Simplified SQS Wrapper and Async Worker manager.
Features:
- Simple interface. ✅
- Promise based. ✅
- ES6. ✅
- Optimized async worker. ✅
# Using npm
$ npm install wtsqs --save
# Or using yarn
$ yarn add wtsqs
- WTSQS
A simplified sqs wrapper with interface similar to a normal queue data structure.
- WTSQSWorker
WTSQS worker job manager.
WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.
A simplified sqs wrapper with interface similar to a normal queue data structure.
Kind: global class
- WTSQS
- new WTSQS(options)
- .size() ⇒
Promise.<integer>
- .enqueueOne(payload, [options], [sqsOptions]) ⇒
Promise
- .enqueueMany(payloads, [options], [sqsOptions]) ⇒
Promise
- .peekOne([options], [sqsOptions]) ⇒
Promise.<(Message|null)>
- .peekMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒
Promise.<Array.<Message>>
- .deleteOne(message) ⇒
Promise
- .deleteMany(messages) ⇒
Promise
- .deleteAll() ⇒
Promise
- .popOne([options], [sqsOptions]) ⇒
Promise.<(Message|null)>
- .popMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒
Promise.<Array.<Message>>
Constructs WTSQS object.
Param | Type | Default | Description |
---|---|---|---|
options | Object |
Options object. | |
options.url | String |
SQS queue url. | |
[options.accessKeyId] | String |
AWS access key id. | |
[options.secretAccessKey] | String |
AWS secret access key. | |
[options.region] | String |
us-east-1 |
AWS regions where queue exists. |
[options.defaultMessageGroupId] | String |
FIFO queues only. Default tag assigned to a message that specifies it belongs to a specific message group. If not provided random uuid is assigned to each message which doesn't guarantee order but allows parallelism. | |
[options.defaultVisibilityTimeout] | Integer |
60 |
Default duration (in seconds) that the received messages are hidden from subsequent retrieve requests. |
[options.defaultPollWaitTime] | Integer |
10 |
Default duration (in seconds) for which read calls wait for a message to arrive in the queue before returning. |
[options.sqsOptions] | Object |
Additional options to extend/override the underlying SQS object creation. |
Example
const { WTSQS } = require('wtsqs')
// The most simple way to construct a WTSQS object
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})
Get approximate total number of messages in the queue.
Kind: instance method of WTSQS
Example
const size = await wtsqs.size()
console.log(size) // output: 2
Enqueue single payload in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessage
Param | Type | Default | Description |
---|---|---|---|
payload | Object |
JSON serializable object. | |
[options] | Object |
Options. | |
[options.messageGroupId] | String |
Message group id to override default id. | |
[sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS sendMessage request. |
Example
const myObj = { a: 1 }
await wtsqs.enqueueOne(myObj)
Enqueue batch of payloads in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessageBatch
Param | Type | Default | Description |
---|---|---|---|
payloads | Array.<Object> |
Array of JSON serializable objects. | |
[options] | Object |
Options object. | |
[options.messageGroupId] | String |
Message group id to override default id. | |
[sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS sendMessageBatch request. |
Example
const myObjList = [{ a: 1 }, { b: 3 }]
await wtsqs.enqueueMany(myObjList)
Retrieve single message without deleting it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)>
- Message object or null if queue is empty.
Param | Type | Default | Description |
---|---|---|---|
[options] | Object |
Options object. | |
[options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
[options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
[sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.peekOne()
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}
Retrieve batch of messages without deleting them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>>
- Array of retrieved messages.
See: SQS#receiveMessage
Param | Type | Default | Description |
---|---|---|---|
[maxNumberOfMessages] | Number |
10 |
Maximum number of messages to retrieve. Must be between 1 and 10. |
[options] | Object |
Options object. | |
[options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
[options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
[sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.peekMany(2)
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]
Delete single message from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessage
Param | Type | Description |
---|---|---|
message | Message |
Message to be deleted |
Example
const myMessage = await wtsqs.peekOne()
await wtsqs.deleteOne(myMessage)
Delete batch of messages from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessageBatch
Param | Type | Description |
---|---|---|
messages | Array.<Message> |
Messages to be deleted |
Example
const myMessageList = await wtsqs.peekMany(2)
await wtsqs.deleteMany(myMessageList)
Delete ALL messages in the queue.
NOTE: Can only be called once every 60 seconds.
Kind: instance method of WTSQS
See: SQS#purgeQueue
Example
await wtsqs.deleteAll()
Retrieve single message and immediately delete it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)>
- Message object or null if queue is empty.
Param | Type | Default | Description |
---|---|---|---|
[options] | Object |
Options object. | |
[options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
[options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
[sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.popOne()
// The message no longer exists in queue
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}
Retrieve batch of messages and immediately delete them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>>
- Array of retrieved messages.
Param | Type | Default | Description |
---|---|---|---|
[maxNumberOfMessages] | Number |
10 |
Maximum number of messages to retrieve. Must be between 1 and 10. |
[options] | Object |
Options object. | |
[options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
[options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
[sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.popMany(2)
// Messages no longer exist in queue
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]
WTSQS worker job manager.
WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.
Kind: global class
- WTSQSWorker
- new WTSQSWorker(options)
- instance
- .run(handler)
- .shutdown() ⇒
Promise
- inner
- ~runHandler ⇒
Promise
- ~runHandler ⇒
Constructs WTSQSWorker object.
Param | Type | Default | Description |
---|---|---|---|
options | Object |
Options object. | |
options.wtsqs | WTSQS |
WTSQS instance to use for connecting to sqs. | |
[options.maxConcurrency] | Integer |
20 |
Maximum number of concurrent jobs. |
[options.pollWaitTime] | Integer |
5 |
Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning. |
[options.visibilityTimeout] | Integer |
30 |
Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests. |
[options.logger] | Object | String |
|
Object with debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger. |
Example
const { WTSQS, WTSQSWorker } = require('wtsqs')
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})
const worker = new WTSQSWorker({ wtsqs })
worker.run(async (job) => {
await someAsyncFunction(job.body)
console.log(job)
})
Start fetching and processing jobs.
Kind: instance method of WTSQSWorker
Param | Type | Description |
---|---|---|
handler | runHandler |
Async function to process a single job. |
Shutsdown the worker and drain active jobs.
Kind: instance method of WTSQSWorker
Returns: Promise
- Resolves when all active jobs have been drained.
Async callback function to process single job.
Kind: inner typedef of WTSQSWorker
Param | Type | Description |
---|---|---|
job | Job |
A single job to process |
Received SQS Message
Kind: global typedef
Properties
Name | Type | Description |
---|---|---|
id | String |
Message id. |
receiptHandle | String |
Message receipt handle. |
md5 | String |
Message body md5 hash sum. |
body | Object |
Message body containing original payload. |
Worker Job
Kind: global typedef
Properties
Name | Type | Description |
---|---|---|
id | String |
Job id. |
receiptHandle | String |
Job receipt handle. |
md5 | String |
Job body md5 hash sum. |
body | Object |
Job body containing original payload. |