001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (C) 2012  Paul Ferraro
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Lesser General Public License as published by
007 * the Free Software Foundation, either version 3 of the License, or
008 * (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Lesser General Public License for more details.
014 *
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018package net.sf.hajdbc.lock.distributed;
019
020import java.io.IOException;
021import java.io.ObjectInput;
022import java.io.ObjectInputStream;
023import java.io.ObjectOutput;
024import java.io.ObjectOutputStream;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.locks.Condition;
032import java.util.concurrent.locks.Lock;
033
034import net.sf.hajdbc.Database;
035import net.sf.hajdbc.DatabaseCluster;
036import net.sf.hajdbc.distributed.CommandDispatcher;
037import net.sf.hajdbc.distributed.CommandDispatcherFactory;
038import net.sf.hajdbc.distributed.Member;
039import net.sf.hajdbc.distributed.MembershipListener;
040import net.sf.hajdbc.distributed.Remote;
041import net.sf.hajdbc.distributed.Stateful;
042import net.sf.hajdbc.lock.LockManager;
043import net.sf.hajdbc.util.Objects;
044
045/**
046 * @author Paul Ferraro
047 */
048public class DistributedLockManager implements LockManager, LockCommandContext, Stateful, MembershipListener
049{
050        final CommandDispatcher<LockCommandContext> dispatcher;
051        
052        private final LockManager lockManager;
053        private final ConcurrentMap<Member, Map<LockDescriptor, Lock>> remoteLockDescriptorMap = new ConcurrentHashMap<Member, Map<LockDescriptor, Lock>>();
054        
055        public <Z, D extends Database<Z>> DistributedLockManager(DatabaseCluster<Z, D> cluster, CommandDispatcherFactory dispatcherFactory) throws Exception
056        {
057                this.lockManager = cluster.getLockManager();
058                LockCommandContext context = this;
059                this.dispatcher = dispatcherFactory.createCommandDispatcher(cluster.getId() + ".lock", context, this, this);
060        }
061        
062        /**
063         * {@inheritDoc}
064         * @see net.sf.hajdbc.lock.LockManager#readLock(java.lang.String)
065         */
066        @Override
067        public Lock readLock(String id)
068        {
069                return this.lockManager.readLock(id);
070        }
071
072        /**
073         * {@inheritDoc}
074         * @see net.sf.hajdbc.lock.LockManager#writeLock(java.lang.String)
075         */
076        @Override
077        public Lock writeLock(String id)
078        {
079                return this.getDistibutedLock(new RemoteLockDescriptorImpl(id, LockType.WRITE, this.dispatcher.getLocal()));
080        }
081
082        /**
083         * {@inheritDoc}
084         * @see net.sf.hajdbc.lock.distributed.LockCommandContext#getLock(net.sf.hajdbc.lock.distributed.LockDescriptor)
085         */
086        @Override
087        public Lock getLock(LockDescriptor lock)
088        {
089                String id = lock.getId();
090                
091                switch (lock.getType())
092                {
093                        case READ:
094                        {
095                                return this.lockManager.readLock(id);
096                        }
097                        case WRITE:
098                        {
099                                return this.lockManager.writeLock(id);
100                        }
101                        default:
102                        {
103                                throw new IllegalStateException();
104                        }
105                }
106        }
107
108        /**
109         * {@inheritDoc}
110         * @see net.sf.hajdbc.lock.distributed.LockCommandContext#getDistibutedLock(net.sf.hajdbc.lock.distributed.RemoteLockDescriptor)
111         */
112        @Override
113        public Lock getDistibutedLock(RemoteLockDescriptor descriptor)
114        {
115                return new DistributedLock(descriptor, this.getLock(descriptor), this.dispatcher);
116        }
117
118        /**
119         * {@inheritDoc}
120         * @see net.sf.hajdbc.Lifecycle#start()
121         */
122        @Override
123        public void start() throws Exception
124        {
125                this.lockManager.start();
126                this.dispatcher.start();
127        }
128
129        /**
130         * {@inheritDoc}
131         * @see net.sf.hajdbc.Lifecycle#stop()
132         */
133        @Override
134        public void stop()
135        {
136                this.dispatcher.stop();
137                this.lockManager.stop();
138        }
139
140        /**
141         * {@inheritDoc}
142         * @see net.sf.hajdbc.lock.distributed.LockCommandContext#getRemoteLocks(net.sf.hajdbc.distributed.Remote)
143         */
144        @Override
145        public Map<LockDescriptor, Lock> getRemoteLocks(Remote remote)
146        {
147                return this.remoteLockDescriptorMap.get(remote.getMember());
148        }
149
150        /**
151         * {@inheritDoc}
152         * @see net.sf.hajdbc.distributed.Stateful#writeState(java.io.ObjectOutput)
153         */
154        @Override
155        public void writeState(ObjectOutput output) throws IOException
156        {
157                output.writeInt(this.remoteLockDescriptorMap.size());
158                
159                for (Map.Entry<Member, Map<LockDescriptor, Lock>> entry: this.remoteLockDescriptorMap.entrySet())
160                {
161                        output.writeObject(entry.getKey());
162                        
163                        Set<LockDescriptor> descriptors = entry.getValue().keySet();
164                        
165                        output.writeInt(descriptors.size());
166                        
167                        for (LockDescriptor descriptor: descriptors)
168                        {
169                                output.writeUTF(descriptor.getId());
170                                output.writeByte(descriptor.getType().ordinal());
171                        }
172                }
173        }
174
175        /**
176         * {@inheritDoc}
177         * @see net.sf.hajdbc.distributed.Stateful#readState(java.io.ObjectInput)
178         */
179        @Override
180        public void readState(ObjectInput input) throws IOException
181        {
182                // Is this valid?  or should we unlock/clear?
183                //assert this.remoteLockDescriptorMap.isEmpty();
184                
185                int size = input.readInt();
186                
187                LockType[] types = LockType.values();
188                
189                for (int i = 0; i < size; ++i)
190                {
191                        Member member = Objects.readObject(input);
192                        
193                        Map<LockDescriptor, Lock> map = new HashMap<LockDescriptor, Lock>();
194                        
195                        int locks = input.readInt();
196                        
197                        for (int j = 0; j < locks; ++j)
198                        {
199                                String id = input.readUTF();
200                                LockType type = types[input.readByte()];
201                                
202                                LockDescriptor descriptor = new RemoteLockDescriptorImpl(id, type, member);
203                                
204                                Lock lock = this.getLock(descriptor);
205                                
206                                lock.lock();
207                                
208                                map.put(descriptor, lock);
209                        }
210                        
211                        this.remoteLockDescriptorMap.put(member, map);
212                }
213        }
214
215        /**
216         * {@inheritDoc}
217         * @see net.sf.hajdbc.distributed.MembershipListener#added(net.sf.hajdbc.distributed.Member)
218         */
219        @Override
220        public void added(Member member)
221        {
222                this.remoteLockDescriptorMap.putIfAbsent(member, new HashMap<LockDescriptor, Lock>());
223        }
224
225        /**
226         * {@inheritDoc}
227         * @see net.sf.hajdbc.distributed.MembershipListener#removed(net.sf.hajdbc.distributed.Member)
228         */
229        @Override
230        public void removed(Member member)
231        {
232                Map<LockDescriptor, Lock> locks = this.remoteLockDescriptorMap.remove(member);
233                
234                if (locks != null)
235                {
236                        for (Lock lock: locks.values())
237                        {
238                                lock.unlock();
239                        }
240                }
241        }
242        
243        private static class DistributedLock implements Lock
244        {
245                private final RemoteLockDescriptor descriptor;
246                private final Lock lock;
247                private final CommandDispatcher<LockCommandContext> dispatcher;
248                
249                DistributedLock(RemoteLockDescriptor descriptor, Lock lock, CommandDispatcher<LockCommandContext> dispatcher)
250                {
251                        this.descriptor = descriptor;
252                        this.lock = lock;
253                        this.dispatcher = dispatcher;
254                }
255                
256                @Override
257                public void lock()
258                {
259                        boolean locked = false;
260                        
261                        while (!locked)
262                        {
263                                Member coordinator = this.dispatcher.getCoordinator();
264                                
265                                if (this.dispatcher.getLocal().equals(coordinator))
266                                {
267                                        this.lock.lock();
268                                        
269                                        try
270                                        {
271                                                locked = this.lockMembers(coordinator);
272                                        }
273                                        finally
274                                        {
275                                                if (!locked)
276                                                {
277                                                        this.lock.unlock();
278                                                }
279                                        }
280                                }
281                                else
282                                {
283                                        locked = this.lockCoordinator(coordinator, Long.MAX_VALUE);
284                                }
285                                
286                                if (!locked)
287                                {
288                                        Thread.yield();
289                                }
290                        }
291                }
292
293                @Override
294                public void lockInterruptibly() throws InterruptedException
295                {
296                        boolean locked = false;
297                        
298                        while (!locked)
299                        {
300                                Member coordinator = this.dispatcher.getCoordinator();
301                                
302                                if (this.dispatcher.getLocal().equals(coordinator))
303                                {
304                                        this.lock.lockInterruptibly();
305                                        
306                                        try
307                                        {
308                                                locked = this.lockMembers(coordinator);
309                                        }
310                                        finally
311                                        {
312                                                if (!locked)
313                                                {
314                                                        this.lock.unlock();
315                                                }
316                                        }
317                                }
318                                else
319                                {
320                                        locked = this.lockCoordinator(coordinator, Long.MAX_VALUE);
321                                }
322                                
323                                if (Thread.currentThread().isInterrupted())
324                                {
325                                        throw new InterruptedException();
326                                }
327                                
328                                if (!locked)
329                                {
330                                        Thread.yield();
331                                }
332                        }
333                }
334
335                @Override
336                public boolean tryLock()
337                {
338                        boolean locked = false;
339                        
340                        Member coordinator = this.dispatcher.getCoordinator();
341                        
342                        if (this.dispatcher.getLocal().equals(coordinator))
343                        {
344                                if (this.lock.tryLock())
345                                {
346                                        try
347                                        {
348                                                locked = this.lockMembers(coordinator);
349                                        }
350                                        finally
351                                        {
352                                                if (!locked)
353                                                {
354                                                        this.lock.unlock();
355                                                }
356                                        }
357                                }
358                        }
359                        else
360                        {
361                                locked = this.lockCoordinator(coordinator, 0);
362                        }
363                        
364                        return locked;
365                }
366
367                @Override
368                public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
369                {
370                        boolean locked = false;
371                        
372                        Member coordinator = this.dispatcher.getCoordinator();
373                        
374                        if (this.dispatcher.getLocal().equals(coordinator))
375                        {
376                                if (this.lock.tryLock(time, unit))
377                                {
378                                        try
379                                        {
380                                                locked = this.lockMembers(coordinator);
381                                        }
382                                        finally
383                                        {
384                                                if (!locked)
385                                                {
386                                                        this.lock.unlock();
387                                                }
388                                        }
389                                }
390                        }
391                        else
392                        {
393                                locked = this.lockCoordinator(coordinator, unit.toMillis(time));
394                        }
395                        
396                        return locked;
397                }
398
399                private boolean lockMembers(Member coordinator)
400                {
401                        boolean locked = true;
402                        
403                        Map<Member, Boolean> results = this.dispatcher.executeAll(new MemberAcquireLockCommand(this.descriptor), coordinator);
404                        
405                        for (Map.Entry<Member, Boolean> entry: results.entrySet())
406                        {
407                                locked &= entry.getValue();
408                        }
409                        
410                        if (!locked)
411                        {
412                                this.unlockMembers(coordinator);
413                        }
414                        
415                        return locked;
416                }
417                
418                private boolean lockCoordinator(Member coordinator, long timeout)
419                {
420                        Boolean result = this.dispatcher.execute(new CoordinatorAcquireLockCommand(this.descriptor, timeout), coordinator);
421                        return (result != null) ? result.booleanValue() : false;
422                }
423                
424                @Override
425                public void unlock()
426                {
427                        Member coordinator = this.dispatcher.getCoordinator();
428                        
429                        if (this.dispatcher.getLocal().equals(coordinator))
430                        {
431                                this.unlockMembers(coordinator);
432                                
433                                this.lock.unlock();
434                        }
435                        else
436                        {
437                                this.unlockCoordinator(coordinator);
438                        }
439                }
440                
441                private void unlockMembers(Member coordinator)
442                {
443                        this.dispatcher.executeAll(new MemberReleaseLockCommand(this.descriptor), coordinator);
444                }
445                
446                private void unlockCoordinator(Member coordinator)
447                {
448                        this.dispatcher.execute(new CoordinatorReleaseLockCommand(this.descriptor), coordinator);
449                }
450
451                @Override
452                public Condition newCondition()
453                {
454                        throw new UnsupportedOperationException();
455                }
456        }
457        
458        private static class RemoteLockDescriptorImpl implements RemoteLockDescriptor
459        {
460                private static final long serialVersionUID = 1950781245453120790L;
461                
462                private final String id;
463                private transient LockType type;
464                private final Member member;
465                
466                RemoteLockDescriptorImpl(String id, LockType type, Member member)
467                {
468                        this.id = id;
469                        this.type = type;
470                        this.member = member;
471                }
472                
473                @Override
474                public String getId()
475                {
476                        return this.id;
477                }
478
479                @Override
480                public LockType getType()
481                {
482                        return this.type;
483                }
484
485                @Override
486                public Member getMember()
487                {
488                        return this.member;
489                }
490
491                private void writeObject(ObjectOutputStream out) throws IOException
492                {
493                        out.defaultWriteObject();
494                        
495                        out.writeByte(this.type.ordinal());
496                }
497                
498                private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
499                {
500                        in.defaultReadObject();
501                        
502                        this.type = LockType.values()[in.readByte()];
503                }
504                
505                /**
506                 * {@inheritDoc}
507                 * @see java.lang.Object#equals(java.lang.Object)
508                 */
509                @Override
510                public boolean equals(Object object)
511                {
512                        if ((object == null) || !(object instanceof RemoteLockDescriptor)) return false;
513                        
514                        String id = ((RemoteLockDescriptor) object).getId();
515                        
516                        return ((this.id != null) && (id != null)) ?  this.id.equals(id) : (this.id == id);
517                }
518
519                /**
520                 * {@inheritDoc}
521                 * @see java.lang.Object#hashCode()
522                 */
523                @Override
524                public int hashCode()
525                {
526                        return this.id != null ? this.id.hashCode() : 0;
527                }
528
529                /**
530                 * {@inheritDoc}
531                 * @see java.lang.Object#toString()
532                 */
533                @Override
534                public String toString()
535                {
536                        return String.format("%sLock(%s)", this.type.name().toLowerCase(), (this.id != null) ? this.id : "");
537                }
538        }
539}