001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.sync;
019
020import java.sql.Connection;
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.sql.Statement;
024import java.sql.Types;
025import java.text.MessageFormat;
026import java.util.Collection;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.Callable;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Future;
034
035import net.sf.hajdbc.Database;
036import net.sf.hajdbc.ExceptionType;
037import net.sf.hajdbc.ForeignKeyConstraint;
038import net.sf.hajdbc.IdentityColumnSupport;
039import net.sf.hajdbc.Messages;
040import net.sf.hajdbc.SequenceProperties;
041import net.sf.hajdbc.SequenceSupport;
042import net.sf.hajdbc.TableProperties;
043import net.sf.hajdbc.UniqueConstraint;
044import net.sf.hajdbc.dialect.Dialect;
045import net.sf.hajdbc.logging.Level;
046import net.sf.hajdbc.logging.Logger;
047import net.sf.hajdbc.logging.LoggerFactory;
048import net.sf.hajdbc.util.Resources;
049import net.sf.hajdbc.util.Strings;
050
051/**
052 * Default {@link SynchronizationSupport} implementation.
053 * @author Paul Ferraro
054 */
055public class SynchronizationSupportImpl<Z, D extends Database<Z>> implements SynchronizationSupport
056{
057        private final Logger logger = LoggerFactory.getLogger(this.getClass());
058        
059        private final SynchronizationContext<Z, D> context;
060        
061        public SynchronizationSupportImpl(SynchronizationContext<Z, D> context)
062        {
063                this.context = context;
064        }
065        
066        /**
067         * {@inheritDoc}
068         * @see net.sf.hajdbc.sync.SynchronizationSupport#dropForeignKeys()
069         */
070        @Override
071        public void dropForeignKeys() throws SQLException
072        {
073                Dialect dialect = this.context.getDialect();
074                
075                Connection connection = this.context.getConnection(this.context.getTargetDatabase());
076                boolean autoCommit = connection.getAutoCommit();
077                try
078                {
079                        connection.setAutoCommit(true);
080                        
081                        Statement statement = connection.createStatement();
082                        try
083                        {
084                                for (TableProperties table: this.context.getTargetDatabaseProperties().getTables())
085                                {
086                                        for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints())
087                                        {
088                                                String sql = dialect.getDropForeignKeyConstraintSQL(constraint);
089                                                
090                                                this.logger.log(Level.DEBUG, sql);
091                                                
092                                                statement.addBatch(sql);
093                                        }
094                                }
095                                statement.executeBatch();
096                        }
097                        finally
098                        {
099                                Resources.close(statement);
100                        }
101                }
102                finally
103                {
104                        connection.setAutoCommit(autoCommit);
105                }
106        }
107        
108        /**
109         * {@inheritDoc}
110         * @see net.sf.hajdbc.sync.SynchronizationSupport#restoreForeignKeys()
111         */
112        @Override
113        public void restoreForeignKeys() throws SQLException
114        {
115                Dialect dialect = this.context.getDialect();
116                
117                Connection connection = this.context.getConnection(this.context.getTargetDatabase());
118                boolean autoCommit = connection.getAutoCommit();
119                try
120                {
121                        connection.setAutoCommit(true);
122                        
123                        Statement statement = connection.createStatement();
124                        try
125                        {
126                                for (TableProperties table: this.context.getSourceDatabaseProperties().getTables())
127                                {
128                                        for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints())
129                                        {
130                                                String sql = dialect.getCreateForeignKeyConstraintSQL(constraint);
131                                                
132                                                this.logger.log(Level.DEBUG, sql);
133                                                
134                                                statement.addBatch(sql);
135                                        }
136                                }
137                                
138                                statement.executeBatch();
139                        }
140                        finally
141                        {
142                                Resources.close(statement);
143                        }
144                }
145                finally
146                {
147                        connection.setAutoCommit(autoCommit);
148                }
149        }
150        
151        /**
152         * {@inheritDoc}
153         * @see net.sf.hajdbc.sync.SynchronizationSupport#synchronizeSequences()
154         */
155        @Override
156        public void synchronizeSequences() throws SQLException
157        {
158                SequenceSupport support = this.context.getDialect().getSequenceSupport();
159                
160                if (support != null)
161                {
162                        Collection<SequenceProperties> sequences = this.context.getSourceDatabaseProperties().getSequences();
163
164                        if (!sequences.isEmpty())
165                        {
166                                D sourceDatabase = this.context.getSourceDatabase();
167                                
168                                Set<D> databases = this.context.getActiveDatabaseSet();
169
170                                ExecutorService executor = this.context.getExecutor();
171                                
172                                Map<SequenceProperties, Long> sequenceMap = new HashMap<SequenceProperties, Long>();
173                                Map<D, Future<Long>> futureMap = new HashMap<D, Future<Long>>();
174
175                                for (SequenceProperties sequence: sequences)
176                                {
177                                        final String sql = support.getNextSequenceValueSQL(sequence);
178                                        
179                                        this.logger.log(Level.DEBUG, sql);
180
181                                        for (final D database: databases)
182                                        {
183                                                final SynchronizationContext<Z, D> context = this.context;
184                                                
185                                                Callable<Long> task = new Callable<Long>()
186                                                {
187                                                        @Override
188                                                        public Long call() throws SQLException
189                                                        {
190                                                                Statement statement = context.getConnection(database).createStatement();
191                                                                try
192                                                                {
193                                                                        ResultSet resultSet = statement.executeQuery(sql);
194                                                                        
195                                                                        resultSet.next();
196                                                                        
197                                                                        return resultSet.getLong(1);
198                                                                }
199                                                                finally
200                                                                {
201                                                                        Resources.close(statement);
202                                                                }
203                                                        }
204                                                };
205                                                
206                                                futureMap.put(database, executor.submit(task));
207                                        }
208
209                                        try
210                                        {
211                                                Long sourceValue = futureMap.get(sourceDatabase).get();
212                                                
213                                                sequenceMap.put(sequence, sourceValue);
214                                                
215                                                for (D database: databases)
216                                                {
217                                                        if (!database.equals(sourceDatabase))
218                                                        {
219                                                                Long value = futureMap.get(database).get();
220                                                                
221                                                                if (!value.equals(sourceValue))
222                                                                {
223                                                                        throw new SQLException(Messages.SEQUENCE_OUT_OF_SYNC.getMessage(sequence, database, value, sourceDatabase, sourceValue));
224                                                                }
225                                                        }
226                                                }
227                                        }
228                                        catch (InterruptedException e)
229                                        {
230                                                throw new SQLException(e);
231                                        }
232                                        catch (ExecutionException e)
233                                        {
234                                                throw ExceptionType.SQL.<SQLException>getExceptionFactory().createException(e.getCause());
235                                        }
236                                }
237                                
238                                Connection targetConnection = this.context.getConnection(this.context.getTargetDatabase());
239                                Statement targetStatement = targetConnection.createStatement();
240                                try
241                                {
242                                        for (SequenceProperties sequence: sequences)
243                                        {
244                                                String sql = support.getAlterSequenceSQL(sequence, sequenceMap.get(sequence) + 1);
245                                                
246                                                this.logger.log(Level.DEBUG, sql);
247                                                
248                                                targetStatement.addBatch(sql);
249                                        }
250                                        
251                                        targetStatement.executeBatch();
252                                }
253                                finally
254                                {
255                                        Resources.close(targetStatement);
256                                }
257                        }
258                }
259        }
260        
261        /**
262         * {@inheritDoc}
263         * @see net.sf.hajdbc.sync.SynchronizationSupport#synchronizeIdentityColumns()
264         */
265        @Override
266        public void synchronizeIdentityColumns() throws SQLException
267        {
268                IdentityColumnSupport support = this.context.getDialect().getIdentityColumnSupport();
269                
270                if (support != null)
271                {
272                        Statement sourceStatement = this.context.getConnection(this.context.getSourceDatabase()).createStatement();
273                        try
274                        {
275                                Statement targetStatement = this.context.getConnection(this.context.getTargetDatabase()).createStatement();
276                                try
277                                {
278                                        for (TableProperties table: this.context.getSourceDatabaseProperties().getTables())
279                                        {
280                                                Collection<String> columns = table.getIdentityColumns();
281                                                
282                                                if (!columns.isEmpty())
283                                                {
284                                                        String selectSQL = MessageFormat.format("SELECT max({0}) FROM {1}", Strings.join(columns, "), max("), table.getName()); //$NON-NLS-1$ //$NON-NLS-2$
285                                                        
286                                                        this.logger.log(Level.DEBUG, selectSQL);
287                                                        
288                                                        Map<String, Long> map = new HashMap<String, Long>();
289                                                        ResultSet resultSet = sourceStatement.executeQuery(selectSQL);
290                                                        try
291                                                        {
292                                                                if (resultSet.next())
293                                                                {
294                                                                        int i = 0;
295                                                                        
296                                                                        for (String column: columns)
297                                                                        {
298                                                                                map.put(column, resultSet.getLong(++i));
299                                                                        }
300                                                                }
301                                                        }
302                                                        finally
303                                                        {
304                                                                Resources.close(resultSet);
305                                                        }
306                                                        
307                                                        if (!map.isEmpty())
308                                                        {
309                                                                for (Map.Entry<String, Long> mapEntry: map.entrySet())
310                                                                {
311                                                                        String alterSQL = support.getAlterIdentityColumnSQL(table, table.getColumnProperties(mapEntry.getKey()), mapEntry.getValue() + 1);
312                                                                        
313                                                                        if (alterSQL != null)
314                                                                        {
315                                                                                this.logger.log(Level.DEBUG, alterSQL);
316                                                                                
317                                                                                targetStatement.addBatch(alterSQL);
318                                                                        }
319                                                                }
320                                                                
321                                                                targetStatement.executeBatch();
322                                                        }
323                                                }
324                                        }
325                                }
326                                finally
327                                {
328                                        Resources.close(targetStatement);
329                                }
330                        }
331                        finally
332                        {
333                                Resources.close(sourceStatement);
334                        }
335                }
336        }
337
338        /**
339         * {@inheritDoc}
340         * @see net.sf.hajdbc.sync.SynchronizationSupport#dropUniqueConstraints()
341         */
342        @Override
343        public void dropUniqueConstraints() throws SQLException
344        {
345                Dialect dialect = this.context.getDialect();
346
347                Connection connection = this.context.getConnection(this.context.getTargetDatabase());
348                boolean autoCommit = connection.getAutoCommit();
349                try
350                {
351                        connection.setAutoCommit(true);
352                        Statement statement = connection.createStatement();
353                        try
354                        {
355                                for (TableProperties table: this.context.getTargetDatabaseProperties().getTables())
356                                {
357                                        for (UniqueConstraint constraint: table.getUniqueConstraints())
358                                        {
359                                                String sql = dialect.getDropUniqueConstraintSQL(constraint);
360                                                
361                                                this.logger.log(Level.DEBUG, sql);
362                                                
363                                                statement.addBatch(sql);
364                                        }
365                                }
366                                
367                                statement.executeBatch();
368                        }
369                        finally
370                        {
371                                Resources.close(statement);
372                        }
373                }
374                finally
375                {
376                        connection.setAutoCommit(autoCommit);
377                }
378        }
379        
380        /**
381         * {@inheritDoc}
382         * @see net.sf.hajdbc.sync.SynchronizationSupport#restoreUniqueConstraints()
383         */
384        @Override
385        public void restoreUniqueConstraints() throws SQLException
386        {
387                Dialect dialect = this.context.getDialect();
388
389                Connection connection = this.context.getConnection(this.context.getTargetDatabase());
390                boolean autoCommit = connection.getAutoCommit();
391                try
392                {
393                        connection.setAutoCommit(true);
394                        
395                        Statement statement = connection.createStatement();
396                        try
397                        {
398                                for (TableProperties table: this.context.getSourceDatabaseProperties().getTables())
399                                {
400                                        // Drop unique constraints on the current table
401                                        for (UniqueConstraint constraint: table.getUniqueConstraints())
402                                        {
403                                                String sql = dialect.getCreateUniqueConstraintSQL(constraint);
404                                                
405                                                this.logger.log(Level.DEBUG, sql);
406                                                
407                                                statement.addBatch(sql);
408                                        }
409                                }
410                                
411                                statement.executeBatch();
412                        }
413                        finally
414                        {
415                                Resources.close(statement);
416                        }
417                }
418                finally
419                {
420                        connection.setAutoCommit(autoCommit);
421                }
422        }
423
424        /**
425         * {@inheritDoc}
426         * @see net.sf.hajdbc.sync.SynchronizationSupport#rollback(java.sql.Connection)
427         */
428        @Override
429        public void rollback(Connection connection)
430        {
431                try
432                {
433                        connection.rollback();
434                }
435                catch (SQLException e)
436                {
437                        this.logger.log(Level.WARN, e);
438                }
439        }
440
441        /**
442         * {@inheritDoc}
443         * @see net.sf.hajdbc.sync.SynchronizationSupport#getObject(java.sql.ResultSet, int, int)
444         */
445        @Override
446        public Object getObject(ResultSet resultSet, int index, int type) throws SQLException
447        {
448                switch (type)
449                {
450                        case Types.BLOB:
451                        {
452                                return resultSet.getBlob(index);
453                        }
454                        case Types.CLOB:
455                        {
456                                return resultSet.getClob(index);
457                        }
458                        default:
459                        {
460                                return resultSet.getObject(index);
461                        }
462                }
463        }
464}