Version: Next

Drivers

In this section, we will see how you can various drivers very easily in the application.

This package supports the following drivers as of now

DriverConstant
Sync (Runs your code syncronously)SyncQueueDriver
AWS SQSSqsQueueDriver
RedisRedisQueueDriver

Sync Driver

This is the most easiest driver out of all. There can be some cases where you may want to run your code synchronously for some testing or debugging purpose. So, to keep it simple, this package provides out of the box support for running your Jobs synchronously.

import { Module } from '@nestjs/common';
import { QueueModule, SyncQueueDriver } from '@squareboat/nest-queue';
@Module({
imports: [
QueueModule.register({
isGlobal: true,
default: 'sqs',
connections: {
notifications: {
driver: SyncQueueDriver, // you can use any available driver here
},
},
}),
],
})
export class AppModule {}

SQS Driver

AWS SQS is one of the most popular choice of using managed queue service. To use the driver, you need to install it first

npm i @squareboat/nest-queue-sqs

Before using it, you need to configure it first like below:

import { Module } from '@nestjs/common';
import { QueueModule } from '@squareboat/nest-queue';
import { SqsQueueDriver } from '@squareboat/nest-queue-sqs';
@Module({
imports: [
QueueModule.register({
isGlobal: true,
default: 'notifications',
connections: {
notifications: {
driver: SqsQueueDriver, // you can use any available driver here
apiVersion: '2012-11-05',
profile: 'default',
prefix: 'https://sqs.us-east-1.amazonaws.com/123456789012',
queue: 'MyQueue',
suffix: '',
region: 'us-east-1',
},
},
}),
],
})
export class AppModule {}

SqsQueueDriver expects following attributes

AttributeDescription
apiVersionAPI version to be used by SQS client
profileProfile of the credential that will be read by the aws-sqs sdk from ~/.aws/credentials file
regionRegion where the queue exists
prefixURL prefix of the queue
suffixURL suffix of the queue
queueQueue's name

Redis Driver

You can also use Redis as your queue driver. You can install it like below

npm i @squareboat/nest-queue-redis

Before using it, you need to configure it first like below:

import { Module } from '@nestjs/common';
import { QueueModule } from '@squareboat/nest-queue';
import { RedisQueueDriver } from '@squareboat/nest-queue-redis';
@Module({
imports: [
QueueModule.register({
isGlobal: true,
default: 'notifications',
connections: {
notifications: {
driver: RedisQueueDriver, // you can use any available driver here
queue: 'MyQueue',
host: 'localhost',
port: '6379',
database: 0,
},
},
}),
],
})
export class AppModule {}

RedisQueueDriver expects following attributes

AttributeDescription
hostHost of the redis server
portPort of the redis server
databaseDatabase to be used for queue
queueName of the queue

Custom Driver

If selected drivers does not meet your requirements, you can create your own custom drivers for queues like beanstalkd, rabbitmq etc.

You need to install @squareboat/nest-queue-strategy package first like below

npm i @squareboat/nest-queue-strategy

Now, you need to create two classes MyCustomQueueDriver and MyCustomQueueJob. Here for understanding purpose we will use AWS AQS in our MyCustom driver.

info

Messages are always serialized before being pushed to the queue. And are automatically parsed as object when processed by the QueueWorker.

MyCustomQueueDriver.ts
import { QueueDriver, InternalMessage } from "@squareboat/nest-queue-strategy";
import AWS = require("aws-sdk");
import { SqsJob } from "./job";
export class MyCustomQueueDriver implements QueueDriver {
private client: AWS.SQS;
private queueUrl: string;
constructor(private options: Record<string, any>) {
AWS.config.update({ region: options.region });
const credential = new AWS.SharedIniFileCredentials({
profile: options.profile,
});
AWS.config.credentials = credential;
this.client = new AWS.SQS({ apiVersion: options.apiVersion });
this.queueUrl = options.prefix + "/" + options.queue;
}
async push(message: string, rawPayload: InternalMessage): Promise<void> {
const params = {
DelaySeconds: rawPayload.delay,
MessageBody: message,
QueueUrl: this.options.prefix + "/" + rawPayload.queue,
};
await this.client.sendMessage(params).promise().then();
return;
}
async pull(options: Record<string, any>): Promise<SqsJob | null> {
const params = {
MaxNumberOfMessages: 1,
MessageAttributeNames: ["All"],
QueueUrl: this.options.prefix + "/" + options.queue,
VisibilityTimeout: 30,
WaitTimeSeconds: 0,
};
const response = await this.client.receiveMessage(params).promise();
const message = response.Messages ? response.Messages[0] : null;
return message ? new SqsJob(message) : null;
}
async remove(job: SqsJob, options: Record<string, any>): Promise<void> {
const params = {
QueueUrl: this.options.prefix + "/" + options.queue,
ReceiptHandle: job.data.ReceiptHandle,
};
await this.client.deleteMessage(params).promise();
return;
}
async purge(options: Record<string, any>): Promise<void> {
const params = {
QueueUrl: this.options.prefix + "/" + options.queue,
};
await this.client.purgeQueue(params).promise();
return;
}
async count(options: Record<string, any>): Promise<number> {
const params = {
QueueUrl: this.options.prefix + "/" + options.queue,
AttributeNames: ["ApproximateNumberOfMessages"],
};
const response: Record<string, any> = await this.client
.getQueueAttributes(params)
.promise();
return +response.Attributes.ApproximateNumberOfMessages;
}
}
MyCustomQueueJob.ts
import { DriverJob } from "@squareboat/nest-queue-strategy";
export class MyCustomQueueJob extends DriverJob {
public getMessage(): string {
return this.data.Body;
}
}
note

If you feel that the driver can be used by many other developers like us, we highly recommmend to publish it. We will add it in the official documentation ๐Ÿ˜ƒ