001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.util.concurrent.cron;
019
020import java.util.Date;
021import java.util.concurrent.CancellationException;
022import java.util.concurrent.RejectedExecutionException;
023import java.util.concurrent.RejectedExecutionHandler;
024import java.util.concurrent.ScheduledThreadPoolExecutor;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.TimeUnit;
027
028/**
029 * Scheduled thread-pool executor implementation that leverages a Quartz CronExpression to calculate future execution times for scheduled tasks.
030 *
031 * @author  Paul Ferraro
032 * @since   1.1
033 */
034public class CronThreadPoolExecutor extends ScheduledThreadPoolExecutor implements CronExecutorService
035{
036        /**
037         * Constructs a new CronThreadPoolExecutor.
038         * @param corePoolSize
039         */
040        public CronThreadPoolExecutor(int corePoolSize)
041        {
042                super(corePoolSize);
043        }
044        
045        /**
046         * Constructs a new CronThreadPoolExecutor.
047         * @param corePoolSize
048         */
049        public CronThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
050        {
051                super(corePoolSize, threadFactory);
052        }
053
054        /**
055         * Constructs a new CronThreadPoolExecutor.
056         * @param corePoolSize
057         */
058        public CronThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
059        {
060                super(corePoolSize, handler);
061        }
062
063        /**
064         * Constructs a new CronThreadPoolExecutor.
065         * @param corePoolSize
066         * @param handler
067         */
068        public CronThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
069        {
070                super(corePoolSize, threadFactory, handler);
071        }
072
073        /**
074         * {@inheritDoc}
075         * @see net.sf.hajdbc.util.concurrent.cron.CronExecutorService#schedule(java.lang.Runnable, net.sf.hajdbc.util.concurrent.cron.CronExpression)
076         */
077        @Override
078        public void schedule(final Runnable task, final CronExpression expression)
079        {
080                if (task == null) throw new NullPointerException();
081                
082                this.setCorePoolSize(this.getCorePoolSize() + 1);
083                
084                Runnable scheduleTask = new Runnable()
085                {
086                        /**
087                         * @see java.lang.Runnable#run()
088                         */
089                        @Override
090                        public void run()
091                        {
092                                Date now = new Date();
093                                Date time = expression.getNextValidTimeAfter(now);
094                        
095                                try
096                                {
097                                        while (time != null)
098                                        {
099                                                CronThreadPoolExecutor.this.schedule(task, time.getTime() - now.getTime(), TimeUnit.MILLISECONDS);
100                                                
101                                                while (now.before(time))
102                                                {
103                                                        Thread.sleep(time.getTime() - now.getTime());
104                                                        
105                                                        now = new Date();
106                                                }
107                                                
108                                                time = expression.getNextValidTimeAfter(now);
109                                        }
110                                }
111                                catch (RejectedExecutionException e)
112                                {
113                                        // Occurs if executor was already shutdown when schedule() is called
114                                }
115                                catch (CancellationException e)
116                                {
117                                        // Occurs when scheduled, but not yet executed tasks are canceled during shutdown
118                                }
119                                catch (InterruptedException e)
120                                {
121                                        // Occurs when executing tasks are interrupted during shutdownNow()
122                                        Thread.currentThread().interrupt();
123                                }
124                        }
125                };
126                
127                this.execute(scheduleTask);
128        }
129}