0

我正在尝试使用RoutingPolicy.ISOLATED在上游和下游之间创建专用连接,根据这个线程创建一个新的 Jet 自定义分区器

此外,尝试使用DiagnosticProcessors.peekOutputP来验证来自同一分区的消息是否正在发送到同一下游处理器。

在 Jet 初始化的日志下方:

45:29,049 Loading hazelcast-jet-default.xml from classpath.
45:29,225 Loading hazelcast-jet-member-default.xml from classpath.
45:30,293 [172.21.0.1]:5701 [jet] [0.5.1] Starting Jet 0.5.1 (20171206 - a2156c6)
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] Setting number of cooperative threads and default parallelism to 8
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] 
    o   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o
    |   |  / \     /  |     |     |      / \  |       |          | |       |  
    o---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |  
    |   | |   |  /    |     |     |     |   |     |   |      \   | |       |  
    o   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o   
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
45:32,256 [172.21.0.1]:5701 [jet] [0.5.1] 

Members {size:1, ver:1} [
    Member [172.21.0.1]:5701 - 71c9778e-ce8d-474e-9c8d-08616a229328 this
]

45:32,441 [172.21.0.1]:5701 [jet] [0.5.1] Starting job 0300-0f31-8123-97aa based on join/submit request from client
45:32,481 [172.21.0.1]:5701 [jet] [0.5.1] Start executing job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2, status STARTING
dag
    .vertex("kafkaSource").localParallelism(2)
    .vertex("meta").localParallelism(2)
    .vertex("sink").localParallelism(1)
    .edge(between("kafkaSource", "meta").isolated())
    .edge(between("meta", "sink").partitioned(?))

45:32,551 [172.21.0.1]:5701 [jet] [0.5.1] Execution plan for job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2 initialized
45:32,555 [172.21.0.1]:5701 [jet] [0.5.1] Start execution of job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2 from coordinator [172.21.0.1]:5701
45:32,758 The configuration 'compression.type' was supplied but isn't a known config.
45:32,758 The configuration 'compression.type' was supplied but isn't a known config.
45:32,957 [172.21.0.1]:5701 [jet] [0.5.1] Partition assignments changed, new partitions: [jet-stream-0]
45:32,957 [172.21.0.1]:5701 [jet] [0.5.1] Partition assignments changed, new partitions: [jet-stream-1]

我的 Kafka 生产者正在将具有相同密钥的消息路由到同一分区:

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
key=src1 partition=1
key=src1 partition=1
key=src0 partition=0
key=src1 partition=1
key=src0 partition=0
key=src1 partition=1
key=src1 partition=1
key=src0 partition=0
key=src0 partition=0
key=src1 partition=1

根据日志,看起来 kafkaSource 顶点正在将所有消息输出到同一个处理器:

54:39,991 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@6da59b20
54:39,993 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #0
54:40,842 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@69c2e820
54:40,844 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #1
54:41,843 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@489aca4c
54:41,845 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #2
54:42,838 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@2e0d6d1f
54:42,838 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #3
54:43,840 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@e1043d3
54:43,841 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #4
54:44,853 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4e6aabe6
54:44,854 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #5
54:45,842 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4ca9eb8a
54:45,842 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #6
54:46,847 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@10c630f7
54:46,849 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #7
54:47,848 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4639cdac
54:47,849 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #8
54:48,843 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@325499d7
54:48,844 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #9

在我的 Jet 代码下方:

DAG dag = new DAG();
DecodeKafkaMessage decodeKafkaMessage = new DecodeKafkaMessage();

Vertex kafkaSource = dag.newVertex("kafkaSource", DiagnosticProcessors.peekOutputP(streamKafkaP(properties, decodeKafkaMessage, topic)))
        .localParallelism(2);

Vertex meta = dag.newVertex("meta", mapP(LogLine::parse))
        .localParallelism(2);

Vertex sink = dag.newVertex("sink", DiagnosticProcessors.writeLoggerP())
        .localParallelism(1);


dag.edge(between(kafkaSource, meta)
        .isolated())
.edge(between(meta, sink)
        .allToOne());

更新

日志::解析

private static class LogLine {

    public LogLine() {
        // TODO Auto-generated constructor stub
    }

    public static String parse(KafkaMessage m) {
        return m.getSourceId() + " / " + m.getData();
    }       

}

解码KafkaMessage

public final class DecodeKafkaMessage implements DistributedBiFunction<Object, Object, Object>, Serializable {

private static final long serialVersionUID = 478528451550904377L;

@Override
public Object apply(Object t, Object u) {

    Gson gson = new Gson();

    KafkaMessage kafkaMessage = gson.fromJson(u.toString(), KafkaMessage.class);

    byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

    try {

        kafkaMessage.setData(new String(encodedData, "utf-8"));

    } catch (UnsupportedEncodingException e) {

        System.out.println(e.getMessage());
        e.printStackTrace();

    }

    return kafkaMessage;
}   

}

更新:2018-02-01

53:42,802 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@ada19fa
53:42,803 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@ada19fa
53:42,805 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #0

53:43,625 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@11b98cff
53:43,625 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@11b98cff
53:43,626 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #1

53:44,627 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@3c501af0
53:44,627 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@3c501af0
53:44,628 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #2

53:45,624 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@1dd2234b
53:45,624 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@1dd2234b
53:45,625 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #3

53:46,627 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@25700cd7
53:46,628 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@25700cd7
53:46,629 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #4

53:47,625 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@39b10238
53:47,626 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@39b10238
53:47,627 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #5

53:48,660 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@136fd05c
53:48,660 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@136fd05c
53:48,661 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #6

53:49,629 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@50a0864
53:49,629 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@50a0864
53:49,630 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #7

53:50,634 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@7ef68ceb
53:50,635 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@7ef68ceb
53:50,636 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #8

53:51,632 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4c6c252c
53:51,632 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@4c6c252c
53:51,633 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #9
4

1 回答 1

1

如果您想查看哪个处理器输出一个项目以及哪个处理器接收它,也可以附加peekInputP到您的meta顶点。在您的日志记录配置中启用记录器名称的打印(在 log4j 中,添加%c{1}ConversionPatternof 中PatternLayout)。

记录器名称以 结尾#X,其中 X 是全局处理器索引。您会看到来自一个上游实例的项目总是转到同一个下游实例。

于 2018-01-31T08:11:35.940 回答