001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.sync; 019 020import java.sql.Connection; 021import java.sql.SQLException; 022import java.util.AbstractMap; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028 029import net.sf.hajdbc.Database; 030import net.sf.hajdbc.DatabaseCluster; 031import net.sf.hajdbc.DatabaseProperties; 032import net.sf.hajdbc.balancer.Balancer; 033import net.sf.hajdbc.cache.DatabaseMetaDataCache; 034import net.sf.hajdbc.codec.Decoder; 035import net.sf.hajdbc.dialect.Dialect; 036import net.sf.hajdbc.logging.Level; 037import net.sf.hajdbc.logging.Logger; 038import net.sf.hajdbc.logging.LoggerFactory; 039import net.sf.hajdbc.util.Resources; 040 041/** 042 * @author Paul Ferraro 043 * @param <D> Driver or DataSource 044 */ 045public class SynchronizationContextImpl<Z, D extends Database<Z>> implements SynchronizationContext<Z, D> 046{ 047 private static final Logger logger = LoggerFactory.getLogger(SynchronizationContextImpl.class); 048 049 private final Set<D> activeDatabaseSet; 050 private final D sourceDatabase; 051 private final D targetDatabase; 052 private final DatabaseCluster<Z, D> cluster; 053 private final DatabaseProperties sourceDatabaseProperties; 054 private final DatabaseProperties targetDatabaseProperties; 055 private final Map<D, Map.Entry<Connection, Boolean>> connectionMap = new HashMap<D, Map.Entry<Connection, Boolean>>(); 056 private final ExecutorService executor; 057 058 /** 059 * @param cluster 060 * @param database 061 * @throws SQLException 062 */ 063 public SynchronizationContextImpl(DatabaseCluster<Z, D> cluster, D database) throws SQLException 064 { 065 this.cluster = cluster; 066 067 Balancer<Z, D> balancer = cluster.getBalancer(); 068 069 this.sourceDatabase = balancer.next(); 070 071 this.activeDatabaseSet = balancer; 072 this.targetDatabase = database; 073 this.executor = Executors.newFixedThreadPool(this.activeDatabaseSet.size(), this.cluster.getThreadFactory()); 074 075 DatabaseMetaDataCache<Z, D> cache = cluster.getDatabaseMetaDataCache(); 076 077 this.targetDatabaseProperties = cache.getDatabaseProperties(this.targetDatabase, this.getConnection(this.targetDatabase)); 078 this.sourceDatabaseProperties = cache.getDatabaseProperties(this.sourceDatabase, this.getConnection(this.sourceDatabase)); 079 } 080 081 /** 082 * @see net.sf.hajdbc.sync.SynchronizationContext#getConnection(net.sf.hajdbc.Database) 083 */ 084 @Override 085 public Connection getConnection(D database) throws SQLException 086 { 087 Map.Entry<Connection, Boolean> entry = this.connectionMap.get(database); 088 089 if (entry == null) 090 { 091 Connection connection = database.connect(database.getConnectionSource(), database.decodePassword(this.cluster.getDecoder())); 092 entry = new AbstractMap.SimpleImmutableEntry<Connection, Boolean>(connection, connection.getAutoCommit()); 093 094 this.connectionMap.put(database, entry); 095 } 096 097 return entry.getKey(); 098 } 099 100 /** 101 * @see net.sf.hajdbc.sync.SynchronizationContext#getSourceDatabase() 102 */ 103 @Override 104 public D getSourceDatabase() 105 { 106 return this.sourceDatabase; 107 } 108 109 /** 110 * @see net.sf.hajdbc.sync.SynchronizationContext#getTargetDatabase() 111 */ 112 @Override 113 public D getTargetDatabase() 114 { 115 return this.targetDatabase; 116 } 117 118 /** 119 * @see net.sf.hajdbc.sync.SynchronizationContext#getActiveDatabaseSet() 120 */ 121 @Override 122 public Set<D> getActiveDatabaseSet() 123 { 124 return this.activeDatabaseSet; 125 } 126 127 /** 128 * @see net.sf.hajdbc.sync.SynchronizationContext#getSourceDatabaseProperties() 129 */ 130 @Override 131 public DatabaseProperties getSourceDatabaseProperties() 132 { 133 return this.sourceDatabaseProperties; 134 } 135 136 /** 137 * @see net.sf.hajdbc.sync.SynchronizationContext#getTargetDatabaseProperties() 138 */ 139 @Override 140 public DatabaseProperties getTargetDatabaseProperties() 141 { 142 return this.targetDatabaseProperties; 143 } 144 145 /** 146 * @see net.sf.hajdbc.sync.SynchronizationContext#getDialect() 147 */ 148 @Override 149 public Dialect getDialect() 150 { 151 return this.cluster.getDialect(); 152 } 153 154 /** 155 * {@inheritDoc} 156 * @see net.sf.hajdbc.sync.SynchronizationContext#getDecoder() 157 */ 158 @Override 159 public Decoder getDecoder() 160 { 161 return this.cluster.getDecoder(); 162 } 163 164 /** 165 * @see net.sf.hajdbc.sync.SynchronizationContext#getExecutor() 166 */ 167 @Override 168 public ExecutorService getExecutor() 169 { 170 return this.executor; 171 } 172 173 /** 174 * {@inheritDoc} 175 * @see net.sf.hajdbc.sync.SynchronizationContext#getSynchronizationSupport() 176 */ 177 @Override 178 public SynchronizationSupport getSynchronizationSupport() 179 { 180 return new SynchronizationSupportImpl<Z, D>(this); 181 } 182 183 /** 184 * @see net.sf.hajdbc.sync.SynchronizationContext#close() 185 */ 186 @Override 187 public void close() 188 { 189 for (Map.Entry<Connection, Boolean> entry: this.connectionMap.values()) 190 { 191 Connection connection = entry.getKey(); 192 193 try 194 { 195 connection.setAutoCommit(entry.getValue()); 196 } 197 catch (SQLException e) 198 { 199 logger.log(Level.WARN, e); 200 } 201 finally 202 { 203 Resources.close(connection); 204 } 205 } 206 207 this.executor.shutdown(); 208 } 209}