1

我的问题是 (1) 是否有更好的策略来解决我的问题 (2) 是否可以调整/改进我的解决方案以便它可以工作并且不会以可靠的方式拆分聚合 (3 不太重要的方式) 如何可以我更智能地调试它?弄清楚聚合器正在做的 wtf 是很困难的,因为它只会在由于大小而难以调试的大批量上失败。任何这些的答案都会非常有用,最重要的是前两个。

我认为问题在于我没有正确地向骆驼表达我需要它来将传入的 CSV 文件视为一个整体,并且我不希望聚合器停止直到所有记录都被聚合。

我正在编写一条路径来消化一百万行 CSV 文件,拆分然后聚合一些关键主字段上的数据,然后将聚合记录写入表

不幸的是,表的主要约束被违反(这也对应于聚合键),这意味着聚合器没有等待整个输入完成。

它适用于几千条记录的小文件,但在生产中实际面临的大文件(1,000,000 条记录)它失败了。

首先,它在 CSV 解组后在拆分时出现 JavaHeap 内存错误而失败。我用 .streaming() 解决了这个问题。这会影响聚合器,聚合器“完成”过早。

为了显示:

A 1 
A 2 
B 2
--- aggregator split --- 
B 1
A 2

--> A(3),B(2) ... A(2),B(1) = constraint violation because 2 lots of A's etc.
when what I want is A(5),B(3)

以 100、1000 等为例,记录它工作正常且正确。但是当它处理 1,000,000 条记录(这是它需要处理的实际大小)时,首先 split() 会收到 OutOfJavaHeapSpace 异常。

我觉得简单地更改堆大小将是一个短期解决方案,并且只是将问题推回直到下一个记录上限出现,所以我通过在拆分上使用 .streaming() 来解决它。

不幸的是,现在,聚合器正在滴灌记录,没有让它们陷入困境,而且它似乎提前完成并进行另一个聚合,这违反了我的主要约束

from( file://inbox )
.unmarshall().bindy().
.split().body().streaming()

.setHeader( "X" Expression building string of primary-key fields)
.aggregate( header("X") ... ).completionTimeout( 15000 )

etc.

我认为问题的一部分是我依赖于流式拆分的超时时间不超过固定的时间,这并不是万无一失的——例如,系统任务可能会合理地导致这种情况,等等。而且每次我增加这个timeout 它使调试和测试这些东西的时间越来越长。

可能更好的解决方案是读取传入的 CSV 文件的大小,并且在处理完每条记录之前不允许聚合器完成。但是,我不知道如何用骆驼表达这一点。

很可能我只是对我应该如何处理/描述这个问题有一个基本的策略误解。我不知道可能有更好(更简单)的方法。

还有如此大量的记录进入,我无法实际手动调试它们以了解正在发生的事情(我怀疑我也在打破聚合器的超时)

4

1 回答 1

3

您可以先将文件逐行拆分,然后将每一行转换为 CSV。然后您可以在流模式下运行拆分器,因此内存消耗低,并且能够读取具有一百万条记录的文件。

此页面http://camel.apache.org/articles有一些关于在 Camel 中拆分大文件的博客链接。它们虽然涵盖了 XML,但也与拆分大 CSV 文件有关。

于 2013-02-25T17:48:18.763 回答