Friday, December 7, 2018

Sample Simple Stupid Redis Message Queue Producer Consumer Example

Just Copy & Paste & Run...And Then Customize & Enhance...
import redis.clients.jedis.Jedis;
import java.util.Scanner;
public class RedisQueueProducer {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
Scanner scanIn = new Scanner(System.in);
String input = "";
while(!input.equals("bye")){
input = getInput(scanIn);
jedis.rpush("queue", input);
}
scanIn.close();
}
private static String getInput(Scanner scanIn)
{
System.out.println("Enter the message: ");
String inputString = scanIn.nextLine();
return inputString;
}
}
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RedisQueueConsumer {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Jedis jedis = new Jedis("localhost");
List<String> messages;
while (true) {
try {
//System.out.println("Waiting for a message in the queue");
messages = jedis.blpop(0, "queue");
//System.out.println("KEY:" + messages.get(0) + " VALUE:" + messages.get(1));
String payload = messages.get(1);
//Do some processing with the payload
consumeFromRedisQueue(executorService, payload);
} catch (Exception e) {
System.out.println("Exception during async processing.." + e.getMessage());
}
}
}
private static void consumeFromRedisQueue(ExecutorService executorService, String payload) throws InterruptedException, java.util.concurrent.ExecutionException {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
String nextMessageFromRedisQueue = payload;
try {
Thread.sleep(new Random().longs(300, 700).findFirst().getAsLong());
} catch (InterruptedException e) {
}
System.out.println("nextMessageFromRedisQueue = " + nextMessageFromRedisQueue + " is processed");
return nextMessageFromRedisQueue + "_processed";
}, executorService);
String asyncProcessedMessageResult = stringCompletableFuture.get();
System.out.println("asyncProcessedMessageResult = " + asyncProcessedMessageResult);
}
}

No comments:

Post a Comment