001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.sync;
019
020import java.sql.Connection;
021import java.sql.SQLException;
022import java.util.AbstractMap;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028
029import net.sf.hajdbc.Database;
030import net.sf.hajdbc.DatabaseCluster;
031import net.sf.hajdbc.DatabaseProperties;
032import net.sf.hajdbc.balancer.Balancer;
033import net.sf.hajdbc.cache.DatabaseMetaDataCache;
034import net.sf.hajdbc.codec.Decoder;
035import net.sf.hajdbc.dialect.Dialect;
036import net.sf.hajdbc.logging.Level;
037import net.sf.hajdbc.logging.Logger;
038import net.sf.hajdbc.logging.LoggerFactory;
039import net.sf.hajdbc.util.Resources;
040
041/**
042 * @author Paul Ferraro
043 * @param <D> Driver or DataSource
044 */
045public class SynchronizationContextImpl<Z, D extends Database<Z>> implements SynchronizationContext<Z, D>
046{
047        private static final Logger logger = LoggerFactory.getLogger(SynchronizationContextImpl.class);
048        
049        private final Set<D> activeDatabaseSet;
050        private final D sourceDatabase;
051        private final D targetDatabase;
052        private final DatabaseCluster<Z, D> cluster;
053        private final DatabaseProperties sourceDatabaseProperties;
054        private final DatabaseProperties targetDatabaseProperties;
055        private final Map<D, Map.Entry<Connection, Boolean>> connectionMap = new HashMap<D, Map.Entry<Connection, Boolean>>();
056        private final ExecutorService executor;
057        
058        /**
059         * @param cluster
060         * @param database
061         * @throws SQLException
062         */
063        public SynchronizationContextImpl(DatabaseCluster<Z, D> cluster, D database) throws SQLException
064        {
065                this.cluster = cluster;
066                
067                Balancer<Z, D> balancer = cluster.getBalancer();
068                
069                this.sourceDatabase = balancer.next();
070                
071                this.activeDatabaseSet = balancer;
072                this.targetDatabase = database;
073                this.executor = Executors.newFixedThreadPool(this.activeDatabaseSet.size(), this.cluster.getThreadFactory());
074                
075                DatabaseMetaDataCache<Z, D> cache = cluster.getDatabaseMetaDataCache();
076                
077                this.targetDatabaseProperties = cache.getDatabaseProperties(this.targetDatabase, this.getConnection(this.targetDatabase));
078                this.sourceDatabaseProperties = cache.getDatabaseProperties(this.sourceDatabase, this.getConnection(this.sourceDatabase));
079        }
080        
081        /**
082         * @see net.sf.hajdbc.sync.SynchronizationContext#getConnection(net.sf.hajdbc.Database)
083         */
084        @Override
085        public Connection getConnection(D database) throws SQLException
086        {
087                Map.Entry<Connection, Boolean> entry = this.connectionMap.get(database);
088                
089                if (entry == null)
090                {
091                        Connection connection = database.connect(database.getConnectionSource(), database.decodePassword(this.cluster.getDecoder()));
092                        entry = new AbstractMap.SimpleImmutableEntry<Connection, Boolean>(connection, connection.getAutoCommit());
093                        
094                        this.connectionMap.put(database, entry);
095                }
096                
097                return entry.getKey();
098        }
099        
100        /**
101         * @see net.sf.hajdbc.sync.SynchronizationContext#getSourceDatabase()
102         */
103        @Override
104        public D getSourceDatabase()
105        {
106                return this.sourceDatabase;
107        }
108        
109        /**
110         * @see net.sf.hajdbc.sync.SynchronizationContext#getTargetDatabase()
111         */
112        @Override
113        public D getTargetDatabase()
114        {
115                return this.targetDatabase;
116        }
117        
118        /**
119         * @see net.sf.hajdbc.sync.SynchronizationContext#getActiveDatabaseSet()
120         */
121        @Override
122        public Set<D> getActiveDatabaseSet()
123        {
124                return this.activeDatabaseSet;
125        }
126        
127        /**
128         * @see net.sf.hajdbc.sync.SynchronizationContext#getSourceDatabaseProperties()
129         */
130        @Override
131        public DatabaseProperties getSourceDatabaseProperties()
132        {
133                return this.sourceDatabaseProperties;
134        }
135
136        /**
137         * @see net.sf.hajdbc.sync.SynchronizationContext#getTargetDatabaseProperties()
138         */
139        @Override
140        public DatabaseProperties getTargetDatabaseProperties()
141        {
142                return this.targetDatabaseProperties;
143        }
144
145        /**
146         * @see net.sf.hajdbc.sync.SynchronizationContext#getDialect()
147         */
148        @Override
149        public Dialect getDialect()
150        {
151                return this.cluster.getDialect();
152        }
153        
154        /**
155         * {@inheritDoc}
156         * @see net.sf.hajdbc.sync.SynchronizationContext#getDecoder()
157         */
158        @Override
159        public Decoder getDecoder()
160        {
161                return this.cluster.getDecoder();
162        }
163
164        /**
165         * @see net.sf.hajdbc.sync.SynchronizationContext#getExecutor()
166         */
167        @Override
168        public ExecutorService getExecutor()
169        {
170                return this.executor;
171        }
172
173        /**
174         * {@inheritDoc}
175         * @see net.sf.hajdbc.sync.SynchronizationContext#getSynchronizationSupport()
176         */
177        @Override
178        public SynchronizationSupport getSynchronizationSupport()
179        {
180                return new SynchronizationSupportImpl<Z, D>(this);
181        }
182
183        /**
184         * @see net.sf.hajdbc.sync.SynchronizationContext#close()
185         */
186        @Override
187        public void close()
188        {
189                for (Map.Entry<Connection, Boolean> entry: this.connectionMap.values())
190                {
191                        Connection connection = entry.getKey();
192                        
193                        try
194                        {
195                                connection.setAutoCommit(entry.getValue());
196                        }
197                        catch (SQLException e)
198                        {
199                                logger.log(Level.WARN, e);
200                        }
201                        finally
202                        {
203                                Resources.close(connection);
204                        }
205                }
206                
207                this.executor.shutdown();
208        }
209}