跳转到主要内容

Stream 简介

Stream(流)是 KeyDB 5.0 引入的一种新数据类型,它以一种更抽象的方式模拟了*日志数据结构*。然而,日志的本质仍然保留:就像一个通常以仅追加模式打开的日志文件一样,KeyDB Stream 主要是一种仅追加的数据结构。至少在概念上是这样,因为作为一种在内存中表示的抽象数据类型,KeyDB Stream 实现了强大的操作来克服日志文件的限制。

尽管 Stream 自身的数据结构相当简单,但它却是 KeyDB 中最复杂的数据类型,原因是它实现了一些额外的、非强制性的功能:一组阻塞操作,允许消费者等待生产者向流中添加新数据;此外,还有一个名为**消费者组 (Consumer Groups)** 的概念。

消费者组最初由流行的消息系统 Kafka (TM) 引入。KeyDB 以完全不同的方式重新实现了类似的想法,但目标是相同的:允许一组客户端协作消费同一个消息流的不同部分。

Stream 基础#

为了理解 KeyDB Stream 是什么以及如何使用它,我们将忽略所有高级功能,而专注于数据结构本身,即用于操作和访问它的命令。这基本上是大多数其他 KeyDB 数据类型(如列表、集合、有序集合等)所共有的部分。然而,请注意,列表也有一个可选的、更复杂的阻塞 API,由 **BLPOP** 及类似命令提供。所以在这方面,Stream 与列表没有太大区别,只是其附加的 API 更复杂、更强大。

由于 Stream 是一种仅追加的数据结构,其基本的写入命令 **XADD** 会将一个新条目追加到指定的流中。一个流条目不仅仅是一个字符串,而是由一个或多个字段-值对组成。这样,流的每个条目都已经结构化了,就像一个以 CSV 格式编写的仅追加文件,每行都包含多个分隔的字段。

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

上述对 **XADD** 命令的调用,将一个条目 `sensor-id: 1234, temperature: 19.8` 添加到键为 `mystream` 的流中,并使用一个自动生成的条目 ID,该 ID 即为命令的返回值,具体为 `1518951480106-0`。它的第一个参数是键名 `mystream`,第二个参数是条目 ID,用于标识流中的每个条目。然而,在这个例子中,我们传递了 `*`,因为我们希望服务器为我们生成一个新的 ID。每个新 ID 都是单调递增的,简单来说,每个新添加的条目都会有一个比所有过去条目都大的 ID。由服务器自动生成 ID 几乎总是你想要的方式,而明确指定 ID 的原因非常罕见。我们稍后会详细讨论这一点。每个 Stream 条目都有一个 ID,这一事实与日志文件有另一个相似之处,在日志文件中,行号或文件内的字节偏移量可用于标识给定条目。回到我们的 **XADD** 示例,在键名和 ID 之后,接下来的参数是构成我们流条目的字段-值对。

可以使用 **XLEN** 命令获取 Stream 中的项目数量:

> XLEN mystream
(integer) 1

条目 ID (Entry IDs)#

**XADD** 命令返回的条目 ID,用于唯一标识给定流中的每个条目,由两部分组成:

<毫秒时间戳>-<序列号>

毫秒时间戳部分实际上是生成流 ID 的 KeyDB 本地节点上的本地时间。但是,如果当前毫秒时间恰好小于前一个条目的时间,则会使用前一个条目的时间,因此即使时钟向后跳,单调递增的 ID 属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号是 64 位宽的,所以在同一毫秒内可以生成的条目数量实际上没有限制。

这种 ID 格式乍一看可能有些奇怪,读者可能会想知道为什么时间是 ID 的一部分。原因是 KeyDB Stream 支持按 ID 进行范围查询。因为 ID 与条目生成的时间相关,这使得按时间范围查询基本是免费的。我们很快将在介绍 **XRANGE** 命令时看到这一点。

如果由于某种原因,用户需要与时间无关、而是与另一个外部系统 ID 关联的增量 ID,如前所述,**XADD** 命令可以接受一个明确的 ID,而不是触发自动生成的 `*` 通配符 ID,如下例所示:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

请注意,在这种情况下,最小 ID 是 0-1,并且该命令不接受等于或小于前一个 ID 的 ID:

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item (错误:XADD 中指定的 ID 等于或小于目标流顶部项目的 ID)

从 Stream 中获取数据#

现在我们终于能通过 **XADD** 向流中追加条目了。然而,虽然向流中追加数据很直观,但查询流以提取数据的方式却不那么明显。如果我们继续使用日志文件的类比,一个显而易见的方式是模仿我们通常使用 Unix 命令 `tail -f` 所做的事情,也就是说,我们可以开始监听以获取追加到流中的新消息。请注意,与 KeyDB 的阻塞列表操作不同(在这些操作中,一个给定的元素将到达一个在*弹出式*操作如 **BLPOP** 中阻塞的单个客户端),对于流,我们希望多个消费者能看到追加到流中的新消息(就像多个 `tail -f` 进程可以看到添加到日志中的内容一样)。用传统术语来说,我们希望流能够将消息*扇出 (fan out)*给多个客户端。

然而,这只是一种潜在的访问模式。我们也可以用完全不同的方式看待流:不是作为一个消息系统,而是作为一个*时间序列存储*。在这种情况下,获取新追加的消息可能也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以增量地检查所有历史记录。这绝对是另一种有用的访问模式。

最后,如果我们从消费者的角度看待流,我们可能希望以另一种方式访问流,即作为一个可以被分区给多个处理这些消息的消费者的消息流,这样消费者组只能看到到达单个流中的消息子集。通过这种方式,可以在不同的消费者之间扩展消息处理,而单个消费者不必处理所有消息:每个消费者只会得到不同的消息来处理。这基本上就是 Kafka (TM) 通过消费者组所做的事情。通过消费者组读取消息是 KeyDB Stream 的另一种有趣的读取模式。

KeyDB Stream 通过不同的命令支持上述所有三种查询模式。接下来的部分将展示所有这些模式,从最简单、最直接的范围查询开始。

按范围查询:XRANGE 和 XREVRANGE#

要按范围查询流,我们只需要指定两个 ID,*start* 和 *end*。返回的范围将包括 ID 为 start 或 end 的元素,因此范围是包含性的。两个特殊的 ID `-` 和 `+` 分别表示最小和最大的可能 ID。

> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"

返回的每个条目都是一个包含两项的数组:ID 和字段-值对列表。我们已经说过条目 ID 与时间有关,因为 `-` 字符左边的部分是创建流条目的本地节点上的 Unix 毫秒时间(但请注意,流是通过完全指定的 **XADD** 命令复制的,因此副本的 ID 将与主节点相同)。这意味着我可以使用 **XRANGE** 查询一个时间范围。然而,为此,我可能想要省略 ID 的序列号部分:如果省略,范围的起始部分将被假定为 0,而结束部分将被假定为可用的最大序列号。这样,仅使用两个毫秒级的 Unix 时间进行查询,我们就能以包含性的方式获得在该时间范围内生成的所有条目。例如,如果我想查询一个两毫秒的周期,我可以使用:

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"

在这个范围内我只有一个条目,但在真实的数据集中,我可以查询数小时的范围,或者在短短两毫秒内就可能有很多项目,返回的结果可能会非常大。因此,**XRANGE** 在末尾支持一个可选的 **COUNT** 选项。通过指定一个 count,我可以只获取前 *N* 个项目。如果我想要更多,我可以获取返回的最后一个 ID,将其序列号部分加一,然后再次查询。让我们在下面的例子中看看这个过程。我们首先用 **XADD** 添加 10 个项目(我不会展示这个过程,我们假设 `mystream` 流中已经填充了 10 个项目)。为了开始我的迭代,每个命令获取 2 个项目,我从完整的范围开始,但 count 为 2。

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"

为了继续用接下来的两个项目进行迭代,我需要取回返回的最后一个 ID,即 `1519073279157-0`,并在其前面加上前缀 `(`。由此产生的开区间,即本例中的 `(1519073279157-0`,现在可以用作下一次 **XRANGE** 调用的新的 *start* 参数:

> XRANGE mystream (1519073279157-0 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"

依此类推。由于 **XRANGE** 的复杂度是 *O(log(N))* 用于查找,然后是 *O(M)* 用于返回 M 个元素,所以使用小的 count 时,命令具有对数时间复杂度,这意味着迭代的每一步都很快。因此,**XRANGE** 也是事实上的*流迭代器*,并且不需要 **XSCAN** 命令。

命令 **XREVRANGE** 等同于 **XRANGE**,但它按相反的顺序返回元素,因此 **XREVRANGE** 的一个实际用途是检查 Stream 中的最后一个项目是什么:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"

注意 **XREVRANGE** 命令的 *start* 和 *stop* 参数是反向的。

使用 XREAD 监听新项目#

当我们不想按范围访问流中的项目时,通常我们想要的是*订阅*流中到达的新项目。这个概念可能看起来与 KeyDB 的发布/订阅 (Pub/Sub) 有关,即你订阅一个频道,或者与 KeyDB 的阻塞列表有关,即你等待一个键获得新元素来获取,但消费流的方式有根本的不同:

  1. 一个流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目将被传递给*每个*在给定流中等待数据的消费者。这种行为与阻塞列表不同,在阻塞列表中每个消费者会得到一个不同的元素。然而,能够*扇出*给多个消费者的能力类似于发布/订阅。
  2. 在发布/订阅中,消息是*即发即忘 (fire and forget)*的,绝不会被存储;而在使用阻塞列表时,当客户端收到消息后,它会从列表中*弹出*(有效移除),但流的工作方式完全不同。所有消息都无限期地追加到流中(除非用户明确要求删除条目):不同的消费者通过记住收到的最后一条消息的 ID 来知道什么是新消息。
  3. 流消费者组提供了发布/订阅或阻塞列表无法达到的控制水平,包括为同一个流设置不同的组、对已处理项目的显式确认、检查待处理项目的能力、声明未处理消息,以及为每个客户端提供一致的历史可见性,即每个客户端只能看到其私有的过去消息历史。

提供监听流中新消息到达能力的命令叫做 **XREAD**。它比 **XRANGE** 稍微复杂一些,所以我们先展示简单的形式,稍后将提供完整的命令布局。

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"

上面是 **XREAD** 的非阻塞形式。注意 **COUNT** 选项不是强制的,实际上该命令唯一强制的选项是 **STREAMS** 选项,它指定了一个键列表以及调用消费者对每个流已经见过的相应最大 ID,这样命令只会向客户端提供 ID 大于我们指定 ID 的消息。

在上面的命令中,我们写了 `STREAMS mystream 0`,所以我们想要流 `mystream` 中所有 ID 大于 `0-0` 的消息。如上例所示,该命令返回了键名,因为实际上可以用多个键调用此命令,以同时从不同的流中读取。例如,我可以写:`STREAMS mystream otherstream 0 0`。注意在 **STREAMS** 选项之后,我们需要提供键名,然后是 ID。因此,**STREAMS** 选项必须始终是最后一个。

除了 **XREAD** 可以一次访问多个流,以及我们能够指定自己拥有的最后一个 ID 来只获取更新的消息之外,在这种简单的形式下,这个命令与 **XRANGE** 并没有太大的不同。然而,有趣的是,我们可以通过指定 **BLOCK** 参数,轻松地将 **XREAD** 变成一个*阻塞命令*:

> XREAD BLOCK 0 STREAMS mystream $

请注意,在上面的例子中,除了移除 **COUNT**,我还指定了新的 **BLOCK** 选项,超时时间为 0 毫秒(意味着永不超时)。此外,我没有为 `mystream` 流传递一个普通的 ID,而是传递了特殊 ID `$`。这个特殊 ID 意味着 **XREAD** 应该使用 `mystream` 流中已存储的最大 ID 作为最后一个 ID,这样我们将只接收*新*消息,从我们开始监听时算起。这在某种程度上类似于 Unix 的 `tail -f` 命令。

请注意,当使用 **BLOCK** 选项时,我们不必使用特殊 ID `$`。我们可以使用任何有效的 ID。如果命令能够立即服务我们的请求而无需阻塞,它就会这样做,否则它将阻塞。通常,如果我们想从新条目开始消费流,我们会以 ID `$` 开始,之后我们继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。

阻塞形式的 **XREAD** 也能够监听多个流,只需指定多个键名即可。如果请求可以同步处理,因为至少有一个流的元素大于我们指定的相应 ID,它就会带着结果返回。否则,该命令将阻塞,并返回第一个获得新数据的流的项目(根据指定的 ID)。

与阻塞列表操作类似,阻塞流读取从等待数据的客户端的角度来看是*公平的*,因为语义是 FIFO(先进先出)风格。第一个为给定流阻塞的客户端将在新项目可用时第一个被解除阻塞。

**XREAD** 除了 **COUNT** 和 **BLOCK** 之外没有其他选项,所以它是一个非常基础的命令,其特定目的是将消费者附加到一个或多个流上。使用消费者组 API 可以获得更强大的消费流的功能,但是通过消费者组读取是由一个名为 **XREADGROUP** 的不同命令实现的,本指南的下一节将对此进行介绍。

消费者组#

当任务是从不同的客户端消费同一个流时,**XREAD** 已经提供了一种向 N 个客户端*扇出 (fan-out)*的方法,还可能使用副本以提供更多的读取可伸缩性。然而,在某些问题中,我们想做的不是向许多客户端提供相同的消息流,而是向许多客户端提供来自同一流的*不同消息子集*。一个明显的有用案例是处理速度慢的消息:让 N 个不同的工作者接收流的不同部分的能力,使我们能够通过将不同的消息路由到准备好做更多工作的不同工作者来扩展消息处理。

实际上,如果我们想象有三个消费者 C1、C2、C3,以及一个包含消息 1、2、3、4、5、6、7 的流,那么我们希望按照以下图示来分发消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这一点,KeyDB 使用了一个叫做*消费者组*的概念。非常重要的一点是,要理解 KeyDB 消费者组从实现的角度来看,与 Kafka (TM) 消费者组没有任何关系。然而,它们在功能上是相似的,所以我决定保留 Kafka (TM) 的术语,因为它最初普及了这个想法。

一个消费者组就像一个从流中获取数据的*伪消费者*,它实际上服务于多个消费者,并提供某些保证:

  1. 每条消息都只分发给一个消费者,因此同一条消息不可能被传递给多个消费者。
  2. 消费者在一个消费者组内通过一个名称来标识,这个名称是一个区分大小写的字符串,实现消费者的客户端必须选择。这意味着即使在断开连接后,流消费者组也会保留所有状态,因为客户端会再次声明自己是同一个消费者。然而,这也意味着客户端有责任提供一个唯一的标识符。
  3. 每个消费者组都有一个*从未被消费的第一个 ID* 的概念,因此,当一个消费者请求新消息时,它可以只提供之前未被分发过的消息。
  4. 然而,消费一条消息需要使用一个特定的命令进行显式确认。KeyDB 将确认解释为:这条消息已正确处理,可以从消费者组中移除。
  5. 一个消费者组会跟踪所有当前待处理的消息,即那些已经分发给消费者组中的某个消费者,但尚未被确认为已处理的消息。由于这个功能,当访问流的消息历史时,每个消费者*只会看到分发给它的消息*。

在某种程度上,一个消费者组可以被想象成关于一个流的*一些状态*:

+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" 带有待处理消息 |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" 带有待处理消息 |
| ... (依此类推) |
+----------------------------------------+

如果你从这个角度看,就很容易理解消费者组能做什么,它如何能只向消费者提供他们待处理消息的历史记录,以及请求新消息的消费者如何只会被分发 ID 大于 `last_delivered_id` 的消息。同时,如果你将消费者组看作是 KeyDB 流的辅助数据结构,很明显一个流可以有多个消费者组,它们有不同的消费者集合。实际上,同一个流甚至可以让客户端通过 **XREAD** 在没有消费者组的情况下读取,同时也有客户端通过 **XREADGROUP** 在不同的消费者组中读取。

现在是时候深入了解消费者组的基本命令了。它们如下:

  • **XGROUP** 用于创建、销毁和管理消费者组。
  • **XREADGROUP** 用于通过消费者组从流中读取。
  • **XACK** 是允许消费者将待处理消息标记为已正确处理的命令。

创建消费者组#

假设我有一个已存在的类型为 stream 的键 `mystream`,为了创建一个消费者组,我只需要做以下操作:

> XGROUP CREATE mystream mygroup $
OK

如上命令所示,创建消费者组时,我们必须指定一个 ID,在示例中是 `$`。这是必需的,因为消费者组除了其他状态外,还需要知道在第一个消费者连接时接下来要分发哪条消息,也就是说,在组刚创建时的*最后一条消息 ID*是什么。如果我们像刚才那样提供 `$`,那么从现在起只有新到达流中的消息才会被提供给组内的消费者。如果我们指定 `0`,那么消费者组将从流历史中的*所有*消息开始消费。当然,你也可以指定任何其他有效的 ID。你所知道的是,消费者组将开始分发大于你指定 ID 的消息。因为 `$` 意味着流中当前最大的 ID,所以指定 `$` 的效果是只消费新消息。

`XGROUP CREATE` 也支持在流不存在时自动创建它,方法是使用可选的 `MKSTREAM` 子命令作为最后一个参数:

> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

现在消费者组已经创建,我们可以立即尝试使用 **XREADGROUP** 命令通过消费者组读取消息。我们将从名为 Alice 和 Bob 的消费者处读取,以观察系统如何向 Alice 或 Bob 返回不同的消息。

**XREADGROUP** 与 **XREAD** 非常相似,并提供相同的 **BLOCK** 选项,否则它是一个同步命令。但是,有一个*强制*选项必须始终指定,那就是 **GROUP**,它有两个参数:消费者组的名称,以及尝试读取的消费者的名称。**COUNT** 选项也受支持,并且与 **XREAD** 中的相同。

在从流中读取之前,让我们先在里面放一些消息:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

注意:*这里的 message 是字段名,水果是相关联的值,请记住流项目是小字典。*

现在是时候尝试使用消费者组读取一些东西了:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"

**XREADGROUP** 的回复就像 **XREAD** 的回复一样。但请注意上面提供的 `GROUP `。它声明了我想使用消费者组 `mygroup` 从流中读取,并且我是消费者 `Alice`。每当消费者使用消费者组执行操作时,它都必须指定其名称,以唯一地标识该组内的这个消费者。

在上面的命令行中还有一个非常重要的细节,在强制性的 **STREAMS** 选项之后,为 `mystream` 键请求的 ID 是特殊 ID `>`。这个特殊 ID 仅在消费者组的上下文中有效,它意味着:**迄今为止从未传递给其他消费者的消息**。

这几乎总是你想要的,但是也可以指定一个真实的 ID,比如 `0` 或任何其他有效的 ID。然而,在这种情况下,我们实际上是请求 **XREADGROUP** 只向我们提供**待处理消息的历史记录**,并且在这种情况下,将永远不会看到组中的新消息。所以基本上 **XREADGROUP** 根据我们指定的 ID 有以下行为:

  • 如果 ID 是特殊 ID `>`,那么该命令将只返回迄今为止从未分发给其他消费者的新消息,并且作为副作用,会更新消费者组的*最后 ID*。
  • 如果 ID 是任何其他有效的数字 ID,那么该命令将让我们访问我们的*待处理消息历史记录*。也就是说,那些已经分发给这个指定消费者(由提供的名称标识)但至今未用 **XACK** 确认的消息集合。

我们可以立即测试这种行为,指定一个 ID 为 0,不带任何 **COUNT** 选项:我们只会看到唯一一条待处理的消息,即关于苹果的那条:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"

但是,如果我们确认消息已处理,它将不再是待处理消息历史的一部分,所以系统将不再报告任何内容:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set) (空列表或集合)

如果你还不知道 **XACK** 如何工作,别担心,其思想就是已处理的消息不再是我们可访问的历史的一部分。

现在轮到 Bob 读点东西了:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
2) 1) 1526569506935-0
2) 1) "message"
2) "strawberry"

Bob 请求最多两条消息,并且通过同一个组 `mygroup` 读取。所以发生的情况是 KeyDB 只报告*新*消息。如你所见,“apple” 消息没有被分发,因为它已经被分发给了 Alice,所以 Bob 得到了 orange 和 strawberry,依此类推。

通过这种方式,Alice、Bob 和组中的任何其他消费者都能从同一个流中读取不同的消息,读取他们待处理消息的历史,或者将消息标记为已处理。这允许为从流中消费消息创建不同的拓扑和语义。

有几点需要记住:

  • 消费者在第一次被提及(使用)时会自动创建,无需显式创建。
  • 即使使用 **XREADGROUP**,你也可以同时从多个键读取,但要实现这一点,你需要在每个流中创建同名的消费者组。这并不常见,但值得一提的是,这个功能在技术上是可用的。
  • **XREADGROUP** 是一个*写命令*,因为即使它从流中读取,消费者组也会作为读取的副作用被修改,所以它只能在主实例上调用。

一个使用消费者组的消费者实现示例,用 Ruby 语言编写,可能如下所示。这段 Ruby 代码旨在让任何有经验的程序员都能读懂,即使他们不懂 Ruby:

require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name" (请输入消费者名称)
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..." (消费者 #{ConsumerName} 正在启动...)
check_backlog = true
while true
# 根据迭代选择ID:第一次我们想要
# 读取我们的待处理消息,以防我们崩溃后正在恢复。
# 一旦我们消费完我们的历史记录,我们就可以开始获取新消息。
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!" (超时!)
next
end
# 如果我们收到一个空的回复,这意味着我们正在消费我们的历史记录
# 并且历史记录现在是空的。让我们开始消费新消息。
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# 处理消息
process_message(id,fields)
# 确认消息已处理
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end

如你所见,这里的想法是首先消费历史记录,也就是我们待处理消息的列表。这很有用,因为消费者可能之前崩溃过,所以在重启时我们想重新读取那些已经分发给我们但没有被确认的消息。请注意,我们可能会多次或一次处理一条消息(至少在消费者故障的情况下是这样,但也涉及到 KeyDB 持久化和复制的限制,请参阅关于此主题的特定部分)。

一旦历史记录被消费完毕,并且我们收到了一个空的消息列表,我们就可以切换到使用 `>` 特殊 ID 来消费新消息。

从永久性故障中恢复#

上面的例子允许我们编写参与同一个消费者组的消费者,每个消费者都处理一部分消息,并在从故障中恢复时重新读取只分发给它们自己的待处理消息。然而,在现实世界中,消费者可能会永久性失败并且永远无法恢复。那些因任何原因停止后永不恢复的消费者的待处理消息会怎么样呢?

KeyDB 消费者组提供了一个在这种情况下使用的功能,用于*声明 (claim)* 一个给定消费者的待处理消息,这样这些消息的所有权就会改变,并被重新分配给一个不同的消费者。这个功能非常明确。一个消费者必须检查待处理消息的列表,并且必须使用一个特殊的命令来声明特定的消息,否则服务器将让这些消息永远待处理并分配给旧的消费者。通过这种方式,不同的应用程序可以选择是否使用这个功能,以及具体如何使用它。

这个过程的第一步只是一个提供消费者组中待处理条目可观察性的命令,叫做 **XPENDING**。这是一个只读命令,调用它总是安全的,不会改变任何消息的所有权。在其最简单的形式中,该命令用两个参数调用,即流的名称和消费者组的名称。

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"

当以这种方式调用时,命令会输出消费者组中待处理消息的总数(本例中为两个),待处理消息中的最低和最高消息 ID,以及最后是一个消费者列表和他们拥有的待处理消息数量。我们只有 Bob 有两条待处理消息,因为 Alice 请求的那条消息已经用 **XACK** 确认了。

我们可以通过给 **XPENDING** 更多参数来请求更多信息,因为完整的命令签名如下:

XPENDING <key> <groupname> [<start-id> <end-id> <count> [<consumer-name>]]

通过提供一个开始和结束 ID(可以像在 **XRANGE** 中一样是 `-` 和 `+`)以及一个 count 来控制命令返回的信息量,我们能够了解更多关于待处理消息的信息。可选的最后一个参数,即消费者名称,用于如果我们想将输出限制为只针对某个特定消费者待处理的消息,但在下面的例子中我们不会使用这个功能。

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1

现在我们有了每条消息的详细信息:ID、消费者名称、以毫秒为单位的*空闲时间*(即自上次将消息分发给某个消费者以来经过了多少毫秒),以及最后是给定消息被分发的次数。我们有来自 Bob 的两条消息,它们已经空闲了 74170458 毫秒,大约 20 小时。

请注意,没有人阻止我们通过使用 **XRANGE** 来检查第一条消息的内容是什么。

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"

我们只需要在参数中重复两次相同的ID。现在我们有了一些想法,Alice 可能会决定,在 20 小时不处理消息之后,Bob 可能无法及时恢复,是时候*声明*这些消息并代替 Bob 继续处理了。为此,我们使用 **XCLAIM** 命令。

这个命令在其完整形式下非常复杂且充满了选项,因为它用于复制消费者组的变更,但我们只使用我们通常需要的参数。在这种情况下,它很简单:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

基本上我们说,对于这个特定的键和组,我希望指定的消息 ID 改变所有权,并被分配给指定的消费者名称 ``。然而,我们还提供了一个最小空闲时间,这样只有当提到的消息的空闲时间大于指定的空闲时间时,操作才会成功。这很有用,因为可能有两个客户端同时试图声明一条消息:

客户端 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
客户端 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

然而,作为副作用,声明一条消息会重置其空闲时间并增加其分发次数计数器,因此第二个客户端声明它时会失败。通过这种方式,我们避免了对消息的简单重复处理(尽管在一般情况下你无法获得精确一次的处理)。

这是命令执行的结果:

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"

该消息已成功被 Alice 声明,她现在可以处理并确认该消息,即使原始消费者没有恢复,也能推动事务前进。

从上面的例子可以清楚地看出,作为成功声明给定消息的副作用,**XCLAIM** 命令也会返回它。然而,这不是强制性的。可以使用 **JUSTID** 选项来只返回成功声明的消息的 ID。如果你想减少客户端和服务器之间使用的带宽(以及命令的性能),并且你对消息不感兴趣,因为你的消费者实现方式是会不时地重新扫描待处理消息的历史记录,那么这会很有用。

声明也可以由一个单独的进程来实现:一个只检查待处理消息列表,并将空闲消息分配给看起来活跃的消费者的进程。活跃的消费者可以通过 KeyDB 流的可观察性功能之一来获取。这是下一节的主题。

声明与分发计数器#

你在 **XPENDING** 输出中观察到的计数器是每条消息的分发次数。该计数器通过两种方式递增:当通过 **XCLAIM** 成功声明一条消息时,或者当使用 **XREADGROUP** 调用访问待处理消息的历史记录时。

当出现故障时,消息被多次分发是正常的,但最终它们通常会被处理并确认。然而,处理某个特定消息可能会出现问题,因为它已损坏或其构造方式触发了处理代码中的一个错误。在这种情况下,消费者将不断地处理这条特定消息失败。因为我们有分发尝试的计数器,我们可以使用该计数器来检测由于某种原因无法处理的消息。因此,一旦分发计数器达到你选择的某个较大数值,将此类消息放入另一个流并向系统管理员发送通知可能是更明智的做法。这基本上是 KeyDB Stream 实现*死信 (dead letter)*概念的方式。

流的可观察性#

缺乏可观察性的消息系统很难使用。不知道谁在消费消息,哪些消息待处理,特定流中有哪些活跃的消费者组,这些都使一切变得不透明。因此,KeyDB 流和消费者组有不同的方式来观察正在发生的事情。我们已经介绍了 **XPENDING**,它允许我们检查在特定时刻正在处理的消息列表,以及它们的空闲时间和分发次数。

然而,我们可能想做的更多,**XINFO** 命令是一个可观察性接口,可以与子命令一起使用,以获取有关流或消费者组的信息。

该命令使用子命令来显示有关流及其消费者组状态的不同信息。例如,**XINFO STREAM [key]** 报告有关流本身的信息。

> XINFO STREAM mystream
1) length (长度)
2) (integer) 13
3) radix-tree-keys (基数树键)
4) (integer) 1
5) radix-tree-nodes (基数树节点)
6) (integer) 2
7) groups (组)
8) (integer) 2
9) first-entry (首个条目)
10) 1) 1526569495631-0
2) 1) "message"
2) "apple"
11) last-entry (末个条目)
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"

输出显示了有关流内部编码方式的信息,还显示了流中的第一个和最后一个消息。另一个可用的信息是与此流关联的消费者组的数量。我们可以通过请求更多关于消费者组的信息来进一步挖掘。

> XINFO GROUPS mystream
1) 1) name (名称)
2) "mygroup"
3) consumers (消费者)
4) (integer) 2
5) pending (待处理)
6) (integer) 2
7) last-delivered-id (最后分发ID)
8) "1588152489012-0"
2) 1) name (名称)
2) "some-other-group"
3) consumers (消费者)
4) (integer) 1
5) pending (待处理)
6) (integer) 0
7) last-delivered-id (最后分发ID)
8) "1588152498034-0"

正如你在本次和上次的输出中所见,**XINFO** 命令输出一系列字段-值项。由于它是一个可观察性命令,这使得人类用户能立即理解报告的信息,并允许该命令在未来通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须更注重带宽效率的命令,如 **XPENDING**,则只报告信息而不带字段名。

上面例子中使用了 **GROUPS** 子命令,其输出通过观察字段名应该很清楚。我们可以通过检查注册在组中的消费者来更详细地检查特定消费者组的状态。

> XINFO CONSUMERS mystream mygroup
1) 1) name (名称)
2) "Alice"
3) pending (待处理)
4) (integer) 1
5) idle (空闲)
6) (integer) 9104628
2) 1) name (名称)
2) "Bob"
3) pending (待处理)
4) (integer) 1
5) idle (空闲)
6) (integer) 83841983

如果你不记得命令的语法,只需向命令本身请求帮助:

> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. 子命令是:
2) CONSUMERS <key> <groupname> -- 显示组 <groupname> 的消费者组。
3) GROUPS <key> -- 显示流的消费者组。
4) STREAM <key> -- 显示有关流的信息。
5) HELP -- 打印此帮助。

与 Kafka (TM) 分区的区别#

KeyDB 流中的消费者组在某种程度上可能类似于 Kafka (TM) 基于分区的消费者组,但请注意,KeyDB 流在实践中非常不同。分区只是*逻辑上的*,消息只是被放入单个 KeyDB 键中,因此不同客户端的服务方式是基于谁准备好处理新消息,而不是客户端从哪个分区读取。例如,如果消费者 C3 在某个时候永久失败,KeyDB 将继续向 C1 和 C2 提供所有新到达的消息,就好像现在只有两个*逻辑*分区一样。

同样,如果某个消费者的消息处理速度远快于其他消费者,那么该消费者在相同单位时间内将按比例接收更多消息。这是可能的,因为 KeyDB 明确跟踪所有未确认的消息,并记住谁收到了哪条消息以及从未分发给任何消费者的第一条消息的 ID。

然而,这也意味着在 KeyDB 中,如果你真的想将同一流中的消息分区到多个 KeyDB 实例中,你必须使用多个键和某种分片系统,如 KeyDB 集群或其他特定于应用程序的分片系统。单个 KeyDB 流不会自动分区到多个实例。

我们可以说,示意性地,以下是成立的:

  • 如果你使用 1 个流 -> 1 个消费者,你是按顺序处理消息的。
  • 如果你使用 N 个流和 N 个消费者,以便只有一个给定的消费者命中 N 个流的一个子集,你可以扩展上述 1 个流 -> 1 个消费者的模型。
  • 如果你使用 1 个流 -> N 个消费者,你是在向 N 个消费者进行负载均衡,但在这种情况下,关于同一个逻辑项目的消息可能会被无序消费,因为一个给定的消费者可能比另一个消费者处理消息 4 更快地处理消息 3。

所以基本上,Kafka 分区更类似于使用 N 个不同的 KeyDB 键,而 KeyDB 消费者组是一个服务器端的负载均衡系统,将来自给定流的消息分发给 N 个不同的消费者。

有上限的流 (Capped Streams)#

许多应用程序不想永远将数据收集到一个流中。有时,让一个流中最多只有给定数量的项目是很有用的;其他时候,一旦达到给定的大小,将数据从 KeyDB 移动到一个非内存、速度较慢但适合存储历史(可能长达数十年)的存储中是很有用的。KeyDB 流对此提供了一些支持。一个是 **XADD** 命令的 **MAXLEN** 选项。这个选项使用起来非常简单:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(整数) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"

使用 **MAXLEN** 时,当达到指定长度时,旧的条目会自动被驱逐,从而使流保持恒定大小。目前没有选项告诉流只保留不早于某个时期的项目,因为这样的命令为了持续运行,可能会为了驱逐项目而长时间阻塞。想象一下,例如,如果出现插入高峰,然后是长时间的暂停,接着又是一次插入,所有都使用相同的最大时间。流会阻塞以驱逐在暂停期间变得太旧的数据。因此,用户需要做一些规划,并了解所期望的最大流长度是多少。此外,虽然流的长度与使用的内存成正比,但按时间修剪则不那么容易控制和预测:它取决于插入率,而插入率通常会随时间变化(当它不变化时,那么仅按大小修剪就很简单了)。

然而,使用 **MAXLEN** 进行修剪可能会很昂贵:流由基数树中的宏节点表示,以便非常节省内存。改变单个宏节点(由几十个元素组成)并不是最优的。所以可以按以下特殊形式使用该命令:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

**MAXLEN** 选项和实际数量之间的 `~` 参数意味着,我并不真的需要它恰好是 1000 个项目。它可以是 1000 或 1010 或 1030,只要确保至少保存 1000 个项目。有了这个参数,修剪只在我们能移除整个节点时才执行。这使得它效率高得多,而且通常也是你想要的。

还有一个 **XTRIM** 命令,它执行与上述 **MAXLEN** 选项非常相似的操作,不同之处在于它可以单独运行:

> XTRIM mystream MAXLEN 10

或者,与 **XADD** 选项一样:

> XTRIM mystream MAXLEN ~ 10

然而,**XTRIM** 被设计为接受不同的修剪策略。另一种修剪策略是 **MINID**,它会驱逐 ID 低于指定 ID 的条目。

由于 **XTRIM** 是一个显式命令,用户应该了解不同修剪策略可能存在的缺点。

未来可能添加到 **XTRIM** 的另一个有用的驱逐策略是按 ID 范围移除,以便在需要时更容易地使用 **XRANGE** 和 **XTRIM** 将数据从 KeyDB 移动到其他存储系统。

流 API 中的特殊 ID#

你可能已经注意到,在 KeyDB API 中有几个可以使用的特殊 ID。这里有一个简短的回顾,以便它们在未来更有意义。

前两个特殊 ID 是 `-` 和 `+`,用于 `XRANGE` 命令的范围查询。这两个 ID 分别表示可能的最小 ID(基本上是 `0-1`)和可能的最大 ID(即 `18446744073709551615-18446744073709551615`)。如你所见,写 `-` 和 `+` 比写这些数字要简洁得多。

然后有些 API 中我们想说,流中具有最大 ID 的项目的 ID。这就是 `$` 的意思。例如,如果我只想要 `XREADGROUP` 的新条目,我使用这个 ID 来表示我已经拥有所有现有条目,但不包括将来将插入的新条目。同样,当我创建或设置消费者组的 ID 时,我可以将最后分发的项目设置为 `$`,以便只向组中的消费者分发新条目。

如你所见,`$` 并不意味着 `+`,它们是两回事,因为 `+` 是任何可能流中可能的最大 ID,而 `$` 是包含给定条目的特定流中的最大 ID。此外,API 通常只理解 `+` 或 `$`,但避免给一个符号赋予多种含义是很有用的。

另一个特殊 ID 是 `>`,它只有一个特殊含义,只与消费者组相关,并且只在 `XREADGROUP` 命令中使用。这个特殊 ID 意味着我们只想要那些迄今为止从未分发给其他消费者的条目。所以基本上,`>` ID 是消费者组的*最后分发 ID*。

最后是特殊 ID `*`,只能与 `XADD` 命令一起使用,意思是为新条目自动为我们选择一个 ID。

所以我们有 `-`、`+`、`$`、`>` 和 `*`,它们都有不同的含义,并且大多数时候,可以在不同的上下文中使用。

持久化、复制和消息安全#

一个流,像任何其他 KeyDB 数据结构一样,会异步复制到副本,并持久化到 AOF 和 RDB 文件中。然而,可能不那么明显的是,消费者组的完整状态也会传播到 AOF、RDB 和副本中,所以如果一条消息在主节点上是待处理的,副本也会有相同的信息。同样,在重启后,AOF 将恢复消费者组的状态。

但请注意,KeyDB 流和消费者组是使用 KeyDB 默认复制机制进行持久化和复制的,所以:

  • 如果消息的持久性在你的应用程序中很重要,AOF 必须与强 fsync 策略一起使用。
  • 默认情况下,异步复制不保证 **XADD** 命令或消费者组状态更改被复制:在故障转移后,根据副本从主节点接收数据的能力,可能会丢失一些内容。
  • **WAIT** 命令可用于强制将更改传播到一组副本。但请注意,虽然这使得数据丢失的可能性非常小,但由 Sentinel 或 KeyDB Cluster 执行的 KeyDB 故障转移过程仅执行*尽力而为*的检查,以故障转移到最新的副本,并且在某些特定的故障条件下,可能会提升一个缺少某些数据的副本。

因此,在设计使用 KeyDB 流和消费者组的应用程序时,请确保了解您的应用程序在故障期间应具有的语义属性,并相应地进行配置,评估它是否对您的用例足够安全。

从流中移除单个项目#

流还有一个特殊的命令,用于仅通过 ID 从流的中间移除项目。通常对于一个只追加的数据结构来说,这可能看起来是一个奇怪的功能,但它实际上对涉及隐私法规等应用程序很有用。该命令称为 **XDEL**,它接收流的名称,后跟要删除的 ID:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"

然而,在当前的实现中,直到一个宏节点完全为空,内存才会被真正回收,所以你不应该滥用这个功能。

零长度流#

流和其他 KeyDB 数据结构之间的一个区别是,当其他数据结构不再有任何元素时,作为调用移除元素命令的副作用,键本身将被移除。例如,当 **ZREM** 调用移除了有序集合中的最后一个元素时,该有序集合将被完全移除。而流,则允许保持零元素状态,这可能是使用零计数的 **MAXLEN** 选项(**XADD** 和 **XTRIM** 命令)的结果,也可能是因为调用了 **XDEL**。

存在这种不对称性的原因是因为流可能有相关的消费者组,而我们不希望仅仅因为流中不再有任何项目而丢失消费者组定义的状态。目前,即使流没有关联的消费者组,它也不会被删除,但这在未来可能会改变。

消费消息的总延迟#

非阻塞流命令如 XRANGE 和 XREAD 或不带 BLOCK 选项的 XREADGROUP 会像任何其他 KeyDB 命令一样同步地处理,因此讨论这些命令的延迟是没有意义的:更有趣的是查看 KeyDB 文档中命令的时间复杂度。可以说,流命令在提取范围时至少和有序集合命令一样快,而且 `XADD` 非常快,如果使用流水线,在普通机器上每秒可以轻松插入五十万到一百万个项目。

然而,如果我们想了解在消费者组中阻塞消费者的上下文中处理一条消息的延迟,从通过 `XADD` 产生消息的那一刻起,到消费者因为 `XREADGROUP` 返回消息而获得消息的那一刻,延迟就成了一个有趣的参数。

服务阻塞消费者的工作原理#

在提供已执行测试的结果之前,了解 KeyDB 使用何种模型来路由流消息(以及通常如何管理任何等待数据的阻塞操作)是很有趣的。

  • 被阻塞的客户端被引用在一个哈希表中,该哈希表将至少有一个阻塞消费者的键映射到一个等待该键的消费者列表。这样,给定一个接收到数据的键,我们就可以解析出所有等待该数据的客户端。
  • 当发生写入时,在本例中是调用 `XADD` 命令时,它会调用 `signalKeyAsReady()` 函数。这个函数会将键放入一个需要处理的键列表中,因为这些键可能对阻塞的消费者有新数据。请注意,这些*就绪的键*将在稍后处理,因此在同一个事件循环周期内,该键可能会接收到其他写入。
  • 最后,在返回事件循环之前,*就绪的键*最终被处理。对于每个键,会扫描等待数据的客户端列表,如果适用,这些客户端将接收到达的新数据。在流的情况下,数据是消费者请求的适用范围内的消息。

如你所见,基本上,在返回事件循环之前,调用 `XADD` 的客户端和阻塞以消费消息的客户端都会在它们的输出缓冲区中收到回复,所以 `XADD` 的调用者应该与消费者接收新消息的时间大约相同地从 KeyDB 收到回复。

这个模型是*推送式*的,因为向消费者缓冲区添加数据将由调用 `XADD` 的动作直接执行,所以延迟往往是相当可预测的。

延迟测试结果#

为了检查这种延迟特性,我们进行了一项测试,使用多个 Ruby 程序实例推送消息,这些消息有一个额外的字段,即计算机的毫秒级时间,还有一些 Ruby 程序从消费者组读取并处理这些消息。消息处理步骤包括将当前的计算机时间与消息时间戳进行比较,以了解总延迟。

这些程序没有经过优化,并且在一个也运行着 KeyDB 的小型双核实例上执行,以试图提供你在非最佳条件下可以预期的延迟数据。消息以每秒 10k 的速率产生,有十个并发消费者从同一个 KeyDB 流和消费者组中消费并确认消息。

获得的结果:

在 0 到 1 毫秒之间处理 -> 74.11%
在 1 到 2 毫秒之间处理 -> 25.80%
在 2 到 3 毫秒之间处理 -> 0.06%
在 3 到 4 毫秒之间处理 -> 0.01%
在 4 到 5 毫秒之间处理 -> 0.02%

因此,99.9% 的请求延迟 <= 2 毫秒,异常值仍然非常接近平均值。

向流中添加几百万未确认的消息并不会改变基准测试的要点,大多数查询仍然以非常短的延迟处理。

一些备注:

  • 在这里,我们每次迭代最多处理 10k 条消息,这意味着 XREADGROUP 的 `COUNT` 参数被设置为 10000。这增加了很多延迟,但是为了让慢速消费者能够跟上消息流,这是必需的。所以你可以预期一个真实世界的延迟会小得多。
  • 用于此基准测试的系统与今天的标准相比非常慢。