前言
事情是这个发生的……
因为公司新专案需要用到 MQ,所以我们选用了 NATS Streaming (STAN) 当作我们专案的 MQ。
没想到如火如荼的密集开发一年后,某天在 Github 查看 STAN 的资料时才发现原来 STAN 要被开发团队弃用了……
(来源: https://github.com/nats-io/nats-streaming-server)
……三小?
看上面说明,开发团队建议改用他们的新产品 JetStream,取代现有的 STAN。
所以 JetStream 是啥?听都没听过?
于是决定来研究一下 JetStream 究竟是何方神圣?并借此机会顺便回顾一下先前的 Core NATS 和 NATS Streaming 两套产品,并且比较一下新旧产品的差异。
这次分享的主题主要有三个部分:
- 一是简单介绍 Message Queue (MQ) 的概念和其用途;
- 二是分别介绍 NATS、NATS Streaming 和这次的重点 JetStream;
- 最后再做个简易的评测。
Message Queue (MQ)
Message Queue (MQ),时常翻译为「讯息伫列」或「消息队列」,常见的开源选择有 RabbitMQ、Kafka 和今天要谈的 NATS。
Message Queue 本身可以简单想像成是一个服务级别的 Queue,同样讯息先进先出,差别在因为这是独立的服务,所以通常必须异步处理;另一个分别是通常 Queue 是一进一出,一则讯息被一个消费者接收,另一个就收不到,但 Message Queue 可以做到让每个消费者都能收到全部的讯息(这通常是可选的)。
MQ 概念上大致可以分别两个角色,分别是:
- 生产者 (Producer)
- 消费者 (Consumer)
生产者负责生产讯息 (Message),并丢进 MQ,而消费者负责接收并处理讯息。两者可以完全不用知道对方,只要和 MQ 沟通即可。
换句话说,只要生产者产生的讯息符合消费者能接收的格式,那么其实不一定具体非得由哪个生产者才能生产。因为无论是单个生产者还是多个不同的生产者,对身为接收端的消费者都无所谓,只要能正常收到符合条件格式的讯息即可。
反过来也是如此,生产者只管生产,至于后端究竟有多少个消费者消费,对生产者来说无关紧要,也不会影响程式码,可以让生产者和消费者各自都拥有最大的弹性。
套用我流解释,使用 MQ 可以有几个好处:
- 更简单
- 更可靠
- 更大更强
更简单 - 程式简化和解耦
使用 MQ 可以方便让不同服务解耦,正如前面所说,所有服务不管是生产者还是消费者 ,全部都统一都只和 MQ 沟通,生产者不用管是谁处理或是什么时候处理,而消费者也不用管是谁生产的内容。
因此无论是生产者还是消费者都可以自由的拆分成多个服务,让每个服务都只负责一件事,程式码可以很单纯。
也就是说只要能和 MQ 沟通,不管是用什么程式语言、用什么方式处理皆无所谓,就算后面其实是一只鸡在处理也可以。
更进一步说,其实连时间也解耦了,因为中间隔了一层 MQ,所以不一定需要生产者和消费者同时在线上。
生产者在生产讯息时,没人规定消费者非得即时在线上处理;反之亦然,消费者在处理讯息时,生产者也不一定要同时在线上生产讯息。
更可靠 - 服务挂掉也没差的能力
在这个架构下,因为生产者或消费者不需要直接连结,所以即使服务挂掉,系统还能一定程度的继续运作。
因为 MQ 通常都有一定程度的储存讯息的能力,所以即使某一个消费者挂掉,也可以等到它复活后再继续把之前没推送成功的讯息再推送给它。
虽然处理时间多少会受影响,但至少讯息不会掉,在多数情境下,这样也不会影响到系统的运作。
更大更强 - 大流量的缓冲
网路服务的流量并不一定是恒定的,系统有时可能会突然面临超大量的网路请求,但是即使要开更多台服务器也需要一点时间,这时 MQ 就可以当作「漏斗」一样的功能,充当缓充。
等到足够数量的服务启动完毕,可以跟上讯息生产的速度了,就可以处理之前来不及处理的讯息了。
Note
另一种假设是这类超大量的网路请求不会一直持续,所以如果业务许可,也可以选择不启动新的服务器,让 MQ 先接收下来就好,之后再让消费者慢慢消化,用时间换取资源,也是一种选择。
Core NATS
这里首先介绍 Core NATS,此处有个很容易混淆的点是 NATS 其实有三项产品:一是 Core NATS、另一是 NATS Streaming,最后还有最新的 JetStream。
因为官网的文件是直接把这三者的内容写在一起,所以一开始没看清楚很容易会以为是同样的东西。但其实这是各自拥有不同概念的三项产品。
Core NATS(以下简称 NATS),一个开源、云原生、用 Golang 写的讯息传递系统,也是 NATS 最基础的产品。
NATS 使用「发布」和「订阅」的方式和程式沟通,并且不做任何持久性的处理,非常单纯,所以效能也相当好。
程式操作也非常简单,这里介绍一下简单的例子。
范例 - NATS 连线
// 连线
natsConn, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal("连不上 NATS")
}
defer natsConn.Close()
范例 - NATS 发送讯息
// 发送讯息
err = natsConn.Publish("subject", []byte("Hello world"))
if err != nil {
log.Fatal("送不出去")
}
// 清空缓冲
err = natsConn.Flush()
if err != nil {
log.Fatal("清空失败")
}
Note
基于效能因素,函式库会先缓充要发送的讯息,直到一定量的时候才会发送。但如果应用程式有需求要确保想处理的已经确实处理好了,可以使用 flush 函式立即处理。
(来源:https://docs.nats.io/developing-with-nats/sending/caches)
范例 - NATS 接收讯息
// 接收讯息
_, err = natsConn.Subscribe("subject", func(msg *nats.Msg) {
fmt.Println("收到了", string(msg.Data))
})
if err != nil {
log.Fatal("订阅失败")
}
Queue Groups
NATS 内建 Load balancing 的功能,你可以在订阅的时候随便指定一个 Queue 的名称,NATS 会确保讯息能自动分配讯息到同一个 Queue Group 的不同的消费者。
// 订阅时直接指定 Queue 的名称,不需要用设定档预先设定
_, err = natsConn.QueueSubscribe("subject", "queue", func(msg *nats.Msg) {
fmt.Println("收到了", string(msg.Data))
})
if err != nil {
log.Fatal("订阅失败")
}
这样一来,即使一个 Subject 有多个消费者一起处理,每则讯息也只会被处理一次,非常方便。
「最多一次」交付模型
前面说到,NATS 本身不做任何持久性处理,换句话说,如果 NATS 发送讯息的时候如果没人接,掉了就掉了, NATS 也不会理它,讯息便有可能会丢失。
虽然 NATS 本身不管这件事,但它提供了一种称为 Request / Reply 的解决方案。
简单来说就两件事:
- 消费者主动回传「收到」的讯息给生产者知道有收到
- 让开发者自行解决有没有收到的问题
范例 - Request / Reply 机制
// 自动建立一个唯一 subject
reply := nats.NewInbox()
// 发送讯息
err = natsConn.PublishRequest("subject", reply, []byte("Hello world"))
if err != nil {
log.Fatal("送不出去")
}
此例的 reply 其实就是一个普通的 Subject,可以正常透过 Subscribe 监听,用来接收消费者回传的「收到」讯息。
// 接收讯息
_, err = natsConn.Subscribe("subject", func(msg *nats.Msg) {
fmt.Println("收到了", string(msg.Data))
<span class="nx">msg</span><span class="p">.</span><span class="nx">Respond</span><span class="p">([]</span><span class="nb">byte</span><span class="p">(</span><span class="s">"我收到了"</span><span class="p">))</span> <span class="c1">// 生产者会监听 reply,来确认消费者有没有收到</span>
})
if err != nil {
log.Fatal("订阅失败")
}
Respond 意思就是回传讯息给上例的 reply,只要生产者有监听,就可以收到该讯息。借此让生产者自行处理「收到」或是「没收到」的情况。
当然,这是有需要判断有没有收到的场景才需要这么做,如果本身是不介意掉讯息的场景那就没差了。
NATS 小结
简单来说,Core NATS 的好处就是速度很快,简单易用,而且对我来说因为是用 Golang 写的,有什么问题比较容易自己找到原因。
但因为有可能因为以消费者挂掉而掉讯息,所以适合需要大量、低延迟的场景 (比 Kafka 低很多),而且不担心漏讯息的场景 (或是能自行维护也行)。
NATS Streaming (STAN)
NATS Streaming,缩写为 STAN,与前述的 Core NATS 相比,最重要的就是新增了持久化的功能,可以说就是「有持久化功能的 NATS」。
具体的应用场景,大约有下列四种情况:
- 需要讯息的历史纪录 (需要 Replay data 的时候)
- Producer 和 Consumer 高度解偶,有可能不是同时在线
- Producer 和 Consumer 需要按照自己的节奏发送、或是接收资料
- 最后一条讯息对 Consumer 是必须的 (Producer 可能离线)
根据官方的说法,其实大部分用原始的 NATS 即可,如果要确保收到,可以透过前述的 Request / Reply 机制解决,官方相信自行在应用端管理,长久下来会比直接用 STAN 更加稳定。
(当然啦,身为苦逼的开发者,不一定都有机会可以能长远的看待问题就是了……)
独立的 STAN
虽然乍听起来, STAN 好像只是 NATS 多了持久化的功能而已,但其实两者几乎是完全不同的东西, STAN 有完全属于自己的概念,有自己独立的函式库,只是函式库内部使用 NATS 连线而已。
由于 STAN 只是将 NATS 当作连线工具使用,会用自己的方式将资讯做包装,如果你直接透过 NATS 来观察,你发现完全看不出什么鬼。
简单来说, STAN 与 NATS 是不同的东西。
好比说客户端在连上 STAN 需要指定使用的 Cluster ID,也要自行指定自己的 Client ID。
而这边的 Client ID 是专属于 NATS Streaming 的概念,并不是 NATS 的 Client ID (但因为是用 NATS 连线,所以同时仍然也会有 NATS 的 Client ID)。
范例 - STAN 连线
// 连线
stanConn, err := stan.Connect(
"test-cluster", // Cluster ID
"clientID", // 客户端自设的 Client ID
stan.NatsURL("nats://localhost:4222"),
stan.NatsOptions(
nats.Name("NATS 连线名称"),
),
)
if err != nil {
log.Fatal("连不上 STAN")
}
defer stanConn.Close()
此外, STAN 使用的是 Channel 而非 Subject,虽然看似相同,但实际却有差别。
NATS 原生的 Subject 可以支援 wildcard,我们可以在直接订阅 chicken.* ,那么无论是 chicken.a 又或是 chicken.b 也都能收到讯息,但 NATS Streaming 的 Channel 就不支援这么做。
Note
不知为何,虽然官网说 STAN 用的是 Channel 而非 Subject,但函式库的命名还是使用 subject
(来源: https://docs.nats.io/developing-with-nats-streaming/streaming)
而 STAN 使用的讯息也是不同的物件,一个是 nats.Msg 另一个是 stan.Msg 。
范例 - STAN 发送讯息
// 发送讯息
err = stanConn.Publish("channel", []byte("Hello world"))
if err != nil {
log.Fatal("送不出去")
}
范例 - STAN 接收讯息
// 接收讯息
_, err = stanConn.Subscribe("channel", func(msg *stan.Msg) { // 使用 stan.Msg
fmt.Println("收到了", string(msg.Data))
})
if err != nil {
log.Fatal("订阅失败", err)
}
Durable
因为 STAN 多了持久化的功能,所以消费者端这边就不用在生产者生产讯息的时候即时接收,只要讯息还存在 STAN 里,就可以自行选择任意时间和位置开始接收讯息。
但是如果消费者端每次都要随时自己记得自己收到哪里也很麻烦,所以 STAN 也多了 Durable 的概念。
STAN 本身会帮忙记录消费者收到哪里,如果消费者断线回复,STAN 会自动从断线的地方开始送。
消费者可以在订阅的时候指定 Durable 名称,STAN 会把消费者的 Client ID 和 Durable 当作 Key 记录当前接收到的位置。假若消费者因故断线重连,那么 STAN 就会根据 Client ID 和 Durable 判断从哪个位置开始发送。
由于 STAN 也支援前述的 Queue Groups 的功能,所以 STAN 的订阅其实有四种组合,分别为:
类型 | 说明 |
---|---|
Regular | 最基本的订阅模式,当应用关掉、取消订阅时,就会失去位置,下次订阅需要重新指定 |
Durable | 消费者断线时会保留位置,下次订阅还会从上次最后接收的位置开始 (不包含主动取消订阅) |
Queue | 多个消费者共享位置,但全部断线就会失去位置 |
Durable / Queue | 多个消费者共享位置,但即使全部断线也不会失去位置 (除非最后一个主动取消订阅) |
简单来说, Durable 就是保留位置,而 Queue 就是共用位置,两两相乘就是四种可能。
至于 Durable 和 Durable / Queue 的差别在于前者以 ClientID 和 Durable 为 Key 记录最后收到的讯息位置,而后者则是以 Queue 和 Durable 为 Key 来记录。所以对于前者来说,如果不同 ClientID ,就会各自当不同的订阅,而后者则会共用同一个。
Durable | Server 会维护一份订阅纪录 (ClientID + Durable 为 Key) 记录最后收到的讯息位置 |
Durable / Queue | Server 同样会维护一订阅纪录 (Queue + Durable 为 Key) 记录最后收到的讯息位置 (这种情况下 ClientID 不重要) |
(来源:https://github.com/nats-io/nats-streaming-server/issues/723#issuecomment-452361690)
「至少一次」交付模型
如果说 NATS 提供的是「最多一次」的交付模型,那么 STAN 就是「至少一次」的交付模型,因为多了持久化的功能,所以 STAN 可以保留之前的讯息,如果消费者端没收到就自动重送。
而为了确认消费者端有没有收到讯息,所以 STAN 也多了 Ack 的概念,让消费者端可以回报 STAN 说这个讯息处理成功了。如果 STAN 这端等太久没收到 Ack,就会认为消费者没有收到讯息而进行重送。
有时因为一些网路的原因,有可能会发生 STAN 认为消费者端没收到,但其实有的情况,好比说消费者的 Ack 太慢送,导致 STAN 发生 Timeout 认为没送到再送一次。一旦发生这种情况,相同的讯息就有可能会重送,所以实作上要设计成幂等的,系统要支持重复的讯息而不会发生错误才行。
我们可以自行选择使用自动 Ack 或是手动 Ack ,预设是自动 Ack,所以只要有正常收到,基本就当你成功了。
但我们通常不会把「收到讯息」就当作成功,而是把讯息当作一个「任务」,必须成功做完某件事才当作成功,不然就都算失败,需要重做。
所以实务上通常会建议用手动,这样才能确保自己能控制这个任务究竟是成功还是失败。
opts := []stan.SubscriptionOption {
stan.SetManualAckMode(), // 手动 Ack 模式
}
_, err = stanConn.Subscribe("channel", func(msg *stan.Msg) {
fmt.Println("收到了", string(msg.Data))
msg.Ack() // 手动 Ack
}, opts...)
if err != nil {
log.Fatal("订阅失败", err)
}
某种程度而言,其实 STAN 就是在 Core NATS 之上再做了 Request / Reply 的功能。 Ack 就是类似 Reply 的效果。而原始的 Core NATS 如果没收到 Reply,生产者端通常能做的就是重送,而 STAN 接手了这件事情,代替生产者端做同样的事情。
Note
订阅本身不影响 Channel 保留的内容, Ack 完的讯息也不会因此被删掉
STAN 的坑
听起来 STAN 似乎很美好,但实际使用时其实有很多坑,刚刚提到 STAN 其实是一个独立的服务,它有自己的术语,有自己的函式库,只是把 NATS 当作系统的底层。
我觉得概念本身没问题,但问题是 STAN 并没有完全把 NATS 隐藏起来,从之前的例子可知如果要调整一些设定,还是得引入 NATS 的函式库,我认为这不是好的设计。
前面说过,NATS 有 Client ID (由服务端分配),STAN 也有 Client ID (由客户端自行指定),STAN 没能做到完全隐藏 NATS 的 Client ID,所以就会让使用者感到困惑。
在连线的时候,如果要调整参数,还是得引入 NATS 的函式库,没办法只用 STAN 的函式库就好。
// 连线
stanConn, err := stan.Connect(
"test-cluster",
"clientID",
stan.NatsURL("nats://localhost:4222"),
stan.NatsOptions(
nats.Name("NATS 连线名称"), // 这项设定需要引入 nats 函式库
),
)
if err != nil {
log.Fatal("连不上 STAN")
}
defer stanConn.Close()
上述的问题可能影响不大,但由于 STAN 和 NATS 是各自独立的服务器,而且连结并没有想像中紧密,好比说 NATS 和 STAN 两者各自都有自己的断线判断,而最糟糕的是--两者判断可能不同。
有可能 NATS 认定断线,但 STAN 没有;又或是相反,STAN 认定断线,但 NATS 没有。这时就会碰到很大的麻烦,有可能会发生表面上 NATS 还在连线,但其实没有办法收到任何讯息的状况。
简单来说,就是它本身的断线重连机制根本没办法正常运作,无法做到用户无感知,必须自行处理,自行重新订阅才行。这件事一直在我写这篇文章时似乎都没有好的解法,我目前的做法就是只要侦测到断线,就直接整个重连(包含 NATS 和 STAN 的连线)。
STAN 小结
这边做个简单的小结,STAN 就是有持久化功能的 NATS,效能也相当不错,延迟同样比 Kafka 好,但因为最初设计的一些原因,所以也带来了许多的坑。
使用上其实没有太大的问题,除了……被开发团队放生以外?
JetStream
最后则是本篇的重头戏--JetStream。
它是开发团队用来取代 STAN 的新方案,所以也提供了 STAN 类似的功能,但功能更丰富也更强大,同时还修正了 STAN 碰到的问题。
这次的 JetStream 不再和 STAN 一样是独立的服务,而是 NATS 本身的子系统,第一个显而易见的好处不用再分别启动 NATS 和 STAN 不同的服务器,只要在 NATS 的服务器简单加了一个参数就可以用 JetStream 了,可以显著的减少维运的成本。
sudo docker run nats:2.6.1 -js # 加上 -js 即可支援 JetStream
在开发上,也不用再引入不同的函式库,直接使用 NATS 本身的函式库就好。如果要使用 JetStream,只要在 NATS 连线的基础上直接取得 JetStream 的 Context 即可,非常简单。
范例 - 取得 JetStream 的 Context
// 连线
natsConn, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal("连不上 NATS")
}
defer natsConn.Close()
// 取得 JetStream 的 Context
js, err := natsConn.JetStream()
if err != nil {
log.Fatalf("取得 JetStream 的 Context 失败: %v", err)
}
同时 JetStream 也带来更多更强大的功能,比如可以更细致的调整讯息的保留方式,除了可以像 STAN 一样定义讯息的保留时间、大小、数量外,还可以进一步设定「如果没 Ack 过就永久保留」或是「没有任何订阅就删除」等更进阶的功能。
而且讯息接收方式除了能由 JetStream 主动推讯息外,还多了可以让消费者自行拉取讯息的模式。
使用上,JetStream 明确定义了两个新概念:
- Stream - 负责管理存储
- Consumer - 负责管理消费
Stream
Stream 定义了 NATS 讯息保留的规则,如果一条 NATS 讯息符合 Stream 设定的 Subject,就会被 JetStream 存下来。而 JetStream 就是透过管理 Stream 间接做持久化。
我们可以设定多个不同的 Stream,来对应多个 Subject,同时每个 Stream 也可以支援不同的存储规则,像是可以自行选择讯息保留的方式、丢弃的方式等。
而这一切都不需要预先定义,可以在程式运作的过程中动态产生。
范例 - 动态建立新的 Stream
// 建立 Stream
_, err = js.AddStream(&nats.StreamConfig{
Name: "Stream名称",
Subjects: []string{
"subject.*", // 支援 wildcard
},
Storage: nats.FileStorage, // 储存的方式 (预设 FileStorage)
Retention: nats.WorkQueuePolicy, // 保留的策略
Discard: nats.DiscardOld, // 丢弃的策略
// ...
})
if err != nil {
log.Fatalf("建立 Stream 失败: %v", err)
}
而代价则是开发者需要在程式里显式管理 Stream,无论是发送和接收,Subject 都必须确保有对应的 Stream 存在,不然就会报错。
所以开发者一开始第一个可能碰到的坑,就是想如同用 NATS 一样直接推送一则讯息,然后就会发现会因为没有对应的 Stream 而推送失败。
Note
补充: JetStream 和 NATS 相同,Subject 都能支援 wildcard,因为 JetStream 的讯息其实就是 NATS 的讯息,当然可以支援。
Consumer
Consumer 则是定义了消费者接收的规则,消费者在订阅某个 Subject 时,会自动产生对应的 Consumer。 Consumer 会包含相关的设定,同时还会维护一份纪录,记录消费者接收到的位置。
JetStream 同样也有 Durable 的概念,用法和 STAN 也基本相同,差别在于 JetStream 明确定义了 Consumer 的概念,所以对于 JetStream 来说,一个 Durable 就代表一个 Consumer。
具体来说就是如果消费者订阅的时候指定了 Durable Name,那么 JetStream 就会找寻同样名称的 Consumer,如果有,就直接从该 Consumer 记录的位置开始发送讯息,而不是从头开始。
Push / Pull Subscription
除此之外, JetStream 还多了 Push 和 Pull 的概念,过去 NATS 和 STAN 都是用 Push 的方式由 MQ 推送讯息给消费者,而 JetStream 则再多了 Pull 的方法,让消费者可以主动和 MQ 要讯息,更好的区分不同的用途。
简单来说 Push 的方式就是 JetStream 会不管三七二十一狂推,适合量少需要极低延迟的任务,比如说即时监控,Pull 的话就是由消费者主动拉讯息,适合当 Worker 使用。
两种方式比较,虽然 Push 会有更低的延迟,更快的速度,但如果对方收不到这么快也没用,还可能被当成 Slow consumer 而被踢掉,所以两种方式各有用途。
Note
要减少 Slow consumer 的问题,可以设定 RateLimit 或是直接用 Max Pending 来解决。
范例 - Push Subscription
_, err = js.Subscribe("subject", func(msg *nats.Msg) {
fmt.Println("收到了", string(msg.Data))
})
if err != nil {
log.Fatal("订阅失败", err)
}
用法和 NATS 的几乎一模一样,差别是改用 JetStream 的 Context 来操作 (此例为 js)。
范例 - Pull Subscription
sub, err := js.PullSubscribe("subject", "durable") // Pull 模式必须要用 Durable
if err != nil {
return xerrors.Errorf("订阅失败: %w", err)
}
for {
msgs, err := sub.Fetch(10) // 决定一次收几条
if err != nil {
return xerrors.Errorf("接收失败: %w", err)
}
<span class="k">for</span> <span class="nx">_</span><span class="p">,</span> <span class="nx">msg</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">msgs</span> <span class="p">{</span>
<span class="nx">fmt</span><span class="p">.</span><span class="nx">Println</span><span class="p">(</span><span class="s">"收到了"</span><span class="p">,</span> <span class="nb">string</span><span class="p">(</span><span class="nx">msg</span><span class="p">.</span><span class="nx">Data</span><span class="p">))</span>
<span class="nx">msg</span><span class="p">.</span><span class="nx">Ack</span><span class="p">()</span> <span class="c1">// 要手动 Ack</span>
<span class="p">}</span>
}
在 Pull 模式,使用差异比较大,消费者要自行主动拉资料,可以决定一次要拉几条,订阅的时候必须使用 durable,而且必须强制手动 Ack。
Ack
提到 Ack,JetStream 也带来了相比 STAN 更丰富的 Ack 机制,除了能回传代表成功的 Ack,也多了代表失败的 Nak 或是还没好的 Progress 等等。
原本在 STAN 中,如果讯息处理失败的时候,就只能让 STAN 等到 Timeout,才能判断失败。但现在 JetStream 可以让消费者主动回传 Nak,让服务器能更快知道该讯息处理失败了。
丰富的 Ack 机制
功能 | 简易说明 |
---|---|
AckAck | 搞好了 |
AckNak | 没搞成 |
AckProgress | 还在搞 |
AckNext | 先搞下一个 |
AckTerm | 这个我不搞 |
多样的 Ack 策略
策略 | 说明 |
---|---|
AckExplicit [预设] | 每个讯息都要 Ack (每个都要明确的说搞好了) |
AckNone | 不用 ack(不用说搞好了没) |
AckAll | 只需要 ack 最后一笔 (搞了这个,就当已经搞好之前所有的讯息) |
「保证一次」交付模型
JetStream 和 STAN 提供的都是「至少一次」的交付模型,但在限定条件下,它可以做到「保证一次」,来确保消费者不会收到重覆的讯息。
具体而言,JetStream 提供了两种机制来确保「发送端不会重送」并且「接收端不会重收」两件事,依此做到「保证一次」的效果。
要保证「发送端不会重送」,JetStream 的做法是让生产者可以为每一则讯息自行指定「讯息 ID」, JetStream 会负责确保同样的「讯息 ID」只会送一次。
简单来说,它会在送完一笔讯息后,在一定时间内无视之后传来所有相同「讯息 ID」的讯息,来达成不会重送的要求。
_, err = js.Publish("subject", []byte("Hello world"), nats.MsgId("讯息ID"))
if err != nil {
log.Fatal("送不出去")
}
之所以要确保「发送端不会重送」,是因为发送端有可能会因为网路原因,明明讯息有送成功了,但却没收到 MQ 回传的「收到讯息」以为自己没发送成功,而再送一次的状况。
如果没有让发送端自行指定讯息 ID,对于 JetStream 来说,它其实无法判断某一则讯息到底是不是重复的。因为即使是完全同样的讯息内容,在不同的业务中仍然可能代表不同的讯息,因此是否重复只有开发者才能决定。而 JetStream 的做法就是让生产者自行决定讯息的 ID,如果是一样的,就代表同一个讯息,反之则不是。
第二件事就是要确保「接收端不会重收」,这里似乎有许多不同的说法,但大概念都是类似,要由消费者端主动确认来解决。
举例来说,可以让消费者透过 AckSync 或是限制时间的 Ack 来向 MQ 确认是否已经有消费者已经收到讯息了。
_, err = js.Subscribe("subject", func(msg *nats.Msg) {
fmt.Println("收到了", string(msg.Data))
<span class="nx">err</span> <span class="o">:=</span> <span class="nx">msg</span><span class="p">.</span><span class="nx">AckSync</span><span class="p">()</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="nx">log</span><span class="p">.</span><span class="nx">Println</span><span class="p">(</span><span class="s">"Ack 没送成功或是这个讯息 Ack 过了"</span><span class="p">)</span>
<span class="p">}</span>
})
AckSync 就是用同步的方式 Ack,也就是让消费者端在 Ack 的时候能同步确认 MQ 有收到自己的 Ack。
之所以要这么做是因为 JetStream 有可能因为网路原因没收到消费者传来的 Ack 而以为自己没成功发送讯息而重送。
而这件事同样也只能由消费者端主动确认 MQ 是否有收到自己的 Ack,来确保 MQ 不会因为没收到消费者端的 Ack 而重送(即使重送了也可以判断出来)。
虽说 JetStream 号称可以做到「只有一次」,不过我觉得这样的代价似乎过大,每个 Ack 都要双重确认绝对会显著拖慢效能,感觉没有必要强求只有一次。
JetStream 的坑
说了这么多 JetStream 的好处,但 JetStream 其实也有很多问题,首先是它是全新的东西,因此可以想见稳定性自然是比较差的,甚至有些语言的实作到目前为止(2022/01/17)都还是 Beta 版。
同时也因为是新东西,所以文件也非常少,有时必须要直接去 github 查程式码才行。
而且更讨厌的是官网 NATS、NATS Streaming 和 JetStream 三套产品全部放在同一份文件里,而三个工具各有不同的概念,名词定义也有差异,却可能共用同样的名字,所以非常容易混淆。
偏偏 JetStream 也不能只看 JetStream 的文件,因为是共用 NATS 的函式库,所以许多概念还会延用,所以在查文件的时候就会觉得非常困扰。
另外虽说共用 NATS 函式库很方便,但产生的缺点就是不同工具的方法也混杂在一起,好比说 NATS 和 JetStream 的讯息都是共用 nats.Msg ,并没有明确的分隔,所以可能会写出令人困惑的程式码。
_, err := natsConn.Subscribe(subject, func(msg *nats.Msg) {
msg.Nak() // 这玩意儿是给 JetStream 用的,但也不会报错
})
最后一点则是 JetStream 本身的机制似乎也还没成熟,光是我刚刚提的「保证一次」这件机制就有不同的说法,而且也都不是很明确,我觉得这也是个很严重的问题。
JetStream 小结
同样做个小结, JetStream 相比 STAN 确实有许多优势。
好比说架设方便,由于 JetStream 是 NATS Server 的子系统,加参数就可以使用,不用付出维护两套服务器的成本。
而且 JetStream 不但功能比 STAN 更丰富更强大,而且效能还更好(这个后面会提)。
并且还因为是直接使用 NATS 本身的函式库,使用上也比 STAN 简单,几乎在各方面都能辗压 STAN。
但缺点是因为是新东西,稳定性可能比较差,同时有些语言的实作还是 beta 版,所以文件也很少,有时必须得直接看程式码。
我流简易评测
这里我简单做一个效能评测,因为现在 JetStream 还是很新的东西,可能不稳定,比较没有参考价值,所以我只挑选几项我个人比较在意的项目做比较。
测试环境
三台 Server
- 8 Core CPU
- 32GB ram
发布相关效能比较
从下图可以很明显的看出 JetStream 的发布效能比较好。
同时接收效能也是 JetStream 比较佳,同时也有更好的延迟表现。
最后再附上 JetStream 的各种接收方式的效能比较,可以看出 Subscribe 和 Chan Subscribe 差不多,而 Pull Subscribe 则较差,但是会随著一次取得越多而越快。
结语
一个工具,有好有坏,适合自己公司的需求才是最重要的,虽然我个人研究了半天,但基于各种原因,我们最后还是没用 JetStream。因此这边单纯只是做个简单的纪录,分享给有需要的人。
以上。
参考资料
- NATS Docs
- nats-io/nats.go: Golang client for NATS, the cloud native messaging system.
- NATS-Server(JetStream)和NATS Streaming Server对比
- 基于NATS JetStream构建分布式事件流系统