001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.distributed.jgroups;
019
020import java.io.InputStream;
021import java.io.ObjectInputStream;
022import java.io.ObjectOutput;
023import java.io.ObjectOutputStream;
024import java.io.OutputStream;
025import java.util.Collections;
026import java.util.Map;
027import java.util.TreeMap;
028import java.util.concurrent.atomic.AtomicReference;
029
030import net.sf.hajdbc.Messages;
031import net.sf.hajdbc.distributed.Command;
032import net.sf.hajdbc.distributed.CommandDispatcher;
033import net.sf.hajdbc.distributed.Member;
034import net.sf.hajdbc.distributed.MembershipListener;
035import net.sf.hajdbc.distributed.Stateful;
036import net.sf.hajdbc.logging.Level;
037import net.sf.hajdbc.logging.Logger;
038import net.sf.hajdbc.logging.LoggerFactory;
039import net.sf.hajdbc.util.Objects;
040
041import org.jgroups.Address;
042import org.jgroups.Channel;
043import org.jgroups.Message;
044import org.jgroups.MessageListener;
045import org.jgroups.View;
046import org.jgroups.blocks.MessageDispatcher;
047import org.jgroups.blocks.RequestHandler;
048import org.jgroups.blocks.RequestOptions;
049import org.jgroups.blocks.ResponseMode;
050import org.jgroups.util.Rsp;
051
052/**
053 * A JGroups-based command dispatcher.
054 * 
055 * @author Paul Ferraro
056 * @see org.jgroups.blocks.MessageDispatcher
057 * @param <C> the execution context type
058 */
059public class JGroupsCommandDispatcher<C> implements RequestHandler, CommandDispatcher<C>, org.jgroups.MembershipListener, MessageListener
060{
061        private final Logger logger = LoggerFactory.getLogger(this.getClass());
062        
063        private final String id;
064        private final long timeout;
065        private final MessageDispatcher dispatcher;
066        private final C context;
067        private final AtomicReference<View> viewReference = new AtomicReference<View>();
068        private final MembershipListener membershipListener;
069        private final Stateful stateful;
070        
071        /**
072         * Constructs a new ChannelCommandDispatcher.
073         * @param id the channel name
074         * @param channel a JGroups channel
075         * @param timeout the command timeout
076         * @param context the execution context
077         * @param stateful the state transfer handler
078         * @param membershipListener notified of membership changes
079         * @throws Exception if channel cannot be created
080         */
081        public JGroupsCommandDispatcher(String id, Channel channel, long timeout, C context, Stateful stateful, MembershipListener membershipListener) throws Exception
082        {
083                this.id = id;
084                this.context = context;
085                this.stateful = stateful;
086                this.membershipListener = membershipListener;
087                
088                this.dispatcher = new MessageDispatcher(channel, this, this, this);
089                this.timeout = timeout;
090        }
091
092        /**
093         * {@inheritDoc}
094         * @see net.sf.hajdbc.Lifecycle#start()
095         */
096        @Override
097        public void start() throws Exception
098        {
099                Channel channel = this.dispatcher.getChannel();
100                
101                channel.setDiscardOwnMessages(false);
102                
103                // Connect and fetch state
104                channel.connect(this.id, null, 0);
105        }
106
107        /**
108         * {@inheritDoc}
109         * @see net.sf.hajdbc.Lifecycle#stop()
110         */
111        @Override
112        public void stop()
113        {
114                Channel channel = this.dispatcher.getChannel();
115                
116                if (channel.isOpen())
117                {
118                        if (channel.isConnected())
119                        {
120                                channel.disconnect();
121                        }
122                        
123                        channel.close();
124                }
125        }
126
127        @Override
128        public <R> Map<Member, R> executeAll(Command<R, C> command, Member... excludedMembers)
129        {
130                Message message = new Message(null, this.getLocalAddress(), Objects.serialize(command));
131                RequestOptions options = new RequestOptions(ResponseMode.GET_ALL, this.timeout);
132
133                if ((excludedMembers != null) && (excludedMembers.length > 0))
134                {
135                        Address[] exclusions = new Address[excludedMembers.length];
136                        for (int i = 0; i < excludedMembers.length; ++i)
137                        {
138                                exclusions[i] = ((AddressMember) excludedMembers[i]).getAddress();
139                        }
140                        options.setExclusionList(exclusions);
141                }
142
143                try
144                {
145                        Map<Address, Rsp<R>> responses = this.dispatcher.castMessage(null, message, options);
146                        
147                        if (responses == null) return Collections.emptyMap();
148                        
149                        Map<Member, R> results = new TreeMap<Member, R>();
150                        
151                        for (Map.Entry<Address, Rsp<R>> entry: responses.entrySet())
152                        {
153                                Rsp<R> response = entry.getValue();
154        
155                                results.put(new AddressMember(entry.getKey()), response.wasReceived() ? response.getValue() : null);
156                        }
157                        
158                        return results;
159                }
160                catch (Exception e)
161                {
162                        return null;
163                }
164        }
165        
166        /**
167         * {@inheritDoc}
168         * @see net.sf.hajdbc.distributed.CommandDispatcher#executeCoordinator(net.sf.hajdbc.distributed.Command)
169         */
170        @Override
171        public <R> R execute(Command<R, C> command, Member member)
172        {
173                Message message = new Message(((AddressMember) member).getAddress(), this.getLocalAddress(), Objects.serialize(command));
174                
175                try
176                {
177                        return this.dispatcher.sendMessage(message, new RequestOptions(ResponseMode.GET_ALL, this.timeout));
178                }
179                catch (Exception e)
180                {
181                        this.logger.log(Level.WARN, e);
182                        return null;
183                }
184        }
185        
186        /**
187         * {@inheritDoc}
188         * @see net.sf.hajdbc.distributed.CommandDispatcher#getLocal()
189         */
190        @Override
191        public AddressMember getLocal()
192        {
193                return new AddressMember(this.getLocalAddress());
194        }
195        
196        private Address getLocalAddress()
197        {
198                return this.dispatcher.getChannel().getAddress();
199        }
200        
201        /**
202         * {@inheritDoc}
203         * @see net.sf.hajdbc.distributed.CommandDispatcher#getCoordinator()
204         */
205        @Override
206        public AddressMember getCoordinator()
207        {
208                return new AddressMember(this.getCoordinatorAddress());
209        }
210
211        private Address getCoordinatorAddress()
212        {
213                return this.dispatcher.getChannel().getView().getMembers().get(0);
214        }
215
216        /**
217         * {@inheritDoc}
218         * @see org.jgroups.blocks.RequestHandler#handle(org.jgroups.Message)
219         */
220        @Override
221        public Object handle(Message message)
222        {
223                Command<Object, C> command = Objects.deserialize(message.getRawBuffer());
224
225                this.logger.log(Level.DEBUG, Messages.COMMAND_RECEIVED.getMessage(command, message.getSrc()));
226                
227                return command.execute(this.context);
228        }
229
230        /**
231         * {@inheritDoc}
232         * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View)
233         */
234        @Override
235        public void viewAccepted(View view)
236        {
237                if (this.membershipListener != null)
238                {
239                        View oldView = this.viewReference.getAndSet(view);
240                        
241                        for (Address address: view.getMembers())
242                        {
243                                if ((oldView == null) || !oldView.containsMember(address))
244                                {
245                                        this.membershipListener.added(new AddressMember(address));
246                                }
247                        }
248                        
249                        if (oldView != null)
250                        {
251                                for (Address address: oldView.getMembers())
252                                {
253                                        if (!view.containsMember(address))
254                                        {
255                                                this.membershipListener.removed(new AddressMember(address));
256                                        }
257                                }
258                        }
259                }
260        }
261
262        /**
263         * {@inheritDoc}
264         * @see org.jgroups.MessageListener#getState(java.io.OutputStream)
265         */
266        @Override
267        public void getState(OutputStream output) throws Exception
268        {
269                ObjectOutput out = new ObjectOutputStream(output);
270                this.stateful.writeState(out);
271                out.flush();
272        }
273
274        /**
275         * {@inheritDoc}
276         * @see org.jgroups.MessageListener#setState(java.io.InputStream)
277         */
278        @Override
279        public void setState(InputStream input) throws Exception
280        {
281                this.stateful.readState(new ObjectInputStream(input));
282        }
283
284        /**
285         * {@inheritDoc}
286         * @see org.jgroups.MembershipListener#suspect(org.jgroups.Address)
287         */
288        @Override
289        public void suspect(Address member)
290        {
291        }
292
293        /**
294         * {@inheritDoc}
295         * @see org.jgroups.MembershipListener#block()
296         */
297        @Override
298        public void block()
299        {
300        }
301
302        /**
303         * {@inheritDoc}
304         * @see org.jgroups.MembershipListener#unblock()
305         */
306        @Override
307        public void unblock()
308        {
309        }
310
311        /**
312         * {@inheritDoc}
313         * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
314         */
315        @Override
316        public void receive(Message message)
317        {
318        }
319}