Skip to main content

Redis 流简介

· 13 min read

概述#

对 Redis 的印象可能很多人都还只停留在 2.8 的阶段,一个结构化的内存存储(嗯,好像也没什么问题)。虽然距离 4.0 发布(2017.7.14)已经一年过去了,但相信很多人已经不再去关心 Redis 的新特性了,因为从 2.8 后的 Redis 已经足够好用了。😄

Redis 3.0 添加了集群的能力,4.0 添加了模块化能力,5.0 添加了流类型。如果说 3.0 和 4.0 添加的新特性对于一般用户来说无足轻重,那 5.0 新的流类型就不可忽视啦!

在没有 Stream 类型之前,其实 Redis 也支持各种类似于流的处理模式,例如 Fire and forget 模式的 Pub/Sub,阻塞队列 BLPOP,时间序列 zsort 存储,等各种方式都能模拟类似的场景,但却都觉得有点欠缺,终于,流类型成功的解决了以上所有问题,并能支持其他的常见使用场景。

说到流消息就不得不说到 Kafka 啦,我相信大家应该都听说过消息中间件 Kafka,至于 RocketMQ 或者 MetaQ 就不再赘述他们与 Kafka 的关系啦,Redis 作者在实现流类型时大量参考了 Kafka 中的概念,例如消费模型,流消息的概念。当然所有的参考只局限于 Kafka 的文档,与 Kafka 的代码实现没有任何关系哦。

心动不如心动,那先一睹为快吧。

环境准备#

如果你是 macOs 用户,并且安装了 brew(如果没有安装,那建议先安装 /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"),那么只需要

brew install redis

即可,如果你是非 macOs 用户,那要嘛考虑换 mac,要嘛使用 docker 启动

docker run --rm -it -p 6379:6379 -v $PWD:/data --name redis redis:alpine# 题外话: 使用 alpine 更小更省心,强烈推荐,有任何使用问题都可以交流哦,至于有多好,在这里怕是说不完。
# 验证安装的版本docker exec redis redis-cli info server# 客户端链接docker exec -it redis redis-cli

一切准备就绪,就开始实践吧。既然流是新的数据类型,那我们就先从支持的操作开始吧。

命令列表#

Stream 类型一共支持 13 个命令,这里简单列举一下支持的命令。

命令功能概述
xinfo获取消费者,分组和流信息
xadd添加消息到流
xtrim将流重置为指定大小
xdel通过 ID 删除
xrange返回范围内的消息,特殊起始 + -
xrevrange与 xrange 相同,但返回顺序相反
xlen获取流长度
xread从流中指定 id 开始读取指定量消息,可选择阻塞返回
xgroup管理消费组
xreadgroup以订阅组成员的身份读取流消息 - 即订阅/消费消息
xack响应消息被正确处理
xpending查询正在处理中的消息 - 尚未 ACK 的消息
xclaim获取正在处理中的消息

在开始之前,简单概述一下使用过程中需要注意的点

  1. 流消息内容是字典 - 即 KV 结构
  2. 每个消息有一个 ID - 128bit - 由时间戳和序列号组成
  3. 插入消息时使用 * 作为 ID 则是由服务端生成 ID
  4. ID 必须递增
  5. -/+ 分别表示最小和最大消息 ID
  6. $ 表示最新的消息位置,在创建消费组时使用
  7. > 表示最新消费的消息位置,在消费消息时使用

命令交互#

# 往流中添加消息 - 会返回消息 ID# 手动指定 IDxadd s 1-0 name wener age 18xadd s 1-1 name wen age 17# 由服务端生成消息 IDxadd s * name xx age 16# 返回所有消息xrange s - +# 返回第一条xrange s - + count 1# 返回最后一条xrevrange s + - count 1# 返回消息长度xlen s
# 读第一条消息xread count 1 streams s1 0-0# 读取第二条 - 指定的消息 ID 是 1-0 ,会返回这个 ID 之后的消息xread count 1 streams s1 1-0
# 模拟消息的 Roling 处理# ------del s# 在插入消息时,可限制消息的最大长度,类似于 rolling 日志文件的逻辑# 逻辑等同于先 add 再 trim# 插入时限制最大长度 2xadd s MAXLEN 2 * ts 1xadd s MAXLEN 2 * ts 2xadd s MAXLEN 2 * ts 3xadd s MAXLEN 2 * ts 4# 流中只会有 3 4 这两条消息xrange s - +
# 消费组# ======# 重置流内容del s# 创建消费组 g1 并将消费位置置为最新消息位置 $# 因为 s 不存在,指定 MKSTREAM 会自动创建一个空的流 sxgroup create s g1 $ MKSTREAM# 添加新的消息xadd s 1-1 name zz age 16# 会返回最新插入的消息,当前消费者为 c1xreadgroup group g1 c1 count 1 streams s ># 当消息处理完成后对服务端进行响应xack s g1 1-1
# 模拟消息处理失败场景# ------# 添加新的消息xadd s 1-2 name aa age 16# 由 c2 消费xreadgroup group g1 c2 count 1 streams s ># 但在处理过程中异常,未 ACK,此时通过 pending 查看 c2 堆积的消息xpending s g1 - + 1 c2# c1 有能力处理,因此可以将 c2 处理失败的消息拿过来处理# retrycount 由应用自己维护,记录重试次数# 500 为表示该消息的处理时间超过 500ms 才能“拿”过来xclaim s g1 c1 500 1-2 retrycount 2# c1 成功处理该消息xack s g1 1-2

Stream 的操作相当简介,能实现什么样的功能主要取决于业务的设计。使用 cli 完成了基本的操作再来看看 Java 的操作吧。

Java 交互#

lettuce 是一个基于 Netty 的异步 Redis 客户端,在最新版中支持了 Stream 的操作。

生产和消费

public void stream() throws InterruptedException {    RedisClient client = RedisClient.create("redis://localhost");    StatefulRedisConnection<String, String> connection = client.connect();    // 流的名字    String streamName = "s";    // 消费组名    String groupName = "g1";
    AtomicInteger counter = new AtomicInteger();    // 总消息量    long total = 1000000;    // 并发生产    int producerCount = 2;    // 并发消费    int consumerCount = 4;    for (int i = 0; i < producerCount; i++) {        int id = i;        CompletableFuture.runAsync(() -> {            String name = "producer." + id;            StatefulRedisConnection<String, String> connect = client.connect();            while (true) {                int n = counter.incrementAndGet();                if (n > total) {                    return;                }                // 同步生产                Timer.Context context = metrics.timer(name).time();                connect                        .sync()                        .xadd(streamName, "ts", String.valueOf(System.currentTimeMillis()), "i", String.valueOf(n))                ;                context.close();            }        });    }
    for (int i = 0; i < consumerCount; i++) {        StatefulRedisConnection<String, String> connect = client.connect();
        // 消费的上下文        ConsumerContext c = new ConsumerContext();        c                .setConnection(connect)                .setConsumer(Consumer.from(groupName, "c" + i))                .setStreamName(streamName)                .setGroupName(groupName)                .setName("consumer." + i)                .setXReadArgs(XReadArgs.Builder.block(Duration.ofSeconds(5)))                .setXreadLastOffset(XReadArgs.StreamOffset.lastConsumed(streamName))        ;        // 异步消费        consume(c);    }
    Thread.sleep(Duration.ofMinutes(10).toMillis());}

private CompletionStage<?> consume(ConsumerContext c) {    Timer.Context context = metrics.timer(c.name).time();    return c.connection            .async()            .xreadgroup(c.consumer, c.xReadArgs, c.xreadLastOffset)            // 消息处理            .thenCompose(v -> {                context.close();                if (v.isEmpty()) {                    metrics.meter(c.name + ".empty").mark();                    return CompletableFuture.completedFuture(null);                }                StreamMessage<String, String> message = v.get(0);
                // 输出一定日志量                if (ThreadLocalRandom.current().nextDouble() < 0.01) {                    log.info("[{}] {}", c.name, message.getBody());                }
                // 成功处理                return c.connection.async().xack(c.streamName, c.groupName, message.getId());            })            // 异常处理            .exceptionally(e -> {                metrics.meter(c.name + ".error").mark();                return null;            })            // 循环 - 没有推出逻辑            .thenCompose((v) -> consume(c));}

@Data@Accessors(chain = true)public static class ConsumerContext {    String name;    String streamName;    String groupName;    Consumer<String> consumer;
    StatefulRedisConnection<String, String> connection;
    XReadArgs.StreamOffset<String> xreadLastOffset;
    XReadArgs xReadArgs;}

处理未成功的消息

同步操作,逻辑相对清晰

public void testClaimPendingSingleThreadSync() {    RedisClient client = RedisClient.create("redis://localhost");    StatefulRedisConnection<String, String> connection = client.connect();    String streamName = "s";    String groupName = "g1";
    RedisCommands<String, String> sync = connection.sync();    Consumer<String> consumer = Consumer.from(groupName, "c1");    Range<String> fullRange = Range.create("-", "+");
    while (true) {        try (Timer.Context ignored = metrics.timer(consumer.getName() + ".pending").time()) {            PendingResult result = PendingResult.of(sync.xpending(streamName, consumer, fullRange, Limit.from(1)));
            if (!result.hasPending()) {                break;            }
            List<StreamMessage<String, String>> list = sync.xclaim(                    streamName,                    consumer,                    new XClaimArgs().minIdleTime(500).retryCount(result.getDeliverCount() + 1),                    result.getMessageId()            );            if (list.isEmpty()) {                continue;            }            StreamMessage<String, String> message = list.get(0);            if (ThreadLocalRandom.current().nextDouble() < 0.001) {                log.info("[{}] {}", consumer.getName(), message.getBody());            }            sync.xack(streamName, groupName, message.getId());        }    }}
/** * Pending 返回的结果处理 */interface PendingResult {    static PendingResult of(List<Object> v) {        return () -> v;    }
    List<Object> getResult();
    default boolean hasPending() {        List<Object> result = getResult();        if (result != null && !result.isEmpty()) {            List list = (List) result.get(0);            return !list.isEmpty() && list.get(0) != null;        }        return false;    }
    default String getMessageId() {        return String.valueOf(((List) getResult().get(0)).get(0));    }
    default String getConsumer() {        return String.valueOf(((List) getResult().get(0)).get(1));    }
    default long getElapseTime() {        return ((Number) ((List) getResult().get(0)).get(2)).longValue();    }
    default long getDeliverCount() {        return ((Number) ((List) getResult().get(0)).get(3)).longValue();    }}

某次的性能统计

Java CPU 70%c1.pending             count = 769903         mean rate = 3360.66 calls/second     1-minute rate = 3267.08 calls/second     5-minute rate = 2855.18 calls/second    15-minute rate = 2538.15 calls/second               min = 0.22 milliseconds               max = 1.80 milliseconds              mean = 0.30 milliseconds            stddev = 0.13 milliseconds            median = 0.26 milliseconds              75% <= 0.32 milliseconds              95% <= 0.49 milliseconds              98% <= 0.65 milliseconds              99% <= 0.88 milliseconds            99.9% <= 1.66 milliseconds

处理未成功的消息

异步操作,逻辑相对不那么清晰~

public void testClaimPendingSingleThreadAsync() throws ExecutionException, InterruptedException {    RedisClient client = RedisClient.create("redis://localhost");    StatefulRedisConnection<String, String> connection = client.connect();    String streamName = "s";    String groupName = "g1";
    RedisAsyncCommands<String, String> async = connection.async();    Consumer<String> consumer = Consumer.from(groupName, "c2");    Range<String> fullRange = Range.create("-", "+");
    AtomicReference<Supplier<CompletionStage<?>>> process = new AtomicReference<>();    AtomicReference<Timer.Context> context = new AtomicReference<>();    // 一次处理    process.set(() -> {        context.set(metrics.timer(consumer.getName() + ".process").time());        return async                .xpending(streamName, consumer, fullRange, Limit.from(1))                .thenCompose(v -> {                    PendingResult result = PendingResult.of(v);                    if (!result.hasPending()) {                        throw new RuntimeException("DONE");                    }
                    return async.xclaim(                            streamName,                            consumer,                            new XClaimArgs().minIdleTime(500).retryCount(result.getDeliverCount() + 1),                            result.getMessageId()                    );                })                .thenCompose(list -> {                    StreamMessage<String, String> message = list.get(0);                    if (ThreadLocalRandom.current().nextDouble() < 0.001) {                        log.info("[{}] {}", consumer.getName(), message.getBody());                    }                    return async.xack(streamName, groupName, message.getId());                })                .thenCompose(v -> {                    context.get().close();                    return process.get().get();                });    });    // 循环    process.get()            .get()            .whenComplete((v, e) -> {                if (e != null) {                    e.printStackTrace();                }                log.info("Complete");            })            .toCompletableFuture()            .get();}

但性能会比同步操作的性能要好呢,Java 的 CPU 也比同步的更低

Java CPU 50%redis-server CPU 50%c2.process            count = 879207        mean rate = 5145.76 calls/second    1-minute rate = 5128.23 calls/second    5-minute rate = 3779.55 calls/second15-minute rate = 3132.92 calls/second            min = 0.14 milliseconds            max = 0.81 milliseconds            mean = 0.18 milliseconds        stddev = 0.06 milliseconds        median = 0.16 milliseconds            75% <= 0.19 milliseconds            95% <= 0.29 milliseconds            98% <= 0.34 milliseconds            99% <= 0.40 milliseconds        99.9% <= 0.81 milliseconds

总结#

当什么时候选择 Redis 的流呢?

  1. 内存存储满足需求
  2. 速度要求高
  3. 能接收 Redis 的持久化保障 - (保障是不一定持久 😄)

合理的应用也是需要合理的场景。

流总的来说还是很不错的,还有很多可能使用的场景在这里不做一一赘述,流的内部实现也是非常的有意思的,等有时间再做另外的一个分享。此外 Redis 4 的 Module 也是非常有魅力,例如甚至可以用 Golang 来实现模块添加新的命令功能,嗯嗯,机会多多。