001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.state.sql; 019 020import java.sql.Connection; 021import java.sql.Driver; 022import java.sql.PreparedStatement; 023import java.sql.ResultSet; 024import java.sql.SQLException; 025import java.sql.Statement; 026import java.sql.Types; 027import java.text.MessageFormat; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeSet; 033 034import net.sf.hajdbc.Database; 035import net.sf.hajdbc.DatabaseCluster; 036import net.sf.hajdbc.DatabaseProperties; 037import net.sf.hajdbc.ExceptionType; 038import net.sf.hajdbc.IdentifiableMatcher; 039import net.sf.hajdbc.cache.lazy.LazyDatabaseProperties; 040import net.sf.hajdbc.cache.simple.SimpleDatabaseMetaDataProvider; 041import net.sf.hajdbc.dialect.Dialect; 042import net.sf.hajdbc.dialect.DialectFactory; 043import net.sf.hajdbc.durability.Durability; 044import net.sf.hajdbc.durability.DurabilityListener; 045import net.sf.hajdbc.durability.InvocationEvent; 046import net.sf.hajdbc.durability.InvocationEventImpl; 047import net.sf.hajdbc.durability.InvokerEvent; 048import net.sf.hajdbc.durability.InvokerEventImpl; 049import net.sf.hajdbc.durability.InvokerResult; 050import net.sf.hajdbc.logging.Level; 051import net.sf.hajdbc.logging.Logger; 052import net.sf.hajdbc.logging.LoggerFactory; 053import net.sf.hajdbc.pool.Pool; 054import net.sf.hajdbc.pool.PoolFactory; 055import net.sf.hajdbc.pool.sql.ConnectionFactory; 056import net.sf.hajdbc.pool.sql.ConnectionPoolProvider; 057import net.sf.hajdbc.sql.DriverDatabase; 058import net.sf.hajdbc.state.DatabaseEvent; 059import net.sf.hajdbc.state.DurabilityListenerAdapter; 060import net.sf.hajdbc.state.SerializedDurabilityListener; 061import net.sf.hajdbc.state.StateManager; 062import net.sf.hajdbc.tx.TransactionIdentifierFactory; 063import net.sf.hajdbc.util.Objects; 064import net.sf.hajdbc.util.Resources; 065import net.sf.hajdbc.util.ServiceLoaders; 066 067/** 068 * @author Paul Ferraro 069 */ 070public class SQLStateManager<Z, D extends Database<Z>> implements StateManager, ConnectionFactory, SerializedDurabilityListener 071{ 072 private static final String STATE_TABLE = "cluster_state"; 073 private static final String DATABASE_COLUMN = "database_id"; 074 075 private static final String INVOCATION_TABLE = "cluster_invocation"; 076 private static final String INVOKER_TABLE = "cluster_invoker"; 077 private static final String TRANSACTION_COLUMN = "tx_id"; 078 private static final String PHASE_COLUMN = "phase_id"; 079 private static final String EXCEPTION_COLUMN = "exception_id"; 080 private static final String RESULT_COLUMN = "result"; 081 082 static final String SELECT_STATE_SQL = MessageFormat.format("SELECT {1} FROM {0}", STATE_TABLE, DATABASE_COLUMN); 083 static final String INSERT_STATE_SQL = MessageFormat.format("INSERT INTO {0} ({1}) VALUES (?)", STATE_TABLE, DATABASE_COLUMN); 084 static final String DELETE_STATE_SQL = MessageFormat.format("DELETE FROM {0} WHERE {1} = ?", STATE_TABLE, DATABASE_COLUMN); 085 static final String TRUNCATE_STATE_SQL = MessageFormat.format("DELETE FROM {0}", STATE_TABLE); 086 087 static final String SELECT_INVOCATION_SQL = MessageFormat.format("SELECT {1}, {2}, {3} FROM {0}", INVOCATION_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, EXCEPTION_COLUMN); 088 static final String INSERT_INVOCATION_SQL = MessageFormat.format("INSERT INTO {0} ({1}, {2}, {3}) VALUES (?, ?, ?)", INVOCATION_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, EXCEPTION_COLUMN); 089 static final String DELETE_INVOCATION_SQL = MessageFormat.format("DELETE FROM {0} WHERE {1} = ? AND {2} = ?", INVOCATION_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN); 090 091 static final String SELECT_INVOKER_SQL = MessageFormat.format("SELECT {1}, {2}, {3}, {4} FROM {0}", INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, DATABASE_COLUMN, RESULT_COLUMN); 092 static final String INSERT_INVOKER_SQL = MessageFormat.format("INSERT INTO {0} ({1}, {2}, {3}) VALUES (?, ?, ?)", INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, DATABASE_COLUMN); 093 static final String UPDATE_INVOKER_SQL = MessageFormat.format("UPDATE {0} SET {4} = ? WHERE {1} = ? AND {2} = ? AND {3} = ?", INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN, DATABASE_COLUMN, RESULT_COLUMN); 094 static final String DELETE_INVOKER_SQL = MessageFormat.format("DELETE FROM {0} WHERE {1} = ? AND {2} = ?", INVOKER_TABLE, TRANSACTION_COLUMN, PHASE_COLUMN); 095 096 private static final String CREATE_INVOCATION_SQL = MessageFormat.format("CREATE TABLE {0} ({1} {2} NOT NULL, {3} {4} NOT NULL, {5} {6} NOT NULL, PRIMARY KEY ({1}, {3}))", INVOCATION_TABLE, TRANSACTION_COLUMN, "{0}", PHASE_COLUMN, "{1}", EXCEPTION_COLUMN, "{2}"); 097 private static final String CREATE_INVOKER_SQL = MessageFormat.format("CREATE TABLE {0} ({1} {2} NOT NULL, {3} {4} NOT NULL, {5} {6} NOT NULL, {7} {8}, PRIMARY KEY ({1}, {3}, {5}))", INVOKER_TABLE, TRANSACTION_COLUMN, "{0}", PHASE_COLUMN, "{1}", DATABASE_COLUMN, "{2}", RESULT_COLUMN, "{3}"); 098 private static final String CREATE_STATE_SQL = MessageFormat.format("CREATE TABLE {0} ({1} {2} NOT NULL, PRIMARY KEY ({1}))", STATE_TABLE, DATABASE_COLUMN, "{0}"); 099 100 private static Logger logger = LoggerFactory.getLogger(SQLStateManager.class); 101 102 private final DurabilityListener listener; 103 private final DatabaseCluster<Z, D> cluster; 104 private final PoolFactory poolFactory; 105 private final DriverDatabase database; 106 107 private String password; 108 private Driver driver; 109 private Pool<Connection, SQLException> pool; 110 111 public SQLStateManager(DatabaseCluster<Z, D> cluster, DriverDatabase database, PoolFactory poolFactory) 112 { 113 this.cluster = cluster; 114 this.database = database; 115 this.poolFactory = poolFactory; 116 this.listener = new DurabilityListenerAdapter(this, cluster.getTransactionIdentifierFactory()); 117 } 118 119 /** 120 * {@inheritDoc} 121 * @see net.sf.hajdbc.state.StateManager#getActiveDatabases() 122 */ 123 @Override 124 public Set<String> getActiveDatabases() 125 { 126 Query<Set<String>> query = new Query<Set<String>>() 127 { 128 @Override 129 public Set<String> execute(Connection connection) throws SQLException 130 { 131 Set<String> set = new TreeSet<String>(); 132 133 PreparedStatement statement = connection.prepareStatement(SELECT_STATE_SQL); 134 135 try 136 { 137 ResultSet resultSet = statement.executeQuery(); 138 139 while (resultSet.next()) 140 { 141 set.add(resultSet.getString(1)); 142 } 143 144 return set; 145 } 146 finally 147 { 148 Resources.close(statement); 149 } 150 } 151 }; 152 153 try 154 { 155 return this.execute(query); 156 } 157 catch (SQLException e) 158 { 159 logger.log(Level.ERROR, e, e.getMessage()); 160 return Collections.emptySet(); 161 } 162 } 163 164 /** 165 * {@inheritDoc} 166 * @see net.sf.hajdbc.state.StateManager#setActiveDatabases(java.util.Set) 167 */ 168 @Override 169 public void setActiveDatabases(final Set<String> databases) 170 { 171 Transaction transaction = new Transaction() 172 { 173 @Override 174 public void execute(Connection connection) throws SQLException 175 { 176 Statement s = connection.createStatement(); 177 178 try 179 { 180 s.executeUpdate(TRUNCATE_STATE_SQL); 181 } 182 finally 183 { 184 Resources.close(s); 185 } 186 187 PreparedStatement statement = connection.prepareStatement(INSERT_STATE_SQL); 188 189 try 190 { 191 for (String database: databases) 192 { 193 statement.clearParameters(); 194 195 statement.setString(1, database); 196 197 statement.addBatch(); 198 } 199 200 statement.executeBatch(); 201 } 202 finally 203 { 204 Resources.close(statement); 205 } 206 } 207 }; 208 209 try 210 { 211 this.execute(transaction); 212 } 213 catch (SQLException e) 214 { 215 logger.log(Level.ERROR, e, e.getMessage()); 216 } 217 } 218 219 /** 220 * {@inheritDoc} 221 * @see net.sf.hajdbc.DatabaseClusterListener#activated(net.sf.hajdbc.state.DatabaseEvent) 222 */ 223 @Override 224 public void activated(final DatabaseEvent event) 225 { 226 Transaction transaction = new Transaction() 227 { 228 @Override 229 public void execute(Connection connection) throws SQLException 230 { 231 SQLStateManager.this.execute(connection, INSERT_STATE_SQL, event); 232 } 233 }; 234 235 try 236 { 237 this.execute(transaction); 238 } 239 catch (SQLException e) 240 { 241 logger.log(Level.ERROR, e, e.getMessage()); 242 } 243 } 244 245 /** 246 * {@inheritDoc} 247 * @see net.sf.hajdbc.DatabaseClusterListener#deactivated(net.sf.hajdbc.state.DatabaseEvent) 248 */ 249 @Override 250 public void deactivated(final DatabaseEvent event) 251 { 252 Transaction transaction = new Transaction() 253 { 254 @Override 255 public void execute(Connection connection) throws SQLException 256 { 257 SQLStateManager.this.execute(connection, DELETE_STATE_SQL, event); 258 } 259 }; 260 261 try 262 { 263 this.execute(transaction); 264 } 265 catch (SQLException e) 266 { 267 logger.log(Level.ERROR, e, e.getMessage()); 268 } 269 } 270 271 void execute(Connection connection, String sql, DatabaseEvent event) throws SQLException 272 { 273 PreparedStatement statement = connection.prepareStatement(sql); 274 275 try 276 { 277 statement.setString(1, event.getSource()); 278 279 statement.executeUpdate(); 280 } 281 finally 282 { 283 Resources.close(statement); 284 } 285 } 286 287 /** 288 * {@inheritDoc} 289 * @see net.sf.hajdbc.state.SerializedDurabilityListener#beforeInvocation(byte[], byte, byte) 290 */ 291 @Override 292 public void beforeInvocation(final byte[] transactionId, final byte phase, final byte exceptionType) 293 { 294 Transaction transaction = new Transaction() 295 { 296 @Override 297 public void execute(Connection connection) throws SQLException 298 { 299 PreparedStatement statement = connection.prepareStatement(INSERT_INVOCATION_SQL); 300 301 try 302 { 303 statement.setBytes(1, transactionId); 304 statement.setByte(2, phase); 305 statement.setByte(3, exceptionType); 306 307 statement.executeUpdate(); 308 } 309 finally 310 { 311 Resources.close(statement); 312 } 313 } 314 }; 315 316 try 317 { 318 this.execute(transaction); 319 } 320 catch (SQLException e) 321 { 322 logger.log(Level.ERROR, e, e.getMessage()); 323 } 324 } 325 326 /** 327 * {@inheritDoc} 328 * @see net.sf.hajdbc.state.SerializedDurabilityListener#afterInvocation(byte[], byte) 329 */ 330 @Override 331 public void afterInvocation(final byte[] transactionId, final byte phase) 332 { 333 Transaction transaction = new Transaction() 334 { 335 @Override 336 public void execute(Connection connection) throws SQLException 337 { 338 SQLStateManager.this.execute(connection, DELETE_INVOKER_SQL, transactionId, phase); 339 SQLStateManager.this.execute(connection, DELETE_INVOCATION_SQL, transactionId, phase); 340 } 341 }; 342 343 try 344 { 345 this.execute(transaction); 346 } 347 catch (SQLException e) 348 { 349 logger.log(Level.ERROR, e, e.getMessage()); 350 } 351 } 352 353 /** 354 * {@inheritDoc} 355 * @see net.sf.hajdbc.state.SerializedDurabilityListener#beforeInvoker(byte[], byte, java.lang.String) 356 */ 357 @Override 358 public void beforeInvoker(final byte[] transactionId, final byte phase, final String databaseId) 359 { 360 Transaction transaction = new Transaction() 361 { 362 @Override 363 public void execute(Connection connection) throws SQLException 364 { 365 PreparedStatement statement = connection.prepareStatement(INSERT_INVOKER_SQL); 366 367 try 368 { 369 statement.setBytes(1, transactionId); 370 statement.setByte(2, phase); 371 statement.setString(3, databaseId); 372 373 statement.executeUpdate(); 374 } 375 finally 376 { 377 Resources.close(statement); 378 } 379 } 380 }; 381 382 try 383 { 384 this.execute(transaction); 385 } 386 catch (SQLException e) 387 { 388 logger.log(Level.ERROR, e, e.getMessage()); 389 } 390 } 391 392 /** 393 * {@inheritDoc} 394 * @see net.sf.hajdbc.state.SerializedDurabilityListener#afterInvoker(byte[], byte, java.lang.String, byte[]) 395 */ 396 @Override 397 public void afterInvoker(final byte[] transactionId, final byte phase, final String databaseId, final byte[] result) 398 { 399 Transaction transaction = new Transaction() 400 { 401 @Override 402 public void execute(Connection connection) throws SQLException 403 { 404 PreparedStatement statement = connection.prepareStatement(UPDATE_INVOKER_SQL); 405 406 try 407 { 408 statement.setBytes(1, result); 409 statement.setBytes(2, transactionId); 410 statement.setByte(3, phase); 411 statement.setString(4, databaseId); 412 413 statement.executeUpdate(); 414 } 415 finally 416 { 417 Resources.close(statement); 418 } 419 } 420 }; 421 422 try 423 { 424 this.execute(transaction); 425 } 426 catch (SQLException e) 427 { 428 logger.log(Level.ERROR, e, e.getMessage()); 429 } 430 } 431 432 /** 433 * {@inheritDoc} 434 * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvocation(net.sf.hajdbc.durability.InvocationEvent) 435 */ 436 @Override 437 public void beforeInvocation(InvocationEvent event) 438 { 439 this.listener.beforeInvocation(event); 440 } 441 442 /** 443 * {@inheritDoc} 444 * @see net.sf.hajdbc.durability.DurabilityListener#afterInvocation(net.sf.hajdbc.durability.InvocationEvent) 445 */ 446 @Override 447 public void afterInvocation(InvocationEvent event) 448 { 449 this.listener.afterInvocation(event); 450 } 451 452 void execute(Connection connection, String sql, byte[] transactionId, byte phase) throws SQLException 453 { 454 PreparedStatement statement = connection.prepareStatement(sql); 455 456 try 457 { 458 statement.setBytes(1, transactionId); 459 statement.setByte(2, phase); 460 461 statement.executeUpdate(); 462 } 463 finally 464 { 465 Resources.close(statement); 466 } 467 } 468 469 /** 470 * {@inheritDoc} 471 * @see net.sf.hajdbc.state.StateManager#recover() 472 */ 473 @Override 474 public Map<InvocationEvent, Map<String, InvokerEvent>> recover() 475 { 476 final TransactionIdentifierFactory<?> txIdFactory = this.cluster.getTransactionIdentifierFactory(); 477 Query<Map<InvocationEvent, Map<String, InvokerEvent>>> query = new Query<Map<InvocationEvent, Map<String, InvokerEvent>>>() 478 { 479 @Override 480 public Map<InvocationEvent, Map<String, InvokerEvent>> execute(Connection connection) throws SQLException 481 { 482 Map<InvocationEvent, Map<String, InvokerEvent>> map = new HashMap<InvocationEvent, Map<String, InvokerEvent>>(); 483 484 PreparedStatement statement = connection.prepareStatement(SELECT_INVOCATION_SQL); 485 486 try 487 { 488 ResultSet resultSet = statement.executeQuery(); 489 while (resultSet.next()) 490 { 491 Object txId = txIdFactory.deserialize(resultSet.getBytes(1)); 492 Durability.Phase phase = Durability.Phase.values()[resultSet.getInt(2)]; 493 ExceptionType type = ExceptionType.values()[resultSet.getInt(3)]; 494 map.put(new InvocationEventImpl(txId, phase, type), new HashMap<String, InvokerEvent>()); 495 } 496 } 497 finally 498 { 499 Resources.close(statement); 500 } 501 502 statement = connection.prepareStatement(SELECT_INVOKER_SQL); 503 504 try 505 { 506 ResultSet resultSet = statement.executeQuery(); 507 508 while (resultSet.next()) 509 { 510 Object txId = txIdFactory.deserialize(resultSet.getBytes(1)); 511 Durability.Phase phase = Durability.Phase.values()[resultSet.getByte(2)]; 512 513 Map<String, InvokerEvent> invokers = map.get(new InvocationEventImpl(txId, phase, null)); 514 515 if (invokers != null) 516 { 517 String databaseId = resultSet.getString(3); 518 519 InvokerEvent event = new InvokerEventImpl(txId, phase, databaseId); 520 521 byte[] bytes = resultSet.getBytes(4); 522 523 if (!resultSet.wasNull()) 524 { 525 event.setResult(Objects.<InvokerResult>deserialize(bytes)); 526 } 527 528 invokers.put(databaseId, event); 529 } 530 } 531 } 532 finally 533 { 534 Resources.close(statement); 535 } 536 537 return map; 538 } 539 }; 540 541 try 542 { 543 return this.execute(query); 544 } 545 catch (SQLException e) 546 { 547 throw new IllegalStateException(e); 548 } 549 } 550 551 /** 552 * {@inheritDoc} 553 * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvoker(net.sf.hajdbc.durability.InvokerEvent) 554 */ 555 @Override 556 public void beforeInvoker(InvokerEvent event) 557 { 558 this.listener.beforeInvoker(event); 559 } 560 561 /** 562 * {@inheritDoc} 563 * @see net.sf.hajdbc.durability.DurabilityListener#afterInvoker(net.sf.hajdbc.durability.InvokerEvent) 564 */ 565 @Override 566 public void afterInvoker(InvokerEvent event) 567 { 568 this.listener.afterInvoker(event); 569 } 570 571 @Override 572 public boolean isEnabled() 573 { 574 return true; 575 } 576 577 /** 578 * {@inheritDoc} 579 * @see net.sf.hajdbc.Lifecycle#start() 580 */ 581 @Override 582 public void start() throws Exception 583 { 584 this.driver = this.database.getConnectionSource(); 585 this.password = this.database.decodePassword(this.cluster.getDecoder()); 586 this.pool = this.poolFactory.createPool(new ConnectionPoolProvider(this)); 587 588 DialectFactory factory = ServiceLoaders.findService(new IdentifiableMatcher<DialectFactory>(this.database.parseVendor()), DialectFactory.class); 589 if (factory == null) 590 { 591 // Use default dialect 592 factory = ServiceLoaders.findRequiredService(DialectFactory.class); 593 } 594 595 Dialect dialect = factory.createDialect(); 596 597 Connection connection = this.pool.take(); 598 599 try 600 { 601 connection.setAutoCommit(true); 602 603 DatabaseProperties properties = new LazyDatabaseProperties(new SimpleDatabaseMetaDataProvider(connection.getMetaData()), dialect); 604 605 String enumType = properties.findType(0, Types.TINYINT, Types.SMALLINT, Types.INTEGER); 606 String stringType = properties.findType(Database.ID_MAX_SIZE, Types.VARCHAR); 607 String binaryType = properties.findType(this.cluster.getTransactionIdentifierFactory().size(), Types.BINARY); 608 String varBinaryType = properties.findType(0, Types.VARBINARY); 609 610 Statement statement = connection.createStatement(); 611 612 try 613 { 614 createTableIfNotExists(statement, properties, STATE_TABLE, CREATE_STATE_SQL, stringType); 615 createTableIfNotExists(statement, properties, INVOCATION_TABLE, CREATE_INVOCATION_SQL, binaryType, enumType, enumType); 616 createTableIfNotExists(statement, properties, INVOKER_TABLE, CREATE_INVOKER_SQL, binaryType, enumType, stringType, varBinaryType); 617 618 if (Boolean.getBoolean(StateManager.CLEAR_LOCAL_STATE)) 619 { 620 statement.executeUpdate(TRUNCATE_STATE_SQL); 621 } 622 } 623 finally 624 { 625 Resources.close(statement); 626 } 627 } 628 finally 629 { 630 this.pool.release(connection); 631 } 632 } 633 634 private static void createTableIfNotExists(Statement statement, DatabaseProperties properties, String table, String pattern, String... types) throws SQLException 635 { 636 if (properties.findTable(table) == null) 637 { 638 String sql = MessageFormat.format(pattern, (Object[]) types); 639 logger.log(Level.DEBUG, sql); 640 statement.executeUpdate(sql); 641 } 642 } 643 644 /** 645 * {@inheritDoc} 646 * @see net.sf.hajdbc.Lifecycle#stop() 647 */ 648 @Override 649 public void stop() 650 { 651 if (this.pool != null) 652 { 653 this.pool.close(); 654 } 655 } 656 657 /** 658 * {@inheritDoc} 659 * @see net.sf.hajdbc.pool.sql.ConnectionFactory#getConnection() 660 */ 661 @Override 662 public Connection getConnection() throws SQLException 663 { 664 Connection connection = this.database.connect(this.driver, this.cluster.getDecoder().decode(this.password)); 665 666 connection.setAutoCommit(false); 667 668 return connection; 669 } 670 671 private <T> T execute(Query<T> query) throws SQLException 672 { 673 Connection connection = this.pool.take(); 674 675 try 676 { 677 return query.execute(connection); 678 } 679 finally 680 { 681 this.pool.release(connection); 682 } 683 } 684 685 interface Query<T> 686 { 687 T execute(Connection connection) throws SQLException; 688 } 689 690 private void execute(Transaction transaction) throws SQLException 691 { 692 Connection connection = this.pool.take(); 693 694 try 695 { 696 transaction.execute(connection); 697 698 connection.commit(); 699 } 700 catch (SQLException e) 701 { 702 try 703 { 704 connection.rollback(); 705 } 706 catch (SQLException ex) 707 { 708 logger.log(Level.WARN, ex); 709 } 710 throw e; 711 } 712 finally 713 { 714 this.pool.release(connection); 715 } 716 } 717 718 interface Transaction 719 { 720 void execute(Connection connection) throws SQLException; 721 } 722}