0

我正在尝试使用 flink 来丰富多个数据流的数据。

这里我在 account_stream 和 status_stream 中有一些数据。我想将该数据添加到来自多个不同来源的所有其他流中。所有的流在它们​​的数据中都有一个共同的字段:“account_id”。

这是我采取的方法。

account_stream.connect(status_stream)
                     .flat_map(EnrichmentFunction())
                     .filter(lambda x: x['name'] != "-" and x['date'] != "0000-00-00 00:00:00")
                     .key_by(lambda row: row['account_id'])
                     .connect(stream1)
                     .flat_map(function_2())
                     .filter(lambda x: x!="2")
                     .key_by(lambda row: row['account_id'])
                     .connect(stream2)
                     .flat_map(function_2())
                     .key_by(lambda row: row['account_id'])
                     .connect(stream3)
                     .flat_map(function_3())
                     .key_by(lambda row: row['account_id'])
                     .connect(stream4)
                     .flat_map(function_4())
                     .key_by(lambda row: row['account_id'])
                     .connect(stream5)
                     .flat_map(function_5())
                     .key_by(lambda row: row['account_id'])
                     .connect(stream6)
                     .flat_map(function_6())
                     .key_by(lambda row: row['account_id'])
                     .connect(stream7)
                     .flat_map(function_7())
                     .key_by(lambda row: row['account_id'])
                     .connect(stream_8)
                     .flat_map(function_8())
                     .map(lambda a: str(a),Types.STRING())
                     .add_sink(kafka_producer)

我正在将必要的数据保存在状态中,并使用 flat_map 函数将其附加到所有流中。最后添加一个 kafka 接收器来发送所有丰富的状态流。

现在,一旦我执行此操作,就会收到此错误:'' java.io.IOException: Insufficient number of network buffers: required 17, but only 8 available. 网络缓冲区的总数当前设置为 2048 个,每个 32768 字节。''

我尝试在 flink 配置文件中将taskmanager.memory.network.fraction 更改为 0.5taskmanager.memory.network.max 更改为 15 gbtaskmanager.memory.process.size 更改为 10 gb 。但它仍然给出了同样的错误。除了保存它以查看更改反映在 flink 作业中之外,我是否还需要做其他事情?还是问题是别的?

还让我知道这种方法是否对任务无效,是否还有其他我应该尝试的方法?

我使用单个 32gb ram、8 核服务器在 python 中使用 pyflink 库运行它,kafka 和 elastic 在同一台服务器上运行。

谢谢你。

4

1 回答 1

0

如何为TaskManager配置网络内存可以参考官方文档的设置TaskManager内存页面。有几件事情需要注意:

  1. taskmanager.memory.network.fraction用作网络内存的总 flink 内存。如果派生的大小小于/大于配置的最小/最大大小,将使用最小/最大大小。
  2. 网络内存的大小不能超过总进程内存的大小。
  3. 您可以在 TaskManager 的日志开头找到当前网络内存的最大/最小值。检查它以查看您的配置是否有效。

如果你可以将你的 Flink 升级到 1.14,你可以尝试最新的功能:细粒度的资源管理。使用此功能,网络内存将自动配置为每个 TaskManager 所需的数量。但是,要使用此功能,您需要为每个算子设置 SlotSharingGroups 并为其配置 CPU 和内存资源。更多细节请参考官方文档

于 2021-10-12T11:05:12.733 回答