001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.invocation;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import java.util.SortedMap;
024
025import net.sf.hajdbc.Database;
026import net.sf.hajdbc.DatabaseCluster;
027import net.sf.hajdbc.ExceptionFactory;
028import net.sf.hajdbc.Messages;
029import net.sf.hajdbc.dialect.Dialect;
030import net.sf.hajdbc.logging.Level;
031import net.sf.hajdbc.logging.Logger;
032import net.sf.hajdbc.logging.LoggerFactory;
033import net.sf.hajdbc.sql.ProxyFactory;
034import net.sf.hajdbc.state.StateManager;
035
036/**
037 * @author paul
038 *
039 */
040public class InvokeOnManyInvocationStrategy implements InvocationStrategy
041{
042        private static Logger logger = LoggerFactory.getLogger(InvokeOnManyInvocationStrategy.class);
043        
044        public static interface ResultsCollector
045        {
046                <Z, D extends Database<Z>, T, R, E extends Exception> Map.Entry<SortedMap<D, R>, SortedMap<D, E>> collectResults(ProxyFactory<Z, D, T, E> map, Invoker<Z, D, T, R, E> invoker);
047        }
048
049        private final ResultsCollector collector;
050        
051        public InvokeOnManyInvocationStrategy(ResultsCollector collector)
052        {
053                this.collector = collector;
054        }
055
056        /**
057         * {@inheritDoc}
058         */
059        @Override
060        public <Z, D extends Database<Z>, T, R, E extends Exception> SortedMap<D, R> invoke(ProxyFactory<Z, D, T, E> factory, Invoker<Z, D, T, R, E> invoker) throws E
061        {
062                Map.Entry<SortedMap<D, R>, SortedMap<D, E>> results = this.collector.collectResults(factory, invoker);
063                ExceptionFactory<E> exceptionFactory = factory.getExceptionFactory();
064                SortedMap<D, R> resultMap = results.getKey();
065                SortedMap<D, E> exceptionMap = results.getValue();
066                
067                if (!exceptionMap.isEmpty())
068                {
069                        DatabaseCluster<Z, D> cluster = factory.getDatabaseCluster();
070                        Dialect dialect = cluster.getDialect();
071                        
072                        List<D> failedDatabases = new ArrayList<D>(exceptionMap.size());
073                        
074                        // Determine which exceptions are due to failures
075                        for (Map.Entry<D, E> entry: exceptionMap.entrySet())
076                        {
077                                if (exceptionFactory.indicatesFailure(entry.getValue(), dialect))
078                                {
079                                        failedDatabases.add(entry.getKey());
080                                }
081                        }
082
083                        StateManager stateManager = cluster.getStateManager();
084                        
085                        // Deactivate failed databases, unless all failed
086                        if (!resultMap.isEmpty() || (failedDatabases.size() < exceptionMap.size()))
087                        {
088                                for (D failedDatabase: failedDatabases)
089                                {
090                                        E exception = exceptionMap.remove(failedDatabase);
091                                        
092                                        if (cluster.deactivate(failedDatabase, stateManager))
093                                        {
094                                                logger.log(Level.ERROR, exception, Messages.DATABASE_DEACTIVATED.getMessage(), failedDatabase, cluster);
095                                        }
096                                }
097                        }
098                        
099                        if (!exceptionMap.isEmpty())
100                        {
101                                // If primary database threw exception
102                                if (resultMap.isEmpty() || !exceptionMap.headMap(resultMap.firstKey()).isEmpty())
103                                {
104                                        D primaryDatabase = exceptionMap.firstKey();
105                                        E primaryException = exceptionMap.get(primaryDatabase);
106                                        
107                                        // Deactivate databases with non-matching exceptions
108                                        for (Map.Entry<D, E> entry: exceptionMap.tailMap(primaryDatabase).entrySet())
109                                        {
110                                                E exception = entry.getValue();
111                                                
112                                                if (!exceptionFactory.equals(exception, primaryException))
113                                                {
114                                                        D database = entry.getKey();
115                                                        
116                                                        if (cluster.deactivate(database, stateManager))
117                                                        {
118                                                                logger.log(Level.ERROR, exception, Messages.DATABASE_INCONSISTENT.getMessage(), database, cluster, primaryException, exception);
119                                                        }
120                                                }
121                                        }
122        
123                                        // Deactivate databases with results
124                                        for (Map.Entry<D, R> entry: resultMap.entrySet())
125                                        {
126                                                D database = entry.getKey();
127                                                
128                                                if (cluster.deactivate(database, stateManager))
129                                                {
130                                                        logger.log(Level.ERROR, Messages.DATABASE_INCONSISTENT.getMessage(), database, cluster, primaryException, entry.getValue());
131                                                }
132                                        }
133                                        
134                                        throw primaryException;
135                                }
136                        }
137                        // Else primary was successful
138                        // Deactivate databases with exceptions
139                        for (Map.Entry<D, E> entry: exceptionMap.entrySet())
140                        {
141                                D database = entry.getKey();
142                                E exception = entry.getValue();
143                                
144                                if (cluster.deactivate(database, stateManager))
145                                {
146                                        logger.log(Level.ERROR, exception, Messages.DATABASE_DEACTIVATED.getMessage(), database, cluster);
147                                }
148                        }
149                }
150                
151                return resultMap;
152        }
153}