1

使用fromElements函数创建DataStream时出错

下面是expeption -

原因:java.io.IOException:无法从源中反序列化元素。如果您使用的是用户定义的序列化(Value 和 Writable 类型),请检查序列化函数。序列化器是 org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:121) 的 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@599fcdda .flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) 在 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) 在 org.apache.flink.streaming .runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745 )

4

2 回答 2

1

为什么要处理InputStreamReader元组?我想这里有一些误解。泛型类型指定要处理的数据的类型。例如

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);

生成具有 5 个Integer元组的有限数据流。我假设您实际上想使用 anInputStreamReader来生成实际的元组。

如果您想阅读,您可以按如下HttpURLConnection方式实现您自己的SourceFunction(或)(替换为您要使用的实际数据类型——也考虑 Flinks到类型):RichSourceFunctionOUTTuple0Tuple25

env.addSource(new SourceFunction<OUT> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<OUT> ctx) {
        InputStreamReader isr = null;
        try {
            URL url = new URL("ex.in/res");
            HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
            if (httpconn.getResponseCode() != 200)
                throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
            isr = new InputStreamReader((httpconn.getInputStream()));
        } catch (Exception e) {
            // clean up; log error
            return;
        }

        while(isRunning) {
            OUT tuple = ... // get data from isr
            ctx.collect(tuple);
        }
    }

    @Override
    public void cancel() {
         this.isRunning = false;
    }
});
于 2015-12-20T20:48:04.117 回答
0

由于不可序列化,您无法创建DataStream<InputStreamReader>with 。这是方法所要求的。此外,在 . 上工作可能没有多大意义。我想最好简单地从中读取数据,然后继续处理这些数据。fromElementsInputStreamReaderfromElementsInputStreamReadersHttpURLConnection

于 2015-12-20T18:27:30.313 回答