001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.sync; 019 020import java.sql.Connection; 021import java.sql.PreparedStatement; 022import java.sql.ResultSet; 023import java.sql.SQLException; 024import java.sql.Statement; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.concurrent.Callable; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.Future; 032import java.util.regex.Pattern; 033 034import net.sf.hajdbc.Database; 035import net.sf.hajdbc.DatabaseCluster; 036import net.sf.hajdbc.ExceptionType; 037import net.sf.hajdbc.Messages; 038import net.sf.hajdbc.SynchronizationStrategy; 039import net.sf.hajdbc.TableProperties; 040import net.sf.hajdbc.UniqueConstraint; 041import net.sf.hajdbc.logging.Level; 042import net.sf.hajdbc.logging.Logger; 043import net.sf.hajdbc.logging.LoggerFactory; 044import net.sf.hajdbc.util.Objects; 045import net.sf.hajdbc.util.Resources; 046import net.sf.hajdbc.util.Strings; 047 048/** 049 * Database-independent synchronization strategy that only updates differences between two databases. 050 * This strategy is best used when there are <em>few</em> differences between the active database and the inactive database (i.e. barely out of sync). 051 * The following algorithm is used: 052 * <ol> 053 * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li> 054 * <li>For each database table: 055 * <ol> 056 * <li>Drop the unique constraints on the table (to avoid integrity constraint violations)</li> 057 * <li>Find the primary key(s) of the table</li> 058 * <li>Query all rows in the inactive database table, sorting by the primary key(s)</li> 059 * <li>Query all rows on the active database table</li> 060 * <li>For each row in table: 061 * <ol> 062 * <li>If primary key of the rows are the same, determine whether or not row needs to be updated</li> 063 * <li>Otherwise, determine whether row should be deleted, or a new row is to be inserted</li> 064 * </ol> 065 * </li> 066 * <li>Re-create the unique constraints on the table (to avoid integrity constraint violations)</li> 067 * </ol> 068 * </li> 069 * <li>Re-create the foreign keys on the inactive database</li> 070 * <li>Synchronize sequences</li> 071 * </ol> 072 * @author Paul Ferraro 073 */ 074public class DifferentialSynchronizationStrategy implements SynchronizationStrategy, TableSynchronizationStrategy 075{ 076 private static final long serialVersionUID = -2785092229503649831L; 077 078 static Logger logger = LoggerFactory.getLogger(DifferentialSynchronizationStrategy.class); 079 080 private final SynchronizationStrategy strategy = new PerTableSynchronizationStrategy(this); 081 private int fetchSize = 0; 082 private int maxBatchSize = 100; 083 private Pattern versionPattern = null; 084 085 @Override 086 public String getId() 087 { 088 return "diff"; 089 } 090 091 @Override 092 public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context) throws SQLException 093 { 094 this.strategy.synchronize(context); 095 } 096 097 /** 098 * {@inheritDoc} 099 * @see net.sf.hajdbc.SynchronizationStrategy#init(net.sf.hajdbc.DatabaseCluster) 100 */ 101 @Override 102 public <Z, D extends Database<Z>> void init(DatabaseCluster<Z, D> cluster) 103 { 104 this.strategy.init(cluster); 105 } 106 107 /** 108 * {@inheritDoc} 109 * @see net.sf.hajdbc.SynchronizationStrategy#destroy(net.sf.hajdbc.DatabaseCluster) 110 */ 111 @Override 112 public <Z, D extends Database<Z>> void destroy(DatabaseCluster<Z, D> cluster) 113 { 114 this.strategy.destroy(cluster); 115 } 116 117 @Override 118 public <Z, D extends Database<Z>> void dropConstraints(SynchronizationContext<Z, D> context) throws SQLException 119 { 120 SynchronizationSupport support = context.getSynchronizationSupport(); 121 support.dropForeignKeys(); 122 support.dropUniqueConstraints(); 123 } 124 125 @Override 126 public <Z, D extends Database<Z>> void restoreConstraints(SynchronizationContext<Z, D> context) throws SQLException 127 { 128 SynchronizationSupport support = context.getSynchronizationSupport(); 129 support.restoreUniqueConstraints(); 130 support.restoreForeignKeys(); 131 } 132 133 /** 134 * {@inheritDoc} 135 * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.sync.SynchronizationContext) 136 */ 137 @Override 138 public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context, TableProperties table) throws SQLException 139 { 140 String tableName = table.getName().getDMLName(); 141 142 UniqueConstraint primaryKey = table.getPrimaryKey(); 143 144 if (primaryKey == null) 145 { 146 throw new SQLException(Messages.PRIMARY_KEY_REQUIRED.getMessage(this.getClass().getName(), tableName)); 147 } 148 149 List<String> primaryKeyColumns = primaryKey.getColumnList(); 150 151 Collection<String> columns = table.getColumns(); 152 153 List<String> nonPrimaryKeyColumns = new ArrayList<String>(columns.size()); 154 List<String> versionColumns = new ArrayList<String>(columns.size()); 155 156 for (String column: columns) 157 { 158 if (!primaryKeyColumns.contains(column)) 159 { 160 // Try to find a version column 161 if ((this.versionPattern != null) && this.versionPattern.matcher(column).matches()) 162 { 163 versionColumns.add(column); 164 } 165 166 nonPrimaryKeyColumns.add(column); 167 } 168 } 169 170 // List of columns for select statement - starting with primary key 171 List<String> allColumns = new ArrayList<String>(columns.size()); 172 allColumns.addAll(primaryKeyColumns); 173 allColumns.addAll(nonPrimaryKeyColumns); 174 175 List<String> selectColumns = allColumns; 176 if (!versionColumns.isEmpty()) 177 { 178 selectColumns = new ArrayList<String>(primaryKeyColumns.size() + versionColumns.size()); 179 selectColumns.addAll(primaryKeyColumns); 180 selectColumns.addAll(versionColumns); 181 } 182 183 // Retrieve table rows in primary key order 184 final String selectSQL = String.format("SELECT %s FROM %s ORDER BY %s", Strings.join(selectColumns, Strings.PADDED_COMMA), tableName, Strings.join(primaryKeyColumns, Strings.PADDED_COMMA)); //$NON-NLS-1$ 185 String primaryKeyWhereClause = Strings.join(new StringBuilder(), primaryKeyColumns, " = ? AND ").append(" = ?").toString(); //$NON-NLS-1$ 186 String selectAllSQL = !versionColumns.isEmpty() ? String.format("SELECT %s FROM %s WHERE %s", Strings.join(nonPrimaryKeyColumns, Strings.PADDED_COMMA), tableName, primaryKeyWhereClause) : null; 187 String deleteSQL = String.format("DELETE FROM %s WHERE %s", tableName, primaryKeyWhereClause); 188 String insertSQL = String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, Strings.join(allColumns, Strings.PADDED_COMMA), Strings.join(Collections.nCopies(allColumns.size(), Strings.QUESTION), Strings.PADDED_COMMA)); //$NON-NLS-1$ 189 String updateSQL = !nonPrimaryKeyColumns.isEmpty() ? String.format("UPDATE %s SET %s = ? WHERE %s", tableName, Strings.join(nonPrimaryKeyColumns, " = ?, "), primaryKeyWhereClause) : null; 190 191 Connection targetConnection = context.getConnection(context.getTargetDatabase()); 192 final Statement targetStatement = targetConnection.createStatement(); 193 194 try 195 { 196 targetStatement.setFetchSize(this.fetchSize); 197 198 Callable<ResultSet> callable = new Callable<ResultSet>() 199 { 200 @Override 201 public ResultSet call() throws SQLException 202 { 203 logger.log(Level.DEBUG, selectSQL); 204 return targetStatement.executeQuery(selectSQL); 205 } 206 }; 207 208 Future<ResultSet> future = context.getExecutor().submit(callable); 209 210 Connection sourceConnection = context.getConnection(context.getSourceDatabase()); 211 Statement sourceStatement = sourceConnection.createStatement(); 212 213 try 214 { 215 sourceStatement.setFetchSize(this.fetchSize); 216 217 ResultSet sourceResultSet = sourceStatement.executeQuery(selectSQL); 218 219 ResultSet targetResultSet = future.get(); 220 221 PreparedStatement selectAllStatement = null; 222 if (!versionColumns.isEmpty()) 223 { 224 logger.log(Level.DEBUG, selectAllSQL); 225 selectAllStatement = targetConnection.prepareStatement(selectAllSQL); 226 } 227 228 try 229 { 230 logger.log(Level.DEBUG, deleteSQL); 231 PreparedStatement deleteStatement = targetConnection.prepareStatement(deleteSQL); 232 233 try 234 { 235 logger.log(Level.DEBUG, insertSQL); 236 PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL); 237 238 try 239 { 240 PreparedStatement updateStatement = null; 241 242 if (!nonPrimaryKeyColumns.isEmpty()) 243 { 244 logger.log(Level.DEBUG, updateSQL); 245 updateStatement = targetConnection.prepareStatement(updateSQL); 246 } 247 248 try 249 { 250 boolean hasMoreSourceResults = sourceResultSet.next(); 251 boolean hasMoreTargetResults = targetResultSet.next(); 252 253 int insertCount = 0; 254 int updateCount = 0; 255 int deleteCount = 0; 256 257 while (hasMoreSourceResults || hasMoreTargetResults) 258 { 259 int compare = 0; 260 261 if (!hasMoreSourceResults) 262 { 263 compare = 1; 264 } 265 else if (!hasMoreTargetResults) 266 { 267 compare = -1; 268 } 269 else 270 { 271 for (int i = 1; i <= primaryKeyColumns.size(); ++i) 272 { 273 Object sourceObject = sourceResultSet.getObject(i); 274 Object targetObject = targetResultSet.getObject(i); 275 276 // We assume that the primary keys column types are Comparable 277 compare = compare(sourceObject, targetObject); 278 279 if (compare != 0) 280 { 281 break; 282 } 283 } 284 } 285 286 if (compare > 0) 287 { 288 deleteStatement.clearParameters(); 289 290 for (int i = 1; i <= primaryKeyColumns.size(); ++i) 291 { 292 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 293 294 deleteStatement.setObject(i, targetResultSet.getObject(i), type); 295 } 296 297 deleteStatement.addBatch(); 298 299 deleteCount += 1; 300 301 if ((deleteCount % this.maxBatchSize) == 0) 302 { 303 deleteStatement.executeBatch(); 304 deleteStatement.clearBatch(); 305 } 306 } 307 else if (compare < 0) 308 { 309 insertStatement.clearParameters(); 310 311 for (int i = 1; i <= primaryKeyColumns.size(); ++i) 312 { 313 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 314 315 insertStatement.setObject(i, sourceResultSet.getObject(i), type); 316 } 317 318 if (versionColumns.isEmpty()) 319 { 320 for (int i = primaryKeyColumns.size() + 1; i <= allColumns.size(); ++i) 321 { 322 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 323 324 Object object = context.getSynchronizationSupport().getObject(sourceResultSet, i, type); 325 326 if (sourceResultSet.wasNull()) 327 { 328 insertStatement.setNull(i, type); 329 } 330 else 331 { 332 insertStatement.setObject(i, object, type); 333 } 334 } 335 } 336 else 337 { 338 if (selectAllStatement != null) 339 { 340 selectAllStatement.clearParameters(); 341 342 for (int i = 1; i <= primaryKeyColumns.size(); ++i) 343 { 344 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 345 346 selectAllStatement.setObject(i, sourceResultSet.getObject(i), type); 347 } 348 349 ResultSet selectAllResultSet = selectAllStatement.executeQuery(); 350 351 for (int i = primaryKeyColumns.size() + 1; i <= allColumns.size(); ++i) 352 { 353 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 354 355 Object object = context.getSynchronizationSupport().getObject(selectAllResultSet, i - primaryKeyColumns.size(), type); 356 357 if (selectAllResultSet.wasNull()) 358 { 359 insertStatement.setNull(i, type); 360 } 361 else 362 { 363 insertStatement.setObject(i, object, type); 364 } 365 } 366 } 367 } 368 369 insertStatement.addBatch(); 370 371 insertCount += 1; 372 373 if ((insertCount % this.maxBatchSize) == 0) 374 { 375 insertStatement.executeBatch(); 376 insertStatement.clearBatch(); 377 } 378 } 379 else if (updateStatement != null) // if (compare == 0) 380 { 381 updateStatement.clearParameters(); 382 383 boolean updated = false; 384 385 for (int i = primaryKeyColumns.size() + 1; i <= selectColumns.size(); ++i) 386 { 387 int type = context.getDialect().getColumnType(table.getColumnProperties(selectColumns.get(i - 1))); 388 389 Object sourceObject = context.getSynchronizationSupport().getObject(sourceResultSet, i, type); 390 Object targetObject = context.getSynchronizationSupport().getObject(targetResultSet, i, type); 391 392 int index = i - primaryKeyColumns.size(); 393 394 if (sourceResultSet.wasNull()) 395 { 396 updateStatement.setNull(index, type); 397 398 updated |= !targetResultSet.wasNull(); 399 } 400 else 401 { 402 updateStatement.setObject(index, sourceObject, type); 403 404 updated |= targetResultSet.wasNull(); 405 updated |= !Objects.equals(sourceObject, targetObject); 406 } 407 } 408 409 if (updated) 410 { 411 if (selectAllStatement != null) 412 { 413 selectAllStatement.clearParameters(); 414 415 for (int i = 1; i <= primaryKeyColumns.size(); ++i) 416 { 417 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 418 419 selectAllStatement.setObject(i, sourceResultSet.getObject(i), type); 420 } 421 422 ResultSet selectAllResultSet = selectAllStatement.executeQuery(); 423 424 for (int i = primaryKeyColumns.size() + 1; i <= allColumns.size(); ++i) 425 { 426 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 427 428 int index = i - primaryKeyColumns.size(); 429 430 Object object = context.getSynchronizationSupport().getObject(selectAllResultSet, index, type); 431 432 if (selectAllResultSet.wasNull()) 433 { 434 updateStatement.setNull(index, type); 435 } 436 else 437 { 438 updateStatement.setObject(index, object, type); 439 } 440 } 441 } 442 443 for (int i = 1; i <= primaryKeyColumns.size(); ++i) 444 { 445 int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1))); 446 447 updateStatement.setObject(i + nonPrimaryKeyColumns.size(), targetResultSet.getObject(i), type); 448 } 449 450 updateStatement.addBatch(); 451 452 updateCount += 1; 453 454 if ((updateCount % this.maxBatchSize) == 0) 455 { 456 updateStatement.executeBatch(); 457 updateStatement.clearBatch(); 458 } 459 } 460 } 461 462 if (hasMoreSourceResults && (compare <= 0)) 463 { 464 hasMoreSourceResults = sourceResultSet.next(); 465 } 466 467 if (hasMoreTargetResults && (compare >= 0)) 468 { 469 hasMoreTargetResults = targetResultSet.next(); 470 } 471 } 472 473 if ((deleteCount % this.maxBatchSize) > 0) 474 { 475 deleteStatement.executeBatch(); 476 } 477 478 if ((insertCount % this.maxBatchSize) > 0) 479 { 480 insertStatement.executeBatch(); 481 } 482 483 if (updateStatement != null) 484 { 485 if ((updateCount % this.maxBatchSize) > 0) 486 { 487 updateStatement.executeBatch(); 488 } 489 } 490 491 logger.log(Level.INFO, Messages.INSERT_COUNT.getMessage(), insertCount, tableName); 492 logger.log(Level.INFO, Messages.UPDATE_COUNT.getMessage(), updateCount, tableName); 493 logger.log(Level.INFO, Messages.DELETE_COUNT.getMessage(), deleteCount, tableName); 494 } 495 finally 496 { 497 if (updateStatement != null) 498 { 499 Resources.close(updateStatement); 500 } 501 } 502 } 503 finally 504 { 505 Resources.close(insertStatement); 506 } 507 } 508 finally 509 { 510 Resources.close(deleteStatement); 511 } 512 } 513 finally 514 { 515 if (selectAllStatement != null) 516 { 517 Resources.close(selectAllStatement); 518 } 519 } 520 } 521 catch (InterruptedException e) 522 { 523 Thread.currentThread().interrupt(); 524 throw new SQLException(e); 525 } 526 catch (ExecutionException e) 527 { 528 throw ExceptionType.SQL.<SQLException>getExceptionFactory().createException(e.getCause()); 529 } 530 finally 531 { 532 Resources.close(sourceStatement); 533 } 534 } 535 finally 536 { 537 Resources.close(targetStatement); 538 } 539 } 540 541 private static int compare(Object object1, Object object2) 542 { 543 @SuppressWarnings("unchecked") 544 Comparable<Object> comparable = (Comparable<Object>) object1; 545 546 return comparable.compareTo(object2); 547 } 548 549 /** 550 * @return the fetchSize. 551 */ 552 public int getFetchSize() 553 { 554 return this.fetchSize; 555 } 556 557 /** 558 * @param fetchSize the fetchSize to set. 559 */ 560 public void setFetchSize(int fetchSize) 561 { 562 this.fetchSize = fetchSize; 563 } 564 565 /** 566 * @return Returns the maxBatchSize. 567 */ 568 public int getMaxBatchSize() 569 { 570 return this.maxBatchSize; 571 } 572 573 /** 574 * @param maxBatchSize The maxBatchSize to set. 575 */ 576 public void setMaxBatchSize(int maxBatchSize) 577 { 578 this.maxBatchSize = maxBatchSize; 579 } 580 581 /** 582 * @return the versionPattern 583 */ 584 public String getVersionPattern() 585 { 586 return (this.versionPattern != null) ? this.versionPattern.pattern() : null; 587 } 588 589 /** 590 * @param versionPattern the versionPattern to set 591 */ 592 public void setVersionPattern(String versionPattern) 593 { 594 this.versionPattern = (versionPattern != null) ? Pattern.compile(versionPattern, Pattern.CASE_INSENSITIVE) : null; 595 } 596}