我正在尝试使用 flink 来丰富多个数据流的数据。
这里我在 account_stream 和 status_stream 中有一些数据。我想将该数据添加到来自多个不同来源的所有其他流中。所有的流在它们的数据中都有一个共同的字段:“account_id”。
这是我采取的方法。
account_stream.connect(status_stream)
.flat_map(EnrichmentFunction())
.filter(lambda x: x['name'] != "-" and x['date'] != "0000-00-00 00:00:00")
.key_by(lambda row: row['account_id'])
.connect(stream1)
.flat_map(function_2())
.filter(lambda x: x!="2")
.key_by(lambda row: row['account_id'])
.connect(stream2)
.flat_map(function_2())
.key_by(lambda row: row['account_id'])
.connect(stream3)
.flat_map(function_3())
.key_by(lambda row: row['account_id'])
.connect(stream4)
.flat_map(function_4())
.key_by(lambda row: row['account_id'])
.connect(stream5)
.flat_map(function_5())
.key_by(lambda row: row['account_id'])
.connect(stream6)
.flat_map(function_6())
.key_by(lambda row: row['account_id'])
.connect(stream7)
.flat_map(function_7())
.key_by(lambda row: row['account_id'])
.connect(stream_8)
.flat_map(function_8())
.map(lambda a: str(a),Types.STRING())
.add_sink(kafka_producer)
我正在将必要的数据保存在状态中,并使用 flat_map 函数将其附加到所有流中。最后添加一个 kafka 接收器来发送所有丰富的状态流。
现在,一旦我执行此操作,就会收到此错误:'' java.io.IOException: Insufficient number of network buffers: required 17, but only 8 available. 网络缓冲区的总数当前设置为 2048 个,每个 32768 字节。''
我尝试在 flink 配置文件中将taskmanager.memory.network.fraction 更改为 0.5,taskmanager.memory.network.max 更改为 15 gb,taskmanager.memory.process.size 更改为 10 gb 。但它仍然给出了同样的错误。除了保存它以查看更改反映在 flink 作业中之外,我是否还需要做其他事情?还是问题是别的?
还让我知道这种方法是否对任务无效,是否还有其他我应该尝试的方法?
我使用单个 32gb ram、8 核服务器在 python 中使用 pyflink 库运行它,kafka 和 elastic 在同一台服务器上运行。
谢谢你。