Event Bus Working Sample

This article will show how to use an Event Bus using RabbitMQ using Work Queues that will be used to distribute time-consuming tasks among multiple workers.

Getting started with an Event Bus is fast and easy. This quick start guide uses RabbitMQ with .NET Core. RabbitMQ must be installed, instructions for installing RabbitMQ are included here.

The main idea behind Work Queues (aka Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead, we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window.

The code is self-explanatory, the NewTask creates 50 tasks on the queue at a time and the Worker will retrieve the queued items, with a 5-second delay – to simulate some work being done.

Round-robin dispatching

One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.

Try to run two Worker instances at the same time. They will both get messages from the queue, you will notice that each worker picks new items of the queue. By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

Message Acknowledgement

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgements. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, the connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren’t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It’s fine even if processing a message takes a very, very long time.

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQ_NewTask
{
    class Program
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using var connection = factory.CreateConnection();

            //Create 50 message on to the Queue for processing
            for (int i = 0; i < 50; i++)
            {
                using (var channel = connection.CreateModel())
                {
                    // We make the queue durable so as it remains even after the service restarts
                    channel.QueueDeclare(queue: "RabbitMQ_task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    var message = $"Hello World - {i}";
                    var body = Encoding.UTF8.GetBytes(message);

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true; // This ensures that the message stays on the queue even if a process fails

                    channel.BasicPublish(exchange: "", routingKey: "RabbitMQ_task_queue", basicProperties: properties, body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                }
            }
        }
    }
}

NewTask

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

namespace RabbitMQ_Worker
{
    class Program
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "RabbitMQ_task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                Console.WriteLine(" [*] Waiting for messages.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    Thread.Sleep(5000);

                    Console.WriteLine(" [x] Done");

                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "RabbitMQ_task_queue", autoAck: false, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }

}

Worker

To get a fuller explanation of this code head over to RabbitMQ tutorials: https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

The source can be found here:
https://bitbucket.org/bryanavery/rabbitmq

To learn more about this you can view the Plural Sight Course

Scaling Applications with Microservices, MassTransit, and RabbitMQ