我对 Apache Camel 完全陌生,我很难阅读他们的文档。这是我的简化路线。
from("direct:importSuppression")
.onException(Exception::class.java).handled(true)
.process{ exchange -> setException(exchange) }.id("setException")
.end()
.routeId("importSuppression")
.setExchangePattern(ExchangePattern.InOut)
.process{ exchange -> exchange.getIn().body = BOMInputStream(exchange.getIn().body as InputStream) }.id("unwrapBOM")
.process{ exchange -> unmarshal(exchange) }.id("unmarshalCSV")
.process{ exchange -> createInsertQuery(exchange) }.id("createInsertQuery")
.split(body()).streaming()
.process{ exchange -> forceAggregateCompletionAtEndOfSplit(exchange) }
.process{ exchange -> convertRowValues(exchange) }.id("convertRowValues")
.filter { exchange -> filterInvalidRows(exchange) }.id("filterInvalidRows")
.aggregate(constant(true), GroupedBodyAggregationStrategy())
.completionSize(10000)
.completeAllOnStop()
.setHeader(SqlConstants.SQL_PARAMETERS).body()
.setBody().header(SQL_INSERT_QUERY)
.to("sql://query?useMessageBodyForSql=true&batch=true&dataSource=#dataSource").id("sqlBatchInsert")
.end()
.end()
.process{ exchange -> setResult(exchange) };
简而言之,我想读取 csv 并在 SQL 表中插入所有有效行。在我添加 .filter 以删除包含无效值的行之前,一切正常。不知道我做错了什么...
以下是处理/过滤方法
@Throws(Exception::class)
//unmarshal csv
// csvDataFormat.isUseOrderedMaps = Boolean.TRUE
// csvDataFormat.isLazyLoad = true
// csvDataFormat.delimiter = ','
private fun unmarshal(exchange: Exchange) {
exception = null
rowCount = 0
val csvDataFormat = getCSVDelimiter(exchange).csvDataFormat
csvDataFormat.start()
exchange.getIn().body = csvDataFormat.unmarshal(exchange, exchange.getIn().body as InputStream?)
}
//prepare insert query
private fun createInsertQuery(exchange: Exchange) {
val columnNameTypes: Map<String, SQLDataType> = getColumnTypes(exchange);
val columnsCount = columnNameTypes.keys.size
val insertQuery: String = format("INSERT INTO %s (%s) VALUES(%s)",
getTableName(exchange),
columnNameTypes.keys.joinToString(","),
IntStream.range(0, columnsCount).mapToObj { "?" }.collect(joining(",")))
exchange.getIn().headers[SQL_INSERT_QUERY] = insertQuery
}
/**
* Force a Camel aggregate() completion upon the last message of a split(), without waiting for an aggregation timeout.
*/
private fun forceAggregateCompletionAtEndOfSplit(exchange: Exchange) {
if (exchange.getProperty("CamelSplitComplete", false, Boolean::class.java)) {
exchange.getIn().headers[Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE] = true
}
}
// Replace the map in body by a list values
private fun convertRowValues(exchange: Exchange) {
val line: Map<String, String> = exchange.getIn().body as Map<String, String>
val convertedLine: MutableList<Any> = line.filter { (key, value) -> getColumnTypes(exchange).keys.contains(key) }
.map { entry -> entry.value }.toList() as MutableList<Any>
convertedLine.add(0, getQueryId(exchange))
rowCount++
exchange.getIn().body = convertedLine
}
//remove rows that would crash batch insert
private fun filterInvalidRows(exchange: Exchange): Boolean {
val line: List<Any> = exchange.getIn().body as List<Any>
return true;// or false; for debugging. Neither works.
}
//trap exception and set in a variable, cause I need it later on for result
private fun setException(exchange: Exchange) {
exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception::class.java)
}
//handle final result
private fun setResult(exchange: Exchange) {
exception?.let {
exchange.message.body = ImportResult(false, rowCount, exception)
} ?: run {
exchange.message.body = ImportResult(true, rowCount)
}
}
另外,也许 Camel pro 的某个地方有一个 Slack 频道可以帮助我?欢迎任何提示。