使用Redis实现消息队列主要有三种方法:

  • List队列
  • 发布/订阅模型 Pub/Sub
  • Stream (Redis5+)

下面分别对这三种方法进行介绍,并编写简单例子。

List队列

看到队列,你会想到Redis有个数据类型List,List能很好符合队列的要求。List的底层是一个链表,在头部和尾部进行操作的时间复杂度都是O(1)。 使用List进行队列操作,你可以这样使用。 生产者使用LPUSH进行消息发布 image.png 消费者使用RPOP对消息进行消费 image.png 这里存在着一个问题,如果LIST没有消息时,消费者执行RPOP时,会返回null(nil)。 我们在编写消费者逻辑时,一般是循环不断从队列中消费数据进行处理,如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。Redis提供了阻塞式拉起命令BRPOP / BLPOP,使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 null,既兼顾了效率还避免了CPU空转问题。 这是List队列的代码例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 生产者
package main

import (
	"context"
    
	"github.com/go-redis/redis/v8"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       1,
	})
	data :=[]string{`{"name": "jinzhu11221212321", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`,`{"name": "asd", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`}
	rdb.LPush(context.Background(), "queue", data)
}

// 消费者
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/go-redis/redis/v8"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       1,
	})
	for true {
		//result, err := rdb.RPop(context.Background(), "queue").Result()
		result, err := rdb.BRPop(context.Background(), 0, "queue").Result()
		if err == redis.Nil {
			continue
		} else if err != nil {
			log.Println(err)
		}
		fmt.Printf("consumer msg %v\n", result)

	}
}

List队列的缺点:

  1. 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据,仅支持1对1。
  2. 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了,无论消费者是否处理成功,这条消息都没办法再次消费了。

发布/订阅模型 Pub/Sub

从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。 它正好可以解决前面提到的第一个问题:重复消费。 即多组生产者、消费者的场景,我们来看它是如何做的。 Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。 image.png image.png 使用Pub/Sub就能很好的解决不能重复消费的问题,多个消费者能够消费同一个生产的消息。 除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。 代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 订阅

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)


func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       1,
	})
	sub := rdb.Subscribe(context.TODO(), "queue")
	ch := sub.Channel()

	for msg := range ch {
		fmt.Println(msg.Channel, msg.Payload)
	}

}

// 发布
package main

import (
	"context"
	"github.com/go-redis/redis/v8"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       1,
	})
	data :=[]string{`{"name": "jinzhu11221212321", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`,`{"name": "asd", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`}
	for i:=range data {
		err := rdb.Publish(context.TODO(), "queue", data[i]).Err()
		if err != nil {
			panic(err)
		}
	}

}

Pub/Sub 最大问题是:丢数据。 如果发生以下场景,就有可能导致数据丢失:

  1. 消费者下线
  2. Redis 宕机
  3. 消息堆积

Pub/Sub的优缺点:

  1. 支持发布 / 订阅,支持多组生产者、消费者处理消息
  2. 消费者下线,数据会丢失
  3. 不支持数据持久化,Redis 宕机,数据也会丢失
  4. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

Stream

首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

  • XADD:发布消息
  • XREAD:读取消息

生产者发布 2 条消息:

1
2
3
4
5
// *表示让Redis自动生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"

使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。 这个消息 ID 的格式是「时间戳-自增序号」。 消费者拉取消息:

1
2
3
4
5
6
7
8
9
// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"

如果想继续拉取消息,需要传入上一条消息的 ID:

1
2
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)

没有消息,Redis 会返回 NULL。

以上就是 Stream 最简单的生产、消费。

  1. Stream 是否支持「阻塞式」拉取消息? 可以的,在读取消息时,只需要增加 BLOCK 参数即可。
1
2
// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0

这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。 2) Stream 是否支持发布 / 订阅模式? 也没问题,Stream 通过以下命令完成发布订阅:

  • XGROUP:创建消费者组
  • XREADGROUP:在指定消费组下,开启消费者拉取消息

下面我们来看具体如何做? 首先,生产者依旧发布 2 条消息:

1
2
3
4
127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"

之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

1
2
3
4
5
6
// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。 第一个消费组开始消费:

1
2
3
4
5
6
7
8
9
// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

同样地,第二个消费组开始消费:

1
2
3
4
5
6
7
8
9
// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。 这样一来,就达到了多组消费者「订阅」消费的目的。

  1. 消息处理时异常,Stream 能否保证消息不丢失,重新消费? 除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。 当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。
1
2
// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。 待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
   2) 1) 1) "1618472043089-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618472045158-0"
         2) 1) "name"
            2) "lisi"
  1. Stream 数据会写入到 RDB 和 AOF 做持久化吗? Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。 我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。
  2. 消息堆积时,Stream 是怎么处理的? 其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:
  1. 生产者限流:避免消费者处理不及时,导致持续积压
  2. 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案。 在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

1
2
3
// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。 这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。 代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
	"context"
	"github.com/go-redis/redis/v8"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       1,
	})
	data := []string{`{"name": "jinzhu11221212321", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`, `{"name": "asd", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`}
	rdb.XAdd(context.TODO(), &redis.XAddArgs{
		Stream: "queue1",
		Values: data,
	})

}


package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       1,
	})
	result, err := rdb.XRead(context.TODO(), &redis.XReadArgs{
		Streams: []string{"queue1","0"},
		Count:   0,
		Block:   0,
	}).Result()
	if err != nil {
		panic(err)
	}
	for r:= range result {
		for m:=range result[r].Messages {
			fmt.Println(result[r].Messages[m].Values)
		}

	}

}

参考

[1] https://redis.uptrace.dev/ [2] https://redis.io/commands [3] https://mp.weixin.qq.com/s/RthQvzLHZRGNo-z6X_7jQQ [4] https://github.com/overstarry/queue