001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.util.concurrent; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.LinkedList; 024import java.util.List; 025import java.util.concurrent.AbstractExecutorService; 026import java.util.concurrent.Callable; 027import java.util.concurrent.CancellationException; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import java.util.concurrent.atomic.AtomicReference; 036 037import net.sf.hajdbc.util.Reversed; 038 039/** 040 * Executor service that executes tasks in the caller thread. 041 * 042 * @author Paul Ferraro 043 */ 044public class SynchronousExecutor extends AbstractExecutorService 045{ 046 private final ExecutorService executor; 047 private final boolean reverse; 048 049 public SynchronousExecutor(ExecutorService executor) 050 { 051 this(executor, false); 052 } 053 054 public SynchronousExecutor(ExecutorService executor, boolean reverse) 055 { 056 this.executor = executor; 057 this.reverse = reverse; 058 } 059 060 /** 061 * @see java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 062 */ 063 @Override 064 public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException 065 { 066 return this.executor.awaitTermination(time, unit); 067 } 068 069 /** 070 * @see java.util.concurrent.ExecutorService#isShutdown() 071 */ 072 @Override 073 public boolean isShutdown() 074 { 075 return this.executor.isShutdown(); 076 } 077 078 /** 079 * @see java.util.concurrent.ExecutorService#isTerminated() 080 */ 081 @Override 082 public boolean isTerminated() 083 { 084 return this.executor.isTerminated(); 085 } 086 087 /** 088 * @see java.util.concurrent.ExecutorService#shutdown() 089 */ 090 @Override 091 public void shutdown() 092 { 093 this.executor.shutdown(); 094 } 095 096 /** 097 * @see java.util.concurrent.ExecutorService#shutdownNow() 098 */ 099 @Override 100 public List<Runnable> shutdownNow() 101 { 102 return this.executor.shutdownNow(); 103 } 104 105 /** 106 * @see java.util.concurrent.Executor#execute(java.lang.Runnable) 107 */ 108 @Override 109 public void execute(Runnable task) 110 { 111 task.run(); 112 } 113 114 /** 115 * {@inheritDoc} 116 * @see java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 117 */ 118 @Override 119 public Future<?> submit(Runnable task) 120 { 121 return this.submit(Executors.callable(task)); 122 } 123 124 /** 125 * {@inheritDoc} 126 * @see java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, java.lang.Object) 127 */ 128 @Override 129 public <T> Future<T> submit(Runnable task, T result) 130 { 131 return this.submit(Executors.callable(task, result)); 132 } 133 134 /** 135 * {@inheritDoc} 136 * @see java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable) 137 */ 138 @Override 139 public <T> Future<T> submit(Callable<T> task) 140 { 141 return new EagerFuture<T>(task); 142 } 143 144 /** 145 * Executes the specified tasks serially, until the first successful result, then the remainder using the executor with which this executor was created. 146 * {@inheritDoc} 147 * @see java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection) 148 */ 149 @Override 150 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException 151 { 152 return this.invokeAll(tasks, Long.MAX_VALUE, TimeUnit.MILLISECONDS); 153 } 154 155 /** 156 * {@inheritDoc} 157 * @see java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) 158 */ 159 @Override 160 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException 161 { 162 if (tasks.isEmpty()) return Collections.emptyList(); 163 164 long end = (timeout == Long.MAX_VALUE) ? 0 : System.currentTimeMillis() + unit.toMillis(timeout); 165 boolean synchronous = !this.reverse; 166 int remaining = tasks.size(); 167 LinkedList<Future<T>> futures = new LinkedList<Future<T>>(); 168 169 for (Callable<T> task: this.reverse ? new Reversed<Callable<T>>(new ArrayList<Callable<T>>(tasks)) : tasks) 170 { 171 remaining -= 1; 172 173 if (synchronous) 174 { 175 Future<T> future = this.reverse ? new LazyFuture<T>(task) : new EagerFuture<T>(task); 176 177 if (this.reverse) 178 { 179 futures.addFirst(future); 180 } 181 else 182 { 183 futures.addLast(future); 184 } 185 186 // Execute remaining tasks in parallel, if there are multiple 187 if (remaining > 1) 188 { 189 synchronous = false; 190 } 191 } 192 else 193 { 194 Future<T> future = this.executor.submit(task); 195 if (this.reverse) 196 { 197 futures.addFirst(future); 198 } 199 else 200 { 201 futures.addLast(future); 202 } 203 204 if (this.reverse && (remaining == 1)) 205 { 206 synchronous = true; 207 } 208 } 209 } 210 211 try 212 { 213 // Wait until all tasks have finished 214 for (Future<T> future: this.reverse ? new Reversed<Future<T>>(futures) : futures) 215 { 216 if (!future.isDone()) 217 { 218 try 219 { 220 if (end == 0) 221 { 222 future.get(); 223 } 224 else 225 { 226 long now = System.currentTimeMillis(); 227 if (now < end) 228 { 229 future.get(end - now, TimeUnit.MILLISECONDS); 230 } 231 } 232 } 233 catch (ExecutionException e) 234 { 235 // Ignore 236 } 237 catch (CancellationException e) 238 { 239 // Ignore 240 } 241 } 242 } 243 } 244 catch (TimeoutException e) 245 { 246 // Ignore 247 } 248 finally 249 { 250 // If interrupted, cancel any unfinished tasks 251 for (Future<T> future: this.reverse ? new Reversed<Future<T>>(futures) : futures) 252 { 253 if (!future.isDone()) 254 { 255 future.cancel(true); 256 } 257 } 258 } 259 260 return futures; 261 } 262 263 /** 264 * {@inheritDoc} 265 * @see java.util.concurrent.AbstractExecutorService#invokeAny(java.util.Collection) 266 */ 267 @Override 268 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException 269 { 270 return this.getAny(this.invokeAll(tasks)); 271 } 272 273 /** 274 * {@inheritDoc} 275 * @see java.util.concurrent.AbstractExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit) 276 */ 277 @Override 278 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException 279 { 280 return this.getAny(this.invokeAll(tasks, timeout, unit)); 281 } 282 283 private <T> T getAny(List<Future<T>> futures) throws InterruptedException, ExecutionException 284 { 285 if (futures.isEmpty()) throw new IllegalArgumentException(); 286 287 return futures.get(this.reverse ? (futures.size() - 1) : 0).get(); 288 } 289 290 /** 291 * Future that doesn't execute its task until get(...). 292 */ 293 private static class LazyFuture<T> implements Future<T> 294 { 295 private enum State 296 { 297 NEW, CANCELLED, DONE; 298 } 299 private final Callable<T> task; 300 private volatile T result; 301 private volatile ExecutionException exception; 302 private final AtomicReference<State> state = new AtomicReference<State>(State.NEW); 303 private final CountDownLatch latch = new CountDownLatch(1); 304 305 LazyFuture(Callable<T> task) 306 { 307 this.task = task; 308 } 309 310 /** 311 * {@inheritDoc} 312 * @see java.util.concurrent.Future#cancel(boolean) 313 */ 314 @Override 315 public boolean cancel(boolean interrupt) 316 { 317 return this.state.compareAndSet(State.NEW, State.CANCELLED); 318 } 319 320 /** 321 * {@inheritDoc} 322 * @see java.util.concurrent.Future#get() 323 */ 324 @Override 325 public T get() throws ExecutionException, InterruptedException 326 { 327 return this.get(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 328 } 329 330 /** 331 * {@inheritDoc} 332 * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 333 */ 334 @Override 335 public T get(long time, TimeUnit unit) throws ExecutionException, InterruptedException 336 { 337 if (this.state.compareAndSet(State.NEW, State.DONE)) 338 { 339 try 340 { 341 this.result = this.task.call(); 342 } 343 catch (Throwable e) 344 { 345 this.exception = new ExecutionException(e); 346 } 347 this.latch.countDown(); 348 } 349 350 if (this.state.get() == State.CANCELLED) 351 { 352 throw new CancellationException(); 353 } 354 355 if (time == Long.MAX_VALUE) 356 { 357 this.latch.await(); 358 } 359 else 360 { 361 this.latch.await(time, unit); 362 } 363 364 if (this.exception != null) 365 { 366 throw this.exception; 367 } 368 369 return this.result; 370 } 371 372 /** 373 * {@inheritDoc} 374 * @see java.util.concurrent.Future#isCancelled() 375 */ 376 @Override 377 public boolean isCancelled() 378 { 379 return this.state.get() == State.CANCELLED; 380 } 381 382 /** 383 * {@inheritDoc} 384 * @see java.util.concurrent.Future#isDone() 385 */ 386 @Override 387 public boolean isDone() 388 { 389 return this.state.get() == State.DONE; 390 } 391 } 392 393 /** 394 * Light-weight future implementation that executes its task on construction. 395 * @param <T> 396 */ 397 private static class EagerFuture<T> implements Future<T> 398 { 399 private T result; 400 private ExecutionException exception; 401 402 EagerFuture(Callable<T> task) 403 { 404 try 405 { 406 this.result = task.call(); 407 } 408 catch (Throwable e) 409 { 410 this.exception = new ExecutionException(e); 411 } 412 } 413 414 @Override 415 public boolean cancel(boolean mayInterruptIfRunning) 416 { 417 return false; 418 } 419 420 @Override 421 public T get() throws ExecutionException 422 { 423 if (this.exception != null) throw this.exception; 424 425 return this.result; 426 } 427 428 @Override 429 public T get(long time, TimeUnit unit) throws ExecutionException 430 { 431 return this.get(); 432 } 433 434 @Override 435 public boolean isCancelled() 436 { 437 return false; 438 } 439 440 @Override 441 public boolean isDone() 442 { 443 return true; 444 } 445 } 446}