001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.sync;
019
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.ResultSet;
023import java.sql.SQLException;
024import java.sql.Statement;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Future;
030
031import net.sf.hajdbc.Database;
032import net.sf.hajdbc.DatabaseCluster;
033import net.sf.hajdbc.ExceptionType;
034import net.sf.hajdbc.Messages;
035import net.sf.hajdbc.SynchronizationStrategy;
036import net.sf.hajdbc.TableProperties;
037import net.sf.hajdbc.logging.Level;
038import net.sf.hajdbc.logging.Logger;
039import net.sf.hajdbc.logging.LoggerFactory;
040import net.sf.hajdbc.util.Resources;
041import net.sf.hajdbc.util.Strings;
042
043/**
044 * Database-independent synchronization strategy that does full record transfer between two databases.
045 * This strategy is best used when there are <em>many</em> differences between the active database and the inactive database (i.e. very much out of sync).
046 * The following algorithm is used:
047 * <ol>
048 *  <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
049 *  <li>For each database table:
050 *   <ol>
051 *    <li>Delete all rows in the inactive database table</li>
052 *    <li>Query all rows on the active database table</li>
053 *    <li>For each row in active database table:
054 *     <ol>
055 *      <li>Insert new row into inactive database table</li>
056 *     </ol>
057 *    </li>
058 *   </ol>
059 *  </li>
060 *  <li>Re-create the foreign keys on the inactive database</li>
061 *  <li>Synchronize sequences</li>
062 * </ol>
063 * @author  Paul Ferraro
064 */
065public class FullSynchronizationStrategy implements SynchronizationStrategy, TableSynchronizationStrategy
066{
067        private static final long serialVersionUID = 9190347092842178162L;
068
069        static Logger logger = LoggerFactory.getLogger(FullSynchronizationStrategy.class);
070
071        private SynchronizationStrategy strategy = new PerTableSynchronizationStrategy(this);
072        private int maxBatchSize = 100;
073        private int fetchSize = 0;
074
075        @Override
076        public String getId()
077        {
078                return "full";
079        }
080
081        @Override
082        public <Z, D extends Database<Z>> void init(DatabaseCluster<Z, D> cluster)
083        {
084                this.strategy.init(cluster);
085        }
086
087        @Override
088        public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context) throws SQLException
089        {
090                this.strategy.synchronize(context);
091        }
092
093        @Override
094        public <Z, D extends Database<Z>> void destroy(DatabaseCluster<Z, D> cluster)
095        {
096                this.strategy.destroy(cluster);
097        }
098
099        @Override
100        public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context, TableProperties table) throws SQLException
101        {
102                final String tableName = table.getName().getDMLName();
103                final Collection<String> columns = table.getColumns();
104                
105                final String commaDelimitedColumns = Strings.join(columns, Strings.PADDED_COMMA);
106                
107                final String selectSQL = String.format("SELECT %s FROM %s", commaDelimitedColumns, tableName);
108                final String deleteSQL = context.getDialect().getTruncateTableSQL(table);
109                final String insertSQL = String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, commaDelimitedColumns, Strings.join(Collections.nCopies(columns.size(), Strings.QUESTION), Strings.PADDED_COMMA));
110                
111                Connection sourceConnection = context.getConnection(context.getSourceDatabase());
112                final Statement selectStatement = sourceConnection.createStatement();
113                try
114                {
115                        selectStatement.setFetchSize(this.fetchSize);
116                        
117                        Callable<ResultSet> callable = new Callable<ResultSet>()
118                        {
119                                @Override
120                                public ResultSet call() throws SQLException
121                                {
122                                        logger.log(Level.DEBUG, selectSQL);
123                                        return selectStatement.executeQuery(selectSQL);
124                                }
125                        };
126        
127                        Future<ResultSet> future = context.getExecutor().submit(callable);
128                        
129                        Connection targetConnection = context.getConnection(context.getTargetDatabase());
130                        Statement deleteStatement = targetConnection.createStatement();
131                        
132                        try
133                        {
134                                logger.log(Level.DEBUG, deleteSQL);
135                                int deletedRows = deleteStatement.executeUpdate(deleteSQL);
136                
137                                logger.log(Level.INFO, Messages.DELETE_COUNT.getMessage(), deletedRows, tableName);
138                        }
139                        finally
140                        {
141                                Resources.close(deleteStatement);
142                        }
143                        
144                        logger.log(Level.DEBUG, insertSQL);
145                        PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL);
146                        
147                        try
148                        {
149                                int statementCount = 0;
150                                
151                                ResultSet resultSet = future.get();
152                                
153                                while (resultSet.next())
154                                {
155                                        int index = 0;
156                                        
157                                        for (String column: table.getColumns())
158                                        {
159                                                index += 1;
160                                                
161                                                int type = context.getDialect().getColumnType(table.getColumnProperties(column));
162                                                
163                                                Object object = context.getSynchronizationSupport().getObject(resultSet, index, type);
164                                                
165                                                if (resultSet.wasNull())
166                                                {
167                                                        insertStatement.setNull(index, type);
168                                                }
169                                                else
170                                                {
171                                                        insertStatement.setObject(index, object, type);
172                                                }
173                                        }
174                                        
175                                        insertStatement.addBatch();
176                                        statementCount += 1;
177                                        
178                                        if ((statementCount % this.maxBatchSize) == 0)
179                                        {
180                                                insertStatement.executeBatch();
181                                                insertStatement.clearBatch();
182                                        }
183                                        
184                                        insertStatement.clearParameters();
185                                }
186                                
187                                if ((statementCount % this.maxBatchSize) > 0)
188                                {
189                                        insertStatement.executeBatch();
190                                }
191                
192                                logger.log(Level.INFO, Messages.INSERT_COUNT.getMessage(), statementCount, table);
193                        }
194                        catch (ExecutionException e)
195                        {
196                                throw ExceptionType.SQL.<SQLException>getExceptionFactory().createException(e.getCause());
197                        }
198                        catch (InterruptedException e)
199                        {
200                                Thread.currentThread().interrupt();
201                                throw new SQLException(e);
202                        }
203                        finally
204                        {
205                                Resources.close(insertStatement);
206                        }
207                }
208                finally
209                {
210                        Resources.close(selectStatement);
211                }
212        }
213        
214        @Override
215        public <Z, D extends Database<Z>> void dropConstraints(SynchronizationContext<Z, D> context) throws SQLException
216        {
217                context.getSynchronizationSupport().dropForeignKeys();
218        }
219
220        @Override
221        public <Z, D extends Database<Z>> void restoreConstraints(SynchronizationContext<Z, D> context) throws SQLException
222        {
223                context.getSynchronizationSupport().restoreForeignKeys();
224        }
225
226        /**
227         * @return the fetchSize.
228         */
229        public int getFetchSize()
230        {
231                return this.fetchSize;
232        }
233
234        /**
235         * @param fetchSize the fetchSize to set.
236         */
237        public void setFetchSize(int fetchSize)
238        {
239                this.fetchSize = fetchSize;
240        }
241        
242        /**
243         * @return the maxBatchSize.
244         */
245        public int getMaxBatchSize()
246        {
247                return this.maxBatchSize;
248        }
249
250        /**
251         * @param maxBatchSize the maxBatchSize to set.
252         */
253        public void setMaxBatchSize(int maxBatchSize)
254        {
255                this.maxBatchSize = maxBatchSize;
256        }
257}