001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.sync; 019 020import java.sql.Connection; 021import java.sql.ResultSet; 022import java.sql.SQLException; 023import java.sql.Statement; 024import java.sql.Types; 025import java.text.MessageFormat; 026import java.util.Collection; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.Callable; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Future; 034 035import net.sf.hajdbc.Database; 036import net.sf.hajdbc.ExceptionType; 037import net.sf.hajdbc.ForeignKeyConstraint; 038import net.sf.hajdbc.IdentityColumnSupport; 039import net.sf.hajdbc.Messages; 040import net.sf.hajdbc.SequenceProperties; 041import net.sf.hajdbc.SequenceSupport; 042import net.sf.hajdbc.TableProperties; 043import net.sf.hajdbc.UniqueConstraint; 044import net.sf.hajdbc.dialect.Dialect; 045import net.sf.hajdbc.logging.Level; 046import net.sf.hajdbc.logging.Logger; 047import net.sf.hajdbc.logging.LoggerFactory; 048import net.sf.hajdbc.util.Resources; 049import net.sf.hajdbc.util.Strings; 050 051/** 052 * Default {@link SynchronizationSupport} implementation. 053 * @author Paul Ferraro 054 */ 055public class SynchronizationSupportImpl<Z, D extends Database<Z>> implements SynchronizationSupport 056{ 057 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 058 059 private final SynchronizationContext<Z, D> context; 060 061 public SynchronizationSupportImpl(SynchronizationContext<Z, D> context) 062 { 063 this.context = context; 064 } 065 066 /** 067 * {@inheritDoc} 068 * @see net.sf.hajdbc.sync.SynchronizationSupport#dropForeignKeys() 069 */ 070 @Override 071 public void dropForeignKeys() throws SQLException 072 { 073 Dialect dialect = this.context.getDialect(); 074 075 Connection connection = this.context.getConnection(this.context.getTargetDatabase()); 076 boolean autoCommit = connection.getAutoCommit(); 077 try 078 { 079 connection.setAutoCommit(true); 080 081 Statement statement = connection.createStatement(); 082 try 083 { 084 for (TableProperties table: this.context.getTargetDatabaseProperties().getTables()) 085 { 086 for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints()) 087 { 088 String sql = dialect.getDropForeignKeyConstraintSQL(constraint); 089 090 this.logger.log(Level.DEBUG, sql); 091 092 statement.addBatch(sql); 093 } 094 } 095 statement.executeBatch(); 096 } 097 finally 098 { 099 Resources.close(statement); 100 } 101 } 102 finally 103 { 104 connection.setAutoCommit(autoCommit); 105 } 106 } 107 108 /** 109 * {@inheritDoc} 110 * @see net.sf.hajdbc.sync.SynchronizationSupport#restoreForeignKeys() 111 */ 112 @Override 113 public void restoreForeignKeys() throws SQLException 114 { 115 Dialect dialect = this.context.getDialect(); 116 117 Connection connection = this.context.getConnection(this.context.getTargetDatabase()); 118 boolean autoCommit = connection.getAutoCommit(); 119 try 120 { 121 connection.setAutoCommit(true); 122 123 Statement statement = connection.createStatement(); 124 try 125 { 126 for (TableProperties table: this.context.getSourceDatabaseProperties().getTables()) 127 { 128 for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints()) 129 { 130 String sql = dialect.getCreateForeignKeyConstraintSQL(constraint); 131 132 this.logger.log(Level.DEBUG, sql); 133 134 statement.addBatch(sql); 135 } 136 } 137 138 statement.executeBatch(); 139 } 140 finally 141 { 142 Resources.close(statement); 143 } 144 } 145 finally 146 { 147 connection.setAutoCommit(autoCommit); 148 } 149 } 150 151 /** 152 * {@inheritDoc} 153 * @see net.sf.hajdbc.sync.SynchronizationSupport#synchronizeSequences() 154 */ 155 @Override 156 public void synchronizeSequences() throws SQLException 157 { 158 SequenceSupport support = this.context.getDialect().getSequenceSupport(); 159 160 if (support != null) 161 { 162 Collection<SequenceProperties> sequences = this.context.getSourceDatabaseProperties().getSequences(); 163 164 if (!sequences.isEmpty()) 165 { 166 D sourceDatabase = this.context.getSourceDatabase(); 167 168 Set<D> databases = this.context.getActiveDatabaseSet(); 169 170 ExecutorService executor = this.context.getExecutor(); 171 172 Map<SequenceProperties, Long> sequenceMap = new HashMap<SequenceProperties, Long>(); 173 Map<D, Future<Long>> futureMap = new HashMap<D, Future<Long>>(); 174 175 for (SequenceProperties sequence: sequences) 176 { 177 final String sql = support.getNextSequenceValueSQL(sequence); 178 179 this.logger.log(Level.DEBUG, sql); 180 181 for (final D database: databases) 182 { 183 final SynchronizationContext<Z, D> context = this.context; 184 185 Callable<Long> task = new Callable<Long>() 186 { 187 @Override 188 public Long call() throws SQLException 189 { 190 Statement statement = context.getConnection(database).createStatement(); 191 try 192 { 193 ResultSet resultSet = statement.executeQuery(sql); 194 195 resultSet.next(); 196 197 return resultSet.getLong(1); 198 } 199 finally 200 { 201 Resources.close(statement); 202 } 203 } 204 }; 205 206 futureMap.put(database, executor.submit(task)); 207 } 208 209 try 210 { 211 Long sourceValue = futureMap.get(sourceDatabase).get(); 212 213 sequenceMap.put(sequence, sourceValue); 214 215 for (D database: databases) 216 { 217 if (!database.equals(sourceDatabase)) 218 { 219 Long value = futureMap.get(database).get(); 220 221 if (!value.equals(sourceValue)) 222 { 223 throw new SQLException(Messages.SEQUENCE_OUT_OF_SYNC.getMessage(sequence, database, value, sourceDatabase, sourceValue)); 224 } 225 } 226 } 227 } 228 catch (InterruptedException e) 229 { 230 throw new SQLException(e); 231 } 232 catch (ExecutionException e) 233 { 234 throw ExceptionType.SQL.<SQLException>getExceptionFactory().createException(e.getCause()); 235 } 236 } 237 238 Connection targetConnection = this.context.getConnection(this.context.getTargetDatabase()); 239 Statement targetStatement = targetConnection.createStatement(); 240 try 241 { 242 for (SequenceProperties sequence: sequences) 243 { 244 String sql = support.getAlterSequenceSQL(sequence, sequenceMap.get(sequence) + 1); 245 246 this.logger.log(Level.DEBUG, sql); 247 248 targetStatement.addBatch(sql); 249 } 250 251 targetStatement.executeBatch(); 252 } 253 finally 254 { 255 Resources.close(targetStatement); 256 } 257 } 258 } 259 } 260 261 /** 262 * {@inheritDoc} 263 * @see net.sf.hajdbc.sync.SynchronizationSupport#synchronizeIdentityColumns() 264 */ 265 @Override 266 public void synchronizeIdentityColumns() throws SQLException 267 { 268 IdentityColumnSupport support = this.context.getDialect().getIdentityColumnSupport(); 269 270 if (support != null) 271 { 272 Statement sourceStatement = this.context.getConnection(this.context.getSourceDatabase()).createStatement(); 273 try 274 { 275 Statement targetStatement = this.context.getConnection(this.context.getTargetDatabase()).createStatement(); 276 try 277 { 278 for (TableProperties table: this.context.getSourceDatabaseProperties().getTables()) 279 { 280 Collection<String> columns = table.getIdentityColumns(); 281 282 if (!columns.isEmpty()) 283 { 284 String selectSQL = MessageFormat.format("SELECT max({0}) FROM {1}", Strings.join(columns, "), max("), table.getName()); //$NON-NLS-1$ //$NON-NLS-2$ 285 286 this.logger.log(Level.DEBUG, selectSQL); 287 288 Map<String, Long> map = new HashMap<String, Long>(); 289 ResultSet resultSet = sourceStatement.executeQuery(selectSQL); 290 try 291 { 292 if (resultSet.next()) 293 { 294 int i = 0; 295 296 for (String column: columns) 297 { 298 map.put(column, resultSet.getLong(++i)); 299 } 300 } 301 } 302 finally 303 { 304 Resources.close(resultSet); 305 } 306 307 if (!map.isEmpty()) 308 { 309 for (Map.Entry<String, Long> mapEntry: map.entrySet()) 310 { 311 String alterSQL = support.getAlterIdentityColumnSQL(table, table.getColumnProperties(mapEntry.getKey()), mapEntry.getValue() + 1); 312 313 if (alterSQL != null) 314 { 315 this.logger.log(Level.DEBUG, alterSQL); 316 317 targetStatement.addBatch(alterSQL); 318 } 319 } 320 321 targetStatement.executeBatch(); 322 } 323 } 324 } 325 } 326 finally 327 { 328 Resources.close(targetStatement); 329 } 330 } 331 finally 332 { 333 Resources.close(sourceStatement); 334 } 335 } 336 } 337 338 /** 339 * {@inheritDoc} 340 * @see net.sf.hajdbc.sync.SynchronizationSupport#dropUniqueConstraints() 341 */ 342 @Override 343 public void dropUniqueConstraints() throws SQLException 344 { 345 Dialect dialect = this.context.getDialect(); 346 347 Connection connection = this.context.getConnection(this.context.getTargetDatabase()); 348 boolean autoCommit = connection.getAutoCommit(); 349 try 350 { 351 connection.setAutoCommit(true); 352 Statement statement = connection.createStatement(); 353 try 354 { 355 for (TableProperties table: this.context.getTargetDatabaseProperties().getTables()) 356 { 357 for (UniqueConstraint constraint: table.getUniqueConstraints()) 358 { 359 String sql = dialect.getDropUniqueConstraintSQL(constraint); 360 361 this.logger.log(Level.DEBUG, sql); 362 363 statement.addBatch(sql); 364 } 365 } 366 367 statement.executeBatch(); 368 } 369 finally 370 { 371 Resources.close(statement); 372 } 373 } 374 finally 375 { 376 connection.setAutoCommit(autoCommit); 377 } 378 } 379 380 /** 381 * {@inheritDoc} 382 * @see net.sf.hajdbc.sync.SynchronizationSupport#restoreUniqueConstraints() 383 */ 384 @Override 385 public void restoreUniqueConstraints() throws SQLException 386 { 387 Dialect dialect = this.context.getDialect(); 388 389 Connection connection = this.context.getConnection(this.context.getTargetDatabase()); 390 boolean autoCommit = connection.getAutoCommit(); 391 try 392 { 393 connection.setAutoCommit(true); 394 395 Statement statement = connection.createStatement(); 396 try 397 { 398 for (TableProperties table: this.context.getSourceDatabaseProperties().getTables()) 399 { 400 // Drop unique constraints on the current table 401 for (UniqueConstraint constraint: table.getUniqueConstraints()) 402 { 403 String sql = dialect.getCreateUniqueConstraintSQL(constraint); 404 405 this.logger.log(Level.DEBUG, sql); 406 407 statement.addBatch(sql); 408 } 409 } 410 411 statement.executeBatch(); 412 } 413 finally 414 { 415 Resources.close(statement); 416 } 417 } 418 finally 419 { 420 connection.setAutoCommit(autoCommit); 421 } 422 } 423 424 /** 425 * {@inheritDoc} 426 * @see net.sf.hajdbc.sync.SynchronizationSupport#rollback(java.sql.Connection) 427 */ 428 @Override 429 public void rollback(Connection connection) 430 { 431 try 432 { 433 connection.rollback(); 434 } 435 catch (SQLException e) 436 { 437 this.logger.log(Level.WARN, e); 438 } 439 } 440 441 /** 442 * {@inheritDoc} 443 * @see net.sf.hajdbc.sync.SynchronizationSupport#getObject(java.sql.ResultSet, int, int) 444 */ 445 @Override 446 public Object getObject(ResultSet resultSet, int index, int type) throws SQLException 447 { 448 switch (type) 449 { 450 case Types.BLOB: 451 { 452 return resultSet.getBlob(index); 453 } 454 case Types.CLOB: 455 { 456 return resultSet.getClob(index); 457 } 458 default: 459 { 460 return resultSet.getObject(index); 461 } 462 } 463 } 464}