001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.state.distributed; 019 020import java.io.IOException; 021import java.io.ObjectInput; 022import java.io.ObjectOutput; 023import java.io.Serializable; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.TreeSet; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import net.sf.hajdbc.Database; 032import net.sf.hajdbc.DatabaseCluster; 033import net.sf.hajdbc.Messages; 034import net.sf.hajdbc.distributed.CommandDispatcher; 035import net.sf.hajdbc.distributed.CommandDispatcherFactory; 036import net.sf.hajdbc.distributed.Member; 037import net.sf.hajdbc.distributed.MembershipListener; 038import net.sf.hajdbc.distributed.Remote; 039import net.sf.hajdbc.distributed.Stateful; 040import net.sf.hajdbc.durability.InvocationEvent; 041import net.sf.hajdbc.durability.InvokerEvent; 042import net.sf.hajdbc.logging.Level; 043import net.sf.hajdbc.logging.Logger; 044import net.sf.hajdbc.logging.LoggerFactory; 045import net.sf.hajdbc.state.DatabaseEvent; 046import net.sf.hajdbc.state.StateManager; 047 048/** 049 * @author Paul Ferraro 050 */ 051public class DistributedStateManager<Z, D extends Database<Z>> implements StateManager, StateCommandContext<Z, D>, MembershipListener, Stateful 052{ 053 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 054 private final DatabaseCluster<Z, D> cluster; 055 private final StateManager stateManager; 056 private final CommandDispatcher<StateCommandContext<Z, D>> dispatcher; 057 private final ConcurrentMap<Member, Map<InvocationEvent, Map<String, InvokerEvent>>> remoteInvokerMap = new ConcurrentHashMap<Member, Map<InvocationEvent, Map<String, InvokerEvent>>>(); 058 059 public DistributedStateManager(DatabaseCluster<Z, D> cluster, CommandDispatcherFactory dispatcherFactory) throws Exception 060 { 061 this.cluster = cluster; 062 this.stateManager = cluster.getStateManager(); 063 StateCommandContext<Z, D> context = this; 064 this.dispatcher = dispatcherFactory.createCommandDispatcher(cluster.getId() + ".state", context, this, this); 065 } 066 067 /** 068 * {@inheritDoc} 069 * @see net.sf.hajdbc.state.StateManager#getActiveDatabases() 070 */ 071 @Override 072 public Set<String> getActiveDatabases() 073 { 074 return this.stateManager.getActiveDatabases(); 075 } 076 077 /** 078 * {@inheritDoc} 079 * @see net.sf.hajdbc.state.StateManager#setActiveDatabases(java.util.Set) 080 */ 081 @Override 082 public void setActiveDatabases(Set<String> databases) 083 { 084 this.stateManager.setActiveDatabases(databases); 085 } 086 087 /** 088 * {@inheritDoc} 089 * @see net.sf.hajdbc.DatabaseClusterListener#activated(net.sf.hajdbc.state.DatabaseEvent) 090 */ 091 @Override 092 public void activated(DatabaseEvent event) 093 { 094 this.stateManager.activated(event); 095 this.dispatcher.executeAll(new ActivationCommand<Z, D>(event), this.dispatcher.getLocal()); 096 } 097 098 /** 099 * {@inheritDoc} 100 * @see net.sf.hajdbc.DatabaseClusterListener#deactivated(net.sf.hajdbc.state.DatabaseEvent) 101 */ 102 @Override 103 public void deactivated(DatabaseEvent event) 104 { 105 this.stateManager.deactivated(event); 106 this.dispatcher.executeAll(new DeactivationCommand<Z, D>(event), this.dispatcher.getLocal()); 107 } 108 109 /** 110 * {@inheritDoc} 111 * @see net.sf.hajdbc.durability.DurabilityListener#afterInvocation(net.sf.hajdbc.durability.InvocationEvent) 112 */ 113 @Override 114 public void afterInvocation(InvocationEvent event) 115 { 116 this.dispatcher.executeAll(new PostInvocationCommand<Z, D>(this.getRemoteDescriptor(event))); 117 } 118 119 /** 120 * {@inheritDoc} 121 * @see net.sf.hajdbc.durability.DurabilityListener#afterInvoker(net.sf.hajdbc.durability.InvokerEvent) 122 */ 123 @Override 124 public void afterInvoker(InvokerEvent event) 125 { 126 this.dispatcher.executeAll(new InvokerCommand<Z, D>(this.getRemoteDescriptor(event))); 127 } 128 129 /** 130 * {@inheritDoc} 131 * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvocation(net.sf.hajdbc.durability.InvocationEvent) 132 */ 133 @Override 134 public void beforeInvocation(InvocationEvent event) 135 { 136 this.dispatcher.executeAll(new PreInvocationCommand<Z, D>(this.getRemoteDescriptor(event))); 137 } 138 139 /** 140 * {@inheritDoc} 141 * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvoker(net.sf.hajdbc.durability.InvokerEvent) 142 */ 143 @Override 144 public void beforeInvoker(InvokerEvent event) 145 { 146 this.dispatcher.executeAll(new InvokerCommand<Z, D>(this.getRemoteDescriptor(event))); 147 } 148 149 private RemoteInvocationDescriptor getRemoteDescriptor(InvocationEvent event) 150 { 151 return new RemoteInvocationDescriptorImpl(event, this.dispatcher.getLocal()); 152 } 153 154 private RemoteInvokerDescriptor getRemoteDescriptor(InvokerEvent event) 155 { 156 return new RemoteInvokerDescriptorImpl(event, this.dispatcher.getLocal()); 157 } 158 159 /** 160 * {@inheritDoc} 161 * @see net.sf.hajdbc.Lifecycle#start() 162 */ 163 @Override 164 public void start() throws Exception 165 { 166 this.stateManager.start(); 167 this.dispatcher.start(); 168 } 169 170 /** 171 * {@inheritDoc} 172 * @see net.sf.hajdbc.Lifecycle#stop() 173 */ 174 @Override 175 public void stop() 176 { 177 this.dispatcher.stop(); 178 this.stateManager.stop(); 179 } 180 181 @Override 182 public boolean isEnabled() 183 { 184 return this.stateManager.isEnabled() && this.dispatcher.getLocal().equals(this.dispatcher.getCoordinator()); 185 } 186 187 /** 188 * {@inheritDoc} 189 * @see net.sf.hajdbc.state.distributed.StateCommandContext#getDatabaseCluster() 190 */ 191 @Override 192 public DatabaseCluster<Z, D> getDatabaseCluster() 193 { 194 return this.cluster; 195 } 196 197 /** 198 * {@inheritDoc} 199 * @see net.sf.hajdbc.state.distributed.StateCommandContext#getLocalStateManager() 200 */ 201 @Override 202 public StateManager getLocalStateManager() 203 { 204 return this.stateManager; 205 } 206 207 /** 208 * {@inheritDoc} 209 * @see net.sf.hajdbc.state.distributed.StateCommandContext#getRemoteInvokers(net.sf.hajdbc.distributed.Remote) 210 */ 211 @Override 212 public Map<InvocationEvent, Map<String, InvokerEvent>> getRemoteInvokers(Remote remote) 213 { 214 return this.remoteInvokerMap.get(remote.getMember()); 215 } 216 217 /** 218 * {@inheritDoc} 219 * @see net.sf.hajdbc.distributed.Stateful#readState(java.io.ObjectInput) 220 */ 221 @Override 222 public void readState(ObjectInput input) throws IOException 223 { 224 if (input.available() > 0) 225 { 226 Set<String> databases = new TreeSet<String>(); 227 228 int size = input.readInt(); 229 230 for (int i = 0; i < size; ++i) 231 { 232 databases.add(input.readUTF()); 233 } 234 235 this.logger.log(Level.INFO, Messages.INITIAL_CLUSTER_STATE_REMOTE.getMessage(databases, this.dispatcher.getCoordinator())); 236 237 this.stateManager.setActiveDatabases(databases); 238 } 239 } 240 241 /** 242 * {@inheritDoc} 243 * @see net.sf.hajdbc.distributed.Stateful#writeState(java.io.ObjectOutput) 244 */ 245 @Override 246 public void writeState(ObjectOutput output) throws IOException 247 { 248 Set<D> databases = this.cluster.getBalancer(); 249 output.writeInt(databases.size()); 250 251 for (D database: databases) 252 { 253 output.writeUTF(database.getId()); 254 } 255 } 256 257 /** 258 * {@inheritDoc} 259 * @see net.sf.hajdbc.distributed.MembershipListener#added(net.sf.hajdbc.distributed.Member) 260 */ 261 @Override 262 public void added(Member member) 263 { 264 this.remoteInvokerMap.putIfAbsent(member, new HashMap<InvocationEvent, Map<String, InvokerEvent>>()); 265 } 266 267 /** 268 * {@inheritDoc} 269 * @see net.sf.hajdbc.distributed.MembershipListener#removed(net.sf.hajdbc.distributed.Member) 270 */ 271 @Override 272 public void removed(Member member) 273 { 274 if (this.dispatcher.getLocal().equals(this.dispatcher.getCoordinator())) 275 { 276 Map<InvocationEvent, Map<String, InvokerEvent>> invokers = this.remoteInvokerMap.remove(member); 277 278 if (invokers != null) 279 { 280 this.cluster.getDurability().recover(invokers); 281 } 282 } 283 } 284 285 /** 286 * {@inheritDoc} 287 * @see net.sf.hajdbc.state.StateManager#recover() 288 */ 289 @Override 290 public Map<InvocationEvent, Map<String, InvokerEvent>> recover() 291 { 292 return this.stateManager.recover(); 293 } 294 295 private static class RemoteDescriptor implements Remote, Serializable 296 { 297 private static final long serialVersionUID = 3717630867671175936L; 298 299 private final Member member; 300 301 RemoteDescriptor(Member member) 302 { 303 this.member = member; 304 } 305 306 @Override 307 public Member getMember() 308 { 309 return this.member; 310 } 311 } 312 313 private static class RemoteInvocationDescriptorImpl extends RemoteDescriptor implements RemoteInvocationDescriptor 314 { 315 private static final long serialVersionUID = 7782082258670023082L; 316 317 private final InvocationEvent event; 318 319 RemoteInvocationDescriptorImpl(InvocationEvent event, Member member) 320 { 321 super(member); 322 323 this.event = event; 324 } 325 326 @Override 327 public InvocationEvent getEvent() 328 { 329 return this.event; 330 } 331 332 /** 333 * {@inheritDoc} 334 * @see java.lang.Object#toString() 335 */ 336 @Override 337 public String toString() 338 { 339 return this.event.toString(); 340 } 341 } 342 343 private static class RemoteInvokerDescriptorImpl extends RemoteDescriptor implements RemoteInvokerDescriptor 344 { 345 private static final long serialVersionUID = 6991831573393882786L; 346 347 private final InvokerEvent event; 348 349 RemoteInvokerDescriptorImpl(InvokerEvent event, Member member) 350 { 351 super(member); 352 353 this.event = event; 354 } 355 356 @Override 357 public InvokerEvent getEvent() 358 { 359 return this.event; 360 } 361 362 /** 363 * {@inheritDoc} 364 * @see java.lang.Object#toString() 365 */ 366 @Override 367 public String toString() 368 { 369 return this.event.toString(); 370 } 371 } 372}