001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.invocation; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Map; 023import java.util.SortedMap; 024 025import net.sf.hajdbc.Database; 026import net.sf.hajdbc.DatabaseCluster; 027import net.sf.hajdbc.ExceptionFactory; 028import net.sf.hajdbc.Messages; 029import net.sf.hajdbc.dialect.Dialect; 030import net.sf.hajdbc.logging.Level; 031import net.sf.hajdbc.logging.Logger; 032import net.sf.hajdbc.logging.LoggerFactory; 033import net.sf.hajdbc.sql.ProxyFactory; 034import net.sf.hajdbc.state.StateManager; 035 036/** 037 * @author paul 038 * 039 */ 040public class InvokeOnManyInvocationStrategy implements InvocationStrategy 041{ 042 private static Logger logger = LoggerFactory.getLogger(InvokeOnManyInvocationStrategy.class); 043 044 public static interface ResultsCollector 045 { 046 <Z, D extends Database<Z>, T, R, E extends Exception> Map.Entry<SortedMap<D, R>, SortedMap<D, E>> collectResults(ProxyFactory<Z, D, T, E> map, Invoker<Z, D, T, R, E> invoker); 047 } 048 049 private final ResultsCollector collector; 050 051 public InvokeOnManyInvocationStrategy(ResultsCollector collector) 052 { 053 this.collector = collector; 054 } 055 056 /** 057 * {@inheritDoc} 058 */ 059 @Override 060 public <Z, D extends Database<Z>, T, R, E extends Exception> SortedMap<D, R> invoke(ProxyFactory<Z, D, T, E> factory, Invoker<Z, D, T, R, E> invoker) throws E 061 { 062 Map.Entry<SortedMap<D, R>, SortedMap<D, E>> results = this.collector.collectResults(factory, invoker); 063 ExceptionFactory<E> exceptionFactory = factory.getExceptionFactory(); 064 SortedMap<D, R> resultMap = results.getKey(); 065 SortedMap<D, E> exceptionMap = results.getValue(); 066 067 if (!exceptionMap.isEmpty()) 068 { 069 DatabaseCluster<Z, D> cluster = factory.getDatabaseCluster(); 070 Dialect dialect = cluster.getDialect(); 071 072 List<D> failedDatabases = new ArrayList<D>(exceptionMap.size()); 073 074 // Determine which exceptions are due to failures 075 for (Map.Entry<D, E> entry: exceptionMap.entrySet()) 076 { 077 if (exceptionFactory.indicatesFailure(entry.getValue(), dialect)) 078 { 079 failedDatabases.add(entry.getKey()); 080 } 081 } 082 083 StateManager stateManager = cluster.getStateManager(); 084 085 // Deactivate failed databases, unless all failed 086 if (!resultMap.isEmpty() || (failedDatabases.size() < exceptionMap.size())) 087 { 088 for (D failedDatabase: failedDatabases) 089 { 090 E exception = exceptionMap.remove(failedDatabase); 091 092 if (cluster.deactivate(failedDatabase, stateManager)) 093 { 094 logger.log(Level.ERROR, exception, Messages.DATABASE_DEACTIVATED.getMessage(), failedDatabase, cluster); 095 } 096 } 097 } 098 099 if (!exceptionMap.isEmpty()) 100 { 101 // If primary database threw exception 102 if (resultMap.isEmpty() || !exceptionMap.headMap(resultMap.firstKey()).isEmpty()) 103 { 104 D primaryDatabase = exceptionMap.firstKey(); 105 E primaryException = exceptionMap.get(primaryDatabase); 106 107 // Deactivate databases with non-matching exceptions 108 for (Map.Entry<D, E> entry: exceptionMap.tailMap(primaryDatabase).entrySet()) 109 { 110 E exception = entry.getValue(); 111 112 if (!exceptionFactory.equals(exception, primaryException)) 113 { 114 D database = entry.getKey(); 115 116 if (cluster.deactivate(database, stateManager)) 117 { 118 logger.log(Level.ERROR, exception, Messages.DATABASE_INCONSISTENT.getMessage(), database, cluster, primaryException, exception); 119 } 120 } 121 } 122 123 // Deactivate databases with results 124 for (Map.Entry<D, R> entry: resultMap.entrySet()) 125 { 126 D database = entry.getKey(); 127 128 if (cluster.deactivate(database, stateManager)) 129 { 130 logger.log(Level.ERROR, Messages.DATABASE_INCONSISTENT.getMessage(), database, cluster, primaryException, entry.getValue()); 131 } 132 } 133 134 throw primaryException; 135 } 136 } 137 // Else primary was successful 138 // Deactivate databases with exceptions 139 for (Map.Entry<D, E> entry: exceptionMap.entrySet()) 140 { 141 D database = entry.getKey(); 142 E exception = entry.getValue(); 143 144 if (cluster.deactivate(database, stateManager)) 145 { 146 logger.log(Level.ERROR, exception, Messages.DATABASE_DEACTIVATED.getMessage(), database, cluster); 147 } 148 } 149 } 150 151 return resultMap; 152 } 153}