RabbitMQ
RabbitMQ is an open-source and lightweight message broker which supports multiple messaging protocols. It can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. In addition, it's the most widely deployed message broker, used worldwide at small startups and large enterprises.
Installation#
To start building RabbitMQ-based microservices, first install the required packages:
$ npm i --save amqplib amqp-connection-manager
Overview#
To use the RabbitMQ transporter, pass the following options object to the createMicroservice()
method:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
Hint TheTransport
enum is imported from the@nestjs/microservices
package.
Options#
The options
property is specific to the chosen transporter. The RabbitMQ transporter exposes the properties described below.
urls | An array of connection URLs to try in order |
queue | Queue name which your server will listen to |
prefetchCount | Sets the prefetch count for the channel |
isGlobalPrefetchCount | Enables per channel prefetching |
noAck | If false , manual acknowledgment mode enabled |
consumerTag | A name which the server will use to distinguish message deliveries for the consumer; mustn’t be already in use on the channel. It’s usually easier to omit this, in which case the server will create a random name and supply it in the reply. Consumer Tag Identifier (read more here) |
queueOptions | Additional queue options (read more here) |
socketOptions | Additional socket options (read more here) |
headers | Headers to be sent along with every message |
replyQueue | Reply queue for the producer. Default is amq.rabbitmq.reply-to |
persistent | If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts |
noAssert | When false, a queue will not be asserted before consuming |
wildcards | Set to true only if you want to use Topic Exchange for routing messages to queues. Enabling this will allow you to use wildcards (*, #) as message and event patterns |
exchange | Name for the exchange. Defaults to the queue name when "wildcards" is set to true |
exchangeType | Type of the exchange. Default is topic . Valid values are direct , fanout , topic , and headers |
routingKey | Additional routing key for the topic exchange |
maxConnectionAttempts | Maximum number of connection attempts. Applies only to the consumer configuration. -1 === infinite |
Client#
Like other microservice transporters, you have several options for creating a RabbitMQ ClientProxy
instance.
One method for creating an instance is to use the ClientsModule
. To create a client instance with the ClientsModule
, import it and use the register()
method to pass an options object with the same properties shown above in the createMicroservice()
method, as well as a name
property to be used as the injection token. Read more about ClientsModule
here.
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})
Other options to create a client (either ClientProxyFactory
or @Client()
) can be used as well. You can read about them here.
Context#
In more complex scenarios, you may need to access additional information about the incoming request. When using the RabbitMQ transporter, you can access the RmqContext
object.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(`Pattern: ${context.getPattern()}`);
}
Hint@Payload()
,@Ctx()
andRmqContext
are imported from the@nestjs/microservices
package.
To access the original RabbitMQ message (with the properties
, fields
, and content
), use the getMessage()
method of the RmqContext
object, as follows:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(context.getMessage());
}
To retrieve a reference to the RabbitMQ channel, use the getChannelRef
method of the RmqContext
object, as follows:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(context.getChannelRef());
}
Message acknowledgement#
To make sure a message is never lost, RabbitMQ supports message acknowledgements. An acknowledgement is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it.
To enable manual acknowledgment mode, set the noAck
property to false
:
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
},
When manual consumer acknowledgements are turned on, we must send a proper acknowledgement from the worker to signal that we are done with a task.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
Record builders#
To configure message options, you can use the RmqRecordBuilder
class (note: this is doable for event-based flows as well). For example, to set headers
and priority
properties, use the setOptions
method, as follows:
const message = ':cat:';
const record = new RmqRecordBuilder(message)
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', record).subscribe(...);
HintRmqRecordBuilder
class is exported from the@nestjs/microservices
package.
And you can read these values on the server-side as well, by accessing the RmqContext
, as follows:
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
const { properties: { headers } } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
Instance status updates#
To get real-time updates on the connection and the state of the underlying driver instance, you can subscribe to the status
stream. This stream provides status updates specific to the chosen driver. For the RMQ driver, the status
stream emits connected
and disconnected
events.
this.client.status.subscribe((status: RmqStatus) => {
console.log(status);
});
Hint TheRmqStatus
type is imported from the@nestjs/microservices
package.
Similarly, you can subscribe to the server's status
stream to receive notifications about the server's status.
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: RmqStatus) => {
console.log(status);
});
Listening to RabbitMQ events#
In some cases, you might want to listen to internal events emitted by the microservice. For example, you could listen for the error
event to trigger additional operations when an error occurs. To do this, use the on()
method, as shown below:
this.client.on('error', (err) => {
console.error(err);
});
Similarly, you can listen to the server's internal events:
server.on<RmqEvents>('error', (err) => {
console.error(err);
});
Hint TheRmqEvents
type is imported from the@nestjs/microservices
package.
Underlying driver access#
For more advanced use cases, you may need to access the underlying driver instance. This can be useful for scenarios like manually closing the connection or using driver-specific methods. However, keep in mind that for most cases, you shouldn't need to access the driver directly.
To do so, you can use the unwrap()
method, which returns the underlying driver instance. The generic type parameter should specify the type of driver instance you expect.
const managerRef =
this.client.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();
Similarly, you can access the server's underlying driver instance:
const managerRef =
server.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();
Wildcards#
RabbitMQ supports the use of wildcards in routing keys to allow for flexible message routing. The #
wildcard matches zero or more words, while the *
wildcard matches exactly one word.
For example, the routing key cats.#
matches cats
, cats.meow
, and cats.meow.purr
. The routing key cats.*
matches cats.meow
but not cats.meow.purr
.
To enable wildcard support in your RabbitMQ microservice, set the wildcards
configuration option to true
in the options object:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
wildcards: true,
},
},
);
With this configuration, you can use wildcards in your routing keys when subscribing to events/messages. For example, to listen for messages with the routing key cats.#
, you can use the following code:
@MessagePattern('cats.#')
getCats(@Payload() data: { message: string }, @Ctx() context: RmqContext) {
console.log(`Received message with routing key: ${context.getPattern()}`);
return {
message: 'Hello from the cats service!',
}
}
To send a message with a specific routing key, you can use the send()
method of the ClientProxy
instance:
this.client.send('cats.meow', { message: 'Meow!' }).subscribe((response) => {
console.log(response);
});