001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.sql; 019 020import java.sql.Connection; 021import java.sql.SQLException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.locks.Lock; 031 032import javax.management.JMException; 033 034import net.sf.hajdbc.Database; 035import net.sf.hajdbc.DatabaseCluster; 036import net.sf.hajdbc.DatabaseClusterConfiguration; 037import net.sf.hajdbc.DatabaseClusterConfigurationListener; 038import net.sf.hajdbc.DatabaseClusterListener; 039import net.sf.hajdbc.Messages; 040import net.sf.hajdbc.SynchronizationListener; 041import net.sf.hajdbc.SynchronizationStrategy; 042import net.sf.hajdbc.TransactionMode; 043import net.sf.hajdbc.Version; 044import net.sf.hajdbc.balancer.Balancer; 045import net.sf.hajdbc.cache.DatabaseMetaDataCache; 046import net.sf.hajdbc.codec.Decoder; 047import net.sf.hajdbc.dialect.Dialect; 048import net.sf.hajdbc.distributed.CommandDispatcherFactory; 049import net.sf.hajdbc.durability.Durability; 050import net.sf.hajdbc.durability.InvocationEvent; 051import net.sf.hajdbc.durability.InvokerEvent; 052import net.sf.hajdbc.io.InputSinkStrategy; 053import net.sf.hajdbc.lock.LockManager; 054import net.sf.hajdbc.lock.distributed.DistributedLockManager; 055import net.sf.hajdbc.logging.Level; 056import net.sf.hajdbc.logging.Logger; 057import net.sf.hajdbc.logging.LoggerFactory; 058import net.sf.hajdbc.management.Description; 059import net.sf.hajdbc.management.MBean; 060import net.sf.hajdbc.management.MBeanRegistrar; 061import net.sf.hajdbc.management.ManagedAttribute; 062import net.sf.hajdbc.management.ManagedOperation; 063import net.sf.hajdbc.state.DatabaseEvent; 064import net.sf.hajdbc.state.StateManager; 065import net.sf.hajdbc.state.distributed.DistributedStateManager; 066import net.sf.hajdbc.sync.SynchronizationContext; 067import net.sf.hajdbc.sync.SynchronizationContextImpl; 068import net.sf.hajdbc.tx.TransactionIdentifierFactory; 069import net.sf.hajdbc.util.Resources; 070import net.sf.hajdbc.util.concurrent.cron.CronExpression; 071import net.sf.hajdbc.util.concurrent.cron.CronThreadPoolExecutor; 072 073/** 074 * @author paul 075 * 076 */ 077@MBean 078public class DatabaseClusterImpl<Z, D extends Database<Z>> implements DatabaseCluster<Z, D> 079{ 080 static final Logger logger = LoggerFactory.getLogger(DatabaseClusterImpl.class); 081 082 private final String id; 083 084 final DatabaseClusterConfiguration<Z, D> configuration; 085 086 private Balancer<Z, D> balancer; 087 private Dialect dialect; 088 private Durability<Z, D> durability; 089 private DatabaseMetaDataCache<Z, D> databaseMetaDataCache; 090 private ExecutorService executor; 091 private Decoder decoder; 092 private CronThreadPoolExecutor cronExecutor; 093 private LockManager lockManager; 094 private StateManager stateManager; 095 private InputSinkStrategy<? extends Object> sinkSourceFactory; 096 097 private boolean active = false; 098 099 private final List<DatabaseClusterConfigurationListener<Z, D>> configurationListeners = new CopyOnWriteArrayList<DatabaseClusterConfigurationListener<Z, D>>(); 100 private final List<DatabaseClusterListener> clusterListeners = new CopyOnWriteArrayList<DatabaseClusterListener>(); 101 private final List<SynchronizationListener> synchronizationListeners = new CopyOnWriteArrayList<SynchronizationListener>(); 102 103 public DatabaseClusterImpl(String id, DatabaseClusterConfiguration<Z, D> configuration, DatabaseClusterConfigurationListener<Z, D> listener) 104 { 105 this.id = id; 106 this.configuration = configuration; 107 108 if (listener != null) 109 { 110 this.configurationListeners.add(listener); 111 } 112 } 113 114 /** 115 * Deactivates the specified database. 116 * @param databaseId a database identifier 117 * @throws IllegalArgumentException if no database exists with the specified identifier. 118 */ 119 @ManagedOperation 120 public void deactivate(String databaseId) 121 { 122 this.deactivate(this.getDatabase(databaseId), this.stateManager); 123 } 124 125 /** 126 * Synchronizes, using the default strategy, and reactivates the specified database. 127 * @param databaseId a database identifier 128 * @throws IllegalArgumentException if no database exists with the specified identifier. 129 * @throws IllegalStateException if synchronization fails. 130 */ 131 @ManagedOperation 132 public void activate(String databaseId) 133 { 134 this.activate(databaseId, this.configuration.getDefaultSynchronizationStrategy()); 135 } 136 137 /** 138 * Synchronizes, using the specified strategy, and reactivates the specified database. 139 * @param databaseId a database identifier 140 * @param strategyId the identifer of a synchronization strategy 141 * @throws IllegalArgumentException if no database exists with the specified identifier, or no synchronization strategy exists with the specified identifier. 142 * @throws IllegalStateException if synchronization fails. 143 */ 144 @ManagedOperation 145 public void activate(String databaseId, String strategyId) 146 { 147 SynchronizationStrategy strategy = this.configuration.getSynchronizationStrategyMap().get(strategyId); 148 149 if (strategy == null) 150 { 151 throw new IllegalArgumentException(Messages.INVALID_SYNC_STRATEGY.getMessage(strategyId)); 152 } 153 154 try 155 { 156 if (this.activate(this.getDatabase(databaseId), strategy)) 157 { 158 logger.log(Level.INFO, Messages.DATABASE_ACTIVATED.getMessage(this, databaseId)); 159 } 160 } 161 catch (SQLException e) 162 { 163 logger.log(Level.WARN, e, Messages.DATABASE_ACTIVATE_FAILED.getMessage(this, databaseId)); 164 165 SQLException exception = e.getNextException(); 166 167 while (exception != null) 168 { 169 logger.log(Level.ERROR, exception); 170 171 exception = exception.getNextException(); 172 } 173 174 throw new IllegalStateException(e.toString()); 175 } 176 catch (InterruptedException e) 177 { 178 logger.log(Level.WARN, e); 179 180 Thread.currentThread().interrupt(); 181 } 182 } 183 184 /** 185 * Determines whether or not the specified database is responsive 186 * @param databaseId a database identifier 187 * @return true, if the database is alive, false otherwise 188 * @throws IllegalArgumentException if no database exists with the specified identifier. 189 */ 190 @ManagedOperation 191 public boolean isAlive(String databaseId) 192 { 193 return this.isAlive(this.getDatabase(databaseId), Level.WARN); 194 } 195 196 /** 197 * Returns a collection of active databases in this cluster. 198 * @return a list of database identifiers 199 */ 200 @ManagedAttribute 201 public Set<String> getActiveDatabases() 202 { 203 Set<String> databases = new TreeSet<String>(); 204 205 for (D database: this.balancer) 206 { 207 databases.add(database.getId()); 208 } 209 210 return databases; 211 } 212 213 /** 214 * Returns a collection of inactive databases in this cluster. 215 * @return a collection of database identifiers 216 */ 217 @ManagedAttribute 218 public Set<String> getInactiveDatabases() 219 { 220 Set<String> databases = new TreeSet<String>(this.configuration.getDatabaseMap().keySet()); 221 222 for (D database: this.balancer) 223 { 224 databases.remove(database.getId()); 225 } 226 227 return databases; 228 } 229 230 /** 231 * Return the current HA-JDBC version 232 * @return the current version 233 */ 234 @ManagedAttribute 235 public String getVersion() 236 { 237 return Version.CURRENT.toString(); 238 } 239 240 /** 241 * Removes the specified database from the cluster. 242 * @param databaseId a database identifier 243 * @throws JMException 244 * @throws IllegalArgumentException if database already exists. 245 */ 246 @ManagedOperation 247 public void add(String databaseId) throws JMException 248 { 249 D database = this.configuration.getDatabaseFactory().createDatabase(databaseId); 250 251 if (this.configuration.getDatabaseMap().putIfAbsent(databaseId, database) != null) 252 { 253 throw new IllegalArgumentException(Messages.DATABASE_ALREADY_EXISTS.getMessage(databaseId, this)); 254 } 255 256 this.configuration.getMBeanRegistrar().register(this, database); 257 258 for (DatabaseClusterConfigurationListener<Z, D> listener: this.configurationListeners) 259 { 260 listener.added(database, this.configuration); 261 } 262 } 263 264 /** 265 * Removes the specified database from the cluster. 266 * @param databaseId a database identifier 267 * @throws IllegalStateException if database is still active. 268 */ 269 @ManagedOperation 270 public void remove(String databaseId) 271 { 272 D database = this.getDatabase(databaseId); 273 274 if (this.balancer.contains(database)) 275 { 276 throw new IllegalStateException(Messages.DATABASE_STILL_ACTIVE.getMessage(this, databaseId)); 277 } 278 279 this.configuration.getMBeanRegistrar().unregister(this, database); 280 281 this.configuration.getDatabaseMap().remove(databaseId); 282 283 for (DatabaseClusterConfigurationListener<Z, D> listener: this.configurationListeners) 284 { 285 listener.removed(database, this.configuration); 286 } 287 } 288 289 /** 290 * {@inheritDoc} 291 * @see net.sf.hajdbc.DatabaseCluster#getId() 292 */ 293 @ManagedAttribute 294 @Override 295 public String getId() 296 { 297 return this.id; 298 } 299 300 @Override 301 public String toString() 302 { 303 return this.id; 304 } 305 306 /** 307 * {@inheritDoc} 308 * @see net.sf.hajdbc.DatabaseCluster#isActive() 309 */ 310 @ManagedAttribute 311 @Override 312 public boolean isActive() 313 { 314 return this.active; 315 } 316 317 /** 318 * Returns the set of synchronization strategies available to this cluster. 319 * @return a set of synchronization strategy identifiers 320 */ 321 @ManagedAttribute 322 public Set<String> getSynchronizationStrategies() 323 { 324 return new TreeSet<String>(this.configuration.getSynchronizationStrategyMap().keySet()); 325 } 326 327 /** 328 * Returns the default synchronization strategy used by this cluster. 329 * @return a synchronization strategy identifier 330 */ 331 @ManagedAttribute 332 public String getDefaultSynchronizationStrategy() 333 { 334 return this.configuration.getDefaultSynchronizationStrategy(); 335 } 336 337 /** 338 * Flushes this cluster's cache of DatabaseMetaData. 339 */ 340 @ManagedOperation 341 @Description("Flushes this cluster's cache of database meta data") 342 public void flushMetaDataCache() 343 { 344 try 345 { 346 this.databaseMetaDataCache.flush(); 347 } 348 catch (SQLException e) 349 { 350 throw new IllegalStateException(e.toString(), e); 351 } 352 } 353 354 /** 355 * {@inheritDoc} 356 * @see net.sf.hajdbc.DatabaseCluster#addConfigurationListener(net.sf.hajdbc.DatabaseClusterConfigurationListener) 357 */ 358 @ManagedOperation 359 @Override 360 public void addConfigurationListener(DatabaseClusterConfigurationListener<Z, D> listener) 361 { 362 this.configurationListeners.add(listener); 363 } 364 365 /** 366 * {@inheritDoc} 367 * @see net.sf.hajdbc.DatabaseCluster#addListener(net.sf.hajdbc.DatabaseClusterListener) 368 */ 369 @ManagedOperation 370 @Override 371 public void addListener(DatabaseClusterListener listener) 372 { 373 this.clusterListeners.add(listener); 374 } 375 376 /** 377 * {@inheritDoc} 378 * @see net.sf.hajdbc.DatabaseCluster#addSynchronizationListener(net.sf.hajdbc.SynchronizationListener) 379 */ 380 @ManagedOperation 381 @Override 382 public void addSynchronizationListener(SynchronizationListener listener) 383 { 384 this.synchronizationListeners.add(listener); 385 } 386 387 /** 388 * {@inheritDoc} 389 * @see net.sf.hajdbc.DatabaseCluster#removeConfigurationListener(net.sf.hajdbc.DatabaseClusterConfigurationListener) 390 */ 391 @ManagedOperation 392 @Override 393 public void removeConfigurationListener(DatabaseClusterConfigurationListener<Z, D> listener) 394 { 395 this.configurationListeners.remove(listener); 396 } 397 398 /** 399 * {@inheritDoc} 400 * @see net.sf.hajdbc.DatabaseCluster#removeListener(net.sf.hajdbc.DatabaseClusterListener) 401 */ 402 @ManagedOperation 403 @Override 404 public void removeListener(DatabaseClusterListener listener) 405 { 406 this.clusterListeners.remove(listener); 407 } 408 409 /** 410 * {@inheritDoc} 411 * @see net.sf.hajdbc.DatabaseCluster#removeSynchronizationListener(net.sf.hajdbc.SynchronizationListener) 412 */ 413 @ManagedOperation 414 @Override 415 public void removeSynchronizationListener(SynchronizationListener listener) 416 { 417 this.synchronizationListeners.remove(listener); 418 } 419 420 /** 421 * {@inheritDoc} 422 * @see net.sf.hajdbc.DatabaseCluster#activate(net.sf.hajdbc.Database, net.sf.hajdbc.state.StateManager) 423 */ 424 @Override 425 public boolean activate(D database, StateManager manager) 426 { 427 boolean added = this.balancer.add(database); 428 429 if (added) 430 { 431 database.setActive(true); 432 433 if (database.isDirty()) 434 { 435 database.clean(); 436 } 437 438 DatabaseEvent event = new DatabaseEvent(database); 439 440 manager.activated(event); 441 442 for (DatabaseClusterListener listener: this.clusterListeners) 443 { 444 listener.activated(event); 445 } 446 } 447 448 return added; 449 } 450 451 /** 452 * {@inheritDoc} 453 * @see net.sf.hajdbc.DatabaseCluster#deactivate(net.sf.hajdbc.Database, net.sf.hajdbc.state.StateManager) 454 */ 455 @Override 456 public boolean deactivate(D database, StateManager manager) 457 { 458 boolean removed = this.balancer.remove(database); 459 460 if (removed) 461 { 462 database.setActive(false); 463 464 DatabaseEvent event = new DatabaseEvent(database); 465 466 manager.deactivated(event); 467 468 for (DatabaseClusterListener listener: this.clusterListeners) 469 { 470 listener.deactivated(event); 471 } 472 } 473 474 return removed; 475 } 476 477 /** 478 * {@inheritDoc} 479 * @see net.sf.hajdbc.DatabaseCluster#getBalancer() 480 */ 481 @Override 482 public Balancer<Z, D> getBalancer() 483 { 484 return this.balancer; 485 } 486 487 /** 488 * {@inheritDoc} 489 * @see net.sf.hajdbc.DatabaseCluster#getDatabase(java.lang.String) 490 */ 491 @Override 492 public D getDatabase(String id) 493 { 494 D database = this.configuration.getDatabaseMap().get(id); 495 496 if (database == null) 497 { 498 throw new IllegalArgumentException(Messages.INVALID_DATABASE.getMessage(this, id)); 499 } 500 501 return database; 502 } 503 504 /** 505 * {@inheritDoc} 506 * @see net.sf.hajdbc.DatabaseCluster#getDatabaseMetaDataCache() 507 */ 508 @Override 509 public DatabaseMetaDataCache<Z, D> getDatabaseMetaDataCache() 510 { 511 return this.databaseMetaDataCache; 512 } 513 514 /** 515 * {@inheritDoc} 516 * @see net.sf.hajdbc.DatabaseCluster#getDialect() 517 */ 518 @Override 519 public Dialect getDialect() 520 { 521 return this.dialect; 522 } 523 524 /** 525 * {@inheritDoc} 526 * @see net.sf.hajdbc.DatabaseCluster#getDurability() 527 */ 528 @Override 529 public Durability<Z, D> getDurability() 530 { 531 return this.durability; 532 } 533 534 /** 535 * {@inheritDoc} 536 * @see net.sf.hajdbc.DatabaseCluster#getLockManager() 537 */ 538 @Override 539 public LockManager getLockManager() 540 { 541 return this.lockManager; 542 } 543 544 /** 545 * {@inheritDoc} 546 * @see net.sf.hajdbc.DatabaseCluster#getExecutor() 547 */ 548 @Override 549 public ExecutorService getExecutor() 550 { 551 return this.executor; 552 } 553 554 /** 555 * {@inheritDoc} 556 * @see net.sf.hajdbc.DatabaseCluster#getTransactionMode() 557 */ 558 @Override 559 public TransactionMode getTransactionMode() 560 { 561 return this.configuration.getTransactionMode(); 562 } 563 564 /** 565 * {@inheritDoc} 566 * @see net.sf.hajdbc.DatabaseCluster#getStateManager() 567 */ 568 @Override 569 public StateManager getStateManager() 570 { 571 return this.stateManager; 572 } 573 574 @Override 575 public ThreadFactory getThreadFactory() 576 { 577 return this.configuration.getThreadFactory(); 578 } 579 580 @Override 581 public Decoder getDecoder() 582 { 583 return this.decoder; 584 } 585 586 @Override 587 public InputSinkStrategy<? extends Object> getInputSinkStrategy() 588 { 589 return this.sinkSourceFactory; 590 } 591 592 /** 593 * {@inheritDoc} 594 * @see net.sf.hajdbc.DatabaseCluster#getTransactionIdentifierFactory() 595 */ 596 @Override 597 public TransactionIdentifierFactory<? extends Object> getTransactionIdentifierFactory() 598 { 599 return this.configuration.getTransactionIdentifierFactory(); 600 } 601 602 /** 603 * {@inheritDoc} 604 * @see net.sf.hajdbc.DatabaseCluster#isCurrentDateEvaluationEnabled() 605 */ 606 @Override 607 public boolean isCurrentDateEvaluationEnabled() 608 { 609 return this.configuration.isCurrentDateEvaluationEnabled(); 610 } 611 612 /** 613 * {@inheritDoc} 614 * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimeEvaluationEnabled() 615 */ 616 @Override 617 public boolean isCurrentTimeEvaluationEnabled() 618 { 619 return this.configuration.isCurrentTimeEvaluationEnabled(); 620 } 621 622 /** 623 * {@inheritDoc} 624 * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimestampEvaluationEnabled() 625 */ 626 @Override 627 public boolean isCurrentTimestampEvaluationEnabled() 628 { 629 return this.configuration.isCurrentTimestampEvaluationEnabled(); 630 } 631 632 /** 633 * {@inheritDoc} 634 * @see net.sf.hajdbc.DatabaseCluster#isIdentityColumnDetectionEnabled() 635 */ 636 @Override 637 public boolean isIdentityColumnDetectionEnabled() 638 { 639 return this.configuration.isIdentityColumnDetectionEnabled(); 640 } 641 642 /** 643 * {@inheritDoc} 644 * @see net.sf.hajdbc.DatabaseCluster#isRandEvaluationEnabled() 645 */ 646 @Override 647 public boolean isRandEvaluationEnabled() 648 { 649 return this.configuration.isRandEvaluationEnabled(); 650 } 651 652 /** 653 * {@inheritDoc} 654 * @see net.sf.hajdbc.DatabaseCluster#isSequenceDetectionEnabled() 655 */ 656 @Override 657 public boolean isSequenceDetectionEnabled() 658 { 659 return this.configuration.isSequenceDetectionEnabled(); 660 } 661 662 /** 663 * {@inheritDoc} 664 * @see net.sf.hajdbc.Lifecycle#start() 665 */ 666 @Override 667 public synchronized void start() throws Exception 668 { 669 if (this.active) return; 670 671 this.decoder = this.configuration.getDecoderFactory().createDecoder(this.id); 672 this.lockManager = this.configuration.getLockManagerFactory().createLockManager(); 673 this.stateManager = this.configuration.getStateManagerFactory().createStateManager(this); 674 675 CommandDispatcherFactory dispatcherFactory = this.configuration.getDispatcherFactory(); 676 677 if (dispatcherFactory != null) 678 { 679 this.lockManager = new DistributedLockManager(this, dispatcherFactory); 680 this.stateManager = new DistributedStateManager<Z, D>(this, dispatcherFactory); 681 } 682 683 this.balancer = this.configuration.getBalancerFactory().createBalancer(new TreeSet<D>()); 684 this.dialect = this.configuration.getDialectFactory().createDialect(); 685 this.durability = this.configuration.getDurabilityFactory().createDurability(this); 686 this.executor = this.configuration.getExecutorProvider().getExecutor(this.configuration.getThreadFactory()); 687 this.sinkSourceFactory = this.configuration.getInputSinkProvider().createInputSinkStrategy(); 688 689 this.lockManager.start(); 690 this.stateManager.start(); 691 692 Set<String> databases = this.stateManager.getActiveDatabases(); 693 694 if (!databases.isEmpty()) 695 { 696 for (String databaseId: databases) 697 { 698 D database = this.getDatabase(databaseId); 699 700 if (database != null) 701 { 702 this.balancer.add(database); 703 database.setActive(true); 704 } 705 else 706 { 707 logger.log(Level.WARN, Messages.DATABASE_IGNORED.getMessage(), this, databaseId); 708 } 709 } 710 } 711 else 712 { 713 for (D database: this.configuration.getDatabaseMap().values()) 714 { 715 if (this.isAlive(database, Level.WARN)) 716 { 717 this.activate(database, this.stateManager); 718 } 719 } 720 } 721 722 Map<InvocationEvent, Map<String, InvokerEvent>> invokers = this.stateManager.recover(); 723 if (!invokers.isEmpty()) 724 { 725 this.durability.recover(invokers); 726 } 727 728 this.databaseMetaDataCache = this.configuration.getDatabaseMetaDataCacheFactory().createCache(this); 729 730 try 731 { 732 this.flushMetaDataCache(); 733 } 734 catch (IllegalStateException e) 735 { 736 // Ignore - cache will initialize lazily. 737 } 738 739 CronExpression failureDetectionExpression = this.configuration.getFailureDetectionExpression(); 740 CronExpression autoActivationExpression = this.configuration.getAutoActivationExpression(); 741 int threads = requiredThreads(failureDetectionExpression) + requiredThreads(autoActivationExpression); 742 743 if (threads > 0) 744 { 745 this.cronExecutor = new CronThreadPoolExecutor(threads, this.configuration.getThreadFactory()); 746 747 if (failureDetectionExpression != null) 748 { 749 this.cronExecutor.schedule(new FailureDetectionTask(), failureDetectionExpression); 750 } 751 752 if (autoActivationExpression != null) 753 { 754 this.cronExecutor.schedule(new AutoActivationTask(), autoActivationExpression); 755 } 756 } 757 758 MBeanRegistrar<Z, D> registrar = this.configuration.getMBeanRegistrar(); 759 760 if (registrar != null) 761 { 762 registrar.register(this); 763 764 for (D database: this.configuration.getDatabaseMap().values()) 765 { 766 registrar.register(this, database); 767 } 768 } 769 770 this.active = true; 771 } 772 773 private static int requiredThreads(CronExpression expression) 774 { 775 return (expression != null) ? 1 : 0; 776 } 777 778 /** 779 * {@inheritDoc} 780 * @see net.sf.hajdbc.Lifecycle#stop() 781 */ 782 @Override 783 public synchronized void stop() 784 { 785 this.active = false; 786/* Don't do this until we can distinguish between embedded databases, and local databases in a separate processes 787 if (this.balancer != null) 788 { 789 // Proactively deactivate any local databases 790 for (D database: this.balancer) 791 { 792 if (database.isLocal()) 793 { 794 this.deactivate(database, this.stateManager); 795 } 796 } 797 } 798*/ 799 MBeanRegistrar<Z, D> registrar = this.configuration.getMBeanRegistrar(); 800 801 if (registrar != null) 802 { 803 registrar.unregister(this); 804 805 for (D database: this.configuration.getDatabaseMap().values()) 806 { 807 registrar.unregister(this, database); 808 } 809 } 810 811 if (this.cronExecutor != null) 812 { 813 this.cronExecutor.shutdownNow(); 814 } 815 816 if (this.stateManager != null) 817 { 818 this.stateManager.stop(); 819 } 820 821 if (this.lockManager != null) 822 { 823 this.lockManager.stop(); 824 } 825 826 if (this.executor != null) 827 { 828 this.executor.shutdownNow(); 829 } 830 831 if (this.balancer != null) 832 { 833 this.balancer.clear(); 834 } 835 } 836 837 boolean isAlive(D database, Level level) 838 { 839 try 840 { 841 Connection connection = database.connect(database.getConnectionSource(), database.decodePassword(this.decoder)); 842 try 843 { 844 return this.dialect.isValid(connection); 845 } 846 finally 847 { 848 Resources.close(connection); 849 } 850 } 851 catch (SQLException e) 852 { 853 logger.log(level, e); 854 return false; 855 } 856 } 857 858 boolean activate(D database, SynchronizationStrategy strategy) throws SQLException, InterruptedException 859 { 860 if (!this.isAlive(database, Level.DEBUG)) return false; 861 862 Lock lock = this.lockManager.writeLock(null); 863 864 lock.lockInterruptibly(); 865 866 try 867 { 868 if (this.balancer.contains(database)) return false; 869 870 if (!this.balancer.isEmpty()) 871 { 872 SynchronizationContext<Z, D> context = new SynchronizationContextImpl<Z, D>(this, database); 873 874 try 875 { 876 DatabaseEvent event = new DatabaseEvent(database); 877 878 logger.log(Level.INFO, Messages.DATABASE_SYNC_START.getMessage(this, database)); 879 880 for (SynchronizationListener listener: this.synchronizationListeners) 881 { 882 listener.beforeSynchronization(event); 883 } 884 885 strategy.synchronize(context); 886 887 logger.log(Level.INFO, Messages.DATABASE_SYNC_END.getMessage(this, database)); 888 889 for (SynchronizationListener listener: this.synchronizationListeners) 890 { 891 listener.afterSynchronization(event); 892 } 893 } 894 finally 895 { 896 context.close(); 897 } 898 } 899 900 return this.activate(database, this.stateManager); 901 } 902 finally 903 { 904 lock.unlock(); 905 } 906 } 907 908 class FailureDetectionTask implements Runnable 909 { 910 @Override 911 public void run() 912 { 913 if (!DatabaseClusterImpl.this.getStateManager().isEnabled()) return; 914 915 Set<D> databases = DatabaseClusterImpl.this.getBalancer(); 916 917 int size = databases.size(); 918 919 if ((size > 1) || DatabaseClusterImpl.this.configuration.isEmptyClusterAllowed()) 920 { 921 List<D> deadList = new ArrayList<D>(size); 922 923 for (D database: databases) 924 { 925 if (!DatabaseClusterImpl.this.isAlive(database, Level.WARN)) 926 { 927 deadList.add(database); 928 } 929 } 930 931 if ((deadList.size() < size) || DatabaseClusterImpl.this.configuration.isEmptyClusterAllowed()) 932 { 933 for (D database: deadList) 934 { 935 if (DatabaseClusterImpl.this.deactivate(database, DatabaseClusterImpl.this.getStateManager())) 936 { 937 logger.log(Level.ERROR, Messages.DATABASE_DEACTIVATED.getMessage(), database, DatabaseClusterImpl.this); 938 } 939 } 940 } 941 } 942 } 943 } 944 945 class AutoActivationTask implements Runnable 946 { 947 @Override 948 public void run() 949 { 950 if (!DatabaseClusterImpl.this.getStateManager().isEnabled()) return; 951 952 try 953 { 954 Set<D> activeDatabases = DatabaseClusterImpl.this.getBalancer(); 955 956 if (!activeDatabases.isEmpty()) 957 { 958 for (D database: DatabaseClusterImpl.this.configuration.getDatabaseMap().values()) 959 { 960 if (!activeDatabases.contains(database)) 961 { 962 try 963 { 964 if (DatabaseClusterImpl.this.activate(database, DatabaseClusterImpl.this.configuration.getSynchronizationStrategyMap().get(DatabaseClusterImpl.this.configuration.getDefaultSynchronizationStrategy()))) 965 { 966 logger.log(Level.INFO, Messages.DATABASE_ACTIVATED.getMessage(), database, DatabaseClusterImpl.this); 967 } 968 } 969 catch (SQLException e) 970 { 971 logger.log(Level.DEBUG, e); 972 } 973 } 974 } 975 } 976 } 977 catch (InterruptedException e) 978 { 979 Thread.currentThread().interrupt(); 980 } 981 } 982 } 983}