概述

Stream 数据类型是在 Redis 版本 5.0 中添加的,它表示消息的仅追加日志。redis.io 上记录的所有 stream related commands 已在 StackExchange.Redis 客户端库中实现。阅读"Introduction to Redis Streams",以获取有关原始 Redis 命令以及如何使用流的更多信息。

写入流

流中的每条消息或条目均由 StreamEntry 类型表示。每个流的条目包含一个唯一的ID和一个名称/值对数组。名称/值对由 NameValueEntry 类型表示。

使用以下命令将具有单个名称/值对的简单消息添加到流中:

var db = redis.GetDatabase();
var messageId = db.StreamAdd("event_stream", "foo_name", "bar_value");
// messageId = 1518951480106-0

StreamAdd 返回的消息ID由将消息添加到流中的毫秒时间和序列号组成。如果在同一毫秒时间创建了两个或更多消息,则序列号用于防止ID冲突。

可以使用以下方法将多个名称/值对写入流:

var values = new NameValueEntry[]
{
    new NameValueEntry("sensor_id", "1234"),
    new NameValueEntry("temp", "19.8")
};

var db = redis.GetDatabase();
var messageId = db.StreamAdd("sensor_stream", values);

You also have the option to override the auto-generated message ID by passing your own ID to the StreamAdd method. Other optional parameters allow you to trim the stream's length.

您还可以选择通过将自己的ID传递给StreamAdd方法来覆盖自动生成的消息ID。其他可选参数使您可以调整流的长度。

db.StreamAdd("event_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);

读取流

通过使用 StreamReadStreamRange 方法从流中进行读取。

var messages = db.StreamRead("event_stream", "0-0");

上面的代码将从ID "0-0" 到流的末尾读取所有消息。你可以选择使用可选的 count 参数来限制返回的消息数。

StreamRead 方法还允许你一次从多个流中读取:

var streams = db.StreamRead(new StreamPosition[]
{
    new StreamPosition("event_stream", "0-0"),
    new StreamPosition("score_stream", "0-0")
});

Console.WriteLine($"Stream = {streams.First().Key}");
Console.WriteLine($"Length = {streams.First().Entries.Length}");

你可以使用 countPerStream 可选参数来限制每个流返回的消息数。

StreamRange 方法允许你返回流中的一系列条目。

var messages = db.StreamRange("event_stream", minId: "-", maxId: "+");

-+ 特殊字符表示可能的最小和最大ID。这些值是没有设置参数时的默认值。你还可以选择通过使用 messageOrder 参数来反向读取流。StreamRange 方法还提供了通过使用 count 参数来限制返回的条目数的功能。

var messages = db.StreamRange("event_stream",
    minId: "0-0",
    maxId: "+",
    count: 100,
    messageOrder: Order.Descending);

流的信息

StreamInfo 方法提供了读取有关流的基本信息的能力:流的第一个和最后一个条目,流的长度,使用者组的数量等。此信息可用于以更有效的方式处理流。

var info = db.StreamInfo("event_stream");

Console.WriteLine(info.Length);
Console.WriteLine(info.FirstEntry.Id);
Console.WriteLine(info.LastEntry.Id);

消费者组

使用使用者组可让你扩展跨多个工作人员或使用者的流的处理。请阅读“Introduction to Redis Streams”,以获取有关消费者群体的详细信息。

以下内容创建了一个使用者组,并告诉 Redis 从流中的哪个位置开始读取。如果你在第一次创建流之前调用该方法,则默认情况下,StreamCreateConsumerGroup 方法将为你创建流。你可以通过为createStream可选参数传递false来覆盖此默认行为。

// Returns true if created, otherwise false.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "$");
// or
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", StreamPosition.NewMessages);

特殊字符 "$" 表示使用者组将只读取在创建使用者组之后创建的消息。如果要阅读流中已经存在的消息,则可以提供流中的任何位置。

// Begin reading from the first position in the stream.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "0-0");

使用 StreamReadGroup 方法将消息读入使用者。此方法接受消息ID作为参数之一。当ID传递给 StreamReadGroup 时,Redis 将仅返回给定使用者的待处理消息,换句话说,它将仅返回使用者已读取的消息。

要将新消息读入使用者,请使用特殊字符 ">"StreamPosition.NewMessages">" 特殊字符表示从未读取过的消息,从未传递给其他消费者。请注意,消费者组中的消费者是在调用 StreamReadGroup 方法时首次使用时自动创建的。

// Read 5 messages into two consumers.
var consumer_1_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", ">", count: 5);
var consumer_2_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_2", ">", count: 5);

消费者读取了一条消息后,其状态对于该消费者变为“待处理”状态,其他任何消费者都无法通过 StreamReadGroup 方法读取该消息。可以通过使用 StreamReadGroup 方法并为消费者提供在待处理消息范围内的ID来读取消费者的待处理消息。

// Read the first pending message for the "consumer_1" consumer.
var message = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", "0-0", count: 1);

还可以通过调用 StreamPendingStreamPendingMessages 方法来检索待处理的消息信息。StreamPending 返回有关待处理消息数,每个使用者的待处理消息以及最高和最低待处理消息ID的高级信息。

var pendingInfo = db.StreamPending("events_stream", "events_cg");

Console.WriteLine(pendingInfo.PendingMessageCount);
Console.WriteLine(pendingInfo.LowestPendingMessageId);
Console.WriteLine(pendingInfo.HighestPendingMessageId);
Console.WriteLine($"Consumer count: {pendingInfo.Consumers.Length}.");
Console.WriteLine(pendingInfo.Consumers.First().Name);
Console.WriteLine(pendingInfo.Consumers.First().PendingMessageCount);

使用 StreamPendingMessages 方法检索有关给定使用者的待处理消息的详细信息。

// Read the first pending message for the consumer.
var pendingMessages = db.StreamPendingMessages("events_stream",
    "events_cg",
    count: 1,
    consumerName: "consumer_1",
    minId: pendingInfo.LowestPendingMessageId);

Console.WriteLine(pendingMessages.Single().MessageId);
Console.WriteLine(pendingMessages.Single().IdleTimeInMilliseconds);

消息在等待消费者处理之前,直到通过调用 StreamAcknowledge 方法得到确认为止。消息被确认后, StreamReadGroup 不能再访问。

// Returns the number of messages acknowledged.
db.StreamAcknowledge("events_stream", "events_cg", pendingMessage.MessageId);

StreamClaim 方法可用于将消费者使用的消息所有权更改为其他消费者。

// Change ownership to consumer_2 for the first 5 messages pending for consumer_1.
var pendingMessages = db.StreamPendingMessages("events_stream",
    "events_cg",
    count: 5,
    consumerName: "consumer_1",
    minId: "0-0");

db.StreamClaim("events_stream",
    "events_cg",
    claimingConsumer: "consumer_2",
    minIdleTimeInMs: 0,
    messageIds: pendingMessages.Select(m => m.MessageId).ToArray());

还有其他几种使用使用者组处理流的方法。请参考 Streams 单元测试以了解这些方法及其用法。

原文地址:Stream

译文地址: https://www.cnblogs.com/liang24/p/13847215.html