001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.lock.distributed; 019 020import java.io.IOException; 021import java.io.ObjectInput; 022import java.io.ObjectInputStream; 023import java.io.ObjectOutput; 024import java.io.ObjectOutputStream; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.locks.Condition; 032import java.util.concurrent.locks.Lock; 033 034import net.sf.hajdbc.Database; 035import net.sf.hajdbc.DatabaseCluster; 036import net.sf.hajdbc.distributed.CommandDispatcher; 037import net.sf.hajdbc.distributed.CommandDispatcherFactory; 038import net.sf.hajdbc.distributed.Member; 039import net.sf.hajdbc.distributed.MembershipListener; 040import net.sf.hajdbc.distributed.Remote; 041import net.sf.hajdbc.distributed.Stateful; 042import net.sf.hajdbc.lock.LockManager; 043import net.sf.hajdbc.util.Objects; 044 045/** 046 * @author Paul Ferraro 047 */ 048public class DistributedLockManager implements LockManager, LockCommandContext, Stateful, MembershipListener 049{ 050 final CommandDispatcher<LockCommandContext> dispatcher; 051 052 private final LockManager lockManager; 053 private final ConcurrentMap<Member, Map<LockDescriptor, Lock>> remoteLockDescriptorMap = new ConcurrentHashMap<Member, Map<LockDescriptor, Lock>>(); 054 055 public <Z, D extends Database<Z>> DistributedLockManager(DatabaseCluster<Z, D> cluster, CommandDispatcherFactory dispatcherFactory) throws Exception 056 { 057 this.lockManager = cluster.getLockManager(); 058 LockCommandContext context = this; 059 this.dispatcher = dispatcherFactory.createCommandDispatcher(cluster.getId() + ".lock", context, this, this); 060 } 061 062 /** 063 * {@inheritDoc} 064 * @see net.sf.hajdbc.lock.LockManager#readLock(java.lang.String) 065 */ 066 @Override 067 public Lock readLock(String id) 068 { 069 return this.lockManager.readLock(id); 070 } 071 072 /** 073 * {@inheritDoc} 074 * @see net.sf.hajdbc.lock.LockManager#writeLock(java.lang.String) 075 */ 076 @Override 077 public Lock writeLock(String id) 078 { 079 return this.getDistibutedLock(new RemoteLockDescriptorImpl(id, LockType.WRITE, this.dispatcher.getLocal())); 080 } 081 082 /** 083 * {@inheritDoc} 084 * @see net.sf.hajdbc.lock.distributed.LockCommandContext#getLock(net.sf.hajdbc.lock.distributed.LockDescriptor) 085 */ 086 @Override 087 public Lock getLock(LockDescriptor lock) 088 { 089 String id = lock.getId(); 090 091 switch (lock.getType()) 092 { 093 case READ: 094 { 095 return this.lockManager.readLock(id); 096 } 097 case WRITE: 098 { 099 return this.lockManager.writeLock(id); 100 } 101 default: 102 { 103 throw new IllegalStateException(); 104 } 105 } 106 } 107 108 /** 109 * {@inheritDoc} 110 * @see net.sf.hajdbc.lock.distributed.LockCommandContext#getDistibutedLock(net.sf.hajdbc.lock.distributed.RemoteLockDescriptor) 111 */ 112 @Override 113 public Lock getDistibutedLock(RemoteLockDescriptor descriptor) 114 { 115 return new DistributedLock(descriptor, this.getLock(descriptor), this.dispatcher); 116 } 117 118 /** 119 * {@inheritDoc} 120 * @see net.sf.hajdbc.Lifecycle#start() 121 */ 122 @Override 123 public void start() throws Exception 124 { 125 this.lockManager.start(); 126 this.dispatcher.start(); 127 } 128 129 /** 130 * {@inheritDoc} 131 * @see net.sf.hajdbc.Lifecycle#stop() 132 */ 133 @Override 134 public void stop() 135 { 136 this.dispatcher.stop(); 137 this.lockManager.stop(); 138 } 139 140 /** 141 * {@inheritDoc} 142 * @see net.sf.hajdbc.lock.distributed.LockCommandContext#getRemoteLocks(net.sf.hajdbc.distributed.Remote) 143 */ 144 @Override 145 public Map<LockDescriptor, Lock> getRemoteLocks(Remote remote) 146 { 147 return this.remoteLockDescriptorMap.get(remote.getMember()); 148 } 149 150 /** 151 * {@inheritDoc} 152 * @see net.sf.hajdbc.distributed.Stateful#writeState(java.io.ObjectOutput) 153 */ 154 @Override 155 public void writeState(ObjectOutput output) throws IOException 156 { 157 output.writeInt(this.remoteLockDescriptorMap.size()); 158 159 for (Map.Entry<Member, Map<LockDescriptor, Lock>> entry: this.remoteLockDescriptorMap.entrySet()) 160 { 161 output.writeObject(entry.getKey()); 162 163 Set<LockDescriptor> descriptors = entry.getValue().keySet(); 164 165 output.writeInt(descriptors.size()); 166 167 for (LockDescriptor descriptor: descriptors) 168 { 169 output.writeUTF(descriptor.getId()); 170 output.writeByte(descriptor.getType().ordinal()); 171 } 172 } 173 } 174 175 /** 176 * {@inheritDoc} 177 * @see net.sf.hajdbc.distributed.Stateful#readState(java.io.ObjectInput) 178 */ 179 @Override 180 public void readState(ObjectInput input) throws IOException 181 { 182 // Is this valid? or should we unlock/clear? 183 //assert this.remoteLockDescriptorMap.isEmpty(); 184 185 int size = input.readInt(); 186 187 LockType[] types = LockType.values(); 188 189 for (int i = 0; i < size; ++i) 190 { 191 Member member = Objects.readObject(input); 192 193 Map<LockDescriptor, Lock> map = new HashMap<LockDescriptor, Lock>(); 194 195 int locks = input.readInt(); 196 197 for (int j = 0; j < locks; ++j) 198 { 199 String id = input.readUTF(); 200 LockType type = types[input.readByte()]; 201 202 LockDescriptor descriptor = new RemoteLockDescriptorImpl(id, type, member); 203 204 Lock lock = this.getLock(descriptor); 205 206 lock.lock(); 207 208 map.put(descriptor, lock); 209 } 210 211 this.remoteLockDescriptorMap.put(member, map); 212 } 213 } 214 215 /** 216 * {@inheritDoc} 217 * @see net.sf.hajdbc.distributed.MembershipListener#added(net.sf.hajdbc.distributed.Member) 218 */ 219 @Override 220 public void added(Member member) 221 { 222 this.remoteLockDescriptorMap.putIfAbsent(member, new HashMap<LockDescriptor, Lock>()); 223 } 224 225 /** 226 * {@inheritDoc} 227 * @see net.sf.hajdbc.distributed.MembershipListener#removed(net.sf.hajdbc.distributed.Member) 228 */ 229 @Override 230 public void removed(Member member) 231 { 232 Map<LockDescriptor, Lock> locks = this.remoteLockDescriptorMap.remove(member); 233 234 if (locks != null) 235 { 236 for (Lock lock: locks.values()) 237 { 238 lock.unlock(); 239 } 240 } 241 } 242 243 private static class DistributedLock implements Lock 244 { 245 private final RemoteLockDescriptor descriptor; 246 private final Lock lock; 247 private final CommandDispatcher<LockCommandContext> dispatcher; 248 249 DistributedLock(RemoteLockDescriptor descriptor, Lock lock, CommandDispatcher<LockCommandContext> dispatcher) 250 { 251 this.descriptor = descriptor; 252 this.lock = lock; 253 this.dispatcher = dispatcher; 254 } 255 256 @Override 257 public void lock() 258 { 259 boolean locked = false; 260 261 while (!locked) 262 { 263 Member coordinator = this.dispatcher.getCoordinator(); 264 265 if (this.dispatcher.getLocal().equals(coordinator)) 266 { 267 this.lock.lock(); 268 269 try 270 { 271 locked = this.lockMembers(coordinator); 272 } 273 finally 274 { 275 if (!locked) 276 { 277 this.lock.unlock(); 278 } 279 } 280 } 281 else 282 { 283 locked = this.lockCoordinator(coordinator, Long.MAX_VALUE); 284 } 285 286 if (!locked) 287 { 288 Thread.yield(); 289 } 290 } 291 } 292 293 @Override 294 public void lockInterruptibly() throws InterruptedException 295 { 296 boolean locked = false; 297 298 while (!locked) 299 { 300 Member coordinator = this.dispatcher.getCoordinator(); 301 302 if (this.dispatcher.getLocal().equals(coordinator)) 303 { 304 this.lock.lockInterruptibly(); 305 306 try 307 { 308 locked = this.lockMembers(coordinator); 309 } 310 finally 311 { 312 if (!locked) 313 { 314 this.lock.unlock(); 315 } 316 } 317 } 318 else 319 { 320 locked = this.lockCoordinator(coordinator, Long.MAX_VALUE); 321 } 322 323 if (Thread.currentThread().isInterrupted()) 324 { 325 throw new InterruptedException(); 326 } 327 328 if (!locked) 329 { 330 Thread.yield(); 331 } 332 } 333 } 334 335 @Override 336 public boolean tryLock() 337 { 338 boolean locked = false; 339 340 Member coordinator = this.dispatcher.getCoordinator(); 341 342 if (this.dispatcher.getLocal().equals(coordinator)) 343 { 344 if (this.lock.tryLock()) 345 { 346 try 347 { 348 locked = this.lockMembers(coordinator); 349 } 350 finally 351 { 352 if (!locked) 353 { 354 this.lock.unlock(); 355 } 356 } 357 } 358 } 359 else 360 { 361 locked = this.lockCoordinator(coordinator, 0); 362 } 363 364 return locked; 365 } 366 367 @Override 368 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 369 { 370 boolean locked = false; 371 372 Member coordinator = this.dispatcher.getCoordinator(); 373 374 if (this.dispatcher.getLocal().equals(coordinator)) 375 { 376 if (this.lock.tryLock(time, unit)) 377 { 378 try 379 { 380 locked = this.lockMembers(coordinator); 381 } 382 finally 383 { 384 if (!locked) 385 { 386 this.lock.unlock(); 387 } 388 } 389 } 390 } 391 else 392 { 393 locked = this.lockCoordinator(coordinator, unit.toMillis(time)); 394 } 395 396 return locked; 397 } 398 399 private boolean lockMembers(Member coordinator) 400 { 401 boolean locked = true; 402 403 Map<Member, Boolean> results = this.dispatcher.executeAll(new MemberAcquireLockCommand(this.descriptor), coordinator); 404 405 for (Map.Entry<Member, Boolean> entry: results.entrySet()) 406 { 407 locked &= entry.getValue(); 408 } 409 410 if (!locked) 411 { 412 this.unlockMembers(coordinator); 413 } 414 415 return locked; 416 } 417 418 private boolean lockCoordinator(Member coordinator, long timeout) 419 { 420 Boolean result = this.dispatcher.execute(new CoordinatorAcquireLockCommand(this.descriptor, timeout), coordinator); 421 return (result != null) ? result.booleanValue() : false; 422 } 423 424 @Override 425 public void unlock() 426 { 427 Member coordinator = this.dispatcher.getCoordinator(); 428 429 if (this.dispatcher.getLocal().equals(coordinator)) 430 { 431 this.unlockMembers(coordinator); 432 433 this.lock.unlock(); 434 } 435 else 436 { 437 this.unlockCoordinator(coordinator); 438 } 439 } 440 441 private void unlockMembers(Member coordinator) 442 { 443 this.dispatcher.executeAll(new MemberReleaseLockCommand(this.descriptor), coordinator); 444 } 445 446 private void unlockCoordinator(Member coordinator) 447 { 448 this.dispatcher.execute(new CoordinatorReleaseLockCommand(this.descriptor), coordinator); 449 } 450 451 @Override 452 public Condition newCondition() 453 { 454 throw new UnsupportedOperationException(); 455 } 456 } 457 458 private static class RemoteLockDescriptorImpl implements RemoteLockDescriptor 459 { 460 private static final long serialVersionUID = 1950781245453120790L; 461 462 private final String id; 463 private transient LockType type; 464 private final Member member; 465 466 RemoteLockDescriptorImpl(String id, LockType type, Member member) 467 { 468 this.id = id; 469 this.type = type; 470 this.member = member; 471 } 472 473 @Override 474 public String getId() 475 { 476 return this.id; 477 } 478 479 @Override 480 public LockType getType() 481 { 482 return this.type; 483 } 484 485 @Override 486 public Member getMember() 487 { 488 return this.member; 489 } 490 491 private void writeObject(ObjectOutputStream out) throws IOException 492 { 493 out.defaultWriteObject(); 494 495 out.writeByte(this.type.ordinal()); 496 } 497 498 private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException 499 { 500 in.defaultReadObject(); 501 502 this.type = LockType.values()[in.readByte()]; 503 } 504 505 /** 506 * {@inheritDoc} 507 * @see java.lang.Object#equals(java.lang.Object) 508 */ 509 @Override 510 public boolean equals(Object object) 511 { 512 if ((object == null) || !(object instanceof RemoteLockDescriptor)) return false; 513 514 String id = ((RemoteLockDescriptor) object).getId(); 515 516 return ((this.id != null) && (id != null)) ? this.id.equals(id) : (this.id == id); 517 } 518 519 /** 520 * {@inheritDoc} 521 * @see java.lang.Object#hashCode() 522 */ 523 @Override 524 public int hashCode() 525 { 526 return this.id != null ? this.id.hashCode() : 0; 527 } 528 529 /** 530 * {@inheritDoc} 531 * @see java.lang.Object#toString() 532 */ 533 @Override 534 public String toString() 535 { 536 return String.format("%sLock(%s)", this.type.name().toLowerCase(), (this.id != null) ? this.id : ""); 537 } 538 } 539}