001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.state.bdb; 019 020import java.io.File; 021import java.io.Serializable; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.Set; 027import java.util.TreeSet; 028 029import net.sf.hajdbc.DatabaseCluster; 030import net.sf.hajdbc.ExceptionType; 031import net.sf.hajdbc.durability.Durability; 032import net.sf.hajdbc.durability.DurabilityListener; 033import net.sf.hajdbc.durability.InvocationEvent; 034import net.sf.hajdbc.durability.InvocationEventImpl; 035import net.sf.hajdbc.durability.InvokerEvent; 036import net.sf.hajdbc.durability.InvokerEventImpl; 037import net.sf.hajdbc.durability.InvokerResult; 038import net.sf.hajdbc.pool.CloseablePoolProvider; 039import net.sf.hajdbc.pool.Pool; 040import net.sf.hajdbc.pool.PoolFactory; 041import net.sf.hajdbc.state.DatabaseEvent; 042import net.sf.hajdbc.state.DurabilityListenerAdapter; 043import net.sf.hajdbc.state.SerializedDurabilityListener; 044import net.sf.hajdbc.state.StateManager; 045import net.sf.hajdbc.tx.TransactionIdentifierFactory; 046import net.sf.hajdbc.util.Objects; 047 048import com.sleepycat.bind.ByteArrayBinding; 049import com.sleepycat.bind.EntryBinding; 050import com.sleepycat.bind.tuple.TupleBinding; 051import com.sleepycat.collections.StoredKeySet; 052import com.sleepycat.collections.StoredMap; 053import com.sleepycat.je.Database; 054import com.sleepycat.je.DatabaseConfig; 055import com.sleepycat.je.DatabaseEntry; 056import com.sleepycat.je.DatabaseException; 057import com.sleepycat.je.Environment; 058import com.sleepycat.je.EnvironmentConfig; 059import com.sleepycat.je.Transaction; 060 061/** 062 * @author paul 063 */ 064public class BerkeleyDBStateManager extends CloseablePoolProvider<Environment, DatabaseException> implements StateManager, SerializedDurabilityListener 065{ 066 private static final String STATE = "state"; 067 private static final String INVOCATION = "invocation"; 068 private static final String INVOKER = "invoker"; 069 private static final EntryBinding<InvocationKey> INVOCATION_KEY_BINDING = new KeyBinding<InvocationKey>(); 070 private static final EntryBinding<InvokerKey> INVOKER_KEY_BINDING = new KeyBinding<InvokerKey>(); 071 private static final EntryBinding<byte[]> BLOB_BINDING = new ByteArrayBinding(); 072 static final byte[] NULL = new byte[0]; 073 074 private final DatabaseCluster<?, ?> cluster; 075 private final File file; 076 private final PoolFactory poolFactory; 077 private final EnvironmentConfig config; 078 private final DurabilityListener durabilityListener; 079 080 private volatile Pool<Environment, DatabaseException> pool; 081 082 public BerkeleyDBStateManager(DatabaseCluster<?, ?> cluster, File file, EnvironmentConfig config, PoolFactory poolFactory) 083 { 084 super(Environment.class, DatabaseException.class); 085 this.cluster = cluster; 086 this.file = file; 087 this.poolFactory = poolFactory; 088 this.config = config; 089 this.durabilityListener = new DurabilityListenerAdapter(this, cluster.getTransactionIdentifierFactory()); 090 } 091 092 @Override 093 public void start() 094 { 095 this.file.mkdirs(); 096 this.pool = this.poolFactory.createPool(this); 097 Environment env = this.pool.take(); 098 try 099 { 100 env.openDatabase(null, STATE, new DatabaseConfig().setAllowCreate(true)).close(); 101 env.openDatabase(null, INVOCATION, new DatabaseConfig().setAllowCreate(true)).close(); 102 env.openDatabase(null, INVOKER, new DatabaseConfig().setAllowCreate(true)).close(); 103 } 104 finally 105 { 106 this.pool.release(env); 107 } 108 } 109 110 @Override 111 public void stop() 112 { 113 this.pool.close(); 114 } 115 116 @Override 117 public Set<String> getActiveDatabases() 118 { 119 DatabaseQuery<Set<String>> query = new DatabaseQuery<Set<String>>(STATE) 120 { 121 @Override 122 Set<String> execute(Database database) 123 { 124 return new TreeSet<String>(createStateSet(database, true)); 125 } 126 }; 127 return this.execute(query); 128 } 129 130 @Override 131 public void setActiveDatabases(final Set<String> databases) 132 { 133 DatabaseOperation operation = new DatabaseOperation(STATE) 134 { 135 @Override 136 void execute(Database database) 137 { 138 createStateSet(database, false).retainAll(databases); 139 } 140 }; 141 this.execute(operation); 142 } 143 144 @Override 145 public void activated(final DatabaseEvent event) 146 { 147 DatabaseOperation operation = new DatabaseOperation(STATE) 148 { 149 @Override 150 void execute(Database database) 151 { 152 createStateSet(database, false).add(event.getSource()); 153 } 154 }; 155 this.execute(operation); 156 } 157 158 @Override 159 public void deactivated(final DatabaseEvent event) 160 { 161 DatabaseOperation operation = new DatabaseOperation(STATE) 162 { 163 @Override 164 void execute(Database database) 165 { 166 createStateSet(database, false).remove(event.getSource()); 167 } 168 }; 169 this.execute(operation); 170 } 171 172 Set<String> createStateSet(Database database, boolean readOnly) 173 { 174 return new StoredKeySet<String>(database, TupleBinding.getPrimitiveBinding(String.class), !readOnly); 175 } 176 177 @Override 178 public void beforeInvocation(InvocationEvent event) 179 { 180 this.durabilityListener.beforeInvocation(event); 181 } 182 183 @Override 184 public void afterInvocation(InvocationEvent event) 185 { 186 this.durabilityListener.afterInvocation(event); 187 } 188 189 @Override 190 public void beforeInvoker(InvokerEvent event) 191 { 192 this.durabilityListener.beforeInvoker(event); 193 } 194 195 @Override 196 public void afterInvoker(InvokerEvent event) 197 { 198 this.durabilityListener.afterInvoker(event); 199 } 200 201 @Override 202 public void beforeInvocation(final byte[] transactionId, final byte phase, final byte exceptionType) 203 { 204 DatabaseOperation operation = new DatabaseOperation(INVOCATION) 205 { 206 @Override 207 void execute(Database database) 208 { 209 createInvocationMap(database, false).put(new InvocationKey(transactionId, phase), exceptionType); 210 } 211 }; 212 this.execute(operation); 213 } 214 215 @Override 216 public void afterInvocation(final byte[] transactionId, final byte phase) 217 { 218 DatabaseOperation invokerperation = new DatabaseOperation(INVOKER) 219 { 220 @Override 221 void execute(Database database) 222 { 223 Iterator<InvokerKey> keys = createInvokerMap(database, false).keySet().iterator(); 224 while (keys.hasNext()) 225 { 226 InvokerKey key = keys.next(); 227 if ((key.getPhase() == phase) && Arrays.equals(key.getTransactionId(), transactionId)) 228 { 229 keys.remove(); 230 } 231 } 232 } 233 }; 234 DatabaseOperation invocationOperation = new DatabaseOperation(INVOCATION) 235 { 236 @Override 237 void execute(Database database) 238 { 239 createInvocationMap(database, false).remove(new InvocationKey(transactionId, phase)); 240 } 241 }; 242 this.execute(invokerperation, invocationOperation); 243 } 244 245 @Override 246 public void beforeInvoker(final byte[] transactionId, final byte phase, final String databaseId) 247 { 248 DatabaseOperation operation = new DatabaseOperation(INVOKER) 249 { 250 @Override 251 void execute(Database database) 252 { 253 createInvokerMap(database, false).put(new InvokerKey(transactionId, phase, databaseId), NULL); 254 } 255 }; 256 this.execute(operation); 257 } 258 259 @Override 260 public void afterInvoker(final byte[] transactionId, final byte phase, final String databaseId, final byte[] result) 261 { 262 DatabaseOperation operation = new DatabaseOperation(INVOKER) 263 { 264 @Override 265 void execute(Database database) 266 { 267 createInvokerMap(database, false).put(new InvokerKey(transactionId, phase, databaseId), result); 268 } 269 }; 270 this.execute(operation); 271 } 272 273 @Override 274 public Map<InvocationEvent, Map<String, InvokerEvent>> recover() 275 { 276 final Map<InvocationEvent, Map<String, InvokerEvent>> result = new HashMap<InvocationEvent, Map<String, InvokerEvent>>(); 277 final TransactionIdentifierFactory<?> txIdFactory = this.cluster.getTransactionIdentifierFactory(); 278 DatabaseQuery<Void> query = new DatabaseQuery<Void>(INVOCATION) 279 { 280 @Override 281 Void execute(Database database) 282 { 283 for (Map.Entry<InvocationKey, Byte> entry: createInvocationMap(database, true).entrySet()) 284 { 285 InvocationKey key = entry.getKey(); 286 result.put(new InvocationEventImpl(txIdFactory.deserialize(key.getTransactionId()), Durability.Phase.values()[key.getPhase()], ExceptionType.values()[entry.getValue()]), new HashMap<String, InvokerEvent>()); 287 } 288 return null; 289 } 290 }; 291 this.execute(query); 292 query = new DatabaseQuery<Void>(INVOKER) 293 { 294 @Override 295 Void execute(Database database) 296 { 297 for (Map.Entry<InvokerKey, byte[]> entry: createInvokerMap(database, true).entrySet()) 298 { 299 InvokerKey key = entry.getKey(); 300 Map<String, InvokerEvent> invokers = result.get(new InvocationEventImpl(txIdFactory.deserialize(key.getTransactionId()), Durability.Phase.values()[key.getPhase()], null)); 301 if (invokers != null) 302 { 303 InvokerEvent invoker = new InvokerEventImpl(txIdFactory.deserialize(key.getTransactionId()), Durability.Phase.values()[key.getPhase()], key.getDatabaseId()); 304 byte[] value = entry.getValue(); 305 if (value.length > 0) 306 { 307 invoker.setResult(Objects.<InvokerResult>deserialize(value)); 308 } 309 invokers.put(key.getDatabaseId(), invoker); 310 } 311 } 312 return null; 313 } 314 }; 315 this.execute(query); 316 return result; 317 } 318 319 private static class InvocationKey implements Serializable 320 { 321 private static final long serialVersionUID = -9033714764207519351L; 322 private final byte[] transactionId; 323 private final byte phase; 324 325 InvocationKey(byte[] transactionId, byte phase) 326 { 327 this.transactionId = transactionId; 328 this.phase = phase; 329 } 330 331 byte[] getTransactionId() 332 { 333 return this.transactionId; 334 } 335 336 byte getPhase() 337 { 338 return this.phase; 339 } 340 } 341 342 private static class InvokerKey extends InvocationKey 343 { 344 private static final long serialVersionUID = 400751577923581135L; 345 private final String databaseId; 346 347 InvokerKey(byte[] transactionId, byte phase, String databaseId) 348 { 349 super(transactionId, phase); 350 this.databaseId = databaseId; 351 } 352 353 String getDatabaseId() 354 { 355 return this.databaseId; 356 } 357 } 358 359 Map<InvocationKey, Byte> createInvocationMap(Database database, boolean readOnly) 360 { 361 return new StoredMap<InvocationKey, Byte>(database, INVOCATION_KEY_BINDING, TupleBinding.getPrimitiveBinding(Byte.class), !readOnly); 362 } 363 364 Map<InvokerKey, byte[]> createInvokerMap(Database database, boolean readOnly) 365 { 366 return new StoredMap<InvokerKey, byte[]>(database, INVOKER_KEY_BINDING, BLOB_BINDING, !readOnly); 367 } 368 369 @Override 370 public boolean isEnabled() 371 { 372 return true; 373 } 374 375 /** 376 * {@inheritDoc} 377 * @see net.sf.hajdbc.pool.PoolProvider#create() 378 */ 379 @Override 380 public Environment create() throws DatabaseException 381 { 382 return new Environment(this.file, this.config); 383 } 384 385 /** 386 * {@inheritDoc} 387 * @see net.sf.hajdbc.pool.PoolProvider#isValid(java.lang.Object) 388 */ 389 @Override 390 public boolean isValid(Environment environment) 391 { 392 try 393 { 394 environment.checkHandleIsValid(); 395 396 return true; 397 } 398 catch (DatabaseException e) 399 { 400 return false; 401 } 402 } 403 404 private static abstract class DatabaseOperation 405 { 406 private final String databaseName; 407 408 DatabaseOperation(String databaseName) 409 { 410 this.databaseName = databaseName; 411 } 412 413 String getDatabaseName() 414 { 415 return this.databaseName; 416 } 417 418 abstract void execute(Database database); 419 } 420 421 private void execute(DatabaseOperation... dbOperations) 422 { 423 Operation[] operations = new Operation[dbOperations.length]; 424 for (int i = 0; i < dbOperations.length; ++i) 425 { 426 final DatabaseOperation operation = dbOperations[i]; 427 operations[i] = new Operation() 428 { 429 @Override 430 public void execute(Environment env, Transaction transaction) 431 { 432 Database database = env.openDatabase(transaction, operation.getDatabaseName(), new DatabaseConfig().setTransactional(true)); 433 try 434 { 435 operation.execute(database); 436 } 437 finally 438 { 439 database.close(); 440 } 441 } 442 }; 443 } 444 this.execute(operations); 445 } 446 447 private abstract static class DatabaseQuery<T> 448 { 449 private final String databaseName; 450 451 DatabaseQuery(String databaseName) 452 { 453 this.databaseName = databaseName; 454 } 455 456 String getDatabaseName() 457 { 458 return this.databaseName; 459 } 460 461 abstract T execute(Database database); 462 } 463 464 private <T> T execute(final DatabaseQuery<T> dbQuery) 465 { 466 Query<T> query = new Query<T>() 467 { 468 @Override 469 public T execute(Environment env) 470 { 471 Database database = env.openDatabase(null, dbQuery.getDatabaseName(), new DatabaseConfig().setReadOnly(true)); 472 try 473 { 474 return dbQuery.execute(database); 475 } 476 finally 477 { 478 database.close(); 479 } 480 } 481 }; 482 return this.execute(query); 483 } 484 485 private static interface Operation 486 { 487 void execute(Environment env, Transaction transaction); 488 } 489 490 private void execute(Operation... operations) 491 { 492 Environment env = this.pool.take(); 493 try 494 { 495 Transaction transaction = env.beginTransaction(null, null); 496 try 497 { 498 for (Operation operation: operations) 499 { 500 operation.execute(env, transaction); 501 } 502 transaction.commit(); 503 } 504 catch (RuntimeException e) 505 { 506 transaction.abort(); 507 throw e; 508 } 509 } 510 finally 511 { 512 this.pool.release(env); 513 } 514 } 515 516 private static interface Query<T> 517 { 518 T execute(Environment env); 519 } 520 521 private <T> T execute(Query<T> query) 522 { 523 Environment env = this.pool.take(); 524 try 525 { 526 return query.execute(env); 527 } 528 finally 529 { 530 this.pool.release(env); 531 } 532 } 533 534 static class KeyBinding<T> implements EntryBinding<T> 535 { 536 @Override 537 public T entryToObject(DatabaseEntry entry) 538 { 539 return Objects.deserialize(entry.getData()); 540 } 541 542 @Override 543 public void objectToEntry(T object, DatabaseEntry entry) 544 { 545 entry.setData(Objects.serialize(object)); 546 } 547 } 548}