001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.sql;
019
020import java.sql.Connection;
021import java.sql.SQLException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.TreeSet;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.locks.Lock;
031
032import javax.management.JMException;
033
034import net.sf.hajdbc.Database;
035import net.sf.hajdbc.DatabaseCluster;
036import net.sf.hajdbc.DatabaseClusterConfiguration;
037import net.sf.hajdbc.DatabaseClusterConfigurationListener;
038import net.sf.hajdbc.DatabaseClusterListener;
039import net.sf.hajdbc.Messages;
040import net.sf.hajdbc.SynchronizationListener;
041import net.sf.hajdbc.SynchronizationStrategy;
042import net.sf.hajdbc.TransactionMode;
043import net.sf.hajdbc.Version;
044import net.sf.hajdbc.balancer.Balancer;
045import net.sf.hajdbc.cache.DatabaseMetaDataCache;
046import net.sf.hajdbc.codec.Decoder;
047import net.sf.hajdbc.dialect.Dialect;
048import net.sf.hajdbc.distributed.CommandDispatcherFactory;
049import net.sf.hajdbc.durability.Durability;
050import net.sf.hajdbc.durability.InvocationEvent;
051import net.sf.hajdbc.durability.InvokerEvent;
052import net.sf.hajdbc.io.InputSinkStrategy;
053import net.sf.hajdbc.lock.LockManager;
054import net.sf.hajdbc.lock.distributed.DistributedLockManager;
055import net.sf.hajdbc.logging.Level;
056import net.sf.hajdbc.logging.Logger;
057import net.sf.hajdbc.logging.LoggerFactory;
058import net.sf.hajdbc.management.Description;
059import net.sf.hajdbc.management.MBean;
060import net.sf.hajdbc.management.MBeanRegistrar;
061import net.sf.hajdbc.management.ManagedAttribute;
062import net.sf.hajdbc.management.ManagedOperation;
063import net.sf.hajdbc.state.DatabaseEvent;
064import net.sf.hajdbc.state.StateManager;
065import net.sf.hajdbc.state.distributed.DistributedStateManager;
066import net.sf.hajdbc.sync.SynchronizationContext;
067import net.sf.hajdbc.sync.SynchronizationContextImpl;
068import net.sf.hajdbc.tx.TransactionIdentifierFactory;
069import net.sf.hajdbc.util.Resources;
070import net.sf.hajdbc.util.concurrent.cron.CronExpression;
071import net.sf.hajdbc.util.concurrent.cron.CronThreadPoolExecutor;
072
073/**
074 * @author paul
075 *
076 */
077@MBean
078public class DatabaseClusterImpl<Z, D extends Database<Z>> implements DatabaseCluster<Z, D>
079{
080        static final Logger logger = LoggerFactory.getLogger(DatabaseClusterImpl.class);
081        
082        private final String id;
083        
084        final DatabaseClusterConfiguration<Z, D> configuration;
085        
086        private Balancer<Z, D> balancer;
087        private Dialect dialect;
088        private Durability<Z, D> durability;
089        private DatabaseMetaDataCache<Z, D> databaseMetaDataCache;
090        private ExecutorService executor;
091        private Decoder decoder;
092        private CronThreadPoolExecutor cronExecutor;
093        private LockManager lockManager;
094        private StateManager stateManager;
095        private InputSinkStrategy<? extends Object> sinkSourceFactory;
096        
097        private boolean active = false;
098        
099        private final List<DatabaseClusterConfigurationListener<Z, D>> configurationListeners = new CopyOnWriteArrayList<DatabaseClusterConfigurationListener<Z, D>>(); 
100        private final List<DatabaseClusterListener> clusterListeners = new CopyOnWriteArrayList<DatabaseClusterListener>();
101        private final List<SynchronizationListener> synchronizationListeners = new CopyOnWriteArrayList<SynchronizationListener>();
102        
103        public DatabaseClusterImpl(String id, DatabaseClusterConfiguration<Z, D> configuration, DatabaseClusterConfigurationListener<Z, D> listener)
104        {
105                this.id = id;
106                this.configuration = configuration;
107                
108                if (listener != null)
109                {
110                        this.configurationListeners.add(listener);
111                }
112        }
113
114        /**
115         * Deactivates the specified database.
116         * @param databaseId a database identifier
117         * @throws IllegalArgumentException if no database exists with the specified identifier.
118         */
119        @ManagedOperation
120        public void deactivate(String databaseId)
121        {
122                this.deactivate(this.getDatabase(databaseId), this.stateManager);
123        }
124
125        /**
126         * Synchronizes, using the default strategy, and reactivates the specified database.
127         * @param databaseId a database identifier
128         * @throws IllegalArgumentException if no database exists with the specified identifier.
129         * @throws IllegalStateException if synchronization fails.
130         */
131        @ManagedOperation
132        public void activate(String databaseId)
133        {
134                this.activate(databaseId, this.configuration.getDefaultSynchronizationStrategy());
135        }
136
137        /**
138         * Synchronizes, using the specified strategy, and reactivates the specified database.
139         * @param databaseId a database identifier
140         * @param strategyId the identifer of a synchronization strategy
141         * @throws IllegalArgumentException if no database exists with the specified identifier, or no synchronization strategy exists with the specified identifier.
142         * @throws IllegalStateException if synchronization fails.
143         */
144        @ManagedOperation
145        public void activate(String databaseId, String strategyId)
146        {
147                SynchronizationStrategy strategy = this.configuration.getSynchronizationStrategyMap().get(strategyId);
148                
149                if (strategy == null)
150                {
151                        throw new IllegalArgumentException(Messages.INVALID_SYNC_STRATEGY.getMessage(strategyId));
152                }
153                
154                try
155                {
156                        if (this.activate(this.getDatabase(databaseId), strategy))
157                        {
158                                logger.log(Level.INFO, Messages.DATABASE_ACTIVATED.getMessage(this, databaseId));
159                        }
160                }
161                catch (SQLException e)
162                {
163                        logger.log(Level.WARN, e, Messages.DATABASE_ACTIVATE_FAILED.getMessage(this, databaseId));
164                        
165                        SQLException exception = e.getNextException();
166                        
167                        while (exception != null)
168                        {
169                                logger.log(Level.ERROR, exception);
170                                
171                                exception = exception.getNextException();
172                        }
173
174                        throw new IllegalStateException(e.toString());
175                }
176                catch (InterruptedException e)
177                {
178                        logger.log(Level.WARN, e);
179                        
180                        Thread.currentThread().interrupt();
181                }
182        }
183        
184        /**
185         * Determines whether or not the specified database is responsive
186         * @param databaseId a database identifier
187         * @return true, if the database is alive, false otherwise
188         * @throws IllegalArgumentException if no database exists with the specified identifier.
189         */
190        @ManagedOperation
191        public boolean isAlive(String databaseId)
192        {
193                return this.isAlive(this.getDatabase(databaseId), Level.WARN);
194        }
195        
196        /**
197         * Returns a collection of active databases in this cluster.
198         * @return a list of database identifiers
199         */
200        @ManagedAttribute
201        public Set<String> getActiveDatabases()
202        {
203                Set<String> databases = new TreeSet<String>();
204                
205                for (D database: this.balancer)
206                {
207                        databases.add(database.getId());
208                }
209                
210                return databases;
211        }
212        
213        /**
214         * Returns a collection of inactive databases in this cluster.
215         * @return a collection of database identifiers
216         */
217        @ManagedAttribute
218        public Set<String> getInactiveDatabases()
219        {
220                Set<String> databases = new TreeSet<String>(this.configuration.getDatabaseMap().keySet());
221                
222                for (D database: this.balancer)
223                {
224                        databases.remove(database.getId());
225                }
226                
227                return databases;
228        }
229        
230        /**
231         * Return the current HA-JDBC version
232         * @return the current version
233         */
234        @ManagedAttribute
235        public String getVersion()
236        {
237                return Version.CURRENT.toString();
238        }
239
240        /**
241         * Removes the specified database from the cluster.
242         * @param databaseId a database identifier
243         * @throws JMException 
244         * @throws IllegalArgumentException if database already exists.
245         */
246        @ManagedOperation
247        public void add(String databaseId) throws JMException
248        {
249                D database = this.configuration.getDatabaseFactory().createDatabase(databaseId);
250                
251                if (this.configuration.getDatabaseMap().putIfAbsent(databaseId, database) != null)
252                {
253                        throw new IllegalArgumentException(Messages.DATABASE_ALREADY_EXISTS.getMessage(databaseId, this));
254                }
255                
256                this.configuration.getMBeanRegistrar().register(this, database);
257                
258                for (DatabaseClusterConfigurationListener<Z, D> listener: this.configurationListeners)
259                {
260                        listener.added(database, this.configuration);
261                }
262        }
263        
264        /**
265         * Removes the specified database from the cluster.
266         * @param databaseId a database identifier
267         * @throws IllegalStateException if database is still active.
268         */
269        @ManagedOperation
270        public void remove(String databaseId)
271        {
272                D database = this.getDatabase(databaseId);
273                
274                if (this.balancer.contains(database))
275                {
276                        throw new IllegalStateException(Messages.DATABASE_STILL_ACTIVE.getMessage(this, databaseId));
277                }
278
279                this.configuration.getMBeanRegistrar().unregister(this, database);
280                
281                this.configuration.getDatabaseMap().remove(databaseId);
282
283                for (DatabaseClusterConfigurationListener<Z, D> listener: this.configurationListeners)
284                {
285                        listener.removed(database, this.configuration);
286                }
287        }
288
289        /**
290         * {@inheritDoc}
291         * @see net.sf.hajdbc.DatabaseCluster#getId()
292         */
293        @ManagedAttribute
294        @Override
295        public String getId()
296        {
297                return this.id;
298        }
299
300        @Override
301        public String toString()
302        {
303                return this.id;
304        }
305
306        /**
307         * {@inheritDoc}
308         * @see net.sf.hajdbc.DatabaseCluster#isActive()
309         */
310        @ManagedAttribute
311        @Override
312        public boolean isActive()
313        {
314                return this.active;
315        }
316        
317        /**
318         * Returns the set of synchronization strategies available to this cluster.
319         * @return a set of synchronization strategy identifiers
320         */
321        @ManagedAttribute
322        public Set<String> getSynchronizationStrategies()
323        {
324                return new TreeSet<String>(this.configuration.getSynchronizationStrategyMap().keySet());
325        }
326        
327        /**
328         * Returns the default synchronization strategy used by this cluster.
329         * @return a synchronization strategy identifier
330         */
331        @ManagedAttribute
332        public String getDefaultSynchronizationStrategy()
333        {
334                return this.configuration.getDefaultSynchronizationStrategy();
335        }
336
337        /**
338         * Flushes this cluster's cache of DatabaseMetaData.
339         */
340        @ManagedOperation
341        @Description("Flushes this cluster's cache of database meta data")
342        public void flushMetaDataCache()
343        {
344                try
345                {
346                        this.databaseMetaDataCache.flush();
347                }
348                catch (SQLException e)
349                {
350                        throw new IllegalStateException(e.toString(), e);
351                }
352        }
353
354        /**
355         * {@inheritDoc}
356         * @see net.sf.hajdbc.DatabaseCluster#addConfigurationListener(net.sf.hajdbc.DatabaseClusterConfigurationListener)
357         */
358        @ManagedOperation
359        @Override
360        public void addConfigurationListener(DatabaseClusterConfigurationListener<Z, D> listener)
361        {
362                this.configurationListeners.add(listener);
363        }
364
365        /**
366         * {@inheritDoc}
367         * @see net.sf.hajdbc.DatabaseCluster#addListener(net.sf.hajdbc.DatabaseClusterListener)
368         */
369        @ManagedOperation
370        @Override
371        public void addListener(DatabaseClusterListener listener)
372        {
373                this.clusterListeners.add(listener);
374        }
375
376        /**
377         * {@inheritDoc}
378         * @see net.sf.hajdbc.DatabaseCluster#addSynchronizationListener(net.sf.hajdbc.SynchronizationListener)
379         */
380        @ManagedOperation
381        @Override
382        public void addSynchronizationListener(SynchronizationListener listener)
383        {
384                this.synchronizationListeners.add(listener);
385        }
386
387        /**
388         * {@inheritDoc}
389         * @see net.sf.hajdbc.DatabaseCluster#removeConfigurationListener(net.sf.hajdbc.DatabaseClusterConfigurationListener)
390         */
391        @ManagedOperation
392        @Override
393        public void removeConfigurationListener(DatabaseClusterConfigurationListener<Z, D> listener)
394        {
395                this.configurationListeners.remove(listener);
396        }
397
398        /**
399         * {@inheritDoc}
400         * @see net.sf.hajdbc.DatabaseCluster#removeListener(net.sf.hajdbc.DatabaseClusterListener)
401         */
402        @ManagedOperation
403        @Override
404        public void removeListener(DatabaseClusterListener listener)
405        {
406                this.clusterListeners.remove(listener);
407        }
408
409        /**
410         * {@inheritDoc}
411         * @see net.sf.hajdbc.DatabaseCluster#removeSynchronizationListener(net.sf.hajdbc.SynchronizationListener)
412         */
413        @ManagedOperation
414        @Override
415        public void removeSynchronizationListener(SynchronizationListener listener)
416        {
417                this.synchronizationListeners.remove(listener);
418        }
419        
420        /**
421         * {@inheritDoc}
422         * @see net.sf.hajdbc.DatabaseCluster#activate(net.sf.hajdbc.Database, net.sf.hajdbc.state.StateManager)
423         */
424        @Override
425        public boolean activate(D database, StateManager manager)
426        {
427                boolean added = this.balancer.add(database);
428                
429                if (added)
430                {
431                        database.setActive(true);
432                        
433                        if (database.isDirty())
434                        {
435                                database.clean();
436                        }
437                        
438                        DatabaseEvent event = new DatabaseEvent(database);
439
440                        manager.activated(event);
441                        
442                        for (DatabaseClusterListener listener: this.clusterListeners)
443                        {
444                                listener.activated(event);
445                        }
446                }
447                
448                return added;
449        }
450        
451        /**
452         * {@inheritDoc}
453         * @see net.sf.hajdbc.DatabaseCluster#deactivate(net.sf.hajdbc.Database, net.sf.hajdbc.state.StateManager)
454         */
455        @Override
456        public boolean deactivate(D database, StateManager manager)
457        {
458                boolean removed = this.balancer.remove(database);
459                
460                if (removed)
461                {
462                        database.setActive(false);
463                        
464                        DatabaseEvent event = new DatabaseEvent(database);
465
466                        manager.deactivated(event);
467                        
468                        for (DatabaseClusterListener listener: this.clusterListeners)
469                        {
470                                listener.deactivated(event);
471                        }
472                }
473                
474                return removed;
475        }
476
477        /**
478         * {@inheritDoc}
479         * @see net.sf.hajdbc.DatabaseCluster#getBalancer()
480         */
481        @Override
482        public Balancer<Z, D> getBalancer()
483        {
484                return this.balancer;
485        }
486
487        /**
488         * {@inheritDoc}
489         * @see net.sf.hajdbc.DatabaseCluster#getDatabase(java.lang.String)
490         */
491        @Override
492        public D getDatabase(String id)
493        {
494                D database = this.configuration.getDatabaseMap().get(id);
495                
496                if (database == null)
497                {
498                        throw new IllegalArgumentException(Messages.INVALID_DATABASE.getMessage(this, id));
499                }
500                
501                return database;
502        }
503
504        /**
505         * {@inheritDoc}
506         * @see net.sf.hajdbc.DatabaseCluster#getDatabaseMetaDataCache()
507         */
508        @Override
509        public DatabaseMetaDataCache<Z, D> getDatabaseMetaDataCache()
510        {
511                return this.databaseMetaDataCache;
512        }
513
514        /**
515         * {@inheritDoc}
516         * @see net.sf.hajdbc.DatabaseCluster#getDialect()
517         */
518        @Override
519        public Dialect getDialect()
520        {
521                return this.dialect;
522        }
523
524        /**
525         * {@inheritDoc}
526         * @see net.sf.hajdbc.DatabaseCluster#getDurability()
527         */
528        @Override
529        public Durability<Z, D> getDurability()
530        {
531                return this.durability;
532        }
533
534        /**
535         * {@inheritDoc}
536         * @see net.sf.hajdbc.DatabaseCluster#getLockManager()
537         */
538        @Override
539        public LockManager getLockManager()
540        {
541                return this.lockManager;
542        }
543
544        /**
545         * {@inheritDoc}
546         * @see net.sf.hajdbc.DatabaseCluster#getExecutor()
547         */
548        @Override
549        public ExecutorService getExecutor()
550        {
551                return this.executor;
552        }
553
554        /**
555         * {@inheritDoc}
556         * @see net.sf.hajdbc.DatabaseCluster#getTransactionMode()
557         */
558        @Override
559        public TransactionMode getTransactionMode()
560        {
561                return this.configuration.getTransactionMode();
562        }
563
564        /**
565         * {@inheritDoc}
566         * @see net.sf.hajdbc.DatabaseCluster#getStateManager()
567         */
568        @Override
569        public StateManager getStateManager()
570        {
571                return this.stateManager;
572        }
573
574        @Override
575        public ThreadFactory getThreadFactory()
576        {
577                return this.configuration.getThreadFactory();
578        }
579
580        @Override
581        public Decoder getDecoder()
582        {
583                return this.decoder;
584        }
585
586        @Override
587        public InputSinkStrategy<? extends Object> getInputSinkStrategy()
588        {
589                return this.sinkSourceFactory;
590        }
591
592        /**
593         * {@inheritDoc}
594         * @see net.sf.hajdbc.DatabaseCluster#getTransactionIdentifierFactory()
595         */
596        @Override
597        public TransactionIdentifierFactory<? extends Object> getTransactionIdentifierFactory()
598        {
599                return this.configuration.getTransactionIdentifierFactory();
600        }
601
602        /**
603         * {@inheritDoc}
604         * @see net.sf.hajdbc.DatabaseCluster#isCurrentDateEvaluationEnabled()
605         */
606        @Override
607        public boolean isCurrentDateEvaluationEnabled()
608        {
609                return this.configuration.isCurrentDateEvaluationEnabled();
610        }
611
612        /**
613         * {@inheritDoc}
614         * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimeEvaluationEnabled()
615         */
616        @Override
617        public boolean isCurrentTimeEvaluationEnabled()
618        {
619                return this.configuration.isCurrentTimeEvaluationEnabled();
620        }
621
622        /**
623         * {@inheritDoc}
624         * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimestampEvaluationEnabled()
625         */
626        @Override
627        public boolean isCurrentTimestampEvaluationEnabled()
628        {
629                return this.configuration.isCurrentTimestampEvaluationEnabled();
630        }
631
632        /**
633         * {@inheritDoc}
634         * @see net.sf.hajdbc.DatabaseCluster#isIdentityColumnDetectionEnabled()
635         */
636        @Override
637        public boolean isIdentityColumnDetectionEnabled()
638        {
639                return this.configuration.isIdentityColumnDetectionEnabled();
640        }
641
642        /**
643         * {@inheritDoc}
644         * @see net.sf.hajdbc.DatabaseCluster#isRandEvaluationEnabled()
645         */
646        @Override
647        public boolean isRandEvaluationEnabled()
648        {
649                return this.configuration.isRandEvaluationEnabled();
650        }
651
652        /**
653         * {@inheritDoc}
654         * @see net.sf.hajdbc.DatabaseCluster#isSequenceDetectionEnabled()
655         */
656        @Override
657        public boolean isSequenceDetectionEnabled()
658        {
659                return this.configuration.isSequenceDetectionEnabled();
660        }
661
662        /**
663         * {@inheritDoc}
664         * @see net.sf.hajdbc.Lifecycle#start()
665         */
666        @Override
667        public synchronized void start() throws Exception
668        {
669                if (this.active) return;
670                
671                this.decoder = this.configuration.getDecoderFactory().createDecoder(this.id);
672                this.lockManager = this.configuration.getLockManagerFactory().createLockManager();
673                this.stateManager = this.configuration.getStateManagerFactory().createStateManager(this);
674                
675                CommandDispatcherFactory dispatcherFactory = this.configuration.getDispatcherFactory();
676                
677                if (dispatcherFactory != null)
678                {
679                        this.lockManager = new DistributedLockManager(this, dispatcherFactory);
680                        this.stateManager = new DistributedStateManager<Z, D>(this, dispatcherFactory);
681                }
682                
683                this.balancer = this.configuration.getBalancerFactory().createBalancer(new TreeSet<D>());
684                this.dialect = this.configuration.getDialectFactory().createDialect();
685                this.durability = this.configuration.getDurabilityFactory().createDurability(this);
686                this.executor = this.configuration.getExecutorProvider().getExecutor(this.configuration.getThreadFactory());
687                this.sinkSourceFactory = this.configuration.getInputSinkProvider().createInputSinkStrategy();
688                
689                this.lockManager.start();
690                this.stateManager.start();
691                
692                Set<String> databases = this.stateManager.getActiveDatabases();
693                
694                if (!databases.isEmpty())
695                {
696                        for (String databaseId: databases)
697                        {
698                                D database = this.getDatabase(databaseId);
699                                
700                                if (database != null)
701                                {
702                                        this.balancer.add(database);
703                                        database.setActive(true);
704                                }
705                                else
706                                {
707                                        logger.log(Level.WARN, Messages.DATABASE_IGNORED.getMessage(), this, databaseId);
708                                }
709                        }
710                }
711                else
712                {
713                        for (D database: this.configuration.getDatabaseMap().values())
714                        {
715                                if (this.isAlive(database, Level.WARN))
716                                {
717                                        this.activate(database, this.stateManager);
718                                }
719                        }
720                }
721
722                Map<InvocationEvent, Map<String, InvokerEvent>> invokers = this.stateManager.recover();
723                if (!invokers.isEmpty())
724                {
725                        this.durability.recover(invokers);
726                }
727                
728                this.databaseMetaDataCache = this.configuration.getDatabaseMetaDataCacheFactory().createCache(this);
729                
730                try
731                {
732                        this.flushMetaDataCache();
733                }
734                catch (IllegalStateException e)
735                {
736                        // Ignore - cache will initialize lazily.
737                }
738                
739                CronExpression failureDetectionExpression = this.configuration.getFailureDetectionExpression();
740                CronExpression autoActivationExpression = this.configuration.getAutoActivationExpression();
741                int threads = requiredThreads(failureDetectionExpression) + requiredThreads(autoActivationExpression);
742                
743                if (threads > 0)
744                {
745                        this.cronExecutor = new CronThreadPoolExecutor(threads, this.configuration.getThreadFactory());
746                        
747                        if (failureDetectionExpression != null)
748                        {
749                                this.cronExecutor.schedule(new FailureDetectionTask(), failureDetectionExpression);
750                        }
751                        
752                        if (autoActivationExpression != null)
753                        {
754                                this.cronExecutor.schedule(new AutoActivationTask(), autoActivationExpression);
755                        }
756                }
757                
758                MBeanRegistrar<Z, D> registrar = this.configuration.getMBeanRegistrar();
759
760                if (registrar != null)
761                {
762                        registrar.register(this);
763                        
764                        for (D database: this.configuration.getDatabaseMap().values())
765                        {
766                                registrar.register(this, database);
767                        }
768                }
769                
770                this.active = true;
771        }
772
773        private static int requiredThreads(CronExpression expression)
774        {
775                return (expression != null) ? 1 : 0;
776        }
777        
778        /**
779         * {@inheritDoc}
780         * @see net.sf.hajdbc.Lifecycle#stop()
781         */
782        @Override
783        public synchronized void stop()
784        {
785                this.active = false;
786/*              Don't do this until we can distinguish between embedded databases, and local databases in a separate processes
787                if (this.balancer != null)
788                {
789                        // Proactively deactivate any local databases
790                        for (D database: this.balancer)
791                        {
792                                if (database.isLocal())
793                                {
794                                        this.deactivate(database, this.stateManager);
795                                }
796                        }
797                }
798*/
799                MBeanRegistrar<Z, D> registrar = this.configuration.getMBeanRegistrar();
800
801                if (registrar != null)
802                {
803                        registrar.unregister(this);
804                        
805                        for (D database: this.configuration.getDatabaseMap().values())
806                        {
807                                registrar.unregister(this, database);
808                        }
809                }
810                
811                if (this.cronExecutor != null)
812                {
813                        this.cronExecutor.shutdownNow();
814                }
815                
816                if (this.stateManager != null)
817                {
818                        this.stateManager.stop();
819                }
820                
821                if (this.lockManager != null)
822                {
823                        this.lockManager.stop();
824                }
825
826                if (this.executor != null)
827                {
828                        this.executor.shutdownNow();
829                }
830
831                if (this.balancer != null)
832                {
833                        this.balancer.clear();
834                }
835        }
836
837        boolean isAlive(D database, Level level)
838        {
839                try
840                {
841                        Connection connection = database.connect(database.getConnectionSource(), database.decodePassword(this.decoder));
842                        try
843                        {
844                                return this.dialect.isValid(connection);
845                        }
846                        finally
847                        {
848                                Resources.close(connection);
849                        }
850                }
851                catch (SQLException e)
852                {
853                        logger.log(level, e);
854                        return false;
855                }
856        }
857
858        boolean activate(D database, SynchronizationStrategy strategy) throws SQLException, InterruptedException
859        {
860                if (!this.isAlive(database, Level.DEBUG)) return false;
861                
862                Lock lock = this.lockManager.writeLock(null);
863                
864                lock.lockInterruptibly();
865                
866                try
867                {
868                        if (this.balancer.contains(database)) return false;
869                        
870                        if (!this.balancer.isEmpty())
871                        {
872                                SynchronizationContext<Z, D> context = new SynchronizationContextImpl<Z, D>(this, database);
873                                
874                                try
875                                {
876                                        DatabaseEvent event = new DatabaseEvent(database);
877                                        
878                                        logger.log(Level.INFO, Messages.DATABASE_SYNC_START.getMessage(this, database));
879                                        
880                                        for (SynchronizationListener listener: this.synchronizationListeners)
881                                        {
882                                                listener.beforeSynchronization(event);
883                                        }
884                                        
885                                        strategy.synchronize(context);
886        
887                                        logger.log(Level.INFO, Messages.DATABASE_SYNC_END.getMessage(this, database));
888                                        
889                                        for (SynchronizationListener listener: this.synchronizationListeners)
890                                        {
891                                                listener.afterSynchronization(event);
892                                        }
893                                }
894                                finally
895                                {
896                                        context.close();
897                                }
898                        }
899                        
900                        return this.activate(database, this.stateManager);
901                }
902                finally
903                {
904                        lock.unlock();
905                }
906        }
907
908        class FailureDetectionTask implements Runnable
909        {
910                @Override
911                public void run()
912                {
913                        if (!DatabaseClusterImpl.this.getStateManager().isEnabled()) return;
914                        
915                        Set<D> databases = DatabaseClusterImpl.this.getBalancer();
916                        
917                        int size = databases.size();
918                        
919                        if ((size > 1) || DatabaseClusterImpl.this.configuration.isEmptyClusterAllowed())
920                        {
921                                List<D> deadList = new ArrayList<D>(size);
922                                
923                                for (D database: databases)
924                                {
925                                        if (!DatabaseClusterImpl.this.isAlive(database, Level.WARN))
926                                        {
927                                                deadList.add(database);
928                                        }
929                                }
930
931                                if ((deadList.size() < size) || DatabaseClusterImpl.this.configuration.isEmptyClusterAllowed())
932                                {
933                                        for (D database: deadList)
934                                        {
935                                                if (DatabaseClusterImpl.this.deactivate(database, DatabaseClusterImpl.this.getStateManager()))
936                                                {
937                                                        logger.log(Level.ERROR, Messages.DATABASE_DEACTIVATED.getMessage(), database, DatabaseClusterImpl.this);
938                                                }
939                                        }
940                                }
941                        }
942                }
943        }       
944        
945        class AutoActivationTask implements Runnable
946        {
947                @Override
948                public void run()
949                {
950                        if (!DatabaseClusterImpl.this.getStateManager().isEnabled()) return;
951                        
952                        try
953                        {
954                                Set<D> activeDatabases = DatabaseClusterImpl.this.getBalancer();
955                                
956                                if (!activeDatabases.isEmpty())
957                                {
958                                        for (D database: DatabaseClusterImpl.this.configuration.getDatabaseMap().values())
959                                        {
960                                                if (!activeDatabases.contains(database))
961                                                {
962                                                        try
963                                                        {
964                                                                if (DatabaseClusterImpl.this.activate(database, DatabaseClusterImpl.this.configuration.getSynchronizationStrategyMap().get(DatabaseClusterImpl.this.configuration.getDefaultSynchronizationStrategy())))
965                                                                {
966                                                                        logger.log(Level.INFO, Messages.DATABASE_ACTIVATED.getMessage(), database, DatabaseClusterImpl.this);
967                                                                }
968                                                        }
969                                                        catch (SQLException e)
970                                                        {
971                                                                logger.log(Level.DEBUG, e);
972                                                        }
973                                                }
974                                        }
975                                }
976                        }
977                        catch (InterruptedException e)
978                        {
979                                Thread.currentThread().interrupt();
980                        }
981                }
982        }
983}