Sunday, July 18, 2010

Three ways to make an object thread-safe


There are basically three approaches you can take to make an object such as RGBThread thread-safe:

  1. Synchronize critical sections
  2. Make it immutable
  3. Use a thread-safe wrapper 
design techniques and thread safety

    Why worry about thread safety?

    Given the structure of the JVM, local variables, method parameters, and return values are inherently "thread-safe." But instance variables and class variables will only be thread-safe if you design your class appropriately

    -- Bill Venners

    Preventing Memery Locks on the Multi CPU machine

    Problem
       Multithreaded apps create new objects at the same time
       New objects are always created in the EDEN space
       During object creation, memory is locked
       On a multi CPU machine (threads run concurrently) there can be contention

    Solution
        Allow each thread to have a private piece of the EDEN
        space
    Thread Local Allocation Buffer
        -XX:+UseTLAB
        -XX:TLABSize=
        -XX:+ResizeTLAB
        (On by default on multi CPU machines and newer JDK)
    Analyse TLAB usage
        -XX:+PrintTLAB
    JDK 1.5 and higher (GC ergonomics)
        Dynamic sizing algorithm, tuned to each thread


    INSIDE THE JAVA VIRTUAL MACHINE / Filip Hanik

    Sun recommended JVM Settings

    GC Settings
       -XX:+UseConcMarkSweepGC
       -XX:+CMSIncrementalMode
       -XX:+CMSIncrementalPacing
       -XX:CMSIncrementalDutyCycleMin=0
       -XX:+CMSIncrementalDutyCycle=10
       -XX:+UseParNewGC
       -XX:+CMSPermGenSweepingEnabled
    To analyze what is going on
       -XX:+PrintGCDetails
       -XX:+PrintGCTimeStamps
       -XX:-TraceClassUnloading
                                       
    Minor Notes
    -XX:+UseParallelGC <> -XX:+UseParNewGC
    -XX:ParallelGCThreads=
        Use with ParallelGC setting
    If you have 4 cpus and 1 JVM
        Set value to 4
    If you have 4 cpus and 2 JVM
        Set value to 2
    If you have 4 cpus and 6 JVM
        Set value to 2


    INSIDE THE JAVA VIRTUAL MACHINE / Filip Hanik

    Thread Safety Rules

    Safety Rules

    Sometimes, a set of data is interdependent. For example, we might have two fields corresponding to a street address and a zip code. Changing an address might involve changing both of these fields. If the zip code is changed without a corresponding change to the street address, the data may be inconsistent or incoherent. Such a set of operations, which must be done as a unit -- i.e., either all of the operations are executed or none are -- in order to ensure consistency of the data, is called a transaction. The property of "doing all or none" is called atomicity. A system in which all transactions are atomic is transaction-safe.
    The following rule suffices to ensure that your system is transaction-safe:
      All (potentially changeable) shared data is accessed only through the synchronized methods of a single object; no interdependent piece can be accessed independently.
    Note that this means that shared data cannot be returned by these methods for access by other methods. If shared data is to be returned, a (non-shared) copy must be made. Further, if interdependent values are to be returned (i.e., a portion of the shared data is to be used by other methods), all of the relevant values must be returned in a single transaction.
    For example, the address and zip code of the previous example should not be returned by two separate method calls if they are to be assumed consistent.
    public class AddressData {
    private String streetAddress;
    private String zipCode;
    public AddressData( String streetAddress, String zipCode) {
        this.setAddress( streetAddress, zipCode );
        ....
    }
    public synchronized void setAddress( String streetAddress,
                                         String zipCode) {
        // validity checks
        ....
        // set fields
        ....
    }
    public synchronized String getStreetAddress() { // problematic!
        return this.streetAddress;
    }
    public synchronized String getZipCode() { // problematic!
        return this.zipCode;
    }
    }
    If this class definition were used, e.g. for
    printMailingLabel( address.getStreetAddress(),
                       address.getZipCode() );
    it would in principle be possible to get an inconsistent address. For example, between the calls to address.getStreetAddress() and address.getZipCode(), it is possible that a call to address.setAddress could occur. In this case, getStreetAddress would return the old street address, while getZipCode() would return the new zip code.
    Instead, getStreetAddress() and getZipCode() should be replaced by a single synchronized method which returns a copy of the fields of the AddressData object:
    public synchronized SimpleAddressData getAddress() {
        return new SimpleAddressData( this.streetAddress,
                                      this.zipCode );
    }
    The SimpleAddressData class can contain just public streetAddress and zipCode fields, without accessors. It is being used solely to return the two objects at the same time.

    Introduction to Interactive Programming by Lynn Andrea Stein

    Reentrant Synchronization

    Recall that a thread cannot acquire a lock owned by another thread. But a thread can acquire a lock that it already owns. Allowing a thread to acquire the same lock more than once enables reentrant synchronization. This describes a situation where synchronized code, directly or indirectly, invokes a method that also contains synchronized code, and both sets of code use the same lock. Without reentrant synchronization, synchronized code would have to take many additional precautions to avoid having a thread cause itself to block.

    Intrinsic Locks and Synchronization

    Cooperation

    Java's monitor supports two kinds of thread synchronization: mutual exclusion and cooperation . Mutual exclusion, which is supported in the Java virtual machine via object locks, enables multiple threads to independently work on shared data without interfering with each other. Cooperation, which is supported in the Java virtual machine via the wait and notify methods of class Object, enables threads to work together towards a common goal. 

    Cooperation is important when one thread needs some data to be in a particular state and another thread is responsible for getting the data into that state. For example, one thread, a "read thread," may be reading data from a buffer that another thread, a "write thread," is filling. The read thread needs the buffer to be in a "not empty" state before it can read any data out of the buffer. If the read thread discovers that the buffer is empty, it must wait. The write thread is responsible for filling the buffer with data. Once the write thread has done some more writing, the read thread can do some more reading. 



    Thread Synchronization by Bill Venner





    package monitor;
    import java.util.Vector;



    class Producer extends Thread {
        static final int MAXQUEUE = 5;
        private Vector messages = new Vector(); 
        public void run() {

            try {
             while ( true ) {
                    putMessage();
                    sleep( 1000 );
                }
            }
            catch( InterruptedException e ) { }
        }

        private synchronized void putMessage() throws InterruptedException {
            while ( messages.size() == MAXQUEUE )
                wait();
            messages.addElement( new java.util.Date().toString() );
            notify();
        }

        // Called by Consumer
        public synchronized String getMessage() throws InterruptedException {
           notify();
           while ( messages.size() == 0 )
                wait();
            String message = (String)messages.firstElement();
            messages.removeElement( message );
            return message;
        }

    }



    class Consumer extends Thread {
        Producer producer;

        Consumer(Producer p) {
            producer = p;
        }

        public void run() {
            try {
                while ( true ) {
                    String message = producer.getMessage();
                    System.out.println("Got message: " + message);
                    sleep( 2000 );
                }
            }
           catch( InterruptedException e ) { }
        } 
     
        public static void main(String args[]) {
            Producer producer = new Producer();
            producer.start();
            new Consumer( producer ).start();
        }

    }
     


    Wednesday, July 14, 2010

    How to detect JVM Shutdown

    Runtime.getRuntime().addShutdownHook(new ShutdownHook());

    boolean jvmShuttingDown = false;


    private class ShutdownHook extends Thread {
            public void run() {
                   jvmShuttingDown = true;
            }
    }

    Friday, July 9, 2010

    How to lock a table in Mysql

    in a session
    use following
    --> lock tables mySchema.myTable write;
    In another session try to get
    -->select count(*) from mySchema.myTable;

    It will be locked till u say

    --> unlock tables

    How to get OpenJpa Sql Traces

    Put the following item in your persistence.xml

    <properties>
                <property name="openjpa.Log" value="SQL=TRACE"/>
    </properties>

    Monday, July 5, 2010

    Lets do some XStream test

    Aim: Keep a finite state machine configuration in a xml file.

    A sample synthetic scenario for this ;
        Get messageBoxes of the users. Put the messageBoxes into a blockingQueue , let some threads process the mailboxes and get new and old messeges from the boxes and seperate them to different queues .
        Then process new messages (exm:delete if it is keeped longer than expected) and put some data for notification sms to the owner of expired message.Some threads will process this queue and will send smses and they will put some data to the edr queue.
        If the message is old (read message) threads on the oldMessageQueue put data to the edr queue directly without sending sms.

    Maybe we want to use this states and conditions in a state manager base implementation/code generator which is not written yet ;)

    public class StateList {
        public StateList () {
        }

        public List getFsm () {
            return fsm;
        }

        public void setFsm (List fsm) {
            this.fsm = fsm;
        }

        private Listfsm = new ArrayList();
    }


    public class State {
        public State () {
        }

        private String queueName;
        private int threadCount;
        private int queueSize;
        private String executorClassName;
        private List transitions = new java.util.ArrayList();

        public String getExecutorClassName () {
            return executorClassName;
        }

        public String getQueueName () {
            return queueName;
        }

        public int getThreadCount () {
            return threadCount;
        }

        public int getQueueSize () {
            return queueSize;
        }

        public List getTransitions () {
            return transitions;
        }

        public void setExecutorClassName (String executorClassName) {
            this.executorClassName = executorClassName;
        }

        public void setQueueName (String queueName) {
            this.queueName = queueName;
        }

        public void setThreadCount (int threadCount) {
            this.threadCount = threadCount;
        }

        public void setQueueSize (int queueSize) {
            this.queueSize = queueSize;
        }

        public void setTransitions (List transitions) {
            this.transitions = transitions;
        }
    }


    public class Transition {
        public Transition () {
        }

        public String getCondition () {
            return condition;
        }

        public String getName () {
            return name;
        }

        public void setCondition (String condition) {
            this.condition = condition;
        }

        public void setName (String name) {
            this.name = name;
        }

        private String name ;
        private String condition;
    }


    package XstreamTester;

    import java.util.*;
    import com.thoughtworks.xstream.*;
    import java.io.*;


    public class XStreamTester {
        public XStreamTester () {
     
        /* messageboxGetter->//newMsgProcessor ->SmsSender->EdrWrite
                                           ->//oldMsgProcessor ->EdrWrite
         */

        }

        public static void main (String[] args) {
            XStreamTester xstreamtester = new XStreamTester ();

            XStream stateConfiguration = new XStream ();
            stateConfiguration.alias ("state", State.class);
            stateConfiguration.alias ("transition", Transition.class);
            stateConfiguration.alias ("states", StateList.class);

            StateList stateConfig = new StateList();
            List stateList = new ArrayList();
            // messagebox state
            State mboxState = new State ();
            mboxState.setExecutorClassName ("MboxGetter");

            List mboxTransitions = new ArrayList();
            Transition newMessageTransition = new Transition ();
            newMessageTransition.setName ("newMessageQueue");
            newMessageTransition.setCondition ("new");
            mboxTransitions.add (newMessageTransition);
            Transition oldMessageTransition = new Transition ();
            oldMessageTransition.setName ("oldMessageQueue");
            oldMessageTransition.setCondition ("old");
            mboxTransitions.add (oldMessageTransition);
            mboxState.setTransitions (mboxTransitions);

            mboxState.setQueueName ("messageBoxQueue");
            mboxState.setThreadCount (10);
            mboxState.setQueueSize (1000);
            stateList.add (mboxState);


            // new message state
            State newMsgState = new State ();
            newMsgState.setExecutorClassName ("newMessageProcessor");

            List newMsgTransitionList = new ArrayList();
            Transition smsTransition = new Transition ();
            smsTransition.setName ("smsQueue");
            smsTransition.setCondition ("deleted");
            newMsgTransitionList.add (smsTransition);
            newMsgState.setTransitions(newMsgTransitionList);

            newMsgState.setQueueName ("newMessageQueue");
            newMsgState.setThreadCount (10);
            newMsgState.setQueueSize (1000);
            stateList.add (newMsgState);

            // old message state
            State oldMsgState = new State ();
            oldMsgState.setExecutorClassName ("oldMessageProcessor");

            List oldMsgTransitionList = new ArrayList();
            Transition edrTransition = new Transition ();
            edrTransition.setName ("edrQueue");
            edrTransition.setCondition ("deleted");
            oldMsgTransitionList.add (edrTransition);
            oldMsgState.setTransitions(oldMsgTransitionList);

            oldMsgState.setQueueName ("oldMessageQueue");
            oldMsgState.setThreadCount (10);
            oldMsgState.setQueueSize (1000);
            stateList.add (oldMsgState);


            //sms state
            State smsState = new State ();
            smsState.setExecutorClassName ("smsQueueProcessor");

            List smsTransitionList = new ArrayList();
            Transition sms2EdrTransition = new Transition ();
            sms2EdrTransition.setName ("edr");
            sms2EdrTransition.setCondition ("smsSent");
            smsTransitionList.add (sms2EdrTransition);

            smsState.setTransitions(smsTransitionList);
            smsState.setQueueName ("smsQueue");
            smsState.setThreadCount (10);
            smsState.setQueueSize (1000);
            stateList.add (smsState);


            //edr state
            State edrState = new State ();
            smsState.setExecutorClassName ("edrQueueProcessor");

            edrState.setQueueName ("edrQueue");
            edrState.setThreadCount (10);
            edrState.setQueueSize (1000);
            stateList.add (edrState);


            stateConfig.setFsm(stateList);
            String xml = stateConfiguration.toXML (stateConfig);
            try {
                BufferedWriter out = new BufferedWriter (new FileWriter (
                    "/home/serkans/jbproject/UM_Test/xml/stateConfiguration.xml"));
                out.write (xml);
                out.close ();
            } catch (IOException e) {
                e.printStackTrace ();
            }
            System.exit (0);
        }
    }


    Generated xml will be something like this.
    <states>
      <fsm>
        <state>
          <queueName>messageBoxQueue</queueName>
          <threadCount>10</threadCount>
          <queueSize>1000</queueSize>
          <executorClassName>MboxGetter</executorClassName>
          <transitions>
            <transition>
              <name>newMessage</name>
              <condition>new</condition>
            </transition>
            <transition>
              <name>oldMessage</name>
              <condition>old</condition>
            </transition>
          </transitions>
        </state>
        <state>
          <queueName>newMessageQueue</queueName>
          <threadCount>10</threadCount>
          <queueSize>1000</queueSize>
          <executorClassName>newMessageProcessor</executorClassName>
          <transitions>
            <transition>
              <name>sms</name>
              <condition>deleted</condition>
            </transition>
          </transitions>
        </state>
        <state>
          <queueName>oldMessageQueue</queueName>
          <threadCount>10</threadCount>
          <queueSize>1000</queueSize>
          <executorClassName>oldMessageProcessor</executorClassName>
          <transitions>
            <transition>
              <name>edr</name>
              <condition>deleted</condition>
            </transition>
          </transitions>
        </state>
        <state>
          <queueName>smsQueue</queueName>
          <threadCount>10</threadCount>
          <queueSize>1000</queueSize>
          <executorClassName>edrQueueProcessor</executorClassName>
          <transitions>
            <transition>
              <name>edr</name>
              <condition>smsSent</condition>
            </transition>
          </transitions>
        </state>
        <state>
          <queueName>edrQueue</queueName>
          <threadCount>10</threadCount>
          <queueSize>1000</queueSize>
          <transitions/>
        </state>
      </fsm>
    </states>