rabbitmq-in-microservices-with-asp.net-core

RabbitMQ in Microservices with  ASP .NET Core

What is RabbitMQ ?

RabbitMQ is a messaging broker – an intermediary for messaging. It gives your applications a common platform to send and receive messages, and your messages a safe place to live until received.

Messaging enables software applications to connect and scale. Applications can connect to each other, as components of a larger application, or to user devices and data. Messaging is asynchronous, decoupling applications by separating sending and receiving data.

Why use  Rabbit MQ to send data?

RabbitMQ use  queue to send data, instead of directly sending data from one microservice to the other one. The important reasons why using a queue:

  • Higher availability and better error handling
  • Better scalability
  • Share data with whoever wants/needs it
  • Better user experience due to asynchronous processing

Read more about RabbitMQ 

In this post I am going to show how to implement RabbitMQ to Microservices which are based on .NET Core (in our case ProductMicroservece)

Implement RabbitMQ

I will  implement a simple version of it to publish data to a queue in the Product Service  and to process the data into another Microservice (shall be implemented later)

 Publishing data

I need install the RabbitMQ.Client NuGet package

I am creating a new Folder in the project ProductMicroservice  called RabbitMQMessaging and in this folder creating two subfolder: Opstions and SendMessage, 

I am creating a class RabbitMqConfig  in the Options folder as following:

namespace ProductMicroservice.RabbitMQMessaging.Options
{
public class RabbitMqConfig
{
public string Hostname { get; set; }
public string QueueName { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}
}

We need this class to create a connection to RabbitMQ queue.

I  SendMessage folder  I am creating ProductUpdateSender class and adding the functionality to send data.

We should provide the settings RabbitMq  queue section to the appsettings.json

"RabbitMq": {
"Hostname": "rabbitmq",
"QueueName": "CustomerQueue",
"UserName": "user",
"Password": "password"
}

I aslo have to register  ProductUpdateSender class in  the Startup class. And also I have to read RabbitMq section in the Startup class.

ConfigureServices(IServiceCollection services)
{
services.AddTransient<IProductUpdateSender, ProductUpdateSender>();
services.AddOptions();
services.Configure<RabbitMqConfig>(Configuration.GetSection("RabbitMq"));
}

We need two functions in the SendMessage folder.

  • CreateConnection() : Connection to RabbitMQ queue
  • SendProdct (Product prodct):  publish Product object to the RabbitMq queue every time product   RabbitMq queue.

We need also create an Interface: IProductUpdateSender

public interface IProductUpdateSender
{
void SendProduct(Product product);
}

The code for implementing interface with  two methods are as following:

using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Text;
using ProductMicroservice.Models;
using ProductMicroservice.RabbitMQMessaging.Options;
namespace ProductMicroservice.RabbitMQMessaging.Sendmesage
{
public class ProductUpdateSender : IProductUpdateSender
{
private readonly string _hostname;
private readonly string _password;
private readonly string _queueName;
private readonly string _username;
private IConnection _connection;
public ProductUpdateSender(IOptions<RabbitMqConfig> rabbitMqOptions)
{
_queueName = rabbitMqOptions.Value.QueueName;
_hostname = rabbitMqOptions.Value.Hostname;
_username = rabbitMqOptions.Value.UserName;
_password = rabbitMqOptions.Value.Password;
CreateConnection();
}
public void SendProduct(Product prodct)
{
if (ConnectionExists())
{
using (var channel = _connection.CreateModel())
{
channel.QueueDeclare(queue: _queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var json = JsonConvert.SerializeObject(prodct);
var body = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(exchange: "", routingKey: _queueName, basicProperties: null, body: body);
}
}
}
private void CreateConnection()
{
try
{
var factory = new ConnectionFactory
{
HostName = _hostname,
UserName = _username,
Password = _password
};
_connection = factory.CreateConnection();
}
catch (Exception ex)
{
Console.WriteLine($"Could not create connection: {ex.Message}");
}
}
private bool ConnectionExists()
{
if (_connection != null)
{
return true;
}
CreateConnection();
return _connection != null;
}
}
}

CreateConnection():  uses ConnectionFactory by setting Hostname, Username and Password to the values are defined in the ProductSender class.

SendProdct(Product prodct) :  takes a parameter product and  checks in case Connection not exist then create a new c connection to RabbitMQ using its hostname, a username and a password  by using the ConnectionFactory.  QueueDeclare method takes a couple of parameters like a name,  queue is durable.., then convert product object to a JSON object using JsonConvert and then encode this JSON to UTF8 (body) .  For Publishing the  generated byte array (body) uses BasicPublish, by using couple of useful parameters and body.

How and where call the SendProdct method

The SendProduct method shall be call from the UpdateProductCommandHandler class under CQRS when a Product is updated. new code is as following:

public class UpdateProductCommandHandler : IRequestHandler<UpdateProductCommand, Product>
{
private readonly IProductRepository _productRepository;
private readonly IProductUpdateSender _productUpdateSender;
public UpdateProductCommandHandler(IProductRepository productRepository, IProductUpdateSender productUpdateSender)
{
_productRepository = productRepository;
_productUpdateSender = productUpdateSender;
}
public async Task<Product> Handle(UpdateProductCommand request, CancellationToken cancellationToken)
{
var _product= await Task.FromResult(_productRepository.UpdateProduct(request.product));
_productUpdateSender.SendProduct(_product);
return _product;
}
}

You can find the code in my Github.

Implementation of reading data from RabbitMQ

Reading Data from other Microservice (it is not implement yet and call it PaymentMicroservice), we have to constantly check the queue if there are new message then process it.

Implementing of reading data is similar to the Sending data but Productsender shall be changed to ProductReceiver.

In the PaymentMicroservice , create a new Folder call it RabbitMQMessaging and two subfolders: ReceiveMessage and Options  install the RabbitMQ.Client NuGet in  the paymentMicrosoft project.

Under Options folder create a class and call it: RabbitMqConfig.cs the code shall be as following:

public class RabbitMqConfiguration
{
public string Hostname { get; set; }
public string QueueName { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public bool Enabled { get; set; }
}

Under ReceiveMessage folder create a class and call it ProductUpdateReceiver.

I am going create code to this class when I have implemented the PaymentMicroservice.

The structure of this class shall be about as following:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// add necessary namespaces here
namespace PaymentMicroservice.RabbitMQMessaging.Receivemesage
{
public class ProductUpdateReceiver  : BackgroundService
{
private readonly string _hostname;
private readonly string _queueName;
private readonly string _username;
private readonly string _password;
// and more..
public ProductUpdateReceiver( IOptions<RabbitMqConfiguration> rabbitMqOptions, /* other params*/)
{
_hostname = rabbitMqOptions.Value.Hostname;
_queueName = rabbitMqOptions.Value.QueueName;
_username = rabbitMqOptions.Value.UserName;
_password = rabbitMqOptions.Value.Password;
InitializeRabbitMqListener();
}
private void InitializeRabbitMqListener()
{
var factory = new ConnectionFactory
{
HostName = _hostname,
UserName = _username,
Password = _password
};
_connection = factory.CreateConnection();
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: _queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (ch, ea) =>
{
var content = Encoding.UTF8.GetString(ea.Body.ToArray());
var updateProduct = JsonConvert.DeserializeObject<UpdateProduct>(content);
_channel.BasicAck(ea.DeliveryTag, false);
};
consumer.Shutdown += OnConsumerShutdown;
consumer.Registered += OnConsumerRegistered;
consumer.Unregistered += OnConsumerUnregistered;
consumer.ConsumerCancelled += OnConsumerCancelled;
_channel.BasicConsume(_queueName, false, consumer);
return Task.CompletedTask;
}
private void OnConsumerCancelled(object sender, ConsumerEventArgs e)
{
}
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
{
}
private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
{
}
private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
{
}
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
}

ProductUpdateReceiver class,  inherits from the BackgroundService class and overrides the ExecuteAsync method.

Initialize the queue (InitializeRabbitMqListener()) In the constructor of the class, as the same way as in the ProductMicroservice using QueueDeclere and register events.

Method: ExecuteAsync(CancellationToken stoppingToken)

The ExecuteAsync method is Reading data from the queue by subscribing to the receive event and whenever this event is fired, then it reads  the message and encode its body which is the Product object. Then it is using this Product object to call another service that will do the update in the database.

The last thing we have to do is to register the ProductUpdateReceiver class as a background service in the Startup class.

if (serviceClientSettings.Enabled)
{
services.AddHostedService<ProductUpdateReceiver>();
}

The code above checks  RabbitMq:Enables is true,  then background service gets registered. This check is useful because so you can either use the appsettings.json file or provide an environment variable to override it.

The RabbitMQ setting in appsettings.json should be as follow:

{
"Logging": {
"LogLevel": {
"Default": "Warning"
}
},
"AllowedHosts": "*",
"RabbitMq": {
"Hostname": "rabbitmq",
"QueueName": "CustomerQueue",
"UserName": "user",
"Password": "password",
 "Enabled" : true //for test it should be disabled with test of ProductMicroservice.
}
}

That’s all you have to do for publish and receive functionalities via RabbitMQ.

How to run RabbitMQ in Docker

To start an instance of RabbitMQ. The easiest way is to run it in a Docker container (you have to installed Docker Desktop) for more about Docker 

Copy the two following lines in Powershell or bash:

docker run -d --hostname my-rabbit --name my-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
rabbitmq-in-microservices-with-asp.net-core-1.png
Runninig RabbitMQ in a Docker container

If you check the C:\ProgramData\Docker\containers, you can see two containers are created:

C:\ProgramData\Docker\containers:
6bbeb7cdbac5c0db72530a28400af46296ef66f73bdaabaa37f695df54c424b3

07bc47fd519b77de4f8dabd36d36c11550a1efea817575057fe8637d2dd7fa60

Even in Docker Desktop: you see two containers are running.

rabbitmq-in-microservices-with-asp.net-core-3.png
Running containers

if you press to the Open in browser icon then shall opens the http://localhost:15672, which is the port is displayed in under the rabbitmq containser.

Description of two Commands:

The first command (docker run) creates my-rabbitmq container which is  a writeable container layer with hostname: my-rabbit and name:  my-rabbit  and sets default user:user, and default password: password and starts it.

-d (–detach):  means Run container in background and print container ID

rabbitmq:3management : means to access it through the browser version 3

The Second command creates rabbitmq container with running port: 15672, via this port you can access from the outside of Container to the container.

The first  -p (-p 5672:5672 )  maps  the host port: 5672: to the port: 5672 inside the container.

-it is short for --interactive + --tty. When you docker run with this command it takes you straight inside the container.

The second -p (-p 15672:15672) maps  the host port:15672 to the port: 15672 inside the container

-rm : means Automatically remove the container when it exits

When RabbitMQ is started, navigate to localhost:15672 and login with guest as user and guest as password.

rabbitmq-in-microservices-with-asp.net-core-2.png
Login into the RabbitMQ management portal

Navigate to the Queues tab and you will see that there is no queue yet.

rabbitmq-in-microservices-with-asp.net-core-4.png
rabbitmq with no queue

Testing RabbitMQ via two microservices to send and receive data.

We can’t test now because i haven’t implemented the PaymentMicroservice as descibed above.

The test process shall be as following:

1- start rabbitmq container as described above

2- start the both Microservices which you want o send data from one to other. let ProductMicoservice updates a product and PaymentMicroservice lestning to the message from ProductMicroservice. and look to the rabbitmq browser (created above and see that data coming to the Queue and test according to the testing  in rabbitmq.

Conclusion

In this post I have described  Rabbitmq and why you should use  Rabbitmq to queue to decouple your microservices and how to implement RabbitMQ using Docker and ASP .NET Core 5. This was a very short implementation.

The code related to ProductMicroservice can be find in my Github.

In my next post I will explain  Upgrading Microservices from .NET 5 to .NET 6

This post is part of Microservices-Step by step”.

Back to home page