001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.sync; 019 020import java.sql.Connection; 021import java.sql.PreparedStatement; 022import java.sql.ResultSet; 023import java.sql.SQLException; 024import java.sql.Statement; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030 031import net.sf.hajdbc.Database; 032import net.sf.hajdbc.DatabaseCluster; 033import net.sf.hajdbc.ExceptionType; 034import net.sf.hajdbc.Messages; 035import net.sf.hajdbc.SynchronizationStrategy; 036import net.sf.hajdbc.TableProperties; 037import net.sf.hajdbc.logging.Level; 038import net.sf.hajdbc.logging.Logger; 039import net.sf.hajdbc.logging.LoggerFactory; 040import net.sf.hajdbc.util.Resources; 041import net.sf.hajdbc.util.Strings; 042 043/** 044 * Database-independent synchronization strategy that does full record transfer between two databases. 045 * This strategy is best used when there are <em>many</em> differences between the active database and the inactive database (i.e. very much out of sync). 046 * The following algorithm is used: 047 * <ol> 048 * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li> 049 * <li>For each database table: 050 * <ol> 051 * <li>Delete all rows in the inactive database table</li> 052 * <li>Query all rows on the active database table</li> 053 * <li>For each row in active database table: 054 * <ol> 055 * <li>Insert new row into inactive database table</li> 056 * </ol> 057 * </li> 058 * </ol> 059 * </li> 060 * <li>Re-create the foreign keys on the inactive database</li> 061 * <li>Synchronize sequences</li> 062 * </ol> 063 * @author Paul Ferraro 064 */ 065public class FullSynchronizationStrategy implements SynchronizationStrategy, TableSynchronizationStrategy 066{ 067 private static final long serialVersionUID = 9190347092842178162L; 068 069 static Logger logger = LoggerFactory.getLogger(FullSynchronizationStrategy.class); 070 071 private SynchronizationStrategy strategy = new PerTableSynchronizationStrategy(this); 072 private int maxBatchSize = 100; 073 private int fetchSize = 0; 074 075 @Override 076 public String getId() 077 { 078 return "full"; 079 } 080 081 @Override 082 public <Z, D extends Database<Z>> void init(DatabaseCluster<Z, D> cluster) 083 { 084 this.strategy.init(cluster); 085 } 086 087 @Override 088 public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context) throws SQLException 089 { 090 this.strategy.synchronize(context); 091 } 092 093 @Override 094 public <Z, D extends Database<Z>> void destroy(DatabaseCluster<Z, D> cluster) 095 { 096 this.strategy.destroy(cluster); 097 } 098 099 @Override 100 public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context, TableProperties table) throws SQLException 101 { 102 final String tableName = table.getName().getDMLName(); 103 final Collection<String> columns = table.getColumns(); 104 105 final String commaDelimitedColumns = Strings.join(columns, Strings.PADDED_COMMA); 106 107 final String selectSQL = String.format("SELECT %s FROM %s", commaDelimitedColumns, tableName); 108 final String deleteSQL = context.getDialect().getTruncateTableSQL(table); 109 final String insertSQL = String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, commaDelimitedColumns, Strings.join(Collections.nCopies(columns.size(), Strings.QUESTION), Strings.PADDED_COMMA)); 110 111 Connection sourceConnection = context.getConnection(context.getSourceDatabase()); 112 final Statement selectStatement = sourceConnection.createStatement(); 113 try 114 { 115 selectStatement.setFetchSize(this.fetchSize); 116 117 Callable<ResultSet> callable = new Callable<ResultSet>() 118 { 119 @Override 120 public ResultSet call() throws SQLException 121 { 122 logger.log(Level.DEBUG, selectSQL); 123 return selectStatement.executeQuery(selectSQL); 124 } 125 }; 126 127 Future<ResultSet> future = context.getExecutor().submit(callable); 128 129 Connection targetConnection = context.getConnection(context.getTargetDatabase()); 130 Statement deleteStatement = targetConnection.createStatement(); 131 132 try 133 { 134 logger.log(Level.DEBUG, deleteSQL); 135 int deletedRows = deleteStatement.executeUpdate(deleteSQL); 136 137 logger.log(Level.INFO, Messages.DELETE_COUNT.getMessage(), deletedRows, tableName); 138 } 139 finally 140 { 141 Resources.close(deleteStatement); 142 } 143 144 logger.log(Level.DEBUG, insertSQL); 145 PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL); 146 147 try 148 { 149 int statementCount = 0; 150 151 ResultSet resultSet = future.get(); 152 153 while (resultSet.next()) 154 { 155 int index = 0; 156 157 for (String column: table.getColumns()) 158 { 159 index += 1; 160 161 int type = context.getDialect().getColumnType(table.getColumnProperties(column)); 162 163 Object object = context.getSynchronizationSupport().getObject(resultSet, index, type); 164 165 if (resultSet.wasNull()) 166 { 167 insertStatement.setNull(index, type); 168 } 169 else 170 { 171 insertStatement.setObject(index, object, type); 172 } 173 } 174 175 insertStatement.addBatch(); 176 statementCount += 1; 177 178 if ((statementCount % this.maxBatchSize) == 0) 179 { 180 insertStatement.executeBatch(); 181 insertStatement.clearBatch(); 182 } 183 184 insertStatement.clearParameters(); 185 } 186 187 if ((statementCount % this.maxBatchSize) > 0) 188 { 189 insertStatement.executeBatch(); 190 } 191 192 logger.log(Level.INFO, Messages.INSERT_COUNT.getMessage(), statementCount, table); 193 } 194 catch (ExecutionException e) 195 { 196 throw ExceptionType.SQL.<SQLException>getExceptionFactory().createException(e.getCause()); 197 } 198 catch (InterruptedException e) 199 { 200 Thread.currentThread().interrupt(); 201 throw new SQLException(e); 202 } 203 finally 204 { 205 Resources.close(insertStatement); 206 } 207 } 208 finally 209 { 210 Resources.close(selectStatement); 211 } 212 } 213 214 @Override 215 public <Z, D extends Database<Z>> void dropConstraints(SynchronizationContext<Z, D> context) throws SQLException 216 { 217 context.getSynchronizationSupport().dropForeignKeys(); 218 } 219 220 @Override 221 public <Z, D extends Database<Z>> void restoreConstraints(SynchronizationContext<Z, D> context) throws SQLException 222 { 223 context.getSynchronizationSupport().restoreForeignKeys(); 224 } 225 226 /** 227 * @return the fetchSize. 228 */ 229 public int getFetchSize() 230 { 231 return this.fetchSize; 232 } 233 234 /** 235 * @param fetchSize the fetchSize to set. 236 */ 237 public void setFetchSize(int fetchSize) 238 { 239 this.fetchSize = fetchSize; 240 } 241 242 /** 243 * @return the maxBatchSize. 244 */ 245 public int getMaxBatchSize() 246 { 247 return this.maxBatchSize; 248 } 249 250 /** 251 * @param maxBatchSize the maxBatchSize to set. 252 */ 253 public void setMaxBatchSize(int maxBatchSize) 254 { 255 this.maxBatchSize = maxBatchSize; 256 } 257}