1

我试图找到一个解决方法来解决我在之前的问题中描述的问题,其中 DataWeave 没有读取不符合RFC 4180的 CSV 文件。我决定编写一个 Java 类来实现org.mule.api.lifecycle.Callable读取InputStream我所拥有的(可能来自文件或 HTTP 响应),删除无法处理的行(我知道这里的实现太多了),并将可读行通过管道传输到新流中。不幸的是,我不断从 DataWeave 得到一个 0 字节的输出和一个异常。

编辑:为了澄清,这个问题可能独立于另一个问题而存在,但我以前想知道如何做这样的事情。碰巧我正试图解决这个问题,作为另一个问题的变通解决方案。

这是我的 CSV 文件:

Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,Something Weird",C,D
A,B,Something Else" Weird,D,
A,",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",C,D   
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,

这是我的流程:

<flow name="nonrfcFlow">
    <file:inbound-endpoint path="C:\tmp\" moveToPattern="nonrfc-read.csv" moveToDirectory="C:\tmp\" connector-ref="File-Configuration" responseTimeout="10000" mimeType="application/csv" metadata:id="a344bc19-5643-4bfb-b8c2-2994d7997c75" doc:name="File">
        <file:filename-regex-filter pattern="nonrfc\.csv" caseSensitive="true"/>
    </file:inbound-endpoint>
    <flow-ref name="removequotedlines" doc:name="removequotedlines"/>
    <dw:transform-message doc:name="Transform Message" metadata:id="5b01fcc4-2a1c-42fb-9cab-2defed9a1161">
        <dw:set-payload><![CDATA[%dw 1.0
%input payload application/csv
%output application/json
---
payload
]]></dw:set-payload>
    </dw:transform-message>
    <file:outbound-endpoint path="C:\tmp" outputPattern="nonrfc-output.json" connector-ref="OutputFileConfiguration" responseTimeout="10000" doc:name="File"/>
</flow>

<sub-flow name="removequotedlines">
    <component class="com.stackoverflow.removeLinesWithQuotes" doc:name="Remove Lines with Quotes"/>
</sub-flow>

这是Java文件:

package com.stackoverflow;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import org.apache.log4j.Logger;
import org.jfree.data.io.CSV;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.transformer.types.ListDataType;

public class removeLinesWithQuotes implements Callable {

    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {
        Logger logger = Logger.getLogger(removeLinesWithQuotes.class);
        InputStream is = (InputStream) eventContext.getMessage().getPayload();
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        PipedOutputStream pos = new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream();
        pis.connect(pos);
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(pos));
        String line;
        while ((line = br.readLine()) != null) {
            if (!line.contains("\"")) {
                bw.write(line);
                logger.debug(line);
                bw.write("\r\n");
                bw.flush();
            }
        }
        is.close();

        //eventContext.getMessage().setPayload(pis); // doesn't work
        //eventContext.getMessage().setPayload(pis, new ListDataType<CSV>(CSV.class, "application/csv")); // doesn't work
        return pis; // doesn't work
    }
}

根据这个答案,我应该能够返回一个InputStream,这PipedInputStream是一个。如果我注释掉流引用或 DataWeave,我会得到输出——分别是带有废话的 JSON(因为当不遵循 RFC 时 DataWeave CSV 解析器的行为很奇怪)或删除了行的 CSV。也就是说,Java 类或 DataWeave 组件独立工作,但不能一起工作。

这是从流程中省略 Java 类时的输出:

[
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "BB",
    "Column C": "CCCC",
    "Column D": "DDDDDDDD"
  },
  {
    "Column A": "A",
    "Column B": "BBB",
    "Column C": "CCCCCCCCC",
    "Column D": "DDDDDDDDDDDDDDDDDDDDDDDDDDD"
  },
  {
    "Column A": "A",
    "Column B": ",C,D\r\nA,B,Something Else",
    "Column C": "D",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": ",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",
    "Column C": "C",
    "Column D": "D   "
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "",
    "Column D": ""
  }
]

这是省略 DataWeave 时的输出:

Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,

这是一个例外:

INFO  2015-12-03 17:01:54,716 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.transport.file.FileConnector: Writing file to: C:\tmp\nonrfc-output.json
ERROR 2015-12-03 17:02:56,817 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.exception.DefaultMessagingExceptionStrategy: 
********************************************************************************
Message               : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
  name=OutputFileConfiguration
  lifecycle=start
  this=6f3e19b3
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[file]
  serviceOverrides=<none>
}
,  name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler
Type                  : org.mule.api.transport.DispatchException
Code                  : MULE_ERROR--2
JavaDoc               : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
Payload               : com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler@3e274324
********************************************************************************
Exception stack is:
1. Pipe broken (java.io.IOException)
  java.io.PipedInputStream:-1 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
  name=OutputFileConfiguration
  lifecycle=start
  this=6f3e19b3
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[file]
  serviceOverrides=<none>
}
,  name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler (org.mule.api.transport.DispatchException)
  org.mule.transport.AbstractMessageDispatcher:117 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.io.IOException: Pipe broken
    at java.io.PipedInputStream.read(Unknown Source)
    at java.io.PipedInputStream.read(Unknown Source)
    at com.mulesoft.weave.reader.DefaultSeekableStream.readUntil(SeekableStream.scala:146)
    at com.mulesoft.weave.reader.DefaultSeekableStream.delegate$lzycompute(SeekableStream.scala:153)
    at com.mulesoft.weave.reader.DefaultSeekableStream.delegate(SeekableStream.scala:151)
    at com.mulesoft.weave.reader.DefaultSeekableStream.seek(SeekableStream.scala:189)
    at com.mulesoft.weave.reader.UTF8StreamSourceReader.seek(StreamSourceReader.scala:121)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.parse(CSVParser.scala:93)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.elementAt(CSVParser.scala:54)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.contains(CSVParser.scala:38)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.hasNext(CSVReader.scala:52)
    at scala.collection.Iterator$class.toStream(Iterator.scala:1188)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.toStream(CSVReader.scala:50)
    at com.mulesoft.weave.writer.json.JsonWriter.writeArray(JsonWriter.scala:156)
    at com.mulesoft.weave.writer.json.JsonWriter.writeValue(JsonWriter.scala:137)
    at com.mulesoft.weave.model.values.Value$class.write(Value.scala:31)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue.write(CSVReader.scala:47)
    at com.mulesoft.weave.model.values.wrappers.DelegateValue$class.write(DelegateValue.scala:29)
    at com.mulesoft.weave.engine.ast.variables.VariableReferenceNode.write(VariableReferenceNode.scala:9)
    at com.mulesoft.weave.engine.Engine.internalExecute(Engine.scala:89)
    at com.mulesoft.weave.engine.Engine.execute(Engine.scala:54)
    at com.mulesoft.weave.engine.Engine.execute(Engine.scala:169)
    at com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler.write(WeaveMessageProcessor.scala:159)
    at org.mule.transport.file.FileMessageDispatcher.doDispatch(FileMessageDispatcher.java:75)
    at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:107)
    at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2686)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:94)
    at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:56)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.processBlocking(AbstractEnvelopeInterceptor.java:58)
    at org.mule.processor.AbstractRequestResponseMessageProcessor.process(AbstractRequestResponseMessageProcessor.java:47)
    at org.mule.processor.AsyncInterceptingMessageProcessor.processNextTimed(AsyncInterceptingMessageProcessor.java:123)
    at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:208)
    at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:201)
    at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:16)
    at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:35)
    at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:22)
    at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:30)
    at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:14)
    at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:67)
    at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:44)
    at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:50)
    at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:40)
    at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:41)
    at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:48)
    at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:28)
    at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:13)
    at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:110)
    at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:30)
    at ...
********************************************************************************
4

0 回答 0