Scatter Gather In Masstransit C#

 

The Problem

We have to pull up product details from many different product vendors using product’s unique Id. All vendors have their own product detail APIs using which we can get product details. Each API call can give product detail with its pricing with price break for different quantities, additionally alternate product detail also being returned. So API could give list of responses for given single product Id. 

We have created one tool to get pricing with other detail where user can provide list of products to search. We need to call APIs for each vendors to get pricing, and as each call can give an array of result so we are not sure how many results we will get, Now the problem here is, we need to create an API endpoint which will internally call all these vendor APIs and once all results are collected then return those as a combined response, also we should be able to provide completed percentage and should be able to provide available results in between anytime. As we were already using Masstransit for this process, we have decided to use Scatter Gather pattern.

Scatter Gather Pattern

This pattern is similar to the RPC message exchange pattern in that the sender will be expecting a response from the receiver. The main difference here is the sender will create a range of queues where it will send responses, and there will be one aggregator service and that will combine all responses and compute completed percentage etc. Aggregator service is able to provide partial results in between anytime with completed percentage. So we can setup one other API endpoint which will ask aggregator service and provide response.

This is definitely a pattern which can be widely used in real applications out there that require 2 way communication with more than a single consumer.

Implementation

We are going to create two API endpoint

  1. SendRequest
  2. GetResponse

First API endpoint will publish RabbitMQ messages to multiple queues (separate queue for each Vendor). As in below diagram, all messages should be correlated with one common CorrelationId which can be used to keep track of each message progress and at the end the same CorrelationId can be used by an aggregator service to combine responses and to calculate completed percentage.

Masstransit Scatter Gather - Implementation

Aggregator service is also responsible to update status of each request (we can use a database table here instead of In-Memory status). There is a separate entry for status of each message published across different queues, and all are correlated with common CorrelationId.

Masstransit Scatter Gather - Implementation

Second API endpoint here will just return responses along with completion percentage. So here end-user is not waiting to complete entire process, he/she can get a response in between anytime even few vendor APIs are still running.

Timer

Aggregator service is using timer for each request, if any specific request has taken more than the maximum specified time due to any uncertain reason, then aggregator service will update the status table with timeout error and mark that request as completed.

Load Balancing And Concurrent Consumers

All consumers can be load balance easily, we just need to spinnup an extra instance of same service (clone copy). As Masstransit supports concurrent consumers out of the box, we can spinnup as many copies as we want. Message consumed by one instance will not consume by any other instance.

Mr. Rohit Chodvadiya