Skip to main content

BullMQ

tip
  • 只依赖 Redis
  • 支持 delay, debounce, flow(parent,children), repeat, rate limit
  • taskforcesh/bullmq
    • MIT, TS, Redis
  • OptimalBits/bull
    • MIT, JS, Redis
    • maintenance mode, 推荐 BullMQ
  • queueName vs jobName
    • Worker 使用 queue 来区分
    • job 包含 jobName - 不能用来分流给 Worker,但可以让 Worker 处理多种 job
  • 去重机制
    • debounce - 会生成 debounced 事件
    • jobId - 会生成 duplicated 事件

Client

import { Queue } from 'bullmq';

const queue = new Queue('Paint');

queue.add('cars', { color: 'blue' });

Worker

import { Worker } from 'bullmq';

const worker = new Worker('Paint', async (job) => {
if (job.name === 'cars') {
await paintCar(job.data.color);
}
});

Listener

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('Paint');

queueEvents.on('completed', ({ jobId }) => {
console.log('done painting');
});

queueEvents.on('failed', ({ jobId, failedReason }: { jobId: string; failedReason: string }) => {
console.error('error painting', failedReason);
});

Notes

export type FinishedStatus = 'completed' | 'failed';
export type JobState = FinishedStatus | 'active' | 'delayed' | 'prioritized' | 'waiting' | 'waiting-children';
export type JobType = JobState | 'paused' | 'repeat' | 'wait';
  • QueueEvents
    • XREAD BLOCK 10000 STREAMS $key $lastEventId
  • 处理中可以 job.moveToDelayed 然后 throw DelayedError
    • 让 job 进入等待状态
  • 处理中可以 job.moveToWaitingChildren 然后 throw WaitingChildrenError
    • 让 job 进入等待 children 的状态
  • throw UnrecoverableError 可以避免 retry
  • throw throw Worker.RateLimitError 可以实现手动 rate-limit
  • repeatable jobs -> delayed jobs
    • jobId 用于生成 deplayed job,而不用于本身 job 去重
      • repeat:KEY:1724148000000
    • key
      • 默认通过 options 生成,自定义 key 创建相同配置的 repeatable job
      • 相同 key 创建可以覆盖之前的 options

job.toJSON

{
"name": "pull",
"opts": {
"attempts": 0,
"delay": 30121,
"repeat": {
"every": 90000,
"key": "WecomArchivePull",
"count": 1
},
"jobId": "repeat:WecomArchivePull:1724148000000",
"timestamp": 1724147969879,
"prevMillis": 1724148000000
},
"id": "repeat:WecomArchivePull:1724148000000",
"progress": 0,
"returnvalue": null,
"stacktrace": null,
"attemptsStarted": 0,
"attemptsMade": 0,
"delay": 30121,
"repeatJobKey": "WecomArchivePull",
"timestamp": 1724147969879,
"queueQualifiedName": "bull:WecomArchive"
}

Lifecycle

  • priority 0 - 2^21
    • 0 最高
  • Worker Pickup - prioritized > wait
    • pickup 后变为 active
title: Lifecycle of a Job - Flow Producer
---

stateDiagram-v2

state added <<fork>>
[*] --> added: added

state no_children <<fork>>
added --> no_children: 无 children
no_children --> delayed: delay > 0
no_children --> prioritized: priority > 0

waiting_children: waiting-children
added --> waiting_children: 有 chdilren


waiting_children --> wait: children completed
waiting_children --> delayed: delay > 0
waiting_children --> prioritized: priority > 0
waiting_children --> failed: child fail & failParentOnFailure

wait --> wait: rate limit
wait --> active

active --> wait: dyanmic rate limit <br/> error retry
active --> delayed: error & backoff
active --> prioritized: rate limit & priority > 0
active --> failed: 失败
active --> completed: 成功

prioritized --> active
delayed --> prioritized: priority > 0
delayed --> wait

state finished <<join>>
failed --> finished
completed --> finished
finished --> [*]: finished

FAQ

BullMQ vs Bull

vsbullmqbull
Year20192013
job eventstreampubsub
Workers and Event listenersseparatesame
Scheduler Process单独 Schedulersame
Job Dependencies
Priority Queueing有,支持 rate-limited, scheduling