001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (C) 2012 Paul Ferraro 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Lesser General Public License as published by 007 * the Free Software Foundation, either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Lesser General Public License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018package net.sf.hajdbc.distributed.jgroups; 019 020import java.io.InputStream; 021import java.io.ObjectInputStream; 022import java.io.ObjectOutput; 023import java.io.ObjectOutputStream; 024import java.io.OutputStream; 025import java.util.Collections; 026import java.util.Map; 027import java.util.TreeMap; 028import java.util.concurrent.atomic.AtomicReference; 029 030import net.sf.hajdbc.Messages; 031import net.sf.hajdbc.distributed.Command; 032import net.sf.hajdbc.distributed.CommandDispatcher; 033import net.sf.hajdbc.distributed.Member; 034import net.sf.hajdbc.distributed.MembershipListener; 035import net.sf.hajdbc.distributed.Stateful; 036import net.sf.hajdbc.logging.Level; 037import net.sf.hajdbc.logging.Logger; 038import net.sf.hajdbc.logging.LoggerFactory; 039import net.sf.hajdbc.util.Objects; 040 041import org.jgroups.Address; 042import org.jgroups.Channel; 043import org.jgroups.Message; 044import org.jgroups.MessageListener; 045import org.jgroups.View; 046import org.jgroups.blocks.MessageDispatcher; 047import org.jgroups.blocks.RequestHandler; 048import org.jgroups.blocks.RequestOptions; 049import org.jgroups.blocks.ResponseMode; 050import org.jgroups.util.Rsp; 051 052/** 053 * A JGroups-based command dispatcher. 054 * 055 * @author Paul Ferraro 056 * @see org.jgroups.blocks.MessageDispatcher 057 * @param <C> the execution context type 058 */ 059public class JGroupsCommandDispatcher<C> implements RequestHandler, CommandDispatcher<C>, org.jgroups.MembershipListener, MessageListener 060{ 061 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 062 063 private final String id; 064 private final long timeout; 065 private final MessageDispatcher dispatcher; 066 private final C context; 067 private final AtomicReference<View> viewReference = new AtomicReference<View>(); 068 private final MembershipListener membershipListener; 069 private final Stateful stateful; 070 071 /** 072 * Constructs a new ChannelCommandDispatcher. 073 * @param id the channel name 074 * @param channel a JGroups channel 075 * @param timeout the command timeout 076 * @param context the execution context 077 * @param stateful the state transfer handler 078 * @param membershipListener notified of membership changes 079 * @throws Exception if channel cannot be created 080 */ 081 public JGroupsCommandDispatcher(String id, Channel channel, long timeout, C context, Stateful stateful, MembershipListener membershipListener) throws Exception 082 { 083 this.id = id; 084 this.context = context; 085 this.stateful = stateful; 086 this.membershipListener = membershipListener; 087 088 this.dispatcher = new MessageDispatcher(channel, this, this, this); 089 this.timeout = timeout; 090 } 091 092 /** 093 * {@inheritDoc} 094 * @see net.sf.hajdbc.Lifecycle#start() 095 */ 096 @Override 097 public void start() throws Exception 098 { 099 Channel channel = this.dispatcher.getChannel(); 100 101 channel.setDiscardOwnMessages(false); 102 103 // Connect and fetch state 104 channel.connect(this.id, null, 0); 105 } 106 107 /** 108 * {@inheritDoc} 109 * @see net.sf.hajdbc.Lifecycle#stop() 110 */ 111 @Override 112 public void stop() 113 { 114 Channel channel = this.dispatcher.getChannel(); 115 116 if (channel.isOpen()) 117 { 118 if (channel.isConnected()) 119 { 120 channel.disconnect(); 121 } 122 123 channel.close(); 124 } 125 } 126 127 @Override 128 public <R> Map<Member, R> executeAll(Command<R, C> command, Member... excludedMembers) 129 { 130 Message message = new Message(null, this.getLocalAddress(), Objects.serialize(command)); 131 RequestOptions options = new RequestOptions(ResponseMode.GET_ALL, this.timeout); 132 133 if ((excludedMembers != null) && (excludedMembers.length > 0)) 134 { 135 Address[] exclusions = new Address[excludedMembers.length]; 136 for (int i = 0; i < excludedMembers.length; ++i) 137 { 138 exclusions[i] = ((AddressMember) excludedMembers[i]).getAddress(); 139 } 140 options.setExclusionList(exclusions); 141 } 142 143 try 144 { 145 Map<Address, Rsp<R>> responses = this.dispatcher.castMessage(null, message, options); 146 147 if (responses == null) return Collections.emptyMap(); 148 149 Map<Member, R> results = new TreeMap<Member, R>(); 150 151 for (Map.Entry<Address, Rsp<R>> entry: responses.entrySet()) 152 { 153 Rsp<R> response = entry.getValue(); 154 155 results.put(new AddressMember(entry.getKey()), response.wasReceived() ? response.getValue() : null); 156 } 157 158 return results; 159 } 160 catch (Exception e) 161 { 162 return null; 163 } 164 } 165 166 /** 167 * {@inheritDoc} 168 * @see net.sf.hajdbc.distributed.CommandDispatcher#executeCoordinator(net.sf.hajdbc.distributed.Command) 169 */ 170 @Override 171 public <R> R execute(Command<R, C> command, Member member) 172 { 173 Message message = new Message(((AddressMember) member).getAddress(), this.getLocalAddress(), Objects.serialize(command)); 174 175 try 176 { 177 return this.dispatcher.sendMessage(message, new RequestOptions(ResponseMode.GET_ALL, this.timeout)); 178 } 179 catch (Exception e) 180 { 181 this.logger.log(Level.WARN, e); 182 return null; 183 } 184 } 185 186 /** 187 * {@inheritDoc} 188 * @see net.sf.hajdbc.distributed.CommandDispatcher#getLocal() 189 */ 190 @Override 191 public AddressMember getLocal() 192 { 193 return new AddressMember(this.getLocalAddress()); 194 } 195 196 private Address getLocalAddress() 197 { 198 return this.dispatcher.getChannel().getAddress(); 199 } 200 201 /** 202 * {@inheritDoc} 203 * @see net.sf.hajdbc.distributed.CommandDispatcher#getCoordinator() 204 */ 205 @Override 206 public AddressMember getCoordinator() 207 { 208 return new AddressMember(this.getCoordinatorAddress()); 209 } 210 211 private Address getCoordinatorAddress() 212 { 213 return this.dispatcher.getChannel().getView().getMembers().get(0); 214 } 215 216 /** 217 * {@inheritDoc} 218 * @see org.jgroups.blocks.RequestHandler#handle(org.jgroups.Message) 219 */ 220 @Override 221 public Object handle(Message message) 222 { 223 Command<Object, C> command = Objects.deserialize(message.getRawBuffer()); 224 225 this.logger.log(Level.DEBUG, Messages.COMMAND_RECEIVED.getMessage(command, message.getSrc())); 226 227 return command.execute(this.context); 228 } 229 230 /** 231 * {@inheritDoc} 232 * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View) 233 */ 234 @Override 235 public void viewAccepted(View view) 236 { 237 if (this.membershipListener != null) 238 { 239 View oldView = this.viewReference.getAndSet(view); 240 241 for (Address address: view.getMembers()) 242 { 243 if ((oldView == null) || !oldView.containsMember(address)) 244 { 245 this.membershipListener.added(new AddressMember(address)); 246 } 247 } 248 249 if (oldView != null) 250 { 251 for (Address address: oldView.getMembers()) 252 { 253 if (!view.containsMember(address)) 254 { 255 this.membershipListener.removed(new AddressMember(address)); 256 } 257 } 258 } 259 } 260 } 261 262 /** 263 * {@inheritDoc} 264 * @see org.jgroups.MessageListener#getState(java.io.OutputStream) 265 */ 266 @Override 267 public void getState(OutputStream output) throws Exception 268 { 269 ObjectOutput out = new ObjectOutputStream(output); 270 this.stateful.writeState(out); 271 out.flush(); 272 } 273 274 /** 275 * {@inheritDoc} 276 * @see org.jgroups.MessageListener#setState(java.io.InputStream) 277 */ 278 @Override 279 public void setState(InputStream input) throws Exception 280 { 281 this.stateful.readState(new ObjectInputStream(input)); 282 } 283 284 /** 285 * {@inheritDoc} 286 * @see org.jgroups.MembershipListener#suspect(org.jgroups.Address) 287 */ 288 @Override 289 public void suspect(Address member) 290 { 291 } 292 293 /** 294 * {@inheritDoc} 295 * @see org.jgroups.MembershipListener#block() 296 */ 297 @Override 298 public void block() 299 { 300 } 301 302 /** 303 * {@inheritDoc} 304 * @see org.jgroups.MembershipListener#unblock() 305 */ 306 @Override 307 public void unblock() 308 { 309 } 310 311 /** 312 * {@inheritDoc} 313 * @see org.jgroups.MessageListener#receive(org.jgroups.Message) 314 */ 315 @Override 316 public void receive(Message message) 317 { 318 } 319}