redis官方命令文档:redis redis中文命令文档:redis go-redis官方地址:go-redis Github地址:go-redis
一、连接
1.拉取go-redis
如果您使用的是Redis 7,请安装go-redis/v9 (当前处于测试版)。
go get github.com/go-redis/redis/v9
2.连接Redis服务器
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
}
)
ctx := context.Background()
二、相关命令
1.消息队列相关命令:
• XAdd – 添加消息到末尾
err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "stream_name", // stream key
NoMkStream: false, // * 默认false,当为false时,key不存在,会新建
MaxLen: 100000, // * 指定stream的最大长度,当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息
Approx: false, // * 默认false,当为true时,模糊指定stream的长度
ID: "*", // 消息 id,我们使用 * 表示由 redis 生成
Values: []interface{}{"field1", "value1", "field2", "value2"}, // 表示消息内容键值对
// MinID: "id", // * 超过阈值,丢弃设置的小于MinID消息id【基本不用】
// Limit: 1000, // * 限制长度【基本不用】
}).Err()
• XTrim – 对流进行修剪,限制长度
err := rdb.XTrimMaxLen(ctx, "stream_name", 1).Err()
• XDel – 删除消息
err := rdb.XDel(ctx, "stream_name", "1667739612182-0").Err()
• XLen – 获取流包含的元素数量,即消息长度
len, err := rdb.XLen(ctx, "stream_name").Result()
• XRange – 获取消息列表,会自动过滤已经删除的消息
// "-":起始ID "+":终止ID
re, err := rdb.XRange(ctx, "stream_name", "-", "+").Result()
re, err := rdb.XRangeN(ctx, "stream_name", "-", "+", 1).Result()
• XRevRange – 反向获取消息列表,ID 从大到小
re, err := rdb.XRevRange(ctx, "stream_name", "+", "-").Result()
re, err := rdb.XRevRangeN(ctx, "stream_name", "+", "-",1).Result()
• XRead – 以阻塞或非阻塞方式获取消息列表
XRead只是读取消息,读取完之后并不会删除消息。 使用XRead读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。
re, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"stream_name", "0-0"},
Count: 1,
Block: time.Second * 10, // 设置为0 表示一直阻塞
}).Result()
2.消费者组相关命令:
• XGroupCreate – 创建消费者组
err = rdb.XGroupCreate(ctx, "stream_name", "group1", "0").Err() // 0:从头获取 $:从最新获取
• XReadGroup – 读取消费者组中的消息
r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "group1",
Consumer: "consumer1",
Streams: []string{"stream_name", ">"},
Count: 1,
Block: 0,
NoAck: true, //为true,表示读取消息时确认消息
}).Result()
id为>,表示着消费者只希望接收从未传递给任何其他消费者的消息
id为0或其他,表示可获取已读但未确认的消息,请注意,在这种情况下,BLOCK和NOACK都将被忽略。
• XPending – 显示待处理消息的相关信息
r, err := rdb.XPending(ctx, "stream_name", "group1").Result()
返回:
&{1 1668089463435-0 1668089463435-0 map[consumer1:1]}
对应返回信息字段:
• 该消费者组的待处理消息总数,该总数为1
• 待处理消息中最小的ID
• 待处理消息中最大的ID
• 该消费者组中的每个有待处理的消费者,以及它拥有的待处理消息数。
• XAck – 将消息标记为”已处理”
err = rdb.XAck(ctx, "stream_name", "group1", "1668089463435-0").Err()
• XGroupSetId – 为消费者组设置新的最后递送消息ID
r,err := rdb.XGroupSetID(ctx, "stream_name", "group1", "0").Result()
返回值:
OK
如果您希望消费者组中的消费者重新处理流中的所有消息,您可能希望将其下一个ID设置为0
• XGoupDelConsumer– 删除消费者
r, err := rdb.XGroupDelConsumer(ctx, "stream_name", "group1", "consumer1").Result()
返回值:
3
对应返回信息字段
整数: 消费者在删除之前拥有的待处理消息数
• XGroupDestroy – 删除消费者组
r, err := rdb.XGroupDestroy(ctx, "stream_name", "group1").Result()
返回值:
1
对应返回信息字段:
整数: 被销毁的消费者组数量 (0或1)
• XClaim – 转移消息的归属权
r, err := rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: "stream_name",
Group: "group1",
Consumer: "consumer2",
MinIdle: time.Second * 10, // 表示需要转移消息最少空闲了10s才能转移
Messages: []string{"1668084000859-0"},
}).Result()
返回值:
[{1668084000859-0 map[field1:value1 field2:value2]}]
转移消息是指从同一个消费者组,不同消费者所读取的消息,相互转移
• XInfoStream – 打印流信息
r, err := rdb.XInfoStream(ctx, "stream_name").Result()
返回:
&{3 1 2 1 1668085056353-0 0-0 3 {1668084000859-0 map[field1:value1 field2:value2]} {1668085056353-0 map[field1:value1 field2:value2]} 1668084000859-0}
对应返回信息字段:
• length: stream中的消息数量
• radix-tree-keys: 基础基数数据结构中的键数
• radix-tree-nodes: 基础基数数据结构中的节点数
• groups: 该流的消费者组数量
• last-generated-id: 该流中最新一条消息的ID
• max-deleted-entry-id: 从流中删除的最大消息ID
• entries-added: 流在其生命周期内添加到的所有消息的计数
• first-entry: 流中第一个消息的ID和字段值组
• last-entry: 流中最后一个消息的ID和字段值组
• recorded-first-entry-id:第一个进入该流的消息ID
• XInfoGroups – 打印消费者组的信息
r, err := rdb.XInfoGroups(ctx, "stream_name").Result()
返回:
[{group1 0 0 1668084000859-0 0 2}]
对应返回信息字段:
• name: 消费者组名
• consumers: 组内消费者数量
• pending: 已读取未确认的消息数量
• last-delivered-id: 组内消费者消费的最后一个消息ID
• entries-read: 组内消费者消费计数器,需要注意的是,这个计数器只是一个启发式的而不是一个精确的计数器,因此使用了术语 “逻辑”。
• lag: 流中仍在等待传递给组消费者的消息数量,或者当无法确定该数量时,为NULL。
• XInfoConsumer – 打印消费者的相关信息
r, err := rdb.XInfoConsumers(ctx, "stream_name", "group1").Result()
返回值:
[{consumer1 0 6m53.121s} {consumer2 2 5m53.161s}]
• name: 消费者名
• pending: 待处理消息数,这些消息是已传递但尚未确认的消息
• idle: 自消费者上次与服务器交互以来经过的毫秒数