Friday, June 12, 2015

Things to remember ... -- Submitting tasks to the ExecutorService (newFixedThreadPool )

In the Javadoc , it is written :

java.util.concurrent.ExecutorService newFixedThreadPool : If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

Here it is said that "They will wait"..

This does not mean any waiting thread. It means tasks that are submitted will be waiting in a queue for processing.

Submitting  the tasks is not a blocking operation. Fixed Thread Pool uses a unbounded queue to hold the submitted tasks.

Task Submitter Thread will not be blocked ...

And the code for copy & paste & run & bye ...

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/** * Created by ssunel on 6/1/2015. */public class TaskSubmitTest {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        MyMonitorThread monitor = new MyMonitorThread((ThreadPoolExecutor) executor, 3);
        
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();

        for (int i = 0; i < 1000000; i++) {
            Runnable worker = new WorkerThread("" + i);
            executor.execute(worker);
        }
        System.out.println("All tasks are submitted !!!");

        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }

    static class WorkerThread implements Runnable {

        private String command;

        public WorkerThread(String s){
            this.command=s;
        }

        @Override        public void run() {
            System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
            processCommand();
            System.out.println(Thread.currentThread().getName()+" End.");
        }

        private void processCommand() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override        public String toString(){
            return this.command;
        }
    }

    static class MyMonitorThread implements Runnable
    {
        private ThreadPoolExecutor executor;

        private int seconds;

        private boolean run=true;

        public MyMonitorThread(ThreadPoolExecutor executor, int delay)
        {
            this.executor = executor;
            this.seconds=delay;
        }

        public void shutdown(){
            this.run=false;
        }

        @Override        public void run()
        {
            while(run){
                System.out.println(
                        String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s, taskQueueSize: %d",
                                this.executor.getPoolSize(),
                                this.executor.getCorePoolSize(),
                                this.executor.getActiveCount(),
                                this.executor.getCompletedTaskCount(),
                                this.executor.getTaskCount(),
                                this.executor.isShutdown(),
                                this.executor.isTerminated(),
                                this.executor.getQueue().size()));

                try {
                    Thread.sleep(seconds*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}