0

我对 Apache Camel 完全陌生,我很难阅读他们的文档。这是我的简化路线。

from("direct:importSuppression")
        .onException(Exception::class.java).handled(true)
            .process{ exchange -> setException(exchange) }.id("setException")
            .end()
        .routeId("importSuppression")
        .setExchangePattern(ExchangePattern.InOut)
        .process{ exchange -> exchange.getIn().body = BOMInputStream(exchange.getIn().body as InputStream) }.id("unwrapBOM")
        .process{ exchange -> unmarshal(exchange) }.id("unmarshalCSV")
        .process{ exchange -> createInsertQuery(exchange) }.id("createInsertQuery")
        .split(body()).streaming()
            .process{ exchange -> forceAggregateCompletionAtEndOfSplit(exchange) }
            .process{ exchange -> convertRowValues(exchange) }.id("convertRowValues")
            .filter { exchange -> filterInvalidRows(exchange) }.id("filterInvalidRows")
            .aggregate(constant(true), GroupedBodyAggregationStrategy())
                .completionSize(10000)
                .completeAllOnStop()
                .setHeader(SqlConstants.SQL_PARAMETERS).body()
                .setBody().header(SQL_INSERT_QUERY)
                .to("sql://query?useMessageBodyForSql=true&batch=true&dataSource=#dataSource").id("sqlBatchInsert")
            .end()
        .end()
        .process{ exchange -> setResult(exchange) };

简而言之,我想读取 csv 并在 SQL 表中插入所有有效行。在我添加 .filter 以删除包含无效值的行之前,一切正常。不知道我做错了什么...

以下是处理/过滤方法

@Throws(Exception::class)
//unmarshal csv
//    csvDataFormat.isUseOrderedMaps = Boolean.TRUE
//    csvDataFormat.isLazyLoad = true
//    csvDataFormat.delimiter = ','
private fun unmarshal(exchange: Exchange) {
    exception = null
    rowCount = 0
    val csvDataFormat = getCSVDelimiter(exchange).csvDataFormat
    csvDataFormat.start()
    exchange.getIn().body = csvDataFormat.unmarshal(exchange, exchange.getIn().body as InputStream?)
}

//prepare insert query
private fun createInsertQuery(exchange: Exchange) {
    val columnNameTypes: Map<String, SQLDataType> = getColumnTypes(exchange);
    val columnsCount = columnNameTypes.keys.size
    val insertQuery: String = format("INSERT INTO %s (%s) VALUES(%s)",
        getTableName(exchange),
        columnNameTypes.keys.joinToString(","),
        IntStream.range(0, columnsCount).mapToObj { "?" }.collect(joining(",")))

    exchange.getIn().headers[SQL_INSERT_QUERY] = insertQuery
}

/**
 * Force a Camel aggregate() completion upon the last message of a split(), without waiting for an aggregation timeout.
 */
private fun forceAggregateCompletionAtEndOfSplit(exchange: Exchange) {
    if (exchange.getProperty("CamelSplitComplete", false, Boolean::class.java)) {
        exchange.getIn().headers[Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE] = true
    }
}

// Replace the map in body by a list values
private fun convertRowValues(exchange: Exchange) {
    val line: Map<String, String> = exchange.getIn().body as Map<String, String>
    val convertedLine: MutableList<Any> = line.filter { (key, value) -> getColumnTypes(exchange).keys.contains(key) }
        .map { entry -> entry.value }.toList() as MutableList<Any>

    convertedLine.add(0, getQueryId(exchange))

    rowCount++

    exchange.getIn().body = convertedLine
}

//remove rows that would crash batch insert
private fun filterInvalidRows(exchange: Exchange): Boolean {
    val line: List<Any> = exchange.getIn().body as List<Any>
    return true;// or false; for debugging. Neither works.
}

//trap exception and set in a variable, cause I need it later on for result
private fun setException(exchange: Exchange) {
    exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception::class.java)
}

//handle final result
private fun setResult(exchange: Exchange) {
    exception?.let {
        exchange.message.body = ImportResult(false, rowCount, exception)
    } ?: run {
        exchange.message.body = ImportResult(true, rowCount)
    }
}

另外,也许 Camel pro 的某个地方有一个 Slack 频道可以帮助我?欢迎任何提示。

4

0 回答 0