8

我的目标是使用 Kafka 作为源和 Flink 作为流处理引擎来设置一个高吞吐量的集群。这就是我所做的。

我已经在主节点和工作节点上设置了一个 2 节点集群,配置如下。

掌握 flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

工人 flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

Master 节点上的slaves文件如下所示:

<WORKER_IP_ADDR>
localhost

两个节点上的 flink 设置位于同名文件夹中。我通过运行在主服务器上启动集群

bin/start-cluster-streaming.sh

这将启动 Worker 节点上的任务管理器。

我的输入源是 Kafka。这是片段。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");

这是我的接收器功能

public class MySink implements SinkFunction<String> {

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception {
        processMessage(arg0);
        System.out.println("Processed Message");
    }
}

这是我的 pom.xml 中的 Flink 依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>

然后我在master上用这个命令运行打包的jar

bin/flink run flink-test-jar-with-dependencies.jar

SinkFunction但是,当我将消息插入 Kafka 主题时,我能够仅在主节点上考虑来自我的 Kafka 主题的所有消息(通过我的实现的调用方法中的调试消息)。

在作业管理器 UI 中,我可以看到 2 个任务管理器,如下所示: 作业管理器仪表板 - 任务管理器

仪表板也看起来像这样: 问题:在此处输入图像描述

  1. 为什么工作节点没有收到任务?
  2. 我错过了一些配置吗?
4

1 回答 1

14

在 Flink 中从 Kafka 源读取时,源任务的最大并行度受给定 Kafka 主题的分区数限制。Kafka 分区是 Flink 中源任务可以消耗的最小单位。如果分区多于源任务,那么有些任务会消耗多个分区。

因此,为了为所有 100 个任务提供输入,您应该确保您的 Kafka 主题至少有 100 个分区。

如果您无法更改主题的分区数,那么也可以使用该setParallelism方法使用较低的并行度从 Kafka 中进行初始读取。或者,您可以使用该rebalance方法将您的数据在前面操作的所有可用任务中打乱。

于 2015-09-01T10:14:08.617 回答