001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.invocation; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Map; 023import java.util.Set; 024import java.util.SortedMap; 025import java.util.TreeMap; 026import java.util.AbstractMap; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Future; 031 032import net.sf.hajdbc.Database; 033import net.sf.hajdbc.DatabaseCluster; 034import net.sf.hajdbc.ExceptionFactory; 035import net.sf.hajdbc.Messages; 036import net.sf.hajdbc.sql.ProxyFactory; 037 038/** 039 * @author Paul Ferraro 040 */ 041public class AllResultsCollector implements InvokeOnManyInvocationStrategy.ResultsCollector 042{ 043 public static interface ExecutorProvider 044 { 045 <Z, D extends Database<Z>> ExecutorService getExecutor(DatabaseCluster<Z, D> cluster); 046 } 047 048 private final ExecutorProvider provider; 049 050 public AllResultsCollector(ExecutorProvider provider) 051 { 052 this.provider = provider; 053 } 054 055 /** 056 * {@inheritDoc} 057 */ 058 @Override 059 public <Z, D extends Database<Z>, T, R, E extends Exception> Map.Entry<SortedMap<D, R>, SortedMap<D, E>> collectResults(ProxyFactory<Z, D, T, E> factory, final Invoker<Z, D, T, R, E> invoker) 060 { 061 DatabaseCluster<Z, D> cluster = factory.getDatabaseCluster(); 062 ExceptionFactory<E> exceptionFactory = factory.getExceptionFactory(); 063 Set<D> databaseSet = cluster.getBalancer(); 064 065 if (databaseSet.isEmpty()) 066 { 067 exceptionFactory.createException(Messages.NO_ACTIVE_DATABASES.getMessage(cluster)); 068 } 069 070 int size = databaseSet.size(); 071 072 List<Invocation<Z, D, T, R, E>> invocationList = new ArrayList<Invocation<Z, D, T, R, E>>(size); 073 074 for (D database: databaseSet) 075 { 076 invocationList.add(new Invocation<Z, D, T, R, E>(invoker, database, factory.get(database))); 077 } 078 079 try 080 { 081 List<Future<R>> futureList = this.provider.getExecutor(cluster).invokeAll(invocationList); 082 083 final SortedMap<D, R> resultMap = new TreeMap<D, R>(); 084 final SortedMap<D, E> exceptionMap = new TreeMap<D, E>(); 085 086 for (int i = 0; i < invocationList.size(); ++i) 087 { 088 D database = invocationList.get(i).getDatabase(); 089 090 try 091 { 092 resultMap.put(database, futureList.get(i).get()); 093 } 094 catch (ExecutionException e) 095 { 096 // If this database was concurrently deactivated, just ignore the failure 097 if (databaseSet.contains(database)) 098 { 099 exceptionMap.put(database, exceptionFactory.createException(e.getCause())); 100 } 101 } 102 catch (InterruptedException e) 103 { 104 Thread.currentThread().interrupt(); 105 106 exceptionMap.put(database, exceptionFactory.createException(e)); 107 } 108 } 109 110 return new AbstractMap.SimpleImmutableEntry<SortedMap<D, R>, SortedMap<D, E>>(resultMap, exceptionMap); 111 } 112 catch (InterruptedException e) 113 { 114 throw new IllegalStateException(e); 115 } 116 } 117 118 private static class Invocation<Z, D extends Database<Z>, T, R, E extends Exception> implements Callable<R> 119 { 120 private final Invoker<Z, D, T, R, E> invoker; 121 private final D database; 122 private final T object; 123 124 Invocation(Invoker<Z, D, T, R, E> invoker, D database, T object) 125 { 126 this.invoker = invoker; 127 this.database = database; 128 this.object = object; 129 } 130 131 D getDatabase() 132 { 133 return this.database; 134 } 135 136 /** 137 * {@inheritDoc} 138 * @see java.util.concurrent.Callable#call() 139 */ 140 @Override 141 public R call() throws E 142 { 143 return this.invoker.invoke(this.database, this.object); 144 } 145 } 146}