我用 flink(java,maven 8.1 版)从磁盘读取了一个 csv 文件(http://data.gdeltproject.org/events/index.html)并得到以下异常:
ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(Print to System.out) (4/4)
java.lang.IllegalStateException: Channel received an event before completing the current partial record.
at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
我的代码:
public static void main(String[] args) {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//env.setDegreeOfParallelism(1);
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
points.print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
// load properties
Properties pro = new Properties();
try {
pro.load(new FileInputStream("./resources/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
String inputFile = pro.getProperty("input");
// map csv file
return env.readCsvFile(inputFile)
.ignoreInvalidLines()
.fieldDelimiter('\u0009')
.lineDelimiter("\n")
.includeFields(true, true, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, true, true
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false)
.types(String.class, Long.class, Double.class, Double.class)
.map(new TuplePointConverter());
}
也许有人有解决方案?
最好的问候保罗