001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.state.sqlite;
019
020import java.io.File;
021import java.text.MessageFormat;
022import java.util.Collections;
023import java.util.EnumMap;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.TreeSet;
028import java.util.concurrent.locks.Lock;
029import java.util.concurrent.locks.ReadWriteLock;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031
032import net.sf.hajdbc.Database;
033import net.sf.hajdbc.DatabaseCluster;
034import net.sf.hajdbc.ExceptionType;
035import net.sf.hajdbc.durability.Durability;
036import net.sf.hajdbc.durability.DurabilityListener;
037import net.sf.hajdbc.durability.InvocationEvent;
038import net.sf.hajdbc.durability.InvocationEventImpl;
039import net.sf.hajdbc.durability.InvokerEvent;
040import net.sf.hajdbc.durability.InvokerEventImpl;
041import net.sf.hajdbc.durability.InvokerResult;
042import net.sf.hajdbc.logging.Level;
043import net.sf.hajdbc.logging.Logger;
044import net.sf.hajdbc.logging.LoggerFactory;
045import net.sf.hajdbc.pool.Pool;
046import net.sf.hajdbc.pool.PoolFactory;
047import net.sf.hajdbc.state.DatabaseEvent;
048import net.sf.hajdbc.state.DurabilityListenerAdapter;
049import net.sf.hajdbc.state.SerializedDurabilityListener;
050import net.sf.hajdbc.state.StateManager;
051import net.sf.hajdbc.tx.TransactionIdentifierFactory;
052import net.sf.hajdbc.util.Objects;
053
054import org.tmatesoft.sqljet.core.SqlJetException;
055import org.tmatesoft.sqljet.core.SqlJetTransactionMode;
056import org.tmatesoft.sqljet.core.schema.ISqlJetSchema;
057import org.tmatesoft.sqljet.core.table.ISqlJetCursor;
058import org.tmatesoft.sqljet.core.table.ISqlJetTable;
059import org.tmatesoft.sqljet.core.table.SqlJetDb;
060
061/**
062 * @author Paul Ferraro
063 */
064public class SQLiteStateManager<Z, D extends Database<Z>> implements StateManager, SerializedDurabilityListener
065{
066        // SQLite has minimal concurrency support - and only supports a single writer per-database
067        // So, mitigate this by using separate databases per table.
068        private enum DB { STATE, INVOCATION }
069        
070        private static final Logger logger = LoggerFactory.getLogger(SQLiteStateManager.class);
071        private static final String STATE_TABLE = "cluster_state";
072        private static final String DATABASE_COLUMN = "database_id";
073
074        private static final String INVOCATION_TABLE = "cluster_invocation";
075        private static final String INVOKER_TABLE = "cluster_invoker";
076        private static final String INVOKER_TABLE_INDEX = "cluster_invoker_index";
077        private static final String TRANSACTION_COLUMN = "tx_id";
078        private static final String PHASE_COLUMN = "phase_id";
079        private static final String EXCEPTION_COLUMN = "exception_id";
080        private static final String RESULT_COLUMN = "result";
081
082        static final String CREATE_INVOCATION_SQL = MessageFormat.format("CREATE TABLE {0} ({1} BLOB NOT NULL, {2} INTEGER NOT NULL, {3} INTEGER NOT NULL, PRIMARY KEY ({1}, {2}))", INVOCATION_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, EXCEPTION_COLUMN);
083        static final String CREATE_INVOKER_SQL = MessageFormat.format("CREATE TABLE {0} ({1} BLOB NOT NULL, {2} INTEGER NOT NULL, {3} TEXT NOT NULL, {4} BLOB, PRIMARY KEY ({1}, {2}, {3}))", INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, DATABASE_COLUMN, RESULT_COLUMN);
084        static final String CREATE_INVOKER_INDEX = MessageFormat.format("CREATE INDEX {0} ON {1} ({2}, {3})", INVOKER_TABLE_INDEX, INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN);
085        static final String CREATE_STATE_SQL = MessageFormat.format("CREATE TABLE {0} ({1} TEXT NOT NULL, PRIMARY KEY ({1}))", STATE_TABLE, DATABASE_COLUMN);
086
087        private final DatabaseCluster<Z, D> cluster;
088        private final DurabilityListener listener;
089        private final File file;
090        private final PoolFactory poolFactory;
091        
092        // Control concurrency ourselves, instead of relying of sqljet lock polling.
093        private final Map<DB, ReadWriteLock> locks = new EnumMap<DB, ReadWriteLock>(DB.class);
094        private final Map<DB, Pool<SqlJetDb, SqlJetException>> pools = new EnumMap<DB, Pool<SqlJetDb, SqlJetException>>(DB.class);
095
096        public SQLiteStateManager(DatabaseCluster<Z, D> cluster, File file, PoolFactory poolFactory)
097        {
098                this.cluster = cluster;
099                this.file = file;
100                this.poolFactory = poolFactory;
101                this.listener = new DurabilityListenerAdapter(this, cluster.getTransactionIdentifierFactory());
102        }
103        
104        @Override
105        public boolean isEnabled()
106        {
107                return true;
108        }
109
110        /**
111         * {@inheritDoc}
112         * @see net.sf.hajdbc.DatabaseClusterListener#activated(net.sf.hajdbc.state.DatabaseEvent)
113         */
114        @Override
115        public void activated(final DatabaseEvent event)
116        {
117                Transaction transaction = new Transaction()
118                {
119                        @Override
120                        public void execute(SqlJetDb db) throws SqlJetException
121                        {
122                                db.getTable(STATE_TABLE).insert(event.getSource());
123                        }
124                };
125                try
126                {
127                        this.execute(transaction, DB.STATE);
128                }
129                catch (SqlJetException e)
130                {
131                        logger.log(Level.ERROR, e);
132                }
133        }
134
135        /**
136         * {@inheritDoc}
137         * @see net.sf.hajdbc.DatabaseClusterListener#deactivated(net.sf.hajdbc.state.DatabaseEvent)
138         */
139        @Override
140        public void deactivated(final DatabaseEvent event)
141        {
142                Transaction transaction = new Transaction()
143                {
144                        @Override
145                        public void execute(SqlJetDb db) throws SqlJetException
146                        {
147                                ISqlJetTable table = db.getTable(STATE_TABLE);
148                                ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName(), event.getSource());
149                                try
150                                {
151                                        if (!cursor.eof())
152                                        {
153                                                cursor.delete();
154                                        }
155                                }
156                                finally
157                                {
158                                        close(cursor);
159                                }
160                        }
161                };
162                try
163                {
164                        this.execute(transaction, DB.STATE);
165                }
166                catch (SqlJetException e)
167                {
168                        logger.log(Level.ERROR, e);
169                }
170        }
171
172        /**
173         * {@inheritDoc}
174         * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvocation(net.sf.hajdbc.durability.InvocationEvent)
175         */
176        @Override
177        public void beforeInvocation(InvocationEvent event)
178        {
179                this.listener.beforeInvocation(event);
180        }
181
182        /**
183         * {@inheritDoc}
184         * @see net.sf.hajdbc.durability.DurabilityListener#afterInvocation(net.sf.hajdbc.durability.InvocationEvent)
185         */
186        @Override
187        public void afterInvocation(InvocationEvent event)
188        {
189                this.listener.afterInvocation(event);
190        }
191
192        /**
193         * {@inheritDoc}
194         * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvoker(net.sf.hajdbc.durability.InvokerEvent)
195         */
196        @Override
197        public void beforeInvoker(InvokerEvent event)
198        {
199                this.listener.beforeInvoker(event);
200        }
201
202        /**
203         * {@inheritDoc}
204         * @see net.sf.hajdbc.durability.DurabilityListener#afterInvoker(net.sf.hajdbc.durability.InvokerEvent)
205         */
206        @Override
207        public void afterInvoker(InvokerEvent event)
208        {
209                this.listener.afterInvoker(event);
210        }
211
212        /**
213         * {@inheritDoc}
214         * @see net.sf.hajdbc.Lifecycle#start()
215         */
216        @Override
217        public void start() throws Exception
218        {
219                for (DB db: DB.values())
220                {
221                        this.locks.put(db, new ReentrantReadWriteLock());
222                        this.pools.put(db, this.poolFactory.createPool(new SQLiteDbPoolProvider(new File(this.file.toURI().resolve(db.name().toLowerCase())))));
223                }
224                
225                Transaction stateTransaction = new Transaction()
226                {
227                        @Override
228                        public void execute(SqlJetDb database) throws SqlJetException
229                        {
230                                ISqlJetSchema schema = database.getSchema();
231                                if (schema.getTable(STATE_TABLE) == null)
232                                {
233                                        database.createTable(CREATE_STATE_SQL);
234                                }
235                                else if (Boolean.getBoolean(StateManager.CLEAR_LOCAL_STATE))
236                                {
237                                        database.getTable(STATE_TABLE).clear();
238                                }
239                        }
240                };
241                Transaction invocationTransaction = new Transaction()
242                {
243                        @Override
244                        public void execute(SqlJetDb database) throws SqlJetException
245                        {
246                                ISqlJetSchema schema = database.getSchema();
247                                if (schema.getTable(INVOCATION_TABLE) == null)
248                                {
249                                        database.createTable(CREATE_INVOCATION_SQL);
250                                }
251                                if (schema.getTable(INVOKER_TABLE) == null)
252                                {
253                                        database.createTable(CREATE_INVOKER_SQL);
254                                        database.createIndex(CREATE_INVOKER_INDEX);
255                                }
256                        }
257                };
258                
259                this.execute(stateTransaction, DB.STATE);
260                this.execute(invocationTransaction, DB.INVOCATION);
261        }
262
263        /**
264         * {@inheritDoc}
265         * @see net.sf.hajdbc.Lifecycle#stop()
266         */
267        @Override
268        public void stop()
269        {
270                for (Pool<SqlJetDb, SqlJetException> pool: this.pools.values())
271                {
272                        pool.close();
273                }
274                this.pools.clear();
275        }
276
277        /**
278         * {@inheritDoc}
279         * @see net.sf.hajdbc.state.StateManager#getActiveDatabases()
280         */
281        @Override
282        public Set<String> getActiveDatabases()
283        {
284                Query<Set<String>> query = new Query<Set<String>>()
285                {
286                        @Override
287                        public Set<String> execute(SqlJetDb database) throws SqlJetException
288                        {
289                                Set<String> set = new TreeSet<String>();
290                                ISqlJetTable table = database.getTable(STATE_TABLE);
291                                ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName());
292                                try
293                                {
294                                        if (!cursor.eof())
295                                        {
296                                                do
297                                                {
298                                                        set.add(cursor.getString(DATABASE_COLUMN));
299                                                }
300                                                while (cursor.next());
301                                        }
302                                        return set;
303                                }
304                                finally
305                                {
306                                        close(cursor);
307                                }
308                        }
309                };
310                
311                try
312                {
313                        return this.execute(query, DB.STATE);
314                }
315                catch (SqlJetException e)
316                {
317                        logger.log(Level.ERROR, e);
318                        return Collections.emptySet();
319                }
320        }
321
322        /**
323         * {@inheritDoc}
324         * @see net.sf.hajdbc.state.StateManager#setActiveDatabases(java.util.Set)
325         */
326        @Override
327        public void setActiveDatabases(final Set<String> databases)
328        {
329                Transaction transaction = new Transaction()
330                {
331                        @Override
332                        public void execute(SqlJetDb db) throws SqlJetException
333                        {
334                                ISqlJetTable table = db.getTable(STATE_TABLE);
335                                table.clear();
336                                for (String database: databases)
337                                {
338                                        table.insert(database);
339                                }
340                        }
341                };
342                try
343                {
344                        this.execute(transaction, DB.STATE);
345                }
346                catch (SqlJetException e)
347                {
348                        logger.log(Level.ERROR, e);
349                }
350        }
351
352        /**
353         * {@inheritDoc}
354         * @see net.sf.hajdbc.state.StateManager#recover()
355         */
356        @Override
357        public Map<InvocationEvent, Map<String, InvokerEvent>> recover()
358        {
359                final TransactionIdentifierFactory<?> txIdFactory = this.cluster.getTransactionIdentifierFactory();
360                
361                Query<Map<InvocationEvent, Map<String, InvokerEvent>>> invocationQuery = new Query<Map<InvocationEvent, Map<String, InvokerEvent>>>()
362                {
363                        @Override
364                        public Map<InvocationEvent, Map<String, InvokerEvent>> execute(SqlJetDb database) throws SqlJetException
365                        {
366                                Map<InvocationEvent, Map<String, InvokerEvent>> map = new HashMap<InvocationEvent, Map<String, InvokerEvent>>();
367                                ISqlJetCursor cursor = database.getTable(INVOCATION_TABLE).open();
368                                try
369                                {
370                                        if (!cursor.eof())
371                                        {
372                                                do
373                                                {
374                                                        Object txId = txIdFactory.deserialize(cursor.getBlobAsArray(TRANSACTION_COLUMN));
375                                                        Durability.Phase phase = Durability.Phase.values()[(int) cursor.getInteger(PHASE_COLUMN)];
376                                                        ExceptionType type = ExceptionType.values()[(int) cursor.getInteger(EXCEPTION_COLUMN)];
377                                                        map.put(new InvocationEventImpl(txId, phase, type), new HashMap<String, InvokerEvent>());
378                                                }
379                                                while (cursor.next());
380                                        }
381                                }
382                                finally
383                                {
384                                        cursor.close();
385                                }
386                                cursor = database.getTable(INVOKER_TABLE).open();
387                                try
388                                {
389                                        if (!cursor.eof())
390                                        {
391                                                do
392                                                {
393                                                        Object txId = txIdFactory.deserialize(cursor.getBlobAsArray(TRANSACTION_COLUMN));
394                                                        Durability.Phase phase = Durability.Phase.values()[(int) cursor.getInteger(PHASE_COLUMN)];
395                                                        
396                                                        Map<String, InvokerEvent> invokers = map.get(new InvocationEventImpl(txId, phase, null));
397                                                        if (invokers != null)
398                                                        {
399                                                                String databaseId = cursor.getString(DATABASE_COLUMN);
400                                                                InvokerEvent event = new InvokerEventImpl(txId, phase, databaseId);
401                                                                
402                                                                if (!cursor.isNull(RESULT_COLUMN))
403                                                                {
404                                                                        byte[] result = cursor.getBlobAsArray(RESULT_COLUMN);
405                                                                        event.setResult(Objects.<InvokerResult>deserialize(result));
406                                                                }
407
408                                                                invokers.put(databaseId, event);
409                                                        }
410                                                }
411                                                while (cursor.next());
412                                        }
413                                }
414                                finally
415                                {
416                                        cursor.close();
417                                }
418                                return map;
419                        }
420                };
421                try
422                {
423                        return this.execute(invocationQuery, DB.INVOCATION);
424                }
425                catch (SqlJetException e)
426                {
427                        throw new IllegalStateException(e);
428                }
429        }
430
431        /**
432         * {@inheritDoc}
433         * @see net.sf.hajdbc.state.SerializedDurabilityListener#beforeInvocation(byte[], byte, byte)
434         */
435        @Override
436        public void beforeInvocation(final byte[] transactionId, final byte phase, final byte exceptionType)
437        {
438                Transaction transaction = new Transaction()
439                {
440                        @Override
441                        public void execute(SqlJetDb db) throws SqlJetException
442                        {
443                                db.getTable(INVOCATION_TABLE).insert(transactionId, phase, exceptionType);
444                        }
445                };
446                try
447                {
448                        this.execute(transaction, DB.INVOCATION);
449                }
450                catch (SqlJetException e)
451                {
452                        logger.log(Level.ERROR, e);
453                }
454        }
455
456        /**
457         * {@inheritDoc}
458         * @see net.sf.hajdbc.state.SerializedDurabilityListener#afterInvocation(byte[], byte)
459         */
460        @Override
461        public void afterInvocation(final byte[] transactionId, final byte phase)
462        {
463                Transaction transaction = new Transaction()
464                {
465                        @Override
466                        public void execute(SqlJetDb db) throws SqlJetException
467                        {
468                                ISqlJetTable table = db.getTable(INVOCATION_TABLE);
469                                ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName(), transactionId, phase);
470                                try
471                                {
472                                        if (!cursor.eof())
473                                        {
474                                                cursor.delete();
475                                        }
476                                }
477                                finally
478                                {
479                                        close(cursor);
480                                }
481                                table = db.getTable(INVOKER_TABLE);
482                                cursor = table.lookup(INVOKER_TABLE_INDEX, transactionId, phase);
483                                try
484                                {
485                                        if (!cursor.eof())
486                                        {
487                                                do
488                                                {
489                                                        cursor.delete();
490                                                }
491                                                while (cursor.next());
492                                        }
493                                }
494                                finally
495                                {
496                                        close(cursor);
497                                }
498                        }
499                };
500                try
501                {
502                        this.execute(transaction, DB.INVOCATION);
503                }
504                catch (SqlJetException e)
505                {
506                        logger.log(Level.ERROR, e);
507                }
508        }
509
510        /**
511         * {@inheritDoc}
512         * @see net.sf.hajdbc.state.SerializedDurabilityListener#beforeInvoker(byte[], byte, java.lang.String)
513         */
514        @Override
515        public void beforeInvoker(final byte[] transactionId, final byte phase, final String databaseId)
516        {
517                Transaction transaction = new Transaction()
518                {
519                        @Override
520                        public void execute(SqlJetDb db) throws SqlJetException
521                        {
522                                db.getTable(INVOKER_TABLE).insert(transactionId, phase, databaseId);
523                        }
524                };
525                try
526                {
527                        this.execute(transaction, DB.INVOCATION);
528                }
529                catch (SqlJetException e)
530                {
531                        logger.log(Level.ERROR, e);
532                }
533        }
534
535        /**
536         * {@inheritDoc}
537         * @see net.sf.hajdbc.state.SerializedDurabilityListener#afterInvoker(byte[], byte, java.lang.String, byte[])
538         */
539        @Override
540        public void afterInvoker(final byte[] transactionId, final byte phase, final String databaseId, final byte[] result)
541        {
542                Transaction transaction = new Transaction()
543                {
544                        @Override
545                        public void execute(SqlJetDb db) throws SqlJetException
546                        {
547                                ISqlJetTable table = db.getTable(INVOKER_TABLE);
548                                ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName(), transactionId, phase, databaseId);
549                                try
550                                {
551                                        if (!cursor.eof())
552                                        {
553                                                cursor.updateByFieldNames(Collections.<String, Object>singletonMap(RESULT_COLUMN, result));
554                                        }
555                                }
556                                finally
557                                {
558                                        close(cursor);
559                                }
560                        }
561                };
562                try
563                {
564                        this.execute(transaction, DB.INVOCATION);
565                }
566                catch (SqlJetException e)
567                {
568                        logger.log(Level.ERROR, e);
569                }
570        }
571
572        static void close(ISqlJetCursor cursor)
573        {
574                try
575                {
576                        cursor.close();
577                }
578                catch (SqlJetException e)
579                {
580                        logger.log(Level.WARN, e);
581                }
582        }
583
584        private void execute(Transaction transaction, DB db) throws SqlJetException
585        {
586                Pool<SqlJetDb, SqlJetException> pool = this.pools.get(db);
587                Lock lock = this.locks.get(db).writeLock();
588                
589                SqlJetDb database = pool.take();
590                
591                lock.lock();
592                
593                try
594                {
595                        database.beginTransaction(SqlJetTransactionMode.WRITE);
596                        
597                        try
598                        {
599                                transaction.execute(database);
600                                
601                                database.commit();
602                        }
603                        catch (SqlJetException e)
604                        {
605                                try
606                                {
607                                        database.rollback();
608                                }
609                                catch (SqlJetException ex)
610                                {
611                                        logger.log(Level.WARN, ex);
612                                }
613                                throw e;
614                        }
615                }
616                finally
617                {
618                        lock.unlock();
619                        pool.release(database);
620                }
621        }
622
623        private <T> T execute(Query<T> query, DB db) throws SqlJetException
624        {
625                Pool<SqlJetDb, SqlJetException> pool = this.pools.get(db);
626                Lock lock = this.locks.get(db).readLock();
627                
628                SqlJetDb database = pool.take();
629                
630                lock.lock();
631                
632                try
633                {
634                        database.beginTransaction(SqlJetTransactionMode.READ_ONLY);
635                        
636                        try
637                        {
638                                return query.execute(database);
639                        }
640                        finally
641                        {
642                                database.commit();
643                        }
644                }
645                finally
646                {
647                        lock.unlock();
648                        
649                        pool.release(database);
650                }
651        }
652
653        interface Query<T>
654        {
655                T execute(SqlJetDb database) throws SqlJetException;
656        }
657
658        interface Transaction
659        {
660                void execute(SqlJetDb database) throws SqlJetException;
661        }
662}