0

我有这个玩具管道

from pyflink.datastream import StreamExecutionEnvironment


def pipeline():
    # Create environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    ds = env.read_text_file('file:///home/user/myfile.json')
    # ds.map(lambda i: i)
    ds.print()

    # Execute job
    env.execute('DynamicStockpilePipeline')


if __name__ == '__main__':
    pipeline()

运行得很好,但是每次我尝试取消对映射阶段的注释时,无论是虚拟内联 lambda 还是 MapFunction,它都会爆炸说:

Caused by: java.io.IOException: Failed to execute the command: python
-c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin')) output: Traceback (most recent call last):   File "<string>",line 1, in <module> ModuleNotFoundError: No module named 'pyflink'

我正在使用 pyenv 3.8 解释器,有人知道基本数据源和输出是如何运行的,但地图显示 pyflink 模块丢失了吗?

附录:这只发生在 PyCharm 上,当我在控制台上运行脚本时不会发生,所以我担心它是 PyCharm 和 pyenv 之间的东西

4

2 回答 2

1

我通过将 pyenv 3.8 分配为虚拟环境而不是从 PyCharm 中的系统解释器选项分配 pyenv 3.8 来消除此错误。我猜关于 pythonpath 的某些内容被上一个选项破坏了

于 2021-09-22T10:09:04.067 回答
0

也许您可以在报告错误时发布代码。

根据上下文,这里报的错误可能是使用错误,打印功能需要跟随map功能

from pyflink.datastream import StreamExecutionEnvironment


def pipeline():
    # Create environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    ds = env.read_text_file('file:///home/user/myfile.json')
    ds.map(lambda i: i).print()

    # Execute job
    env.execute('DynamicStockpilePipeline')


if __name__ == '__main__':
    pipeline()
于 2021-09-22T09:16:27.567 回答