3

我正在查看一些每秒生成约 30K 消息的 kafka 主题。我有一个 flink 拓扑设置来读取其中一个,聚合一点(5 秒窗口),然后(最终)写入数据库。

当我运行拓扑并删除除读取 - > 聚合步骤之外的所有内容时,我每分钟只能获得约 30K 消息。没有任何地方可以发生背压。

我究竟做错了什么?


编辑:

  1. 我无法更改有关主题空间的任何内容。每个主题都有一个分区,并且有数百个。
  2. 每条消息都是一个压缩的 thrift 对象,平均大小为 2-3Kb

看来我只能获得〜1.5 MB / s。不接近提到的 100MB/s。

当前代码路径:

DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);

public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

    @Override
    public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
        TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
        Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
        tuple4.f0 = timeData.getId();
        tuple4.f1 = timeData.getOtherId();
        tuple4.f2 = timeData.getSections().size();
        tuple4.f3 = mapId;

        collector.collect(tuple4);
    }
}
4

2 回答 2

5

从代码中,我看到两个可能导致性能问题的潜在组件:

  • FlinkKafka消费者
  • Thrift 反序列化器

为了了解瓶颈在哪里,我会首先测量 Flink 从 Kafka 主题读取的原始读取性能。

因此,您可以在集群上运行以下代码吗?

public class RawKafka {

private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);

    dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
        long received = 0;
        long logfreq = 50000;
        long lastLog = -1;
        long lastElements = 0;

        @Override
        public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
            received++;
            if (received % logfreq == 0) {
                // throughput over entire time
                long now = System.currentTimeMillis();

                // throughput for the last "logfreq" elements
                if(lastLog == -1) {
                    // init (the first)
                    lastLog = now;
                    lastElements = received;
                } else {
                    long timeDiff = now - lastLog;
                    long elementDiff = received - lastElements;
                    double ex = (1000/(double)timeDiff);
                    LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
                            timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
                    // reinit
                    lastLog = now;
                    lastElements = received;
                }
            }
        }
    });

    env.execute("Raw kafka throughput");
}
}

此代码测量来自 Kafka 的 50k 个元素之间的时间,并记录从 Kafka 读取的元素数量。在我的本地机器上,我的吞吐量约为 330k 个元素/核心/秒:

16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0

我真的很想看看您从 Kafka 读取的吞吐量是多少。

于 2015-11-25T15:15:56.787 回答
1

我从未使用过 Flink 或它的 KafkaConsumer,但我有在 Storm 环境中使用 Kafka 的经验。以下是我的一些想法。确定 Kafka 速度的方式有很多变数。这里有一些需要考虑和调查的事情,当你有问题时,请在你的问题中添加更多细节。

  • 添加更多分区应该会增加您的吞吐量。所以是的,添加更多分区和消费者应该会看到性能的线性增长。
  • Kafka 吞吐量与消息大小有关。因此,如果您有大消息,吞吐量将相应受到影响。
  • 您是否有任何证据支持您对 Kafka Consumer 应该更快的期望?虽然我同意 30K msg/min 真的很慢,但你有证据支持你的期望吗?就像使用 FlinkKafkaConsumer(类似这样)的一般速度测试,或者使用普通的 Kafka 消费者来查看消费速度,然后将其与 Flink 的消费者进行比较?

它消耗缓慢可能有很多原因,我试图强调一些与 Kafka 相关的一般内容。我敢肯定,您可能在 Flink 中可以做一些我不知道的事情来加速消费,因为我从未使用过它。

于 2015-11-24T22:42:20.583 回答