1

我的任务是使用 Camel 版本 2.20.0 创建一个 Camel 路由,该路由从 CSV 文件中获取一行,在 SQL 语句 where 子句中使用该行中的一个值,并合并结果并再次输出它们。如果我在 SQL 语句中对标识符进行硬编码,它可以正常工作,如果我尝试使用动态 URI,我会收到错误消息。

路线是:

from("file:///tmp?fileName=test.csv")
.split()
.tokenize("\n")
.streaming()
.parallelProcessing(true)
.setHeader("userID", constant("1001"))
//.enrich("sql:select emplid,name from employees where emplid = '1001'",
.enrich("sql:select name from employees where emplid = :#userID",
     new AggregationStrategy() {
        public Exchange aggregate(Exchange oldExchange,
                                     Exchange newExchange)    {...

正如我所说,如果我取消注释带有硬编码 1001 的行,它会查询数据库并按预期工作。但是使用 ':#userID' 语法我得到一个 Oracle 错误:

java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist


    Message History
    ---------------------------------------------------------------------------------------------------------------------------------------
    RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
    [route3            ] [route3            ] [file:///tmp?fileName=test.csv                                                 ] [        43]
    [route3            ] [log5              ] [log                                                                           ] [         2]
    [route3            ] [setHeader2        ] [setHeader[userID]                                                             ] [         0]
    [route3            ] [enrich2           ] [enrich[constant{sql:select name from employees where emplid = :#userID] [        40]

该表显然在那里,因为它在值被硬编码时工作,因此它与传递动态值有关。我已经尝试了很多关于如何在单引号内传递该变量的变体,使用来自正文而不是标题的值等,但还没有找到有效的组合,尽管我已经看到了很多类似的看似有效的例子.

我打开了跟踪它似乎标题也正确设置:

o.a.camel.processor.interceptor.Tracer   :  >>> (route3) setHeader[userID, 1001] --> enrich[constant{sql:select name from employees where emplid = :#userID}] <<< Pattern:InOnly, Headers:{CamelFileAbsolute=true, CamelFileAbsolutePath=/tmp/test.csv, CamelFileLastModified=1513116018000, CamelFileLength=26, CamelFileName=test.csv, CamelFileNameConsumed=test.csv, CamelFileNameOnly=test.csv, CamelFileParent=/tmp, CamelFilePath=/tmp/test.csv, CamelFileRelativePath=test.csv, userID=1001}, BodyType:String, Body:1001,SomeValue,MoreValues

需要改变什么才能使这项工作有效?

我还应该注意我已经尝试过这种方法,使用各种语法选项来引用标头值,但没有任何运气:

.enrich().simple("sql:select * from employees where emplid = :#${in.header.userID}").aggregate ...
4

2 回答 2

1

来自骆驼文档:

pollEnrich 或enrich不会访问来自当前Exchange 的任何数据,这意味着在轮询时它不能使用您可能在Exchange 上设置的任何现有标题。

实现您想要的目标的推荐方法是使用recipientList,所以我建议您阅读它。

编辑:

正如 Ricardo Zanini 在他的回答中正确指出的那样,从 2.16 开始,使用 Camel 版本实际上可以实现这一点。由于 OP 使用的是 2.20,我的回答无效。

但是,我会保留我的答案,但要指出的是,这仅在您使用的是 2.16 之前的旧版本时才有效。

于 2017-12-13T07:20:42.607 回答
1

文档

Camel 2.16 开始, enrichpollEnrich都支持使用 Expression 来计算 uri 的动态端点,这允许使用来自当前 Exchange 的数据。换句话说,上面所说的所有内容都不再适用,它只是有效。

当你使用 2.20 时,我想你可以试试这个例子:

from("file:///tmp?fileName=test.csv")
.split()
.tokenize("\n")
.streaming()
.parallelProcessing(true)
.setHeader("userID", constant("1001"))
//.enrich("sql:select emplid,name from employees where emplid = '1001'",
.enrich("sql:select name from employees where emplid = ':#${in.header.userID}'",
    new AggregationStrategy() {
        public Exchange aggregate(Exchange oldExchange,
                                    Exchange newExchange)    {...

查看文档中的表达式主题以获取更多示例。

总而言之,表达式可以是:

"sql:select name from employees where emplid = ':#${in.header.userID}'"

编辑

对不起,我错过了:# 后缀。你可以看到一个单元测试在这里工作

只需注意列类型。如果它是整数,则不需要引号。

干杯!

于 2017-12-13T11:51:34.843 回答