我试图找到一个解决方法来解决我在之前的问题中描述的问题,其中 DataWeave 没有读取不符合RFC 4180的 CSV 文件。我决定编写一个 Java 类来实现org.mule.api.lifecycle.Callable
读取InputStream
我所拥有的(可能来自文件或 HTTP 响应),删除无法处理的行(我知道这里的实现太多了),并将可读行通过管道传输到新流中。不幸的是,我不断从 DataWeave 得到一个 0 字节的输出和一个异常。
编辑:为了澄清,这个问题可能独立于另一个问题而存在,但我以前想知道如何做这样的事情。碰巧我正试图解决这个问题,作为另一个问题的变通解决方案。
这是我的 CSV 文件:
Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,Something Weird",C,D
A,B,Something Else" Weird,D,
A,",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",C,D
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,
这是我的流程:
<flow name="nonrfcFlow">
<file:inbound-endpoint path="C:\tmp\" moveToPattern="nonrfc-read.csv" moveToDirectory="C:\tmp\" connector-ref="File-Configuration" responseTimeout="10000" mimeType="application/csv" metadata:id="a344bc19-5643-4bfb-b8c2-2994d7997c75" doc:name="File">
<file:filename-regex-filter pattern="nonrfc\.csv" caseSensitive="true"/>
</file:inbound-endpoint>
<flow-ref name="removequotedlines" doc:name="removequotedlines"/>
<dw:transform-message doc:name="Transform Message" metadata:id="5b01fcc4-2a1c-42fb-9cab-2defed9a1161">
<dw:set-payload><![CDATA[%dw 1.0
%input payload application/csv
%output application/json
---
payload
]]></dw:set-payload>
</dw:transform-message>
<file:outbound-endpoint path="C:\tmp" outputPattern="nonrfc-output.json" connector-ref="OutputFileConfiguration" responseTimeout="10000" doc:name="File"/>
</flow>
<sub-flow name="removequotedlines">
<component class="com.stackoverflow.removeLinesWithQuotes" doc:name="Remove Lines with Quotes"/>
</sub-flow>
这是Java文件:
package com.stackoverflow;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.apache.log4j.Logger;
import org.jfree.data.io.CSV;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.transformer.types.ListDataType;
public class removeLinesWithQuotes implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
Logger logger = Logger.getLogger(removeLinesWithQuotes.class);
InputStream is = (InputStream) eventContext.getMessage().getPayload();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
pis.connect(pos);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(pos));
String line;
while ((line = br.readLine()) != null) {
if (!line.contains("\"")) {
bw.write(line);
logger.debug(line);
bw.write("\r\n");
bw.flush();
}
}
is.close();
//eventContext.getMessage().setPayload(pis); // doesn't work
//eventContext.getMessage().setPayload(pis, new ListDataType<CSV>(CSV.class, "application/csv")); // doesn't work
return pis; // doesn't work
}
}
根据这个答案,我应该能够返回一个InputStream
,这PipedInputStream
是一个。如果我注释掉流引用或 DataWeave,我会得到输出——分别是带有废话的 JSON(因为当不遵循 RFC 时 DataWeave CSV 解析器的行为很奇怪)或删除了行的 CSV。也就是说,Java 类或 DataWeave 组件独立工作,但不能一起工作。
这是从流程中省略 Java 类时的输出:
[
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "BB",
"Column C": "CCCC",
"Column D": "DDDDDDDD"
},
{
"Column A": "A",
"Column B": "BBB",
"Column C": "CCCCCCCCC",
"Column D": "DDDDDDDDDDDDDDDDDDDDDDDDDDD"
},
{
"Column A": "A",
"Column B": ",C,D\r\nA,B,Something Else",
"Column C": "D",
"Column D": ""
},
{
"Column A": "A",
"Column B": ",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",
"Column C": "C",
"Column D": "D "
},
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": ""
},
{
"Column A": "A",
"Column B": "B",
"Column C": "",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "B",
"Column C": "",
"Column D": ""
},
{
"Column A": "A",
"Column B": "",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "",
"Column C": "C",
"Column D": ""
},
{
"Column A": "A",
"Column B": "",
"Column C": "",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "",
"Column C": "",
"Column D": ""
},
{
"Column A": "",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "",
"Column B": "B",
"Column C": "C",
"Column D": ""
},
{
"Column A": "",
"Column B": "B",
"Column C": "",
"Column D": "D"
},
{
"Column A": "",
"Column B": "B",
"Column C": "",
"Column D": ""
},
{
"Column A": "",
"Column B": "",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "",
"Column B": "",
"Column C": "C",
"Column D": ""
},
{
"Column A": "",
"Column B": "",
"Column C": "",
"Column D": "D"
},
{
"Column A": "",
"Column B": "",
"Column C": "",
"Column D": ""
}
]
这是省略 DataWeave 时的输出:
Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,
这是一个例外:
INFO 2015-12-03 17:01:54,716 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.transport.file.FileConnector: Writing file to: C:\tmp\nonrfc-output.json
ERROR 2015-12-03 17:02:56,817 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
name=OutputFileConfiguration
lifecycle=start
this=6f3e19b3
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[file]
serviceOverrides=<none>
}
, name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler
Type : org.mule.api.transport.DispatchException
Code : MULE_ERROR--2
JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
Payload : com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler@3e274324
********************************************************************************
Exception stack is:
1. Pipe broken (java.io.IOException)
java.io.PipedInputStream:-1 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
name=OutputFileConfiguration
lifecycle=start
this=6f3e19b3
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[file]
serviceOverrides=<none>
}
, name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler (org.mule.api.transport.DispatchException)
org.mule.transport.AbstractMessageDispatcher:117 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.io.IOException: Pipe broken
at java.io.PipedInputStream.read(Unknown Source)
at java.io.PipedInputStream.read(Unknown Source)
at com.mulesoft.weave.reader.DefaultSeekableStream.readUntil(SeekableStream.scala:146)
at com.mulesoft.weave.reader.DefaultSeekableStream.delegate$lzycompute(SeekableStream.scala:153)
at com.mulesoft.weave.reader.DefaultSeekableStream.delegate(SeekableStream.scala:151)
at com.mulesoft.weave.reader.DefaultSeekableStream.seek(SeekableStream.scala:189)
at com.mulesoft.weave.reader.UTF8StreamSourceReader.seek(StreamSourceReader.scala:121)
at com.mulesoft.weave.reader.csv.parser.CSVParser.parse(CSVParser.scala:93)
at com.mulesoft.weave.reader.csv.parser.CSVParser.elementAt(CSVParser.scala:54)
at com.mulesoft.weave.reader.csv.parser.CSVParser.contains(CSVParser.scala:38)
at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.hasNext(CSVReader.scala:52)
at scala.collection.Iterator$class.toStream(Iterator.scala:1188)
at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.toStream(CSVReader.scala:50)
at com.mulesoft.weave.writer.json.JsonWriter.writeArray(JsonWriter.scala:156)
at com.mulesoft.weave.writer.json.JsonWriter.writeValue(JsonWriter.scala:137)
at com.mulesoft.weave.model.values.Value$class.write(Value.scala:31)
at com.mulesoft.weave.reader.csv.CSVRecordsValue.write(CSVReader.scala:47)
at com.mulesoft.weave.model.values.wrappers.DelegateValue$class.write(DelegateValue.scala:29)
at com.mulesoft.weave.engine.ast.variables.VariableReferenceNode.write(VariableReferenceNode.scala:9)
at com.mulesoft.weave.engine.Engine.internalExecute(Engine.scala:89)
at com.mulesoft.weave.engine.Engine.execute(Engine.scala:54)
at com.mulesoft.weave.engine.Engine.execute(Engine.scala:169)
at com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler.write(WeaveMessageProcessor.scala:159)
at org.mule.transport.file.FileMessageDispatcher.doDispatch(FileMessageDispatcher.java:75)
at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:107)
at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2686)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:94)
at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:56)
at org.mule.interceptor.AbstractEnvelopeInterceptor.processBlocking(AbstractEnvelopeInterceptor.java:58)
at org.mule.processor.AbstractRequestResponseMessageProcessor.process(AbstractRequestResponseMessageProcessor.java:47)
at org.mule.processor.AsyncInterceptingMessageProcessor.processNextTimed(AsyncInterceptingMessageProcessor.java:123)
at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:208)
at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:201)
at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:16)
at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:35)
at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:22)
at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:30)
at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:14)
at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:67)
at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:44)
at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:50)
at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:40)
at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:41)
at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:48)
at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:28)
at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:13)
at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:110)
at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:30)
at ...
********************************************************************************