go-redis:Stream 消息队列详解

redis官方命令文档:redis                redis中文命令文档:redis               go-redis官方地址:go-redis        Github地址:go-redis

一、连接

1.拉取go-redis

如果您使用的是Redis 7,请安装go-redis/v9 (当前处于测试版)。

go get github.com/go-redis/redis/v9

2.连接Redis服务器

rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		Password: "",
		DB:       0,
	}
)

ctx := context.Background()

Go Context基本使用

二、相关命令

1.消息队列相关命令:

• XAdd – 添加消息到末尾

err := rdb.XAdd(ctx, &redis.XAddArgs{
		Stream:     "stream_name",                                         // stream key
		NoMkStream: false,                                                 // * 默认false,当为false时,key不存在,会新建
		MaxLen:     100000,                                                // * 指定stream的最大长度,当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息
		Approx:     false,                                                 // * 默认false,当为true时,模糊指定stream的长度
		ID:         "*",                                                   // 消息 id,我们使用 * 表示由 redis 生成
		Values:     []interface{}{"field1", "value1", "field2", "value2"}, // 表示消息内容键值对
		// MinID: "id",            // * 超过阈值,丢弃设置的小于MinID消息id【基本不用】
		// Limit: 1000,            // * 限制长度【基本不用】
	}).Err()

• XTrim – 对流进行修剪,限制长度

err := rdb.XTrimMaxLen(ctx, "stream_name", 1).Err()

• XDel – 删除消息

err := rdb.XDel(ctx, "stream_name", "1667739612182-0").Err()

• XLen – 获取流包含的元素数量,即消息长度

len, err := rdb.XLen(ctx, "stream_name").Result()

• XRange – 获取消息列表,会自动过滤已经删除的消息

// "-":起始ID   "+":终止ID
re, err := rdb.XRange(ctx, "stream_name", "-", "+").Result()

re, err := rdb.XRangeN(ctx, "stream_name", "-", "+", 1).Result()

• XRevRange – 反向获取消息列表,ID 从大到小

re, err := rdb.XRevRange(ctx, "stream_name", "+", "-").Result()

re, err := rdb.XRevRangeN(ctx, "stream_name", "+", "-",1).Result()

• XRead – 以阻塞或非阻塞方式获取消息列表

XRead只是读取消息,读取完之后并不会删除消息。 使用XRead读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。

re, err := rdb.XRead(ctx, &redis.XReadArgs{
		Streams: []string{"stream_name", "0-0"},
		Count:   1,
		Block:   time.Second * 10,   // 设置为0 表示一直阻塞
	}).Result()

2.消费者组相关命令:

• XGroupCreate – 创建消费者组

err = rdb.XGroupCreate(ctx, "stream_name", "group1", "0").Err()   // 0:从头获取   $:从最新获取

• XReadGroup  – 读取消费者组中的消息

r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
	Group:    "group1",
	Consumer: "consumer1",
	Streams:  []string{"stream_name", ">"},  
	Count:    1,
	Block:    0,
	NoAck:    true,   //为true,表示读取消息时确认消息
}).Result()

id为>,表示着消费者只希望接收从未传递给任何其他消费者的消息
id为0或其他,表示可获取已读但未确认的消息,请注意,在这种情况下,BLOCK和NOACK都将被忽略。

• XPending – 显示待处理消息的相关信息

r, err := rdb.XPending(ctx, "stream_name", "group1").Result()

返回:
&{1 1668089463435-0 1668089463435-0 map[consumer1:1]}

对应返回信息字段:
• 该消费者组的待处理消息总数,该总数为1
• 待处理消息中最小的ID
• 待处理消息中最大的ID
• 该消费者组中的每个有待处理的消费者,以及它拥有的待处理消息数。

• XAck – 将消息标记为”已处理”

err = rdb.XAck(ctx, "stream_name", "group1", "1668089463435-0").Err()

• XGroupSetId – 为消费者组设置新的最后递送消息ID

r,err := rdb.XGroupSetID(ctx, "stream_name", "group1", "0").Result()

返回值:
OK

如果您希望消费者组中的消费者重新处理流中的所有消息,您可能希望将其下一个ID设置为0

• XGoupDelConsumer– 删除消费者

r, err := rdb.XGroupDelConsumer(ctx, "stream_name", "group1", "consumer1").Result()

返回值:
3

对应返回信息字段
整数: 消费者在删除之前拥有的待处理消息数

• XGroupDestroy – 删除消费者组

r, err := rdb.XGroupDestroy(ctx, "stream_name", "group1").Result()

返回值:
1

对应返回信息字段:
整数: 被销毁的消费者组数量 (0或1)

• XClaim – 转移消息的归属权

r, err := rdb.XClaim(ctx, &redis.XClaimArgs{
	Stream:   "stream_name",
	Group:    "group1",
	Consumer: "consumer2",
	MinIdle:  time.Second * 10,   // 表示需要转移消息最少空闲了10s才能转移
	Messages: []string{"1668084000859-0"},
}).Result()

返回值:
[{1668084000859-0 map[field1:value1 field2:value2]}]

转移消息是指从同一个消费者组,不同消费者所读取的消息,相互转移

• XInfoStream – 打印流信息

r, err := rdb.XInfoStream(ctx, "stream_name").Result()

返回:
&{3 1 2 1 1668085056353-0 0-0 3 {1668084000859-0 map[field1:value1 field2:value2]} {1668085056353-0 map[field1:value1 field2:value2]} 1668084000859-0}

对应返回信息字段:
• length: stream中的消息数量
• radix-tree-keys: 基础基数数据结构中的键数
• radix-tree-nodes: 基础基数数据结构中的节点数
• groups: 该流的消费者组数量
• last-generated-id: 该流中最新一条消息的ID
• max-deleted-entry-id: 从流中删除的最大消息ID
• entries-added: 流在其生命周期内添加到的所有消息的计数
• first-entry: 流中第一个消息的ID和字段值组
• last-entry: 流中最后一个消息的ID和字段值组
• recorded-first-entry-id:第一个进入该流的消息ID

• XInfoGroups – 打印消费者组的信息

r, err := rdb.XInfoGroups(ctx, "stream_name").Result()

返回:
[{group1 0 0 1668084000859-0 0 2}]

对应返回信息字段:
• name: 消费者组名
• consumers: 组内消费者数量
• pending: 已读取未确认的消息数量
• last-delivered-id: 组内消费者消费的最后一个消息ID
• entries-read: 组内消费者消费计数器,需要注意的是,这个计数器只是一个启发式的而不是一个精确的计数器,因此使用了术语 “逻辑”。
• lag: 流中仍在等待传递给组消费者的消息数量,或者当无法确定该数量时,为NULL。

• XInfoConsumer – 打印消费者的相关信息

r, err := rdb.XInfoConsumers(ctx, "stream_name", "group1").Result()

返回值:
[{consumer1 0 6m53.121s} {consumer2 2 5m53.161s}]

• name: 消费者名
• pending: 待处理消息数,这些消息是已传递但尚未确认的消息
• idle: 自消费者上次与服务器交互以来经过的毫秒数

 

除非注明,否则均为呆小猴博客原创文章,转载必须以链接形式标明本文链接!付费资源为虚拟物品,一经出售,概不退款!
呆小猴 » go-redis:Stream 消息队列详解

发表回复

呆小猴 · 专注安全学习与分享

关于作者 联系作者