All Aboard with MassTransit

Stephen Darlington
Stephen Darlington
Contents

In previous stories on this blog, we’ve discussed both RabbitMQ and event-driven architectures. Today, we’re going to bring those threads together by creating an simple geocoding program using MassTransit.

In the project’s own words:

MassTransit is a free, open-source distributed application framework for .NET. MassTransit makes it easy to create applications and services that leverage message-based, loosely-coupled asynchronous communication for higher availability, reliability, and scalability.

Before you get started, you’ll need .NET Core and RabbitMQ installed. To completely follow along, you’ll also need an API Key for PC*Miler Web Services. Lucky for you, you can get a free key from the Trimble Maps Developer Portal in just minutes!

Solution Overview

The solution we’re going to build today will consist of three different parts:

  1. A .NET Core Web API
  2. RabbitMQ
  3. A .NET Core geocoding worker

Our customer will hit the API, which will publish messages that the worker will consume. The worker will then call PC*Miler Web Services to get a geocoded result and return that to the API.

The request/response pattern is one of the more straightforward implementations in MassTransit, but you can also implement fully asynchronous workflows where your API publishes a message and immediately returns to your customer a status indicating their request is being worked on. Once your worker is done with its work, it can publish its own messages saying it’s done and you can notify your customer via SignalR, a web hook, or even insert the output into a database for your customers to poll.

The API

The first thing you’re going to do is create a new Web API project in Visual Studio or VS Code. I’m going to assume you’ve done this before, so I won’t go step-by-step.

Next, you’ll need to install the following NuGet packages:

MassTransit.AspNetCore
MassTransit.Extensions.DependencyInjection
MassTransit.RabbitMQ

After those are installed, you’ll need to register MassTransit in Startup.cs:

public void ConfigureServices(IServiceCollection services)
{
    // add MassTransit
    services.AddMassTransit(x =>
    {
        x.UsingRabbitMq();
        x.AddRequestClient<GeocodeRequest>();
    });
    services.AddMassTransitHostedService();

    services.AddControllers();
}

The lines UsingRabbitMq() and AddMassTransitHostedService() tell MassTransit that our service wants to use MassTransit and that we want to use RabbitMQ as the message transport. (We can also use x.UsingInMemory() to use in-memory transport instead of RabbitMQ.) The piece we want to pay close attention to here is x.AddRequestClient<GeocodeRequest>(). This tells MassTransit that we want it to send a message of type GeocodeRequest to a consumer listening for messages of that type.

Once MassTransit is configured, it’s time to move on to the controller. We will need to inject a RequestClient<GeocodeRequest> into the controller and call it in our action. This is fairly straightforward:

locationcontroller.cs

public LocationController(IRequestClient<GeocodeRequest> requestClient)
{
    _requestClient = requestClient;
}

[HttpPost]
public async Task<IActionResult> GeocodeLocation(GeocodeRequest req)
{
    var response = await _requestClient.GetResponse<GeocodeResponse>(req);
    return Ok(response.Message);
}

When you invoke _requestClient.GetResponse<>, MassTransit inserts a GeocodeRequest message into RabbitMQ for any consumer listening for messages of that type to consume. The _requestClient will then wait for a reply of type GeocodeResponse or an error to come back on a reply channel.

Now that our API is done, it’s time to implement the worker.

The Worker

For our geocoding worker, you will need to install the same three NuGet packages as above. You’ll also need to register MassTransit in Startup.cs, but this time it will be for a message consumer and not a publisher.

consumerstartup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddMassTransit(x =>
    {
        x.AddConsumer<LocationConsumer>();

        // automatically create endpoints for any registered consumers
        x.SetKebabCaseEndpointNameFormatter();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.ConfigureEndpoints(context);
        });
    });

    services.AddMassTransitHostedService();
}

The first line registers our consumer. A consumer is any class that implements the IConsumer<> interface from MassTransit. We’ll take a look at LocationConsumer shortly. The next three lines register our consumers with MassTransit. You can set up each consumer by hand, but using cfg.ConfigureEndpoints(context) will set up the necessary queues and exchanges in RabbitMQ for each one of our registered consumers automatically.

LocationConsumer.cs is where we make the request to geocode our address:

locationconsumer.cs

public class LocationConsumer : IConsumer<GeocodeRequest>
{
    private const string API_KEY = "your-key";
    private readonly HttpClient _httpClient = new HttpClient();

    public async Task Consume(ConsumeContext<GeocodeRequest> context)
    {
        var request = context.Message;
        Console.WriteLine($"Received request to geocode {request.City}.");

        GeocodeResponse location = null;

        // geocode with PC*Miler Web Services
        if (!_httpClient.DefaultRequestHeaders.Contains("Authorization"))
        {
            _httpClient.DefaultRequestHeaders.TryAddWithoutValidation("Authorization", API_KEY);
        }

        var url = $"https://pcmiler.alk.com/APIs/REST/v1.0/service.svc/locations?city={request.City}&state={request.State}&postcode={request.PostalCode}&region={request.Region}";
        var response = await _httpClient.GetAsync(url);

        if (response.IsSuccessStatusCode)
        {
            var responseText = await response.Content.ReadAsStringAsync();
            var tmp = JsonSerializer.Deserialize<List<GeocodeResponse>>(responseText);
            location = tmp.FirstOrDefault();
        }

        await context.RespondAsync(location);
    }
}

This class must implement the IConsumer<> interface and implement a Consume() method. The geocode request from our API is included in the Message property of the context object. We pull the model out of the context, send a request to PC*Miler Web Services to perform the geocode, and parse the response. The final line of the method await context.RespondAsync(location) tells MassTransit to send a GeocodeResponse to the client that requested the geocode. At this point, the response will get back to the action in our API controller and will be returned to the customer.

Wrapping Up

As I have shown, using MassTransit is a simple way to set up your application for decoupled workflows. By configuring your application this way, you can scale each part of your solution independently to provide for better reliability and scalability.

Share this article:

You May Also Like

Customizing Swagger Responses for Better API Documentation

| By Stephen Darlington

How to configure custom example responses for Swagger API Docs

Experimenting With LangChain AI

| By Ganesan Senthilvel

My recent experiences working with LangChain technology.