001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.state.sqlite; 019 020import java.io.File; 021import java.text.MessageFormat; 022import java.util.Collections; 023import java.util.EnumMap; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.TreeSet; 028import java.util.concurrent.locks.Lock; 029import java.util.concurrent.locks.ReadWriteLock; 030import java.util.concurrent.locks.ReentrantReadWriteLock; 031 032import net.sf.hajdbc.Database; 033import net.sf.hajdbc.DatabaseCluster; 034import net.sf.hajdbc.ExceptionType; 035import net.sf.hajdbc.durability.Durability; 036import net.sf.hajdbc.durability.DurabilityListener; 037import net.sf.hajdbc.durability.InvocationEvent; 038import net.sf.hajdbc.durability.InvocationEventImpl; 039import net.sf.hajdbc.durability.InvokerEvent; 040import net.sf.hajdbc.durability.InvokerEventImpl; 041import net.sf.hajdbc.durability.InvokerResult; 042import net.sf.hajdbc.logging.Level; 043import net.sf.hajdbc.logging.Logger; 044import net.sf.hajdbc.logging.LoggerFactory; 045import net.sf.hajdbc.pool.Pool; 046import net.sf.hajdbc.pool.PoolFactory; 047import net.sf.hajdbc.state.DatabaseEvent; 048import net.sf.hajdbc.state.DurabilityListenerAdapter; 049import net.sf.hajdbc.state.SerializedDurabilityListener; 050import net.sf.hajdbc.state.StateManager; 051import net.sf.hajdbc.tx.TransactionIdentifierFactory; 052import net.sf.hajdbc.util.Objects; 053 054import org.tmatesoft.sqljet.core.SqlJetException; 055import org.tmatesoft.sqljet.core.SqlJetTransactionMode; 056import org.tmatesoft.sqljet.core.schema.ISqlJetSchema; 057import org.tmatesoft.sqljet.core.table.ISqlJetCursor; 058import org.tmatesoft.sqljet.core.table.ISqlJetTable; 059import org.tmatesoft.sqljet.core.table.SqlJetDb; 060 061/** 062 * @author Paul Ferraro 063 */ 064public class SQLiteStateManager<Z, D extends Database<Z>> implements StateManager, SerializedDurabilityListener 065{ 066 // SQLite has minimal concurrency support - and only supports a single writer per-database 067 // So, mitigate this by using separate databases per table. 068 private enum DB { STATE, INVOCATION } 069 070 private static final Logger logger = LoggerFactory.getLogger(SQLiteStateManager.class); 071 private static final String STATE_TABLE = "cluster_state"; 072 private static final String DATABASE_COLUMN = "database_id"; 073 074 private static final String INVOCATION_TABLE = "cluster_invocation"; 075 private static final String INVOKER_TABLE = "cluster_invoker"; 076 private static final String INVOKER_TABLE_INDEX = "cluster_invoker_index"; 077 private static final String TRANSACTION_COLUMN = "tx_id"; 078 private static final String PHASE_COLUMN = "phase_id"; 079 private static final String EXCEPTION_COLUMN = "exception_id"; 080 private static final String RESULT_COLUMN = "result"; 081 082 static final String CREATE_INVOCATION_SQL = MessageFormat.format("CREATE TABLE {0} ({1} BLOB NOT NULL, {2} INTEGER NOT NULL, {3} INTEGER NOT NULL, PRIMARY KEY ({1}, {2}))", INVOCATION_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, EXCEPTION_COLUMN); 083 static final String CREATE_INVOKER_SQL = MessageFormat.format("CREATE TABLE {0} ({1} BLOB NOT NULL, {2} INTEGER NOT NULL, {3} TEXT NOT NULL, {4} BLOB, PRIMARY KEY ({1}, {2}, {3}))", INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, DATABASE_COLUMN, RESULT_COLUMN); 084 static final String CREATE_INVOKER_INDEX = MessageFormat.format("CREATE INDEX {0} ON {1} ({2}, {3})", INVOKER_TABLE_INDEX, INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN); 085 static final String CREATE_STATE_SQL = MessageFormat.format("CREATE TABLE {0} ({1} TEXT NOT NULL, PRIMARY KEY ({1}))", STATE_TABLE, DATABASE_COLUMN); 086 087 private final DatabaseCluster<Z, D> cluster; 088 private final DurabilityListener listener; 089 private final File file; 090 private final PoolFactory poolFactory; 091 092 // Control concurrency ourselves, instead of relying of sqljet lock polling. 093 private final Map<DB, ReadWriteLock> locks = new EnumMap<DB, ReadWriteLock>(DB.class); 094 private final Map<DB, Pool<SqlJetDb, SqlJetException>> pools = new EnumMap<DB, Pool<SqlJetDb, SqlJetException>>(DB.class); 095 096 public SQLiteStateManager(DatabaseCluster<Z, D> cluster, File file, PoolFactory poolFactory) 097 { 098 this.cluster = cluster; 099 this.file = file; 100 this.poolFactory = poolFactory; 101 this.listener = new DurabilityListenerAdapter(this, cluster.getTransactionIdentifierFactory()); 102 } 103 104 @Override 105 public boolean isEnabled() 106 { 107 return true; 108 } 109 110 /** 111 * {@inheritDoc} 112 * @see net.sf.hajdbc.DatabaseClusterListener#activated(net.sf.hajdbc.state.DatabaseEvent) 113 */ 114 @Override 115 public void activated(final DatabaseEvent event) 116 { 117 Transaction transaction = new Transaction() 118 { 119 @Override 120 public void execute(SqlJetDb db) throws SqlJetException 121 { 122 db.getTable(STATE_TABLE).insert(event.getSource()); 123 } 124 }; 125 try 126 { 127 this.execute(transaction, DB.STATE); 128 } 129 catch (SqlJetException e) 130 { 131 logger.log(Level.ERROR, e); 132 } 133 } 134 135 /** 136 * {@inheritDoc} 137 * @see net.sf.hajdbc.DatabaseClusterListener#deactivated(net.sf.hajdbc.state.DatabaseEvent) 138 */ 139 @Override 140 public void deactivated(final DatabaseEvent event) 141 { 142 Transaction transaction = new Transaction() 143 { 144 @Override 145 public void execute(SqlJetDb db) throws SqlJetException 146 { 147 ISqlJetTable table = db.getTable(STATE_TABLE); 148 ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName(), event.getSource()); 149 try 150 { 151 if (!cursor.eof()) 152 { 153 cursor.delete(); 154 } 155 } 156 finally 157 { 158 close(cursor); 159 } 160 } 161 }; 162 try 163 { 164 this.execute(transaction, DB.STATE); 165 } 166 catch (SqlJetException e) 167 { 168 logger.log(Level.ERROR, e); 169 } 170 } 171 172 /** 173 * {@inheritDoc} 174 * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvocation(net.sf.hajdbc.durability.InvocationEvent) 175 */ 176 @Override 177 public void beforeInvocation(InvocationEvent event) 178 { 179 this.listener.beforeInvocation(event); 180 } 181 182 /** 183 * {@inheritDoc} 184 * @see net.sf.hajdbc.durability.DurabilityListener#afterInvocation(net.sf.hajdbc.durability.InvocationEvent) 185 */ 186 @Override 187 public void afterInvocation(InvocationEvent event) 188 { 189 this.listener.afterInvocation(event); 190 } 191 192 /** 193 * {@inheritDoc} 194 * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvoker(net.sf.hajdbc.durability.InvokerEvent) 195 */ 196 @Override 197 public void beforeInvoker(InvokerEvent event) 198 { 199 this.listener.beforeInvoker(event); 200 } 201 202 /** 203 * {@inheritDoc} 204 * @see net.sf.hajdbc.durability.DurabilityListener#afterInvoker(net.sf.hajdbc.durability.InvokerEvent) 205 */ 206 @Override 207 public void afterInvoker(InvokerEvent event) 208 { 209 this.listener.afterInvoker(event); 210 } 211 212 /** 213 * {@inheritDoc} 214 * @see net.sf.hajdbc.Lifecycle#start() 215 */ 216 @Override 217 public void start() throws Exception 218 { 219 for (DB db: DB.values()) 220 { 221 this.locks.put(db, new ReentrantReadWriteLock()); 222 this.pools.put(db, this.poolFactory.createPool(new SQLiteDbPoolProvider(new File(this.file.toURI().resolve(db.name().toLowerCase()))))); 223 } 224 225 Transaction stateTransaction = new Transaction() 226 { 227 @Override 228 public void execute(SqlJetDb database) throws SqlJetException 229 { 230 ISqlJetSchema schema = database.getSchema(); 231 if (schema.getTable(STATE_TABLE) == null) 232 { 233 database.createTable(CREATE_STATE_SQL); 234 } 235 else if (Boolean.getBoolean(StateManager.CLEAR_LOCAL_STATE)) 236 { 237 database.getTable(STATE_TABLE).clear(); 238 } 239 } 240 }; 241 Transaction invocationTransaction = new Transaction() 242 { 243 @Override 244 public void execute(SqlJetDb database) throws SqlJetException 245 { 246 ISqlJetSchema schema = database.getSchema(); 247 if (schema.getTable(INVOCATION_TABLE) == null) 248 { 249 database.createTable(CREATE_INVOCATION_SQL); 250 } 251 if (schema.getTable(INVOKER_TABLE) == null) 252 { 253 database.createTable(CREATE_INVOKER_SQL); 254 database.createIndex(CREATE_INVOKER_INDEX); 255 } 256 } 257 }; 258 259 this.execute(stateTransaction, DB.STATE); 260 this.execute(invocationTransaction, DB.INVOCATION); 261 } 262 263 /** 264 * {@inheritDoc} 265 * @see net.sf.hajdbc.Lifecycle#stop() 266 */ 267 @Override 268 public void stop() 269 { 270 for (Pool<SqlJetDb, SqlJetException> pool: this.pools.values()) 271 { 272 pool.close(); 273 } 274 this.pools.clear(); 275 } 276 277 /** 278 * {@inheritDoc} 279 * @see net.sf.hajdbc.state.StateManager#getActiveDatabases() 280 */ 281 @Override 282 public Set<String> getActiveDatabases() 283 { 284 Query<Set<String>> query = new Query<Set<String>>() 285 { 286 @Override 287 public Set<String> execute(SqlJetDb database) throws SqlJetException 288 { 289 Set<String> set = new TreeSet<String>(); 290 ISqlJetTable table = database.getTable(STATE_TABLE); 291 ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName()); 292 try 293 { 294 if (!cursor.eof()) 295 { 296 do 297 { 298 set.add(cursor.getString(DATABASE_COLUMN)); 299 } 300 while (cursor.next()); 301 } 302 return set; 303 } 304 finally 305 { 306 close(cursor); 307 } 308 } 309 }; 310 311 try 312 { 313 return this.execute(query, DB.STATE); 314 } 315 catch (SqlJetException e) 316 { 317 logger.log(Level.ERROR, e); 318 return Collections.emptySet(); 319 } 320 } 321 322 /** 323 * {@inheritDoc} 324 * @see net.sf.hajdbc.state.StateManager#setActiveDatabases(java.util.Set) 325 */ 326 @Override 327 public void setActiveDatabases(final Set<String> databases) 328 { 329 Transaction transaction = new Transaction() 330 { 331 @Override 332 public void execute(SqlJetDb db) throws SqlJetException 333 { 334 ISqlJetTable table = db.getTable(STATE_TABLE); 335 table.clear(); 336 for (String database: databases) 337 { 338 table.insert(database); 339 } 340 } 341 }; 342 try 343 { 344 this.execute(transaction, DB.STATE); 345 } 346 catch (SqlJetException e) 347 { 348 logger.log(Level.ERROR, e); 349 } 350 } 351 352 /** 353 * {@inheritDoc} 354 * @see net.sf.hajdbc.state.StateManager#recover() 355 */ 356 @Override 357 public Map<InvocationEvent, Map<String, InvokerEvent>> recover() 358 { 359 final TransactionIdentifierFactory<?> txIdFactory = this.cluster.getTransactionIdentifierFactory(); 360 361 Query<Map<InvocationEvent, Map<String, InvokerEvent>>> invocationQuery = new Query<Map<InvocationEvent, Map<String, InvokerEvent>>>() 362 { 363 @Override 364 public Map<InvocationEvent, Map<String, InvokerEvent>> execute(SqlJetDb database) throws SqlJetException 365 { 366 Map<InvocationEvent, Map<String, InvokerEvent>> map = new HashMap<InvocationEvent, Map<String, InvokerEvent>>(); 367 ISqlJetCursor cursor = database.getTable(INVOCATION_TABLE).open(); 368 try 369 { 370 if (!cursor.eof()) 371 { 372 do 373 { 374 Object txId = txIdFactory.deserialize(cursor.getBlobAsArray(TRANSACTION_COLUMN)); 375 Durability.Phase phase = Durability.Phase.values()[(int) cursor.getInteger(PHASE_COLUMN)]; 376 ExceptionType type = ExceptionType.values()[(int) cursor.getInteger(EXCEPTION_COLUMN)]; 377 map.put(new InvocationEventImpl(txId, phase, type), new HashMap<String, InvokerEvent>()); 378 } 379 while (cursor.next()); 380 } 381 } 382 finally 383 { 384 cursor.close(); 385 } 386 cursor = database.getTable(INVOKER_TABLE).open(); 387 try 388 { 389 if (!cursor.eof()) 390 { 391 do 392 { 393 Object txId = txIdFactory.deserialize(cursor.getBlobAsArray(TRANSACTION_COLUMN)); 394 Durability.Phase phase = Durability.Phase.values()[(int) cursor.getInteger(PHASE_COLUMN)]; 395 396 Map<String, InvokerEvent> invokers = map.get(new InvocationEventImpl(txId, phase, null)); 397 if (invokers != null) 398 { 399 String databaseId = cursor.getString(DATABASE_COLUMN); 400 InvokerEvent event = new InvokerEventImpl(txId, phase, databaseId); 401 402 if (!cursor.isNull(RESULT_COLUMN)) 403 { 404 byte[] result = cursor.getBlobAsArray(RESULT_COLUMN); 405 event.setResult(Objects.<InvokerResult>deserialize(result)); 406 } 407 408 invokers.put(databaseId, event); 409 } 410 } 411 while (cursor.next()); 412 } 413 } 414 finally 415 { 416 cursor.close(); 417 } 418 return map; 419 } 420 }; 421 try 422 { 423 return this.execute(invocationQuery, DB.INVOCATION); 424 } 425 catch (SqlJetException e) 426 { 427 throw new IllegalStateException(e); 428 } 429 } 430 431 /** 432 * {@inheritDoc} 433 * @see net.sf.hajdbc.state.SerializedDurabilityListener#beforeInvocation(byte[], byte, byte) 434 */ 435 @Override 436 public void beforeInvocation(final byte[] transactionId, final byte phase, final byte exceptionType) 437 { 438 Transaction transaction = new Transaction() 439 { 440 @Override 441 public void execute(SqlJetDb db) throws SqlJetException 442 { 443 db.getTable(INVOCATION_TABLE).insert(transactionId, phase, exceptionType); 444 } 445 }; 446 try 447 { 448 this.execute(transaction, DB.INVOCATION); 449 } 450 catch (SqlJetException e) 451 { 452 logger.log(Level.ERROR, e); 453 } 454 } 455 456 /** 457 * {@inheritDoc} 458 * @see net.sf.hajdbc.state.SerializedDurabilityListener#afterInvocation(byte[], byte) 459 */ 460 @Override 461 public void afterInvocation(final byte[] transactionId, final byte phase) 462 { 463 Transaction transaction = new Transaction() 464 { 465 @Override 466 public void execute(SqlJetDb db) throws SqlJetException 467 { 468 ISqlJetTable table = db.getTable(INVOCATION_TABLE); 469 ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName(), transactionId, phase); 470 try 471 { 472 if (!cursor.eof()) 473 { 474 cursor.delete(); 475 } 476 } 477 finally 478 { 479 close(cursor); 480 } 481 table = db.getTable(INVOKER_TABLE); 482 cursor = table.lookup(INVOKER_TABLE_INDEX, transactionId, phase); 483 try 484 { 485 if (!cursor.eof()) 486 { 487 do 488 { 489 cursor.delete(); 490 } 491 while (cursor.next()); 492 } 493 } 494 finally 495 { 496 close(cursor); 497 } 498 } 499 }; 500 try 501 { 502 this.execute(transaction, DB.INVOCATION); 503 } 504 catch (SqlJetException e) 505 { 506 logger.log(Level.ERROR, e); 507 } 508 } 509 510 /** 511 * {@inheritDoc} 512 * @see net.sf.hajdbc.state.SerializedDurabilityListener#beforeInvoker(byte[], byte, java.lang.String) 513 */ 514 @Override 515 public void beforeInvoker(final byte[] transactionId, final byte phase, final String databaseId) 516 { 517 Transaction transaction = new Transaction() 518 { 519 @Override 520 public void execute(SqlJetDb db) throws SqlJetException 521 { 522 db.getTable(INVOKER_TABLE).insert(transactionId, phase, databaseId); 523 } 524 }; 525 try 526 { 527 this.execute(transaction, DB.INVOCATION); 528 } 529 catch (SqlJetException e) 530 { 531 logger.log(Level.ERROR, e); 532 } 533 } 534 535 /** 536 * {@inheritDoc} 537 * @see net.sf.hajdbc.state.SerializedDurabilityListener#afterInvoker(byte[], byte, java.lang.String, byte[]) 538 */ 539 @Override 540 public void afterInvoker(final byte[] transactionId, final byte phase, final String databaseId, final byte[] result) 541 { 542 Transaction transaction = new Transaction() 543 { 544 @Override 545 public void execute(SqlJetDb db) throws SqlJetException 546 { 547 ISqlJetTable table = db.getTable(INVOKER_TABLE); 548 ISqlJetCursor cursor = table.lookup(table.getPrimaryKeyIndexName(), transactionId, phase, databaseId); 549 try 550 { 551 if (!cursor.eof()) 552 { 553 cursor.updateByFieldNames(Collections.<String, Object>singletonMap(RESULT_COLUMN, result)); 554 } 555 } 556 finally 557 { 558 close(cursor); 559 } 560 } 561 }; 562 try 563 { 564 this.execute(transaction, DB.INVOCATION); 565 } 566 catch (SqlJetException e) 567 { 568 logger.log(Level.ERROR, e); 569 } 570 } 571 572 static void close(ISqlJetCursor cursor) 573 { 574 try 575 { 576 cursor.close(); 577 } 578 catch (SqlJetException e) 579 { 580 logger.log(Level.WARN, e); 581 } 582 } 583 584 private void execute(Transaction transaction, DB db) throws SqlJetException 585 { 586 Pool<SqlJetDb, SqlJetException> pool = this.pools.get(db); 587 Lock lock = this.locks.get(db).writeLock(); 588 589 SqlJetDb database = pool.take(); 590 591 lock.lock(); 592 593 try 594 { 595 database.beginTransaction(SqlJetTransactionMode.WRITE); 596 597 try 598 { 599 transaction.execute(database); 600 601 database.commit(); 602 } 603 catch (SqlJetException e) 604 { 605 try 606 { 607 database.rollback(); 608 } 609 catch (SqlJetException ex) 610 { 611 logger.log(Level.WARN, ex); 612 } 613 throw e; 614 } 615 } 616 finally 617 { 618 lock.unlock(); 619 pool.release(database); 620 } 621 } 622 623 private <T> T execute(Query<T> query, DB db) throws SqlJetException 624 { 625 Pool<SqlJetDb, SqlJetException> pool = this.pools.get(db); 626 Lock lock = this.locks.get(db).readLock(); 627 628 SqlJetDb database = pool.take(); 629 630 lock.lock(); 631 632 try 633 { 634 database.beginTransaction(SqlJetTransactionMode.READ_ONLY); 635 636 try 637 { 638 return query.execute(database); 639 } 640 finally 641 { 642 database.commit(); 643 } 644 } 645 finally 646 { 647 lock.unlock(); 648 649 pool.release(database); 650 } 651 } 652 653 interface Query<T> 654 { 655 T execute(SqlJetDb database) throws SqlJetException; 656 } 657 658 interface Transaction 659 { 660 void execute(SqlJetDb database) throws SqlJetException; 661 } 662}