如果您使用 Samza 的OutgoingMessageEnvelope使用此格式发送消息:
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
并且您在流任务的 process() 方法中调用此方法并希望将传入消息路由到适当的分区,当您调用该方法时,Samza 会为您创建分区吗?
例如
MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}
如果我在流任务的process()
wheremsg
中调用消息实例:
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
// ...
String partition = msg["id"]
String key = msg["key"]
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
// ...
这会自动为我创建分区 idA 和 idB(即,我是否需要在向它们发送消息之前创建这些分区)?我希望能够将消息路由到适当的分区,还希望能够使用单独的消息键记录压缩。