专家们,
我正在尝试对 larget 数据集执行一些 ETL 操作作为批处理。我的要求是提取数据,对其进行转换,然后保存到 mongoDB。我正在使用 Apache FLINK,但性能非常慢,因为我正在对每一行进行 mongoDB 更新。
有什么方法可以让我们作为批量记录下沉,以便提高性能。就像在所有转换之后我们在 mongoDB 上进行批量更新一样。我们可以将它们全部聚合起来,最后像流 [.aggregate() .sink({bulk update})] 一样将其下沉到数据库中
private DataEnrichmentDO submitToFlinkJobManager(ExecutionEnvironment env,
List<Tuple2<String, Integer>> inputCollection,long collectionSize) throws Exception {
try {
DataSet<Tuple2<String, Integer>> inputCollectionData = env.fromCollection(inputCollection);
DataSet<String> enrichmentContext = env.fromElements(this.clientContext.toString(),
this.collectionContext.toString(), this.enrichColumnDefinitions.toString(),
this.lookupDefinitions.toString(), this.quantityUnitConversions.toString(),
this.technicalDataTypes.toString(), this.errorContext.toString(), this.errorCodeCache.toString(),
this.subCurrencyConversions.toString());
List<DataEnrichmentDO> result = inputCollectionData
.rebalance()
.map(new DataEnrichmentExpressionEvaluator())
.withBroadcastSet(enrichmentContext, "enrichmentContext")
.collect();
我们可以在转换后收集整个集合,然后进行批量 mongoDB 更新吗?目前在地图功能上我正在做更新操作。我已将并行度设置为 8 { setParallelism(8);}