2

我有一些 Samza 作业运行所有从 Kafka 主题读取消息并将新消息写入新主题。为了发送新消息,我使用了 Samza 内置的 OutgoingMessageEnvelope。还使用 MessageCollector 发送新消息。它看起来像这样:

collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))

有没有办法可以使用它向 Kafka 主题添加分区?例如对用户 ID 进行分区或类似的东西。

或者,如果有更好的方法,我很想听!

4

1 回答 1

3

您应该能够使用分区键发送消息,

    public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object partitionKey,
                               java.lang.Object key,
                               java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.

使用此方法将对您的数据进行分区。但是我认为,如果您正在考虑以编程方式控制分区数量,您应该使用 kafka API 来创建/更改主题,如此处所述

于 2015-09-04T11:21:20.737 回答