Building a Simple Message Queue Using Redis and Node.js

Posted August 2, 2017 - 5 min read
Topics:  

Hello there! This time I am going to discuss the process of building a message queue. Many rock-solid, time-proven, and enterprise-ready solutions already exist (RabbitMQ, ZeroMQ, etc…).

So why did I bother writing my own message queue from scratch?

One day during the development of a project, I wondered how could I use Redis as a message queue? I did not want to introduce some complicated layers or to use a solution that provides more than I wanted for my medium project.

I have looked around for any existing library, but I was not satisfied with the projects I found.

I decided then to start developing my own message queue from scratch, called RedisSMQ (Redis Simple Message Queue) which could take the most of Redis and Node.js for maximum performance and ease of use.

One of the main requirements of RedisSMQ is that it should be in no aspect inferior to any existing solid solutions in terms of:

  1. Persistence.
  2. Atomicity.
  3. Fault tolerance and High Availability.
  4. Monitoring.
  5. Simplicity and ease of use.

The basis of RedisSMQ is the Redis itself which already provides atomic operations and can be configured to be highly available.

Also, Redis can be tuned to match exactly our requirements when dealing with persistence vs performance.

RedisSMQ underlying overview


                      ┌─────► Yes ──────────────────────────────────────────┐
                      │                   Move message to Dead-letter queue │
                      │                                                     │
                      ├─────► No ─────────┐  ┌──────────┐                   │
                      │          Re-queue │  │ Producer │                   │
                      │                   │  └────┬─────┘                   │
              Has message retry           │       │              ┌─┬────────▼──────────┬─┐
             threshold exceeded?          │       │              │ │ Dead-letter Queue │ │
                      |                   │ ┌─────┘       .......│ │        1.1        │ │
                      |                   │ │             .      └─┴───────────────────┴─┘
                      |             ┌─┬───▼─▼─────┬─┐......
               ┌────────────────────┤ │   Queue   │ ├──────────────────────┐
               │      |             │ │     1     │ │       Fetch messages │
               │      |             └─┴─────┬─────┴─┘                      │
               │      |                     │                              │
               │      |                     │                              │
  ┌────────────▼────────────┐   ┌───────────▼──────────────┐  ┌────────────▼─────────────┐
  │        Consumer   |     │   │        Consumer          │  │        Consumer          │
  │           1.1     |     │   │           1.2            │  │           1.3            │
  │ ┌─────────┐ ┌─────┴───┐ │   │ ┌─────────┐ ┌──────────┐ │  │ ┌─────────┐ ┌──────────┐ │
  │ │         │ │ Garbage ◄─┼─┐ │ │         │ │ Garbage  │ │  │ │         │ │ Garbage  │ │
  │ │Heartbeat│ │collector│ │ │ │ │Heartbeat│ │collector │ │  │ │Heartbeat│ │collector │ │
  │ │         │ │(Active) │ │ │ │ │         │ │(Failover)│ │  │ │         │ │(Failover)│ │
  └─┴─────────┴─┴─────────┴─┘ │ └─┴─────────┴─┴──────────┴─┘  └─┴─────────┴─┴──────────┴─┘
                              │
                              │ Collect failed messages
               ┌──────────────┴──────────────┬─────────────────────────────┐
               │                             │                             │
    ┌─┬────────┴─────────┬─┐      ┌─┬────────┴─────────┬─┐      ┌─┬────────┴─────────┬─┐
    │ │ Processing queue │ │      │ │ Processing queue │ │      │ │ Processing queue │ │
    │ │      1.1.1       │ │      │ │      1.2.1       │ │      │ │      1.3.1       │ │
    └─┴──────────────────┴─┘      └─┴──────────────────┴─┘      └─┴──────────────────┴─┘

Queues

Queues are simply LISTS with multiple producers and consumers.

To publish a message to a queue, producers use LPUSH command.

Each consumer has a unique processing queue, created every time the consumer is launched.

A processing queue can hold only one message at a time. A message can be either acknowledged if it has been successfully processed, or unacknowledged otherwise.

Once a message is acknowledged (unacknowledged), it is removed from the processing queue and the consumer starts again waiting for a new message.

An unacknowledged message is collected by Garbage collector and based on message retry threshold, which is the maximum number of times the message can be re-queued, the message is re-queued or moved to a dead-letter queue.

Consumers

A consumer is a standalone process. Each consumer can be launched from CLI. In this way consumers can be regarded as “workers”.

Each queue can have an unlimited number of concurrent consumers.

When a consumer waits for a message it uses BRPOPLPUSH that fetches a message from the queue and moves it to its processing queue.

This operation is atomic and guaranties that a message can be delivered to only one consumer if there are many running consumers on the same queue.

Heartbeat

In computer science, a heartbeat is a periodic signal generated by hardware or software to indicate normal operation or to synchronize other parts of a computer system.

Wikipedia

A consumer is said to be alive and operating normally if its heartbeat is sent within a period of time less than 10 seconds.

Consumer heartbeat signal is sent each second.

Garbage collector

Garbage collector is responsible for collecting failed messages from a given queue.

Each consumer has a Garbage collector instance.

Given many queues with many consumers, each queue can have at a time only one running garbage collector instance.

A garbage collector instance scans periodically all processing queues.

A message being in a processing queue is considered as garbage (failed) and collected when the consumer, owning the processing queue is dead.

When a consumer goes offline, another garbage collector instance from another consumer takes control.

Statistics and monitoring

RedisSMQ keeps statistics from all consumers and producers updated each second. Statistics and monitoring helps us:

  • To have an overview of the current message queue state.
  • To check that the message queue is performing as expected.
  • To troubleshoot and debug any issues, like delays or poor performance etc.
  • To determine if we need more consumers to be added for processing messages.

Statistics are provided in “real-time” and include:

  • Aggregated message rates from all consumers (processing/acknowledged/unacknowledged message rates) and producers (message input rates)
  • Message rates per each consumer/producer instance.
  • Consumers resources (memory and cpu usage)
  • Queues size
  • Etc.

To monitor and manage the message queue, a Web UI is provided which makes use of statistical data published through WebSockets and visualizes various metrics using charts.

Summary

Within this post many aspects of RedisSMQ were discussed to give an overview about its design.

RedisSMQ is a simple message queue as its name suggests, yet it has shown and demonstrated excellent performance.

To have more details about RedisSMQ and its implementation, please take a look at the project page on GitHub.

Update: The project has gone through a lots of development iterations and the MQ architecture has been updated. You can find the latest architecture overview here: RedisSMQ Architecture Overview