BullMQ
tip
- 只依赖 Redis
- 支持 delay, debounce, flow(parent,children), repeat, rate limit
- taskforcesh/bullmq
- MIT, TS, Redis
- OptimalBits/bull
- MIT, JS, Redis
- maintenance mode, 推荐 BullMQ
- queueName vs jobName
- Worker 使用 queue 来区分
- job 包含 jobName - 不能用来分流给 Worker,但可以让 Worker 处理多种 job
- 如果希望细粒度分流可以考虑创建多个 Queue,例如
Email:Send
,Email:Receive
- pro 支持 group 概念
- https://docs.bullmq.io/patterns/named-processor
- 如果希望细粒度分流可以考虑创建多个 Queue,例如
- 去重机制
- debounce - 会生成 debounced 事件
- jobId - 会生成 duplicated 事件
Client
import { Queue } from 'bullmq';
const queue = new Queue('Paint');
queue.add('cars', { color: 'blue' });
Worker
import { Worker } from 'bullmq';
const worker = new Worker('Paint', async (job) => {
if (job.name === 'cars') {
await paintCar(job.data.color);
}
});
Listener
import { QueueEvents } from 'bullmq';
const queueEvents = new QueueEvents('Paint');
queueEvents.on('completed', ({ jobId }) => {
console.log('done painting');
});
queueEvents.on('failed', ({ jobId, failedReason }: { jobId: string; failedReason: string }) => {
console.error('error painting', failedReason);
});
Notes
export type FinishedStatus = 'completed' | 'failed';
export type JobState = FinishedStatus | 'active' | 'delayed' | 'prioritized' | 'waiting' | 'waiting-children';
export type JobType = JobState | 'paused' | 'repeat' | 'wait';
- QueueEvents
XREAD BLOCK 10000 STREAMS $key $lastEventId
- 处理中可以 job.moveToDelayed 然后 throw DelayedError
- 让 job 进入等待状态
- 处理中可以 job.moveToWaitingChildren 然后 throw WaitingChildrenError
- 让 job 进入等待 children 的状态
- throw UnrecoverableError 可以避免 retry
- throw throw Worker.RateLimitError 可以实现手动 rate-limit
- repeatable jobs -> delayed jobs
- jobId 用于生成 deplayed job,而不用于本身 job 去重
repeat:KEY:1724148000000
- key
- 默认通过 options 生成,自定义 key 创建相同配置的 repeatable job
- 相同 key 创建可以覆盖之前的 options
- jobId 用于生成 deplayed job,而不用于本身 job 去重
job.toJSON
{
"name": "pull",
"opts": {
"attempts": 0,
"delay": 30121,
"repeat": {
"every": 90000,
"key": "WecomArchivePull",
"count": 1
},
"jobId": "repeat:WecomArchivePull:1724148000000",
"timestamp": 1724147969879,
"prevMillis": 1724148000000
},
"id": "repeat:WecomArchivePull:1724148000000",
"progress": 0,
"returnvalue": null,
"stacktrace": null,
"attemptsStarted": 0,
"attemptsMade": 0,
"delay": 30121,
"repeatJobKey": "WecomArchivePull",
"timestamp": 1724147969879,
"queueQualifiedName": "bull:WecomArchive"
}
Lifecycle
- priority 0 - 2^21
- 0 最高
- Worker Pickup - prioritized > wait
- pickup 后变为 active
title: Lifecycle of a Job - Flow Producer
---
stateDiagram-v2
state added <<fork>>
[*] --> added: added
state no_children <<fork>>
added --> no_children: 无 children
no_children --> delayed: delay > 0
no_children --> prioritized: priority > 0
waiting_children: waiting-children
added --> waiting_children: 有 chdilren
waiting_children --> wait: children completed
waiting_children --> delayed: delay > 0
waiting_children --> prioritized: priority > 0
waiting_children --> failed: child fail & failParentOnFailure
wait --> wait: rate limit
wait --> active
active --> wait: dyanmic rate limit <br/> error retry
active --> delayed: error & backoff
active --> prioritized: rate limit & priority > 0
active --> failed: 失败
active --> completed: 成功
prioritized --> active
delayed --> prioritized: priority > 0
delayed --> wait
state finished <<join>>
failed --> finished
completed --> finished
finished --> [*]: finished
FAQ
BullMQ vs Bull
vs | bullmq | bull |
---|---|---|
Year | 2019 | 2013 |
job event | stream | pubsub |
Workers and Event listeners | separate | same |
Scheduler Process | 单独 Scheduler | same |
Job Dependencies | 有 | 无 |
Priority Queueing | 有,支持 rate-limited, scheduling | 有 |