001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.util.concurrent;
019
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.LinkedList;
024import java.util.List;
025import java.util.concurrent.AbstractExecutorService;
026import java.util.concurrent.Callable;
027import java.util.concurrent.CancellationException;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.TimeoutException;
035import java.util.concurrent.atomic.AtomicReference;
036
037import net.sf.hajdbc.util.Reversed;
038
039/**
040 * Executor service that executes tasks in the caller thread.
041 * 
042 * @author Paul Ferraro
043 */
044public class SynchronousExecutor extends AbstractExecutorService
045{
046        private final ExecutorService executor;
047        private final boolean reverse;
048        
049        public SynchronousExecutor(ExecutorService executor)
050        {
051                this(executor, false);
052        }
053        
054        public SynchronousExecutor(ExecutorService executor, boolean reverse)
055        {
056                this.executor = executor;
057                this.reverse = reverse;
058        }
059        
060        /**
061         * @see java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
062         */
063        @Override
064        public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException
065        {
066                return this.executor.awaitTermination(time, unit);
067        }
068
069        /**
070         * @see java.util.concurrent.ExecutorService#isShutdown()
071         */
072        @Override
073        public boolean isShutdown()
074        {
075                return this.executor.isShutdown();
076        }
077
078        /**
079         * @see java.util.concurrent.ExecutorService#isTerminated()
080         */
081        @Override
082        public boolean isTerminated()
083        {
084                return this.executor.isTerminated();
085        }
086
087        /**
088         * @see java.util.concurrent.ExecutorService#shutdown()
089         */
090        @Override
091        public void shutdown()
092        {
093                this.executor.shutdown();
094        }
095
096        /**
097         * @see java.util.concurrent.ExecutorService#shutdownNow()
098         */
099        @Override
100        public List<Runnable> shutdownNow()
101        {
102                return this.executor.shutdownNow();
103        }
104
105        /**
106         * @see java.util.concurrent.Executor#execute(java.lang.Runnable)
107         */
108        @Override
109        public void execute(Runnable task)
110        {
111                task.run();
112        }
113
114        /**
115         * {@inheritDoc}
116         * @see java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
117         */
118        @Override
119        public Future<?> submit(Runnable task)
120        {
121                return this.submit(Executors.callable(task));
122        }
123
124        /**
125         * {@inheritDoc}
126         * @see java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, java.lang.Object)
127         */
128        @Override
129        public <T> Future<T> submit(Runnable task, T result)
130        {
131                return this.submit(Executors.callable(task, result));
132        }
133
134        /**
135         * {@inheritDoc}
136         * @see java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable)
137         */
138        @Override
139        public <T> Future<T> submit(Callable<T> task)
140        {
141                return new EagerFuture<T>(task);
142        }
143
144        /**
145         * Executes the specified tasks serially, until the first successful result, then the remainder using the executor with which this executor was created.
146         * {@inheritDoc}
147         * @see java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection)
148         */
149        @Override
150        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
151        {
152                return this.invokeAll(tasks, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
153        }
154
155        /**
156         * {@inheritDoc}
157         * @see java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
158         */
159        @Override
160        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
161        {
162                if (tasks.isEmpty()) return Collections.emptyList();
163
164                long end = (timeout == Long.MAX_VALUE) ? 0 : System.currentTimeMillis() + unit.toMillis(timeout);
165                boolean synchronous = !this.reverse;
166                int remaining = tasks.size();
167                LinkedList<Future<T>> futures = new LinkedList<Future<T>>();
168                
169                for (Callable<T> task: this.reverse ? new Reversed<Callable<T>>(new ArrayList<Callable<T>>(tasks)) : tasks)
170                {
171                        remaining -= 1;
172
173                        if (synchronous)
174                        {
175                                Future<T> future = this.reverse ? new LazyFuture<T>(task) : new EagerFuture<T>(task);
176                                
177                                if (this.reverse)
178                                {
179                                        futures.addFirst(future);
180                                }
181                                else
182                                {
183                                        futures.addLast(future);
184                                }
185                                
186                                // Execute remaining tasks in parallel, if there are multiple
187                                if (remaining > 1)
188                                {
189                                        synchronous = false;
190                                }
191                        }
192                        else
193                        {
194                                Future<T> future = this.executor.submit(task);
195                                if (this.reverse)
196                                {
197                                        futures.addFirst(future);
198                                }
199                                else
200                                {
201                                        futures.addLast(future);
202                                }
203                                
204                                if (this.reverse && (remaining == 1))
205                                {
206                                        synchronous = true;
207                                }
208                        }
209                }
210                
211                try
212                {
213                        // Wait until all tasks have finished
214                        for (Future<T> future: this.reverse ? new Reversed<Future<T>>(futures) : futures)
215                        {
216                                if (!future.isDone())
217                                {
218                                        try
219                                        {
220                                                if (end == 0)
221                                                {
222                                                        future.get();
223                                                }
224                                                else
225                                                {
226                                                        long now = System.currentTimeMillis();
227                                                        if (now < end)
228                                                        {
229                                                                future.get(end - now, TimeUnit.MILLISECONDS);
230                                                        }
231                                                }
232                                        }
233                                        catch (ExecutionException e)
234                                        {
235                                                // Ignore
236                                        }
237                                        catch (CancellationException e)
238                                        {
239                                                // Ignore
240                                        }
241                                }
242                        }
243                }
244                catch (TimeoutException e)
245                {
246                        // Ignore
247                }
248                finally
249                {
250                        // If interrupted, cancel any unfinished tasks
251                        for (Future<T> future: this.reverse ? new Reversed<Future<T>>(futures) : futures)
252                        {
253                                if (!future.isDone())
254                                {
255                                        future.cancel(true);
256                                }
257                        }
258                }
259                
260                return futures;
261        }
262        
263        /**
264         * {@inheritDoc}
265         * @see java.util.concurrent.AbstractExecutorService#invokeAny(java.util.Collection)
266         */
267        @Override
268        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
269        {
270                return this.getAny(this.invokeAll(tasks));
271        }
272
273        /**
274         * {@inheritDoc}
275         * @see java.util.concurrent.AbstractExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit)
276         */
277        @Override
278        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException
279        {
280                return this.getAny(this.invokeAll(tasks, timeout, unit));
281        }
282
283        private <T> T getAny(List<Future<T>> futures) throws InterruptedException, ExecutionException
284        {
285                if (futures.isEmpty()) throw new IllegalArgumentException();
286                
287                return futures.get(this.reverse ? (futures.size() - 1) : 0).get();
288        }
289        
290        /**
291         * Future that doesn't execute its task until get(...).
292         */
293        private static class LazyFuture<T> implements Future<T>
294        {
295                private enum State
296                {
297                        NEW, CANCELLED, DONE;
298                }
299                private final Callable<T> task;
300                private volatile T result;
301                private volatile ExecutionException exception;
302                private final AtomicReference<State> state = new AtomicReference<State>(State.NEW);
303                private final CountDownLatch latch = new CountDownLatch(1);
304                
305                LazyFuture(Callable<T> task)
306                {
307                        this.task = task;
308                }
309
310                /**
311                 * {@inheritDoc}
312                 * @see java.util.concurrent.Future#cancel(boolean)
313                 */
314                @Override
315                public boolean cancel(boolean interrupt)
316                {
317                        return this.state.compareAndSet(State.NEW, State.CANCELLED);
318                }
319
320                /**
321                 * {@inheritDoc}
322                 * @see java.util.concurrent.Future#get()
323                 */
324                @Override
325                public T get() throws ExecutionException, InterruptedException
326                {
327                        return this.get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
328                }
329
330                /**
331                 * {@inheritDoc}
332                 * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
333                 */
334                @Override
335                public T get(long time, TimeUnit unit) throws ExecutionException, InterruptedException
336                {
337                        if (this.state.compareAndSet(State.NEW, State.DONE))
338                        {
339                                try
340                                {
341                                        this.result = this.task.call();
342                                }
343                                catch (Throwable e)
344                                {
345                                        this.exception = new ExecutionException(e);
346                                }
347                                this.latch.countDown();
348                        }
349                        
350                        if (this.state.get() == State.CANCELLED)
351                        {
352                                throw new CancellationException();
353                        }
354                        
355                        if (time == Long.MAX_VALUE)
356                        {
357                                this.latch.await();
358                        }
359                        else
360                        {
361                                this.latch.await(time, unit);
362                        }
363                        
364                        if (this.exception != null)
365                        {
366                                throw this.exception;
367                        }
368                        
369                        return this.result;
370                }
371
372                /**
373                 * {@inheritDoc}
374                 * @see java.util.concurrent.Future#isCancelled()
375                 */
376                @Override
377                public boolean isCancelled()
378                {
379                        return this.state.get() == State.CANCELLED;
380                }
381
382                /**
383                 * {@inheritDoc}
384                 * @see java.util.concurrent.Future#isDone()
385                 */
386                @Override
387                public boolean isDone()
388                {
389                        return this.state.get() == State.DONE;
390                }
391        }
392
393        /**
394         * Light-weight future implementation that executes its task on construction.
395         * @param <T>
396         */
397        private static class EagerFuture<T> implements Future<T>
398        {
399                private T result;
400                private ExecutionException exception;
401                
402                EagerFuture(Callable<T> task)
403                {
404                        try
405                        {
406                                this.result = task.call();
407                        }
408                        catch (Throwable e)
409                        {
410                                this.exception = new ExecutionException(e);
411                        }
412                }
413                
414                @Override
415                public boolean cancel(boolean mayInterruptIfRunning)
416                {
417                        return false;
418                }
419
420                @Override
421                public T get() throws ExecutionException
422                {
423                        if (this.exception != null) throw this.exception;
424                        
425                        return this.result;
426                }
427
428                @Override
429                public T get(long time, TimeUnit unit) throws ExecutionException
430                {
431                        return this.get();
432                }
433
434                @Override
435                public boolean isCancelled()
436                {
437                        return false;
438                }
439
440                @Override
441                public boolean isDone()
442                {
443                        return true;
444                }
445        }
446}