001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.state.bdb;
019
020import java.io.File;
021import java.io.Serializable;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.Map;
026import java.util.Set;
027import java.util.TreeSet;
028
029import net.sf.hajdbc.DatabaseCluster;
030import net.sf.hajdbc.ExceptionType;
031import net.sf.hajdbc.durability.Durability;
032import net.sf.hajdbc.durability.DurabilityListener;
033import net.sf.hajdbc.durability.InvocationEvent;
034import net.sf.hajdbc.durability.InvocationEventImpl;
035import net.sf.hajdbc.durability.InvokerEvent;
036import net.sf.hajdbc.durability.InvokerEventImpl;
037import net.sf.hajdbc.durability.InvokerResult;
038import net.sf.hajdbc.pool.CloseablePoolProvider;
039import net.sf.hajdbc.pool.Pool;
040import net.sf.hajdbc.pool.PoolFactory;
041import net.sf.hajdbc.state.DatabaseEvent;
042import net.sf.hajdbc.state.DurabilityListenerAdapter;
043import net.sf.hajdbc.state.SerializedDurabilityListener;
044import net.sf.hajdbc.state.StateManager;
045import net.sf.hajdbc.tx.TransactionIdentifierFactory;
046import net.sf.hajdbc.util.Objects;
047
048import com.sleepycat.bind.ByteArrayBinding;
049import com.sleepycat.bind.EntryBinding;
050import com.sleepycat.bind.tuple.TupleBinding;
051import com.sleepycat.collections.StoredKeySet;
052import com.sleepycat.collections.StoredMap;
053import com.sleepycat.je.Database;
054import com.sleepycat.je.DatabaseConfig;
055import com.sleepycat.je.DatabaseEntry;
056import com.sleepycat.je.DatabaseException;
057import com.sleepycat.je.Environment;
058import com.sleepycat.je.EnvironmentConfig;
059import com.sleepycat.je.Transaction;
060
061/**
062 * @author paul
063 */
064public class BerkeleyDBStateManager extends CloseablePoolProvider<Environment, DatabaseException> implements StateManager, SerializedDurabilityListener
065{
066        private static final String STATE = "state";
067        private static final String INVOCATION = "invocation";
068        private static final String INVOKER = "invoker";
069        private static final EntryBinding<InvocationKey> INVOCATION_KEY_BINDING = new KeyBinding<InvocationKey>();
070        private static final EntryBinding<InvokerKey> INVOKER_KEY_BINDING = new KeyBinding<InvokerKey>();
071        private static final EntryBinding<byte[]> BLOB_BINDING = new ByteArrayBinding();
072        static final byte[] NULL = new byte[0];
073        
074        private final DatabaseCluster<?, ?> cluster;
075        private final File file;
076        private final PoolFactory poolFactory;
077        private final EnvironmentConfig config;
078        private final DurabilityListener durabilityListener;
079        
080        private volatile Pool<Environment, DatabaseException> pool;
081
082        public BerkeleyDBStateManager(DatabaseCluster<?, ?> cluster, File file, EnvironmentConfig config, PoolFactory poolFactory)
083        {
084                super(Environment.class, DatabaseException.class);
085                this.cluster = cluster;
086                this.file = file;
087                this.poolFactory = poolFactory;
088                this.config = config;
089                this.durabilityListener = new DurabilityListenerAdapter(this, cluster.getTransactionIdentifierFactory());
090        }
091
092        @Override
093        public void start()
094        {
095                this.file.mkdirs();
096                this.pool = this.poolFactory.createPool(this);
097                Environment env = this.pool.take();
098                try
099                {
100                        env.openDatabase(null, STATE, new DatabaseConfig().setAllowCreate(true)).close();
101                        env.openDatabase(null, INVOCATION, new DatabaseConfig().setAllowCreate(true)).close();
102                        env.openDatabase(null, INVOKER, new DatabaseConfig().setAllowCreate(true)).close();
103                }
104                finally
105                {
106                        this.pool.release(env);
107                }
108        }
109
110        @Override
111        public void stop()
112        {
113                this.pool.close();
114        }
115
116        @Override
117        public Set<String> getActiveDatabases()
118        {
119                DatabaseQuery<Set<String>> query = new DatabaseQuery<Set<String>>(STATE)
120                {
121                        @Override
122                        Set<String> execute(Database database)
123                        {
124                                return new TreeSet<String>(createStateSet(database, true));
125                        }
126                };
127                return this.execute(query);
128        }
129
130        @Override
131        public void setActiveDatabases(final Set<String> databases)
132        {
133                DatabaseOperation operation = new DatabaseOperation(STATE)
134                {
135                        @Override
136                        void execute(Database database)
137                        {
138                                createStateSet(database, false).retainAll(databases);
139                        }
140                };
141                this.execute(operation);
142        }
143
144        @Override
145        public void activated(final DatabaseEvent event)
146        {
147                DatabaseOperation operation = new DatabaseOperation(STATE)
148                {
149                        @Override
150                        void execute(Database database)
151                        {
152                                createStateSet(database, false).add(event.getSource());
153                        }
154                };
155                this.execute(operation);
156        }
157
158        @Override
159        public void deactivated(final DatabaseEvent event)
160        {
161                DatabaseOperation operation = new DatabaseOperation(STATE)
162                {
163                        @Override
164                        void execute(Database database)
165                        {
166                                createStateSet(database, false).remove(event.getSource());
167                        }
168                };
169                this.execute(operation);
170        }
171
172        Set<String> createStateSet(Database database, boolean readOnly)
173        {
174                return new StoredKeySet<String>(database, TupleBinding.getPrimitiveBinding(String.class), !readOnly);
175        }
176        
177        @Override
178        public void beforeInvocation(InvocationEvent event)
179        {
180                this.durabilityListener.beforeInvocation(event);
181        }
182
183        @Override
184        public void afterInvocation(InvocationEvent event)
185        {
186                this.durabilityListener.afterInvocation(event);
187        }
188
189        @Override
190        public void beforeInvoker(InvokerEvent event)
191        {
192                this.durabilityListener.beforeInvoker(event);
193        }
194
195        @Override
196        public void afterInvoker(InvokerEvent event)
197        {
198                this.durabilityListener.afterInvoker(event);
199        }
200
201        @Override
202        public void beforeInvocation(final byte[] transactionId, final byte phase, final byte exceptionType)
203        {
204                DatabaseOperation operation = new DatabaseOperation(INVOCATION)
205                {
206                        @Override
207                        void execute(Database database)
208                        {
209                                createInvocationMap(database, false).put(new InvocationKey(transactionId, phase), exceptionType);
210                        }
211                };
212                this.execute(operation);
213        }
214
215        @Override
216        public void afterInvocation(final byte[] transactionId, final byte phase)
217        {
218                DatabaseOperation invokerperation = new DatabaseOperation(INVOKER)
219                {
220                        @Override
221                        void execute(Database database)
222                        {
223                                Iterator<InvokerKey> keys = createInvokerMap(database, false).keySet().iterator();
224                                while (keys.hasNext())
225                                {
226                                        InvokerKey key = keys.next();
227                                        if ((key.getPhase() == phase) && Arrays.equals(key.getTransactionId(), transactionId))
228                                        {
229                                                keys.remove();
230                                        }
231                                }
232                        }
233                };
234                DatabaseOperation invocationOperation = new DatabaseOperation(INVOCATION)
235                {
236                        @Override
237                        void execute(Database database)
238                        {
239                                createInvocationMap(database, false).remove(new InvocationKey(transactionId, phase));
240                        }
241                };
242                this.execute(invokerperation, invocationOperation);
243        }
244
245        @Override
246        public void beforeInvoker(final byte[] transactionId, final byte phase, final String databaseId)
247        {
248                DatabaseOperation operation = new DatabaseOperation(INVOKER)
249                {
250                        @Override
251                        void execute(Database database)
252                        {
253                                createInvokerMap(database, false).put(new InvokerKey(transactionId, phase, databaseId), NULL);
254                        }
255                };
256                this.execute(operation);
257        }
258
259        @Override
260        public void afterInvoker(final byte[] transactionId, final byte phase, final String databaseId, final byte[] result)
261        {
262                DatabaseOperation operation = new DatabaseOperation(INVOKER)
263                {
264                        @Override
265                        void execute(Database database)
266                        {
267                                createInvokerMap(database, false).put(new InvokerKey(transactionId, phase, databaseId), result);
268                        }
269                };
270                this.execute(operation);
271        }
272
273        @Override
274        public Map<InvocationEvent, Map<String, InvokerEvent>> recover()
275        {
276                final Map<InvocationEvent, Map<String, InvokerEvent>> result = new HashMap<InvocationEvent, Map<String, InvokerEvent>>();
277                final TransactionIdentifierFactory<?> txIdFactory = this.cluster.getTransactionIdentifierFactory();
278                DatabaseQuery<Void> query = new DatabaseQuery<Void>(INVOCATION)
279                {
280                        @Override
281                        Void execute(Database database)
282                        {
283                                for (Map.Entry<InvocationKey, Byte> entry: createInvocationMap(database, true).entrySet())
284                                {
285                                        InvocationKey key = entry.getKey();
286                                        result.put(new InvocationEventImpl(txIdFactory.deserialize(key.getTransactionId()), Durability.Phase.values()[key.getPhase()], ExceptionType.values()[entry.getValue()]), new HashMap<String, InvokerEvent>());
287                                }
288                                return null;
289                        }
290                };
291                this.execute(query);
292                query = new DatabaseQuery<Void>(INVOKER)
293                {
294                        @Override
295                        Void execute(Database database)
296                        {
297                                for (Map.Entry<InvokerKey, byte[]> entry: createInvokerMap(database, true).entrySet())
298                                {
299                                        InvokerKey key = entry.getKey();
300                                        Map<String, InvokerEvent> invokers = result.get(new InvocationEventImpl(txIdFactory.deserialize(key.getTransactionId()), Durability.Phase.values()[key.getPhase()], null));
301                                        if (invokers != null)
302                                        {
303                                                InvokerEvent invoker = new InvokerEventImpl(txIdFactory.deserialize(key.getTransactionId()), Durability.Phase.values()[key.getPhase()], key.getDatabaseId());
304                                                byte[] value = entry.getValue();
305                                                if (value.length > 0)
306                                                {
307                                                        invoker.setResult(Objects.<InvokerResult>deserialize(value));
308                                                }
309                                                invokers.put(key.getDatabaseId(), invoker);
310                                        }
311                                }
312                                return null;
313                        }
314                };
315                this.execute(query);
316                return result;
317        }
318        
319        private static class InvocationKey implements Serializable
320        {
321                private static final long serialVersionUID = -9033714764207519351L;
322                private final byte[] transactionId;
323                private final byte phase;
324                
325                InvocationKey(byte[] transactionId, byte phase)
326                {
327                        this.transactionId = transactionId;
328                        this.phase = phase;
329                }
330                
331                byte[] getTransactionId()
332                {
333                        return this.transactionId;
334                }
335                
336                byte getPhase()
337                {
338                        return this.phase;
339                }
340        }
341
342        private static class InvokerKey extends InvocationKey
343        {
344                private static final long serialVersionUID = 400751577923581135L;
345                private final String databaseId;
346                
347                InvokerKey(byte[] transactionId, byte phase, String databaseId)
348                {
349                        super(transactionId, phase);
350                        this.databaseId = databaseId;
351                }
352                
353                String getDatabaseId()
354                {
355                        return this.databaseId;
356                }
357        }
358        
359        Map<InvocationKey, Byte> createInvocationMap(Database database, boolean readOnly)
360        {
361                return new StoredMap<InvocationKey, Byte>(database, INVOCATION_KEY_BINDING, TupleBinding.getPrimitiveBinding(Byte.class), !readOnly);
362        }
363        
364        Map<InvokerKey, byte[]> createInvokerMap(Database database, boolean readOnly)
365        {
366                return new StoredMap<InvokerKey, byte[]>(database, INVOKER_KEY_BINDING, BLOB_BINDING, !readOnly);
367        }
368
369        @Override
370        public boolean isEnabled()
371        {
372                return true;
373        }
374
375        /**
376         * {@inheritDoc}
377         * @see net.sf.hajdbc.pool.PoolProvider#create()
378         */
379        @Override
380        public Environment create() throws DatabaseException
381        {
382                return new Environment(this.file, this.config);
383        }
384
385        /**
386         * {@inheritDoc}
387         * @see net.sf.hajdbc.pool.PoolProvider#isValid(java.lang.Object)
388         */
389        @Override
390        public boolean isValid(Environment environment)
391        {
392                try
393                {
394                        environment.checkHandleIsValid();
395                        
396                        return true;
397                }
398                catch (DatabaseException e)
399                {
400                        return false;
401                }
402        }
403
404        private static abstract class DatabaseOperation
405        {
406                private final String databaseName;
407
408                DatabaseOperation(String databaseName)
409                {
410                        this.databaseName = databaseName;
411                }
412
413                String getDatabaseName()
414                {
415                        return this.databaseName;
416                }
417
418                abstract void execute(Database database);
419        }
420        
421        private void execute(DatabaseOperation... dbOperations)
422        {
423                Operation[] operations = new Operation[dbOperations.length];
424                for (int i = 0; i < dbOperations.length; ++i)
425                {
426                        final DatabaseOperation operation = dbOperations[i];
427                        operations[i] = new Operation()
428                        {
429                                @Override
430                                public void execute(Environment env, Transaction transaction)
431                                {
432                                        Database database = env.openDatabase(transaction, operation.getDatabaseName(), new DatabaseConfig().setTransactional(true));
433                                        try
434                                        {
435                                                operation.execute(database);
436                                        }
437                                        finally
438                                        {
439                                                database.close();
440                                        }
441                                }
442                        };
443                }
444                this.execute(operations);
445        }
446
447        private abstract static class DatabaseQuery<T>
448        {
449                private final String databaseName;
450                
451                DatabaseQuery(String databaseName)
452                {
453                        this.databaseName = databaseName;
454                }
455                
456                String getDatabaseName()
457                {
458                        return this.databaseName;
459                }
460                
461                abstract T execute(Database database);
462        }
463        
464        private <T> T execute(final DatabaseQuery<T> dbQuery)
465        {
466                Query<T> query = new Query<T>()
467                {
468                        @Override
469                        public T execute(Environment env)
470                        {
471                                Database database = env.openDatabase(null, dbQuery.getDatabaseName(), new DatabaseConfig().setReadOnly(true));
472                                try
473                                {
474                                        return dbQuery.execute(database);
475                                }
476                                finally
477                                {
478                                        database.close();
479                                }
480                        }
481                };
482                return this.execute(query);
483        }
484        
485        private static interface Operation
486        {
487                void execute(Environment env, Transaction transaction);
488        }
489        
490        private void execute(Operation... operations)
491        {
492                Environment env = this.pool.take();
493                try
494                {
495                        Transaction transaction = env.beginTransaction(null, null);
496                        try
497                        {
498                                for (Operation operation: operations)
499                                {
500                                        operation.execute(env, transaction);
501                                }
502                                transaction.commit();
503                        }
504                        catch (RuntimeException e)
505                        {
506                                transaction.abort();
507                                throw e;
508                        }
509                }
510                finally
511                {
512                        this.pool.release(env);
513                }
514        }
515        
516        private static interface Query<T>
517        {
518                T execute(Environment env);
519        }
520        
521        private <T> T execute(Query<T> query)
522        {
523                Environment env = this.pool.take();
524                try
525                {
526                        return query.execute(env);
527                }
528                finally
529                {
530                        this.pool.release(env);
531                }
532        }
533        
534        static class KeyBinding<T> implements EntryBinding<T>
535        {
536                @Override
537                public T entryToObject(DatabaseEntry entry)
538                {
539                        return Objects.deserialize(entry.getData());
540                }
541
542                @Override
543                public void objectToEntry(T object, DatabaseEntry entry)
544                {
545                        entry.setData(Objects.serialize(object));
546                }
547        }
548}