1

作为第一次尝试,我想从文件中读取 JSON 数据并将其传递给 Flink。我定义了一个源(逐行读取 JSON 字符串)和一个占位符过滤器。见代码:

from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys

class Json_reader(SourceFunction):
    def readjason(self, ctx):
        sys.stdin = open('capture.json', 'r')
        for line in sys.stdin:
            ctx.collect(json.loads(line))


class Dummy_Filter(FilterFunction):
    def filter(self, value):
        return True

#
# The pipeline definition.
#
def main(factory):
    env = factory.get_execution_environment()
    env.create_python_source(Json_reader()) \
        .filter(Dummy_Filter()) \
        .output()
    env.execute()

当我构建作业并将其移动到我启动的 Flink 集群时,我收到以下错误消息:

PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43) 在 java.lang.reflect.Method.invoke(Method.java:498) org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: 作业失败。(职位编号:31615948194c951be03d46576929aa23)43) 在 java.lang.reflect.Method.invoke(Method.java:498) org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: 作业失败。(职位编号:31615948194c951be03d46576929aa23)43) 在 java.lang.reflect.Method.invoke(Method.java:498) org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: 作业失败。(职位编号:31615948194c951be03d46576929aa23)

该程序不包含 Flink 作业。也许您忘记在执行环境上调用 execute()。

我没有忘记调用execute()。

4

1 回答 1

1

我发现了问题。Fast 期望 SourceFunction 中有一个 run() 函数。

于 2019-03-07T09:24:35.413 回答