001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.util.concurrent.cron; 019 020import java.util.Date; 021import java.util.concurrent.CancellationException; 022import java.util.concurrent.RejectedExecutionException; 023import java.util.concurrent.RejectedExecutionHandler; 024import java.util.concurrent.ScheduledThreadPoolExecutor; 025import java.util.concurrent.ThreadFactory; 026import java.util.concurrent.TimeUnit; 027 028/** 029 * Scheduled thread-pool executor implementation that leverages a Quartz CronExpression to calculate future execution times for scheduled tasks. 030 * 031 * @author Paul Ferraro 032 * @since 1.1 033 */ 034public class CronThreadPoolExecutor extends ScheduledThreadPoolExecutor implements CronExecutorService 035{ 036 /** 037 * Constructs a new CronThreadPoolExecutor. 038 * @param corePoolSize 039 */ 040 public CronThreadPoolExecutor(int corePoolSize) 041 { 042 super(corePoolSize); 043 } 044 045 /** 046 * Constructs a new CronThreadPoolExecutor. 047 * @param corePoolSize 048 */ 049 public CronThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) 050 { 051 super(corePoolSize, threadFactory); 052 } 053 054 /** 055 * Constructs a new CronThreadPoolExecutor. 056 * @param corePoolSize 057 */ 058 public CronThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) 059 { 060 super(corePoolSize, handler); 061 } 062 063 /** 064 * Constructs a new CronThreadPoolExecutor. 065 * @param corePoolSize 066 * @param handler 067 */ 068 public CronThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) 069 { 070 super(corePoolSize, threadFactory, handler); 071 } 072 073 /** 074 * {@inheritDoc} 075 * @see net.sf.hajdbc.util.concurrent.cron.CronExecutorService#schedule(java.lang.Runnable, net.sf.hajdbc.util.concurrent.cron.CronExpression) 076 */ 077 @Override 078 public void schedule(final Runnable task, final CronExpression expression) 079 { 080 if (task == null) throw new NullPointerException(); 081 082 this.setCorePoolSize(this.getCorePoolSize() + 1); 083 084 Runnable scheduleTask = new Runnable() 085 { 086 /** 087 * @see java.lang.Runnable#run() 088 */ 089 @Override 090 public void run() 091 { 092 Date now = new Date(); 093 Date time = expression.getNextValidTimeAfter(now); 094 095 try 096 { 097 while (time != null) 098 { 099 CronThreadPoolExecutor.this.schedule(task, time.getTime() - now.getTime(), TimeUnit.MILLISECONDS); 100 101 while (now.before(time)) 102 { 103 Thread.sleep(time.getTime() - now.getTime()); 104 105 now = new Date(); 106 } 107 108 time = expression.getNextValidTimeAfter(now); 109 } 110 } 111 catch (RejectedExecutionException e) 112 { 113 // Occurs if executor was already shutdown when schedule() is called 114 } 115 catch (CancellationException e) 116 { 117 // Occurs when scheduled, but not yet executed tasks are canceled during shutdown 118 } 119 catch (InterruptedException e) 120 { 121 // Occurs when executing tasks are interrupted during shutdownNow() 122 Thread.currentThread().interrupt(); 123 } 124 } 125 }; 126 127 this.execute(scheduleTask); 128 } 129}