Tuesday, January 3, 2012

Jgroups LockService Example

import org.jgroups.*;
import org.jgroups.blocks.locking.LockService;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;

public class TheOne {

    private volatile AtomicBoolean becomeMaster = new AtomicBoolean (false);
    private JChannel channel;
    private LockService lock_service;
    private Thread acquiringThread;
    private Thread statusPrinter;

    public static void main(String[] args) throws Exception {
        Thread.currentThread().setName("MyMainThread");
        TheOne master = new TheOne();
        master.channel = new JChannel("/home/ssunel/udp.xml");
        master.channel.connect("lock-cluster");
        master.lock_service = new LockService(master.channel);
        master.startAcquiringThread();
        master.startStatusPrinterThread();
    }

    public void startAcquiringThread() {
        acquiringThread = new Thread() {
            @Override
            public void run() {
                try {
                    Thread.currentThread().setName("MyAcquiringThread");                   
                    getLock();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        acquiringThread.setDaemon(true);
        acquiringThread.start();
       
    }

    private void getLock() throws Exception{
        Lock lock = lock_service.getLock("mylock");
        lock.lock();
        becomeMaster.set(true);
        System.out.println(" -- I GOT THE LOCK !!!");
    }

    private void startStatusPrinterThread() {
        statusPrinter = new Thread(){
            @Override
            public void run() {
                Thread.currentThread().setName("MyStatusPrinterThread");                   
               
                while (true) {
                    try {
                        System.out.println("becomeMaster ? -> { " + becomeMaster + " }");
                        System.out.println("thread state ? -> { " + acquiringThread.getState() + " }");
                        sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        statusPrinter.setDaemon(true);
        statusPrinter.start();
    }
}
Content of UDP.XML
open the jgroups.3.1-final.jar and edit the udp.xml inside it .
Add CENTRAL_LOCK tag 

Note : DistributedLockManager is Depracated in jgroups.3.1-final release .For older release example refer to http://javabender.blogspot.com/2010/10/solving-leader-election-problem-in.html


udp.xml content will be something like this :

<!--
  Default stack using IP multicasting. It is similar to the "udp"
  stack in stacks.xml, but doesn't use streaming state transfer and flushing
  author: Bela Ban
-->

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
    <UDP
        mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         level="WARN"
         log_discard_msgs="false"
         discard_incompatible_packets="true"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:8}"
         enable_bundling="true"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE2 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK exponential_backoff="300"
                   xmit_stagger_timeout="200"
                   use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                view_bundling="true"/>
    <UFC max_credits="2M"
         min_threshold="0.4"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />
    <CENTRAL_LOCK />
    <!-- pbcast.FLUSH  /-->
</config>