1

我用 flink(java,maven 8.1 版)从磁盘读取了一个 csv 文件(http://data.gdeltproject.org/events/index.html)并得到以下异常:

ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.:  DataSink(Print to System.out) (4/4)
java.lang.IllegalStateException: Channel received an event before completing the current partial record.
    at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
    at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
    at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

我的代码:

public static void main(String[] args) {
    // set up execution environment
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //env.setDegreeOfParallelism(1);
    // get input points
    DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
    points.print();
    // execute program
    try {
        env.execute("KMeans Flink");
    } catch (Exception e) {
        e.printStackTrace();
    }
}
private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            .lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }

也许有人有解决方案?

最好的问候保罗

4

1 回答 1

4

我在这里发布了来自 Apache Flink 邮件列表的答案,因此人们不必通读邮件列表存档:

报错的原因是使用了自定义的序列化逻辑,反序列化函数出错,没有消耗所有数据。

最新的主人有一个改进的错误信息。

作为背景:

Flink 支持两种类型接口,允许程序员实现自己的序列化例程:Writables(Hadoop 的核心类型接口)和 Values(Flink 自己的自定义序列化接口)。

于 2015-05-13T21:57:27.657 回答