001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.sync;
019
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.ResultSet;
023import java.sql.SQLException;
024import java.sql.Statement;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.concurrent.Callable;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.Future;
032import java.util.regex.Pattern;
033
034import net.sf.hajdbc.Database;
035import net.sf.hajdbc.DatabaseCluster;
036import net.sf.hajdbc.ExceptionType;
037import net.sf.hajdbc.Messages;
038import net.sf.hajdbc.SynchronizationStrategy;
039import net.sf.hajdbc.TableProperties;
040import net.sf.hajdbc.UniqueConstraint;
041import net.sf.hajdbc.logging.Level;
042import net.sf.hajdbc.logging.Logger;
043import net.sf.hajdbc.logging.LoggerFactory;
044import net.sf.hajdbc.util.Objects;
045import net.sf.hajdbc.util.Resources;
046import net.sf.hajdbc.util.Strings;
047
048/**
049 * Database-independent synchronization strategy that only updates differences between two databases.
050 * This strategy is best used when there are <em>few</em> differences between the active database and the inactive database (i.e. barely out of sync).
051 * The following algorithm is used:
052 * <ol>
053 *  <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
054 *  <li>For each database table:
055 *   <ol>
056 *    <li>Drop the unique constraints on the table (to avoid integrity constraint violations)</li>
057 *    <li>Find the primary key(s) of the table</li>
058 *    <li>Query all rows in the inactive database table, sorting by the primary key(s)</li>
059 *    <li>Query all rows on the active database table</li>
060 *    <li>For each row in table:
061 *     <ol>
062 *      <li>If primary key of the rows are the same, determine whether or not row needs to be updated</li>
063 *      <li>Otherwise, determine whether row should be deleted, or a new row is to be inserted</li>
064 *     </ol>
065 *    </li>
066 *    <li>Re-create the unique constraints on the table (to avoid integrity constraint violations)</li>
067 *   </ol>
068 *  </li>
069 *  <li>Re-create the foreign keys on the inactive database</li>
070 *  <li>Synchronize sequences</li>
071 * </ol>
072 * @author  Paul Ferraro
073 */
074public class DifferentialSynchronizationStrategy implements SynchronizationStrategy, TableSynchronizationStrategy
075{
076        private static final long serialVersionUID = -2785092229503649831L;
077
078        static Logger logger = LoggerFactory.getLogger(DifferentialSynchronizationStrategy.class);
079
080        private final SynchronizationStrategy strategy = new PerTableSynchronizationStrategy(this);
081        private int fetchSize = 0;
082        private int maxBatchSize = 100;
083        private Pattern versionPattern = null;
084        
085        @Override
086        public String getId()
087        {
088                return "diff";
089        }
090
091        @Override
092        public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context) throws SQLException
093        {
094                this.strategy.synchronize(context);
095        }
096
097        /**
098         * {@inheritDoc}
099         * @see net.sf.hajdbc.SynchronizationStrategy#init(net.sf.hajdbc.DatabaseCluster)
100         */
101        @Override
102        public <Z, D extends Database<Z>> void init(DatabaseCluster<Z, D> cluster)
103        {
104                this.strategy.init(cluster);
105        }
106
107        /**
108         * {@inheritDoc}
109         * @see net.sf.hajdbc.SynchronizationStrategy#destroy(net.sf.hajdbc.DatabaseCluster)
110         */
111        @Override
112        public <Z, D extends Database<Z>> void destroy(DatabaseCluster<Z, D> cluster)
113        {
114                this.strategy.destroy(cluster);
115        }
116
117        @Override
118        public <Z, D extends Database<Z>> void dropConstraints(SynchronizationContext<Z, D> context) throws SQLException
119        {
120                SynchronizationSupport support = context.getSynchronizationSupport();
121                support.dropForeignKeys();
122                support.dropUniqueConstraints();
123        }
124
125        @Override
126        public <Z, D extends Database<Z>> void restoreConstraints(SynchronizationContext<Z, D> context) throws SQLException
127        {
128                SynchronizationSupport support = context.getSynchronizationSupport();
129                support.restoreUniqueConstraints();
130                support.restoreForeignKeys();
131        }
132
133        /**
134         * {@inheritDoc}
135         * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.sync.SynchronizationContext)
136         */
137        @Override
138        public <Z, D extends Database<Z>> void synchronize(SynchronizationContext<Z, D> context, TableProperties table) throws SQLException
139        {
140                String tableName = table.getName().getDMLName();
141                
142                UniqueConstraint primaryKey = table.getPrimaryKey();
143                
144                if (primaryKey == null)
145                {
146                        throw new SQLException(Messages.PRIMARY_KEY_REQUIRED.getMessage(this.getClass().getName(), tableName));
147                }
148                
149                List<String> primaryKeyColumns = primaryKey.getColumnList();
150                
151                Collection<String> columns = table.getColumns();
152                
153                List<String> nonPrimaryKeyColumns = new ArrayList<String>(columns.size());
154                List<String> versionColumns = new ArrayList<String>(columns.size());
155                
156                for (String column: columns)
157                {
158                        if (!primaryKeyColumns.contains(column))
159                        {
160                                // Try to find a version column
161                                if ((this.versionPattern != null) && this.versionPattern.matcher(column).matches())
162                                {
163                                        versionColumns.add(column);
164                                }
165                                
166                                nonPrimaryKeyColumns.add(column);
167                        }
168                }
169                
170                // List of columns for select statement - starting with primary key
171                List<String> allColumns = new ArrayList<String>(columns.size());
172                allColumns.addAll(primaryKeyColumns);
173                allColumns.addAll(nonPrimaryKeyColumns);
174
175                List<String> selectColumns = allColumns;
176                if (!versionColumns.isEmpty())
177                {
178                        selectColumns = new ArrayList<String>(primaryKeyColumns.size() + versionColumns.size());
179                        selectColumns.addAll(primaryKeyColumns);
180                        selectColumns.addAll(versionColumns);
181                }
182                
183                // Retrieve table rows in primary key order
184                final String selectSQL = String.format("SELECT %s FROM %s ORDER BY %s", Strings.join(selectColumns, Strings.PADDED_COMMA), tableName, Strings.join(primaryKeyColumns, Strings.PADDED_COMMA)); //$NON-NLS-1$
185                String primaryKeyWhereClause = Strings.join(new StringBuilder(), primaryKeyColumns, " = ? AND ").append(" = ?").toString(); //$NON-NLS-1$
186                String selectAllSQL = !versionColumns.isEmpty() ? String.format("SELECT %s FROM %s WHERE %s", Strings.join(nonPrimaryKeyColumns, Strings.PADDED_COMMA), tableName, primaryKeyWhereClause) : null;
187                String deleteSQL = String.format("DELETE FROM %s WHERE %s", tableName, primaryKeyWhereClause);
188                String insertSQL = String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, Strings.join(allColumns, Strings.PADDED_COMMA), Strings.join(Collections.nCopies(allColumns.size(), Strings.QUESTION), Strings.PADDED_COMMA)); //$NON-NLS-1$
189                String updateSQL = !nonPrimaryKeyColumns.isEmpty() ? String.format("UPDATE %s SET %s = ? WHERE %s", tableName, Strings.join(nonPrimaryKeyColumns, " = ?, "), primaryKeyWhereClause) : null;
190                
191                Connection targetConnection = context.getConnection(context.getTargetDatabase());
192                final Statement targetStatement = targetConnection.createStatement();
193
194                try
195                {
196                        targetStatement.setFetchSize(this.fetchSize);
197                        
198                        Callable<ResultSet> callable = new Callable<ResultSet>()
199                        {
200                                @Override
201                                public ResultSet call() throws SQLException
202                                {
203                                        logger.log(Level.DEBUG, selectSQL);
204                                        return targetStatement.executeQuery(selectSQL);
205                                }
206                        };
207        
208                        Future<ResultSet> future = context.getExecutor().submit(callable);
209                        
210                        Connection sourceConnection = context.getConnection(context.getSourceDatabase());
211                        Statement sourceStatement = sourceConnection.createStatement();
212                        
213                        try
214                        {
215                                sourceStatement.setFetchSize(this.fetchSize);
216                                
217                                ResultSet sourceResultSet = sourceStatement.executeQuery(selectSQL);
218                
219                                ResultSet targetResultSet = future.get();
220                                
221                                PreparedStatement selectAllStatement = null;
222                                if (!versionColumns.isEmpty())
223                                {
224                                        logger.log(Level.DEBUG, selectAllSQL);
225                                        selectAllStatement = targetConnection.prepareStatement(selectAllSQL);
226                                }
227                                
228                                try
229                                {
230                                        logger.log(Level.DEBUG, deleteSQL);
231                                        PreparedStatement deleteStatement = targetConnection.prepareStatement(deleteSQL);
232                                        
233                                        try
234                                        {
235                                                logger.log(Level.DEBUG, insertSQL);
236                                                PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL);
237                                                
238                                                try
239                                                {
240                                                        PreparedStatement updateStatement = null;
241                                                        
242                                                        if (!nonPrimaryKeyColumns.isEmpty())
243                                                        {
244                                                                logger.log(Level.DEBUG, updateSQL);
245                                                                updateStatement = targetConnection.prepareStatement(updateSQL);
246                                                        }
247                                                        
248                                                        try
249                                                        {
250                                                                boolean hasMoreSourceResults = sourceResultSet.next();
251                                                                boolean hasMoreTargetResults = targetResultSet.next();
252                                                                
253                                                                int insertCount = 0;
254                                                                int updateCount = 0;
255                                                                int deleteCount = 0;
256                                                                
257                                                                while (hasMoreSourceResults || hasMoreTargetResults)
258                                                                {
259                                                                        int compare = 0;
260                                                                        
261                                                                        if (!hasMoreSourceResults)
262                                                                        {
263                                                                                compare = 1;
264                                                                        }
265                                                                        else if (!hasMoreTargetResults)
266                                                                        {
267                                                                                compare = -1;
268                                                                        }
269                                                                        else
270                                                                        {
271                                                                                for (int i = 1; i <= primaryKeyColumns.size(); ++i)
272                                                                                {
273                                                                                        Object sourceObject = sourceResultSet.getObject(i);
274                                                                                        Object targetObject = targetResultSet.getObject(i);
275                                                                                        
276                                                                                        // We assume that the primary keys column types are Comparable
277                                                                                        compare = compare(sourceObject, targetObject);
278                                                                                        
279                                                                                        if (compare != 0)
280                                                                                        {
281                                                                                                break;
282                                                                                        }
283                                                                                }
284                                                                        }
285                                                                        
286                                                                        if (compare > 0)
287                                                                        {
288                                                                                deleteStatement.clearParameters();
289                                                                                
290                                                                                for (int i = 1; i <= primaryKeyColumns.size(); ++i)
291                                                                                {
292                                                                                        int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
293                                                                                        
294                                                                                        deleteStatement.setObject(i, targetResultSet.getObject(i), type);
295                                                                                }
296                                                                                
297                                                                                deleteStatement.addBatch();
298                                                                                
299                                                                                deleteCount += 1;
300                                                                                
301                                                                                if ((deleteCount % this.maxBatchSize) == 0)
302                                                                                {
303                                                                                        deleteStatement.executeBatch();
304                                                                                        deleteStatement.clearBatch();
305                                                                                }
306                                                                        }
307                                                                        else if (compare < 0)
308                                                                        {
309                                                                                insertStatement.clearParameters();
310                                                                                
311                                                                                for (int i = 1; i <= primaryKeyColumns.size(); ++i)
312                                                                                {
313                                                                                        int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
314                                                                                        
315                                                                                        insertStatement.setObject(i, sourceResultSet.getObject(i), type);
316                                                                                }
317                                                                                
318                                                                                if (versionColumns.isEmpty())
319                                                                                {
320                                                                                        for (int i = primaryKeyColumns.size() + 1; i <= allColumns.size(); ++i)
321                                                                                        {
322                                                                                                int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
323                                                
324                                                                                                Object object = context.getSynchronizationSupport().getObject(sourceResultSet, i, type);
325                                                                                                
326                                                                                                if (sourceResultSet.wasNull())
327                                                                                                {
328                                                                                                        insertStatement.setNull(i, type);
329                                                                                                }
330                                                                                                else
331                                                                                                {
332                                                                                                        insertStatement.setObject(i, object, type);
333                                                                                                }
334                                                                                        }
335                                                                                }
336                                                                                else
337                                                                                {
338                                                                                        if (selectAllStatement != null)
339                                                                                        {
340                                                                                                selectAllStatement.clearParameters();
341                                                                                                
342                                                                                                for (int i = 1; i <= primaryKeyColumns.size(); ++i)
343                                                                                                {
344                                                                                                        int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
345                                                                                                        
346                                                                                                        selectAllStatement.setObject(i, sourceResultSet.getObject(i), type);
347                                                                                                }
348                                                                                                
349                                                                                                ResultSet selectAllResultSet = selectAllStatement.executeQuery();
350                                                
351                                                                                                for (int i = primaryKeyColumns.size() + 1; i <= allColumns.size(); ++i)
352                                                                                                {
353                                                                                                        int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
354                                                                                                        
355                                                                                                        Object object = context.getSynchronizationSupport().getObject(selectAllResultSet, i - primaryKeyColumns.size(), type);
356                                                                                                        
357                                                                                                        if (selectAllResultSet.wasNull())
358                                                                                                        {
359                                                                                                                insertStatement.setNull(i, type);
360                                                                                                        }
361                                                                                                        else
362                                                                                                        {
363                                                                                                                insertStatement.setObject(i, object, type);
364                                                                                                        }
365                                                                                                }
366                                                                                        }
367                                                                                }
368                                                                                        
369                                                                                insertStatement.addBatch();
370                                                                                
371                                                                                insertCount += 1;
372                                                                                
373                                                                                if ((insertCount % this.maxBatchSize) == 0)
374                                                                                {
375                                                                                        insertStatement.executeBatch();
376                                                                                        insertStatement.clearBatch();
377                                                                                }
378                                                                        }
379                                                                        else if (updateStatement != null) // if (compare == 0)
380                                                                        {
381                                                                                updateStatement.clearParameters();
382                                                                                
383                                                                                boolean updated = false;
384                                                                                
385                                                                                for (int i = primaryKeyColumns.size() + 1; i <= selectColumns.size(); ++i)
386                                                                                {
387                                                                                        int type = context.getDialect().getColumnType(table.getColumnProperties(selectColumns.get(i - 1)));
388                                                                                        
389                                                                                        Object sourceObject = context.getSynchronizationSupport().getObject(sourceResultSet, i, type);
390                                                                                        Object targetObject = context.getSynchronizationSupport().getObject(targetResultSet, i, type);
391                                                                                        
392                                                                                        int index = i - primaryKeyColumns.size();
393                                                                                        
394                                                                                        if (sourceResultSet.wasNull())
395                                                                                        {
396                                                                                                updateStatement.setNull(index, type);
397                                                                                                
398                                                                                                updated |= !targetResultSet.wasNull();
399                                                                                        }
400                                                                                        else
401                                                                                        {
402                                                                                                updateStatement.setObject(index, sourceObject, type);
403                                                                                                
404                                                                                                updated |= targetResultSet.wasNull();
405                                                                                                updated |= !Objects.equals(sourceObject, targetObject);
406                                                                                        }
407                                                                                }
408                                                                                
409                                                                                if (updated)
410                                                                                {
411                                                                                        if (selectAllStatement != null)
412                                                                                        {
413                                                                                                selectAllStatement.clearParameters();
414                                                                                                
415                                                                                                for (int i = 1; i <= primaryKeyColumns.size(); ++i)
416                                                                                                {
417                                                                                                        int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
418                                                                                                        
419                                                                                                        selectAllStatement.setObject(i, sourceResultSet.getObject(i), type);
420                                                                                                }
421                                                                                                
422                                                                                                ResultSet selectAllResultSet = selectAllStatement.executeQuery();
423                                                
424                                                                                                for (int i = primaryKeyColumns.size() + 1; i <= allColumns.size(); ++i)
425                                                                                                {
426                                                                                                        int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
427                                                                                                        
428                                                                                                        int index = i - primaryKeyColumns.size();
429                                                                                                        
430                                                                                                        Object object = context.getSynchronizationSupport().getObject(selectAllResultSet, index, type);
431                                                                                                        
432                                                                                                        if (selectAllResultSet.wasNull())
433                                                                                                        {
434                                                                                                                updateStatement.setNull(index, type);
435                                                                                                        }
436                                                                                                        else
437                                                                                                        {
438                                                                                                                updateStatement.setObject(index, object, type);
439                                                                                                        }
440                                                                                                }
441                                                                                        }
442                                                                                        
443                                                                                        for (int i = 1; i <= primaryKeyColumns.size(); ++i)
444                                                                                        {
445                                                                                                int type = context.getDialect().getColumnType(table.getColumnProperties(allColumns.get(i - 1)));
446                                                                                                
447                                                                                                updateStatement.setObject(i + nonPrimaryKeyColumns.size(), targetResultSet.getObject(i), type);
448                                                                                        }
449                                                                                        
450                                                                                        updateStatement.addBatch();
451                                                                                        
452                                                                                        updateCount += 1;
453                                                                                        
454                                                                                        if ((updateCount % this.maxBatchSize) == 0)
455                                                                                        {
456                                                                                                updateStatement.executeBatch();
457                                                                                                updateStatement.clearBatch();
458                                                                                        }
459                                                                                }
460                                                                        }
461                                                                        
462                                                                        if (hasMoreSourceResults && (compare <= 0))
463                                                                        {
464                                                                                hasMoreSourceResults = sourceResultSet.next();
465                                                                        }
466                                                                        
467                                                                        if (hasMoreTargetResults && (compare >= 0))
468                                                                        {
469                                                                                hasMoreTargetResults = targetResultSet.next();
470                                                                        }
471                                                                }
472                                                                
473                                                                if ((deleteCount % this.maxBatchSize) > 0)
474                                                                {
475                                                                        deleteStatement.executeBatch();
476                                                                }
477                                                                
478                                                                if ((insertCount % this.maxBatchSize) > 0)
479                                                                {
480                                                                        insertStatement.executeBatch();
481                                                                }
482                                                                
483                                                                if (updateStatement != null)
484                                                                {
485                                                                        if ((updateCount % this.maxBatchSize) > 0)
486                                                                        {
487                                                                                updateStatement.executeBatch();
488                                                                        }
489                                                                }
490                                                                
491                                                                logger.log(Level.INFO, Messages.INSERT_COUNT.getMessage(), insertCount, tableName);
492                                                                logger.log(Level.INFO, Messages.UPDATE_COUNT.getMessage(), updateCount, tableName);
493                                                                logger.log(Level.INFO, Messages.DELETE_COUNT.getMessage(), deleteCount, tableName);
494                                                        }
495                                                        finally
496                                                        {
497                                                                if (updateStatement != null)
498                                                                {
499                                                                        Resources.close(updateStatement);
500                                                                }
501                                                        }
502                                                }
503                                                finally
504                                                {
505                                                        Resources.close(insertStatement);
506                                                }
507                                        }
508                                        finally
509                                        {
510                                                Resources.close(deleteStatement);
511                                        }
512                                }
513                                finally
514                                {
515                                        if (selectAllStatement != null)
516                                        {
517                                                Resources.close(selectAllStatement);
518                                        }
519                                }
520                        }
521                        catch (InterruptedException e)
522                        {
523                                Thread.currentThread().interrupt();
524                                throw new SQLException(e);
525                        }
526                        catch (ExecutionException e)
527                        {
528                                throw ExceptionType.SQL.<SQLException>getExceptionFactory().createException(e.getCause());
529                        }
530                        finally
531                        {
532                                Resources.close(sourceStatement);
533                        }
534                }
535                finally
536                {
537                        Resources.close(targetStatement);
538                }
539        }
540        
541        private static int compare(Object object1, Object object2)
542        {
543                @SuppressWarnings("unchecked")
544                Comparable<Object> comparable = (Comparable<Object>) object1;
545                
546                return comparable.compareTo(object2);
547        }
548
549        /**
550         * @return the fetchSize.
551         */
552        public int getFetchSize()
553        {
554                return this.fetchSize;
555        }
556
557        /**
558         * @param fetchSize the fetchSize to set.
559         */
560        public void setFetchSize(int fetchSize)
561        {
562                this.fetchSize = fetchSize;
563        }
564
565        /**
566         * @return Returns the maxBatchSize.
567         */
568        public int getMaxBatchSize()
569        {
570                return this.maxBatchSize;
571        }
572
573        /**
574         * @param maxBatchSize The maxBatchSize to set.
575         */
576        public void setMaxBatchSize(int maxBatchSize)
577        {
578                this.maxBatchSize = maxBatchSize;
579        }
580
581        /**
582         * @return the versionPattern
583         */
584        public String getVersionPattern()
585        {
586                return (this.versionPattern != null) ? this.versionPattern.pattern() : null;
587        }
588
589        /**
590         * @param versionPattern the versionPattern to set
591         */
592        public void setVersionPattern(String versionPattern)
593        {
594                this.versionPattern = (versionPattern != null) ? Pattern.compile(versionPattern, Pattern.CASE_INSENSITIVE) : null;
595        }
596}