Showing posts with label Mastership Claim with jgroups. Show all posts
Showing posts with label Mastership Claim with jgroups. Show all posts

Tuesday, January 3, 2012

Jgroups LockService Example

import org.jgroups.*;
import org.jgroups.blocks.locking.LockService;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;

public class TheOne {

    private volatile AtomicBoolean becomeMaster = new AtomicBoolean (false);
    private JChannel channel;
    private LockService lock_service;
    private Thread acquiringThread;
    private Thread statusPrinter;

    public static void main(String[] args) throws Exception {
        Thread.currentThread().setName("MyMainThread");
        TheOne master = new TheOne();
        master.channel = new JChannel("/home/ssunel/udp.xml");
        master.channel.connect("lock-cluster");
        master.lock_service = new LockService(master.channel);
        master.startAcquiringThread();
        master.startStatusPrinterThread();
    }

    public void startAcquiringThread() {
        acquiringThread = new Thread() {
            @Override
            public void run() {
                try {
                    Thread.currentThread().setName("MyAcquiringThread");                   
                    getLock();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        acquiringThread.setDaemon(true);
        acquiringThread.start();
       
    }

    private void getLock() throws Exception{
        Lock lock = lock_service.getLock("mylock");
        lock.lock();
        becomeMaster.set(true);
        System.out.println(" -- I GOT THE LOCK !!!");
    }

    private void startStatusPrinterThread() {
        statusPrinter = new Thread(){
            @Override
            public void run() {
                Thread.currentThread().setName("MyStatusPrinterThread");                   
               
                while (true) {
                    try {
                        System.out.println("becomeMaster ? -> { " + becomeMaster + " }");
                        System.out.println("thread state ? -> { " + acquiringThread.getState() + " }");
                        sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        statusPrinter.setDaemon(true);
        statusPrinter.start();
    }
}
Content of UDP.XML
open the jgroups.3.1-final.jar and edit the udp.xml inside it .
Add CENTRAL_LOCK tag 

Note : DistributedLockManager is Depracated in jgroups.3.1-final release .For older release example refer to http://javabender.blogspot.com/2010/10/solving-leader-election-problem-in.html


udp.xml content will be something like this :

<!--
  Default stack using IP multicasting. It is similar to the "udp"
  stack in stacks.xml, but doesn't use streaming state transfer and flushing
  author: Bela Ban
-->

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
    <UDP
        mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         level="WARN"
         log_discard_msgs="false"
         discard_incompatible_packets="true"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:8}"
         enable_bundling="true"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE2 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK exponential_backoff="300"
                   xmit_stagger_timeout="200"
                   use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                view_bundling="true"/>
    <UFC max_credits="2M"
         min_threshold="0.4"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />
    <CENTRAL_LOCK />
    <!-- pbcast.FLUSH  /-->
</config>