Monday, March 18, 2024

MicroService ASYNC Communication via Message Brokers [Spring, RabbitMQ, Microservice]

UserService is Communicationg with Balance Service using rabbitMQ. 

Async communication microservices 

RabbitMQ DynamicQueue Names + replyTO semantic


USER SERVICE
@RabbitListener(queues = "#{dynamicQueueNameResolver.resolveResponseQueueName()}")
public void handleResponse(@Payload UserBalanceResponse response, org.springframework.amqp.core.Message message) {
System.out.println("UserController Got a rabbit message = " + response);
String correlationId = message.getMessageProperties().getCorrelationId();
correlationIdResponseMap.put(correlationId, response);
CountDownLatch latch = correlationIdLatchMap.get(correlationId);
if (latch != null) {
latch.countDown();
}
}

@PostMapping("/get-user-balance")
public ResponseEntity<String> getUserBalance(@RequestBody UserRequest userRequest) throws InterruptedException {
// Generate a correlation ID for the request
String correlationId = UUID.randomUUID().toString();
// Setup latch to wait for the response
CountDownLatch latch = new CountDownLatch(1);
correlationIdLatchMap.put(correlationId, latch);

// Send request to the BalanceService with the correlation ID
rabbitTemplate.convertAndSend("user_balance_request_queue", userRequest, message -> {
message.getMessageProperties().setCorrelationId(correlationId);
// Set replyTo to a dynamic queue based on correlation ID
message.getMessageProperties().setReplyTo(dynamicQueueNameResolver.resolveResponseQueueName());
return message;
});
System.out.println("UserController has SENT a rabbit message = " + userRequest);

// Wait for response with a timeout
latch.await(5, TimeUnit.SECONDS);
correlationIdLatchMap.remove(correlationId);
UserBalanceResponse userBalance = correlationIdResponseMap.remove(correlationId);

if (userBalance != null) {
return ResponseEntity.ok("User balance for user ID " + userRequest.getUserId() + " is: " + userBalance);
} else {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to get user balance for user ID: " + userRequest.getUserId());
}
}
BALANCE SERVICE
@RabbitListener(queues = "user_balance_request_queue")
public void processBalanceRequest(UserRequest userRequest, Message requestMessage) {
System.out.println("GOT userRequest = " + userRequest);
// Simulate processing the balance request
// In a real scenario, this could involve querying a database or external service
String userBalance = "Balance for user " + userRequest.getUserId() + ": $123"; // Example balance

// Construct the response object
UserBalanceResponse response = new UserBalanceResponse(userRequest.getUserId(), userBalance);

// Send the response back to the UserController using the replyTo queue specified in the request message
rabbitTemplate.convertAndSend(requestMessage.getMessageProperties().getReplyTo(), response, message -> {
message.getMessageProperties().
setCorrelationId(
requestMessage.getMessageProperties().getCorrelationId()
);
return message;
});
}