0

专家们,

我正在尝试对 larget 数据集执行一些 ETL 操作作为批处理。我的要求是提取数据,对其进行转换,然后保存到 mongoDB。我正在使用 Apache FLINK,但性能非常慢,因为我正在对每一行进行 mongoDB 更新。

有什么方法可以让我们作为批量记录下沉,以便提高性能。就像在所有转换之后我们在 mongoDB 上进行批量更新一样。我们可以将它们全部聚合起来,最后像流 [.aggregate() .sink({bulk update})] 一样将其下沉到数据库中


private DataEnrichmentDO submitToFlinkJobManager(ExecutionEnvironment env,
            List<Tuple2<String, Integer>> inputCollection,long collectionSize) throws Exception  {
        
        try {
            DataSet<Tuple2<String, Integer>> inputCollectionData = env.fromCollection(inputCollection);
            DataSet<String> enrichmentContext = env.fromElements(this.clientContext.toString(),
                    this.collectionContext.toString(), this.enrichColumnDefinitions.toString(),
                    this.lookupDefinitions.toString(), this.quantityUnitConversions.toString(),
                    this.technicalDataTypes.toString(), this.errorContext.toString(), this.errorCodeCache.toString(),
                    this.subCurrencyConversions.toString());
            List<DataEnrichmentDO> result = inputCollectionData
                                            .rebalance()
                                            .map(new DataEnrichmentExpressionEvaluator())
                                            .withBroadcastSet(enrichmentContext, "enrichmentContext")
                                            .collect();

我们可以在转换后收集整个集合,然后进行批量 mongoDB 更新吗?目前在地图功能上我正在做更新操作。我已将并行度设置为 8 { setParallelism(8);}

4

0 回答 0