Skip to main content

NATS JetStream

tip
  • 持久化消息队列 - 对标 Kafka
  • 替代 STAN
  • nats-io/jetstream
  • 视图 - 内置 Service 实现
  • Stream
    • $JS.API.>
    • domain - 多租户
  • 特性
    • At-least-once delivery; exactly once within a window
    • Store messages and replay by time or sequence
    • Wildcard support
    • Account aware
    • Data at rest encryption
    • Cleanse specific messages (GDPR)
    • Horizontal scalability
    • Persist Streams and replay via Consumers
    • WebSocket
  • Replica
    • Source
      • 创建的 stream 指定 source 后会去消费,可多个 source
      • 可被视为副本 - 该 stream 多节点运行可实现多副本
      • 配置不同的持久化策略
    • Mirror
      • 镜像另外一个 stream
      • 只能消费
      • 例如 mirror 一个 副本 stream 提供近期查询和消费
  • 集群
    • Raft 实现
    • 所有节点加入 Meta Group
    • 每个 stream 组成一个 Stream Group
    • 每个 consumer 组成一个 Consumer Group
    • 推荐混合 JetStream 和一般 Nats
      • 因为 JetStream 需要存储 - 配置后可针对节点使用 JetStream 优化存储
  • nats://demo.nats.io:4222
  • 参考
  • 基于 jetstream 的功能
    • kv
    • object store
    • service rpc
info
  • 存储尚不支持集群
caution
  • 建议名字少于 32 字符
# 启动 jetstream
nats-server -js
# 配置启动
nats-server -c js.conf

# 容器启动
# 默认 scrach 镜像只包含 /nats-server
docker run --rm -it -p 4222:4222 --name js nats:alpine -js

# nats stream
# ==================
# 创建 Stream
nats str add ORDERS --subjects "ORDERS.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size=-1 --discard=old --replicas 3 --dupe-window=2m
# 输出 stream 配置
nats str info ORDERS -j | jq .config
# 通过配置创建
nats str add ORDERS --config orders.json

# 所有 stream
nats str ls
# stream 信息
nats str info ORDERS
# 复制 stream
nats str cp ORDERS ARCHIVE --subjects "ORDERS_ARCVHIVE.*" --max-age 2y
# 修改 stream 单项配置
nats str edit ORDERS --subjects "ORDERS.*"
# 配置覆盖
nats str edit ORDERS --config orders.json

# 发布消息
nats pub ORDERS.scratch hello
# 发布带 ACK - 确认收到持久化
nats req ORDERS.scratch hello
# 清除所有消息
nats str purge ORDERS -f
# 删除一条消息 - SEQ
nats str rmm ORDERS 1 -f

# 移除 steam
nats str rm ORDERS -f

# nats consumer
# ==================
# 所有 consumer
nats con ls ORDERS
nats con add ORDERS DISPATCH --filter ORDERS.processed --ack explicit --pull --deliver all --sample 100 --max-deliver 20 --replay instant --max-pending 0
nats con add ORDERS NEW --filter ORDERS.received --ack explicit --pull --deliver all --max-deliver=-1 --sample 100
# 配置
nats con info ORDERS DISPATCH -j | jq .config
# MONITOR push
nats con add ORDERS MONITOR --filter '' --ack none --target monitor.ORDERS --deliver last --replay instant

# consumer 状态
nats con info ORDERS DISPATCH

nats pub ORDERS.processed "order 1"
nats pub ORDERS.processed "order 2"
nats pub ORDERS.processed "order 3"

# Pull 消费 ACK 消息
# --no-ack 不 ACK 继续消费
# ACK 执行一次消费一条
nats con next ORDERS DISPATCH
nats req '$JS.API.CONSUMER.MSG.NEXT.ORDERS.DISPATCH' ''

nats con info ORDERS MONITOR
# push 消费 - 会一次性消费所有
nats sub monitor.ORDERS

# nats 监控
# ==========
nats event --js-advisory

# nats 其他
# ==========
# 报告统计
nats s report
# 发送多条消息
nats req ORDERS.new "ORDER {{Count}}" --count 100

# 输出副本关系为 dot 图
nats s report --dot replication.dot

# 创建模板 - 在 pub 的时候生成
# 删除 template 会删除 所有 stream
nats str template add CLIENTS --subjects "CLIENT.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size 2048 --max-streams 1024 --discard old
jetstream {
store_dir=nats
}

stream.json

{
"name": "ORDERS",
"subjects": ["ORDERS.*"],
"retention": "limits",
"max_consumers": -1,
"max_msgs": -1,
"max_bytes": -1,
"max_age": 31536000000000000,
"max_msg_size": -1,
"storage": "file",
"discard": "old",
"num_replicas": 1,
"duplicate_window": 120000000000
}

Notes

Stream 属性

attrdefaultmean
Name
Storage
Subjects消费的主题 - 支持通配符
Replicas集群副本 - 最多 5
MaxAge消息留存时间
MaxBytes消息数据量
MaxMsgs消息数量
MaxMsgSize
MaxConsumers
NoAck禁用 ACK
RententionLimitsPolicy留存策略 - LimitsPolicy,InterestPolicy,WorkQueuePolicy
DiscardDiscardOld丢弃策略 - DiscardNew,DiscardOld
Duplicates去重时间窗口
Sealed
DenyDelete
DenyPurge
AllowRollup
  • 去重
    • 基于 Nats-Msg-Id
  • stream
    • name - [^\s.*>/]
    • duration, size, interest
    • subjects
      • 捕获所有的这些消息并存储
  • client
    • pull/push
  • $JS.API.SERVER
  • $JS.API.INFO
  • $JS.API.$KV
  • $JS.API.$OBJ
  • $JS.API.STREAM.>
  • $JS.API.META.>
  • $JS.API.ACCOUNT.>
  • $JS.API.CONSUMER.>
  • $JS.API.DIRECT.>
  • domain 通过映射实现
    • $JS.<DOMAIN>.API -> $JS.API

FAQ

Cannot read properties of undefined (reading ack_policy)

await jsc.pullSubscribe('send.*', {
mack: true,
// 少了 config
config: {
durable_name: 'agent',
ack_policy: AckPolicy.Explicit,
ack_wait: 10_000_000, // 10s
},
});

invalid stream name - stream name cannot contain :

nats 可以创建,但是 nats.ws 用不了