001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.invocation;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import java.util.Set;
024import java.util.SortedMap;
025import java.util.TreeMap;
026import java.util.AbstractMap;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Future;
031
032import net.sf.hajdbc.Database;
033import net.sf.hajdbc.DatabaseCluster;
034import net.sf.hajdbc.ExceptionFactory;
035import net.sf.hajdbc.Messages;
036import net.sf.hajdbc.sql.ProxyFactory;
037
038/**
039 * @author Paul Ferraro
040 */
041public class AllResultsCollector implements InvokeOnManyInvocationStrategy.ResultsCollector
042{
043        public static interface ExecutorProvider
044        {
045                <Z, D extends Database<Z>> ExecutorService getExecutor(DatabaseCluster<Z, D> cluster);
046        }
047        
048        private final ExecutorProvider provider;
049        
050        public AllResultsCollector(ExecutorProvider provider)
051        {
052                this.provider = provider;
053        }
054        
055        /**
056         * {@inheritDoc}
057         */
058        @Override
059        public <Z, D extends Database<Z>, T, R, E extends Exception> Map.Entry<SortedMap<D, R>, SortedMap<D, E>> collectResults(ProxyFactory<Z, D, T, E> factory, final Invoker<Z, D, T, R, E> invoker)
060        {
061                DatabaseCluster<Z, D> cluster = factory.getDatabaseCluster();
062                ExceptionFactory<E> exceptionFactory = factory.getExceptionFactory();
063                Set<D> databaseSet = cluster.getBalancer();
064                
065                if (databaseSet.isEmpty())
066                {
067                        exceptionFactory.createException(Messages.NO_ACTIVE_DATABASES.getMessage(cluster));
068                }
069
070                int size = databaseSet.size();
071                
072                List<Invocation<Z, D, T, R, E>> invocationList = new ArrayList<Invocation<Z, D, T, R, E>>(size);
073                
074                for (D database: databaseSet)
075                {
076                        invocationList.add(new Invocation<Z, D, T, R, E>(invoker, database, factory.get(database)));
077                }
078                
079                try
080                {
081                        List<Future<R>> futureList = this.provider.getExecutor(cluster).invokeAll(invocationList);
082                        
083                        final SortedMap<D, R> resultMap = new TreeMap<D, R>();
084                        final SortedMap<D, E> exceptionMap = new TreeMap<D, E>();
085                        
086                        for (int i = 0; i < invocationList.size(); ++i)
087                        {
088                                D database = invocationList.get(i).getDatabase();
089                                
090                                try
091                                {
092                                        resultMap.put(database, futureList.get(i).get());
093                                }
094                                catch (ExecutionException e)
095                                {
096                                        // If this database was concurrently deactivated, just ignore the failure
097                                        if (databaseSet.contains(database))
098                                        {
099                                                exceptionMap.put(database, exceptionFactory.createException(e.getCause()));
100                                        }
101                                }
102                                catch (InterruptedException e)
103                                {
104                                        Thread.currentThread().interrupt();
105                                        
106                                        exceptionMap.put(database, exceptionFactory.createException(e));
107                                }
108                        }
109                
110                        return new AbstractMap.SimpleImmutableEntry<SortedMap<D, R>, SortedMap<D, E>>(resultMap, exceptionMap);
111                }
112                catch (InterruptedException e)
113                {
114                        throw new IllegalStateException(e);
115                }
116        }
117        
118        private static class Invocation<Z, D extends Database<Z>, T, R, E extends Exception> implements Callable<R>
119        {
120                private final Invoker<Z, D, T, R, E> invoker;
121                private final D database;
122                private final T object;
123                
124                Invocation(Invoker<Z, D, T, R, E> invoker, D database, T object)
125                {
126                        this.invoker = invoker;
127                        this.database = database;
128                        this.object = object;
129                }
130                
131                D getDatabase()
132                {
133                        return this.database;
134                }
135                
136                /**
137                 * {@inheritDoc}
138                 * @see java.util.concurrent.Callable#call()
139                 */
140                @Override
141                public R call() throws E
142                {
143                        return this.invoker.invoke(this.database, this.object);
144                }
145        }
146}