Showing posts with label Microservice async communication rabbitmq. Show all posts
Showing posts with label Microservice async communication rabbitmq. Show all posts

Monday, March 18, 2024

MicroService ASYNC Communication via Message Brokers [Spring, RabbitMQ, Microservice]

UserService is Communicationg with Balance Service using rabbitMQ. 

Async communication microservices 

RabbitMQ DynamicQueue Names + replyTO semantic


USER SERVICE
@RabbitListener(queues = "#{dynamicQueueNameResolver.resolveResponseQueueName()}")
public void handleResponse(@Payload UserBalanceResponse response, org.springframework.amqp.core.Message message) {
System.out.println("UserController Got a rabbit message = " + response);
String correlationId = message.getMessageProperties().getCorrelationId();
correlationIdResponseMap.put(correlationId, response);
CountDownLatch latch = correlationIdLatchMap.get(correlationId);
if (latch != null) {
latch.countDown();
}
}

@PostMapping("/get-user-balance")
public ResponseEntity<String> getUserBalance(@RequestBody UserRequest userRequest) throws InterruptedException {
// Generate a correlation ID for the request
String correlationId = UUID.randomUUID().toString();
// Setup latch to wait for the response
CountDownLatch latch = new CountDownLatch(1);
correlationIdLatchMap.put(correlationId, latch);

// Send request to the BalanceService with the correlation ID
rabbitTemplate.convertAndSend("user_balance_request_queue", userRequest, message -> {
message.getMessageProperties().setCorrelationId(correlationId);
// Set replyTo to a dynamic queue based on correlation ID
message.getMessageProperties().setReplyTo(dynamicQueueNameResolver.resolveResponseQueueName());
return message;
});
System.out.println("UserController has SENT a rabbit message = " + userRequest);

// Wait for response with a timeout
latch.await(5, TimeUnit.SECONDS);
correlationIdLatchMap.remove(correlationId);
UserBalanceResponse userBalance = correlationIdResponseMap.remove(correlationId);

if (userBalance != null) {
return ResponseEntity.ok("User balance for user ID " + userRequest.getUserId() + " is: " + userBalance);
} else {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to get user balance for user ID: " + userRequest.getUserId());
}
}
BALANCE SERVICE
@RabbitListener(queues = "user_balance_request_queue")
public void processBalanceRequest(UserRequest userRequest, Message requestMessage) {
System.out.println("GOT userRequest = " + userRequest);
// Simulate processing the balance request
// In a real scenario, this could involve querying a database or external service
String userBalance = "Balance for user " + userRequest.getUserId() + ": $123"; // Example balance

// Construct the response object
UserBalanceResponse response = new UserBalanceResponse(userRequest.getUserId(), userBalance);

// Send the response back to the UserController using the replyTo queue specified in the request message
rabbitTemplate.convertAndSend(requestMessage.getMessageProperties().getReplyTo(), response, message -> {
message.getMessageProperties().
setCorrelationId(
requestMessage.getMessageProperties().getCorrelationId()
);
return message;
});
}