This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import redis.clients.jedis.Jedis; | |
import java.util.Scanner; | |
public class RedisQueueProducer { | |
public static void main(String[] args) { | |
Jedis jedis = new Jedis("localhost"); | |
Scanner scanIn = new Scanner(System.in); | |
String input = ""; | |
while(!input.equals("bye")){ | |
input = getInput(scanIn); | |
jedis.rpush("queue", input); | |
} | |
scanIn.close(); | |
} | |
private static String getInput(Scanner scanIn) | |
{ | |
System.out.println("Enter the message: "); | |
String inputString = scanIn.nextLine(); | |
return inputString; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import redis.clients.jedis.Jedis; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
public class RedisQueueConsumer { | |
public static void main(String[] args) { | |
ExecutorService executorService = Executors.newFixedThreadPool(10); | |
Jedis jedis = new Jedis("localhost"); | |
List<String> messages; | |
while (true) { | |
try { | |
//System.out.println("Waiting for a message in the queue"); | |
messages = jedis.blpop(0, "queue"); | |
//System.out.println("KEY:" + messages.get(0) + " VALUE:" + messages.get(1)); | |
String payload = messages.get(1); | |
//Do some processing with the payload | |
consumeFromRedisQueue(executorService, payload); | |
} catch (Exception e) { | |
System.out.println("Exception during async processing.." + e.getMessage()); | |
} | |
} | |
} | |
private static void consumeFromRedisQueue(ExecutorService executorService, String payload) throws InterruptedException, java.util.concurrent.ExecutionException { | |
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> { | |
String nextMessageFromRedisQueue = payload; | |
try { | |
Thread.sleep(new Random().longs(300, 700).findFirst().getAsLong()); | |
} catch (InterruptedException e) { | |
} | |
System.out.println("nextMessageFromRedisQueue = " + nextMessageFromRedisQueue + " is processed"); | |
return nextMessageFromRedisQueue + "_processed"; | |
}, executorService); | |
String asyncProcessedMessageResult = stringCompletableFuture.get(); | |
System.out.println("asyncProcessedMessageResult = " + asyncProcessedMessageResult); | |
} | |
} |
No comments:
Post a Comment