Friday, December 7, 2018

Sample Simple Stupid Redis Message Queue Producer Consumer Example

Just Copy & Paste & Run...And Then Customize & Enhance...
import redis.clients.jedis.Jedis;
import java.util.Scanner;
public class RedisQueueProducer {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
Scanner scanIn = new Scanner(System.in);
String input = "";
while(!input.equals("bye")){
input = getInput(scanIn);
jedis.rpush("queue", input);
}
scanIn.close();
}
private static String getInput(Scanner scanIn)
{
System.out.println("Enter the message: ");
String inputString = scanIn.nextLine();
return inputString;
}
}
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RedisQueueConsumer {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Jedis jedis = new Jedis("localhost");
List<String> messages;
while (true) {
try {
//System.out.println("Waiting for a message in the queue");
messages = jedis.blpop(0, "queue");
//System.out.println("KEY:" + messages.get(0) + " VALUE:" + messages.get(1));
String payload = messages.get(1);
//Do some processing with the payload
consumeFromRedisQueue(executorService, payload);
} catch (Exception e) {
System.out.println("Exception during async processing.." + e.getMessage());
}
}
}
private static void consumeFromRedisQueue(ExecutorService executorService, String payload) throws InterruptedException, java.util.concurrent.ExecutionException {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
String nextMessageFromRedisQueue = payload;
try {
Thread.sleep(new Random().longs(300, 700).findFirst().getAsLong());
} catch (InterruptedException e) {
}
System.out.println("nextMessageFromRedisQueue = " + nextMessageFromRedisQueue + " is processed");
return nextMessageFromRedisQueue + "_processed";
}, executorService);
String asyncProcessedMessageResult = stringCompletableFuture.get();
System.out.println("asyncProcessedMessageResult = " + asyncProcessedMessageResult);
}
}

Sunday, December 2, 2018

Java 8 CompletableFuture parallel tasks and Timeout example

package com.company;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by sunels on 02.12.2018.
*/
public class CompletableFutureTimeout {
public static void main (String[] args) {
int numOfParallelTasks = 10;
long startTime = System.currentTimeMillis ();
ExecutorService es = Executors.newFixedThreadPool (numOfParallelTasks);
List<CompletableFuture> futures = new ArrayList<> ();
CompletableFutureTimeout r = new CompletableFutureTimeout ();
for (long i = 0; i < numOfParallelTasks; i++) {
final long sleepDuration = i * 1000l;
CompletableFuture<String> future = CompletableFuture.supplyAsync (() -> r.doSomeSleep (sleepDuration), es);
futures.add (future);
}
System.out.println ("Duration here = " + (System.currentTimeMillis () - startTime));
try {
Thread.currentThread ().sleep (5000l);
} catch (InterruptedException e) {
}
List<Object> resultArray = new ArrayList<> ();
for (int i = 0; i < numOfParallelTasks; i++) {
Object result = futures.get (i).getNow (null);
resultArray.add (result);
}
es.shutdown ();
try {
if (!es.awaitTermination (1l, TimeUnit.NANOSECONDS)) {
es.shutdownNow ();
}
} catch (InterruptedException e) {
es.shutdownNow ();
}
System.out.println ("Collected Results = " + resultArray);
System.out.println ("Total Duration = " + (System.currentTimeMillis () - startTime));
}
public String doSomeSleep (long sleep) {
String result;
try {
Thread.sleep (sleep);
System.out.println (Thread.currentThread ().getName () + " - - > will sleep for secs :" + sleep);
result = sleep + "";
} catch (Exception e) {
result = e.getMessage ();
}
return result;
}
}

Suppose that you have a list of item (id list referring to a table, url list to fetch page data from internet, customer keys for location query from a map service  etc...). And you will start some parallel tasks  for those ids. But you don't want to wait no longer than your threshold. Also you want to collect the data from the returned "CompletableFutures" which are not timed out.

How can you achieve this while using a CompletableFuture ?

PS: There is no .orTimeout ()  option in Java 8. It is included in Java10.  And other option is using .get(N, TimeUnit.SECONDS) but it will not gave you what you want.



Outputs : 

pool-1-thread-1 - - > will sleep for secs :0
Duration here = 233
pool-1-thread-2 - - > will sleep for secs :1000
pool-1-thread-3 - - > will sleep for secs :2000
pool-1-thread-4 - - > will sleep for secs :3000
pool-1-thread-5 - - > will sleep for secs :4000
pool-1-thread-6 - - > will sleep for secs :5000
Collected Results = [0, 1000, 2000, 3000, 4000, 5000, null, null, null, null]
Total Duration = 5235

How did i come here :) See below code pls
In this example your main thread will be waiting 2 seconds for each uncompleted future.
I guess N*2 seconds waiting is not the thing that you expect to see here
And yes.. There is 20 seconds duration job and it is cancelled You have a minimal gain at that point but ! But if you would have 20 tasks ...The picture would be very nagative again .

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CompletableFutureTimeoutUnEfficientWay {
public static void main(String[] args) {
CompletableFutureTimeoutUnEfficientWay r = new CompletableFutureTimeoutUnEfficientWay();
long startTime = System.currentTimeMillis();
long numOfTasks =10l;
List<CompletableFuture> futures = new ArrayList();
for (long i = 0l; i < numOfTasks; i++) {
final long sleepDurationInSeconds = i;
futures.add(CompletableFuture.supplyAsync(() -> r.getSomeSleep(sleepDurationInSeconds)));
}
futures.add(CompletableFuture.supplyAsync(() -> r.getSomeSleep(20)));
System.out.println("Tasks are created here, duration = " + (System.currentTimeMillis() - startTime));
List<Object> collectedResults = Stream.of(futures.toArray(new CompletableFuture[futures.size()])).
map(f -> {
Object result;
try {
System.out.println("Getting results");
try {
result = f.get(2l, TimeUnit.SECONDS);// So you will be waiting 2 seconds for each/any un-completed future here. total loss!
} catch (InterruptedException e) {
result = e;
} catch (ExecutionException e) {
result = e;
} catch (TimeoutException e) {
result = e;
}
} catch (Exception e) {
result = e;
}
return result;
}).
collect(Collectors.toList());
System.out.println("Total duration = " + (System.currentTimeMillis() - startTime));
System.out.println("CollectedResults = " + collectedResults);
}
public String getSomeSleep(long sleepSeconds) {
String result = sleepSeconds + "";
try {
System.out.println(Thread.currentThread().getName() + " - - > will sleep : " + sleepSeconds );
Thread.sleep(sleepSeconds*1000l);
} catch (Exception e) {
result = e.getMessage();
}
return result;
}
}

Outputs : 

ForkJoinPool.commonPool-worker-1 - - > will sleep : 0
ForkJoinPool.commonPool-worker-5 - - > will sleep : 4
ForkJoinPool.commonPool-worker-3 - - > will sleep : 2
ForkJoinPool.commonPool-worker-4 - - > will sleep : 3
ForkJoinPool.commonPool-worker-7 - - > will sleep : 6
ForkJoinPool.commonPool-worker-1 - - > will sleep : 5
ForkJoinPool.commonPool-worker-2 - - > will sleep : 1
ForkJoinPool.commonPool-worker-6 - - > will sleep : 7
Tasks are created here, duration = 79
Getting results
Getting results
Getting results
ForkJoinPool.commonPool-worker-2 - - > will sleep : 8
Getting results
ForkJoinPool.commonPool-worker-3 - - > will sleep : 9
Getting results
ForkJoinPool.commonPool-worker-4 - - > will sleep : 20
Getting results
Getting results
Getting results
Getting results
Getting results
Getting results
Total duration = 13080
CollectedResults = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, java.util.concurrent.TimeoutException]

Wednesday, July 25, 2018

weak wifi , remote jmx



................HOW TO SEE CONNECTION POOL STATS ON COMMAND LINE VIA JMX ............................

Tool page : https://nofluffjuststuff.com/blog/vladimir_vivien/2012/04/jmx_cli_a_command_line_console_to_jmx

wget https://github.com/downloads/vladimirvivien/jmx-cli/jmxcli-0.1.2-bin.zip
unzip jmxcli-0.1.2-bin.zip
cd jmxcli-0.1.2
java -jar cli.jar
cp /usr/lib/jvm/java-8-oracle/lib/tools.jar lib/
chmod 777 lib/tools.jar 
list filter:"com.mchange.v2.c3p0:type=PooledDataSource*" label:true
desc bean:$0
exec bean:"com.mchange.v2.c3p0:type=PooledDataSource[z8kflt9w1cicerh10mnh44|20c29a6f]" get:"numBusyConnections"
exec bean:"com.mchange.v2.c3p0:type=PooledDataSource[z8kflt9w1jggz5o1xv9pi5|2101f18a]" get:"numBusyConnections"