001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.state.distributed;
019
020import java.io.IOException;
021import java.io.ObjectInput;
022import java.io.ObjectOutput;
023import java.io.Serializable;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.TreeSet;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030
031import net.sf.hajdbc.Database;
032import net.sf.hajdbc.DatabaseCluster;
033import net.sf.hajdbc.Messages;
034import net.sf.hajdbc.distributed.CommandDispatcher;
035import net.sf.hajdbc.distributed.CommandDispatcherFactory;
036import net.sf.hajdbc.distributed.Member;
037import net.sf.hajdbc.distributed.MembershipListener;
038import net.sf.hajdbc.distributed.Remote;
039import net.sf.hajdbc.distributed.Stateful;
040import net.sf.hajdbc.durability.InvocationEvent;
041import net.sf.hajdbc.durability.InvokerEvent;
042import net.sf.hajdbc.logging.Level;
043import net.sf.hajdbc.logging.Logger;
044import net.sf.hajdbc.logging.LoggerFactory;
045import net.sf.hajdbc.state.DatabaseEvent;
046import net.sf.hajdbc.state.StateManager;
047
048/**
049 * @author Paul Ferraro
050 */
051public class DistributedStateManager<Z, D extends Database<Z>> implements StateManager, StateCommandContext<Z, D>, MembershipListener, Stateful
052{
053        private final Logger logger = LoggerFactory.getLogger(this.getClass());
054        private final DatabaseCluster<Z, D> cluster;
055        private final StateManager stateManager;
056        private final CommandDispatcher<StateCommandContext<Z, D>> dispatcher;
057        private final ConcurrentMap<Member, Map<InvocationEvent, Map<String, InvokerEvent>>> remoteInvokerMap = new ConcurrentHashMap<Member, Map<InvocationEvent, Map<String, InvokerEvent>>>();
058        
059        public DistributedStateManager(DatabaseCluster<Z, D> cluster, CommandDispatcherFactory dispatcherFactory) throws Exception
060        {
061                this.cluster = cluster;
062                this.stateManager = cluster.getStateManager();
063                StateCommandContext<Z, D> context = this;
064                this.dispatcher = dispatcherFactory.createCommandDispatcher(cluster.getId() + ".state", context, this, this);
065        }
066
067        /**
068         * {@inheritDoc}
069         * @see net.sf.hajdbc.state.StateManager#getActiveDatabases()
070         */
071        @Override
072        public Set<String> getActiveDatabases()
073        {
074                return this.stateManager.getActiveDatabases();
075        }
076
077        /**
078         * {@inheritDoc}
079         * @see net.sf.hajdbc.state.StateManager#setActiveDatabases(java.util.Set)
080         */
081        @Override
082        public void setActiveDatabases(Set<String> databases)
083        {
084                this.stateManager.setActiveDatabases(databases);
085        }
086
087        /**
088         * {@inheritDoc}
089         * @see net.sf.hajdbc.DatabaseClusterListener#activated(net.sf.hajdbc.state.DatabaseEvent)
090         */
091        @Override
092        public void activated(DatabaseEvent event)
093        {
094                this.stateManager.activated(event);
095                this.dispatcher.executeAll(new ActivationCommand<Z, D>(event), this.dispatcher.getLocal());
096        }
097
098        /**
099         * {@inheritDoc}
100         * @see net.sf.hajdbc.DatabaseClusterListener#deactivated(net.sf.hajdbc.state.DatabaseEvent)
101         */
102        @Override
103        public void deactivated(DatabaseEvent event)
104        {
105                this.stateManager.deactivated(event);
106                this.dispatcher.executeAll(new DeactivationCommand<Z, D>(event), this.dispatcher.getLocal());
107        }
108
109        /**
110         * {@inheritDoc}
111         * @see net.sf.hajdbc.durability.DurabilityListener#afterInvocation(net.sf.hajdbc.durability.InvocationEvent)
112         */
113        @Override
114        public void afterInvocation(InvocationEvent event)
115        {
116                this.dispatcher.executeAll(new PostInvocationCommand<Z, D>(this.getRemoteDescriptor(event)));
117        }
118
119        /**
120         * {@inheritDoc}
121         * @see net.sf.hajdbc.durability.DurabilityListener#afterInvoker(net.sf.hajdbc.durability.InvokerEvent)
122         */
123        @Override
124        public void afterInvoker(InvokerEvent event)
125        {
126                this.dispatcher.executeAll(new InvokerCommand<Z, D>(this.getRemoteDescriptor(event)));
127        }
128
129        /**
130         * {@inheritDoc}
131         * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvocation(net.sf.hajdbc.durability.InvocationEvent)
132         */
133        @Override
134        public void beforeInvocation(InvocationEvent event)
135        {
136                this.dispatcher.executeAll(new PreInvocationCommand<Z, D>(this.getRemoteDescriptor(event)));
137        }
138
139        /**
140         * {@inheritDoc}
141         * @see net.sf.hajdbc.durability.DurabilityListener#beforeInvoker(net.sf.hajdbc.durability.InvokerEvent)
142         */
143        @Override
144        public void beforeInvoker(InvokerEvent event)
145        {
146                this.dispatcher.executeAll(new InvokerCommand<Z, D>(this.getRemoteDescriptor(event)));
147        }
148
149        private RemoteInvocationDescriptor getRemoteDescriptor(InvocationEvent event)
150        {
151                return new RemoteInvocationDescriptorImpl(event, this.dispatcher.getLocal());
152        }
153        
154        private RemoteInvokerDescriptor getRemoteDescriptor(InvokerEvent event)
155        {
156                return new RemoteInvokerDescriptorImpl(event, this.dispatcher.getLocal());
157        }
158        
159        /**
160         * {@inheritDoc}
161         * @see net.sf.hajdbc.Lifecycle#start()
162         */
163        @Override
164        public void start() throws Exception
165        {
166                this.stateManager.start();
167                this.dispatcher.start();
168        }
169
170        /**
171         * {@inheritDoc}
172         * @see net.sf.hajdbc.Lifecycle#stop()
173         */
174        @Override
175        public void stop()
176        {
177                this.dispatcher.stop();
178                this.stateManager.stop();
179        }
180
181        @Override
182        public boolean isEnabled()
183        {
184                return this.stateManager.isEnabled() && this.dispatcher.getLocal().equals(this.dispatcher.getCoordinator());
185        }
186
187        /**
188         * {@inheritDoc}
189         * @see net.sf.hajdbc.state.distributed.StateCommandContext#getDatabaseCluster()
190         */
191        @Override
192        public DatabaseCluster<Z, D> getDatabaseCluster()
193        {
194                return this.cluster;
195        }
196
197        /**
198         * {@inheritDoc}
199         * @see net.sf.hajdbc.state.distributed.StateCommandContext#getLocalStateManager()
200         */
201        @Override
202        public StateManager getLocalStateManager()
203        {
204                return this.stateManager;
205        }
206
207        /**
208         * {@inheritDoc}
209         * @see net.sf.hajdbc.state.distributed.StateCommandContext#getRemoteInvokers(net.sf.hajdbc.distributed.Remote)
210         */
211        @Override
212        public Map<InvocationEvent, Map<String, InvokerEvent>> getRemoteInvokers(Remote remote)
213        {
214                return this.remoteInvokerMap.get(remote.getMember());
215        }
216
217        /**
218         * {@inheritDoc}
219         * @see net.sf.hajdbc.distributed.Stateful#readState(java.io.ObjectInput)
220         */
221        @Override
222        public void readState(ObjectInput input) throws IOException
223        {
224                if (input.available() > 0)
225                {
226                        Set<String> databases = new TreeSet<String>();
227                        
228                        int size = input.readInt();
229                        
230                        for (int i = 0; i < size; ++i)
231                        {
232                                databases.add(input.readUTF());
233                        }
234                        
235                        this.logger.log(Level.INFO, Messages.INITIAL_CLUSTER_STATE_REMOTE.getMessage(databases, this.dispatcher.getCoordinator()));
236                        
237                        this.stateManager.setActiveDatabases(databases);
238                }
239        }
240
241        /**
242         * {@inheritDoc}
243         * @see net.sf.hajdbc.distributed.Stateful#writeState(java.io.ObjectOutput)
244         */
245        @Override
246        public void writeState(ObjectOutput output) throws IOException
247        {
248                Set<D> databases = this.cluster.getBalancer();
249                output.writeInt(databases.size());
250                
251                for (D database: databases)
252                {
253                        output.writeUTF(database.getId());
254                }
255        }
256
257        /**
258         * {@inheritDoc}
259         * @see net.sf.hajdbc.distributed.MembershipListener#added(net.sf.hajdbc.distributed.Member)
260         */
261        @Override
262        public void added(Member member)
263        {
264                this.remoteInvokerMap.putIfAbsent(member, new HashMap<InvocationEvent, Map<String, InvokerEvent>>());
265        }
266
267        /**
268         * {@inheritDoc}
269         * @see net.sf.hajdbc.distributed.MembershipListener#removed(net.sf.hajdbc.distributed.Member)
270         */
271        @Override
272        public void removed(Member member)
273        {
274                if (this.dispatcher.getLocal().equals(this.dispatcher.getCoordinator()))
275                {
276                        Map<InvocationEvent, Map<String, InvokerEvent>> invokers = this.remoteInvokerMap.remove(member);
277                        
278                        if (invokers != null)
279                        {
280                                this.cluster.getDurability().recover(invokers);
281                        }
282                }
283        }
284        
285        /**
286         * {@inheritDoc}
287         * @see net.sf.hajdbc.state.StateManager#recover()
288         */
289        @Override
290        public Map<InvocationEvent, Map<String, InvokerEvent>> recover()
291        {
292                return this.stateManager.recover();
293        }
294
295        private static class RemoteDescriptor implements Remote, Serializable
296        {
297                private static final long serialVersionUID = 3717630867671175936L;
298                
299                private final Member member;
300                
301                RemoteDescriptor(Member member)
302                {
303                        this.member = member;
304                }
305
306                @Override
307                public Member getMember()
308                {
309                        return this.member;
310                }
311        }
312        
313        private static class RemoteInvocationDescriptorImpl extends RemoteDescriptor implements RemoteInvocationDescriptor
314        {
315                private static final long serialVersionUID = 7782082258670023082L;
316                
317                private final InvocationEvent event;
318                
319                RemoteInvocationDescriptorImpl(InvocationEvent event, Member member)
320                {
321                        super(member);
322                        
323                        this.event = event;
324                }
325                
326                @Override
327                public InvocationEvent getEvent()
328                {
329                        return this.event;
330                }
331
332                /**
333                 * {@inheritDoc}
334                 * @see java.lang.Object#toString()
335                 */
336                @Override
337                public String toString()
338                {
339                        return this.event.toString();
340                }
341        }
342        
343        private static class RemoteInvokerDescriptorImpl extends RemoteDescriptor implements RemoteInvokerDescriptor
344        {
345                private static final long serialVersionUID = 6991831573393882786L;
346                
347                private final InvokerEvent event;
348                
349                RemoteInvokerDescriptorImpl(InvokerEvent event, Member member)
350                {
351                        super(member);
352                        
353                        this.event = event;
354                }
355                
356                @Override
357                public InvokerEvent getEvent()
358                {
359                        return this.event;
360                }
361
362                /**
363                 * {@inheritDoc}
364                 * @see java.lang.Object#toString()
365                 */
366                @Override
367                public String toString()
368                {
369                        return this.event.toString();
370                }
371        }
372}