0

所以我有一个用 PyFlink SQL API 编写的简单聚合作业。该作业从 AWS kinesis 读取数据并将结果输出到 Kinesis。

我很好奇我是否可以用 pytest 对我的管道进行单元测试?我猜我需要用文件系统连接器模拟源和接收器?但是如何创建本地 Flink 会话来在 pytest 中运行作业?我们在这里有最佳实践建议吗?

谢谢!

4

1 回答 1

1

你应该看看 PyFlink 本身的测试是如何实现的。它为实现表测试用例设置了各种基类; PyFlinkStreamTableTestCase可能是一个很好的起点。使用它可以编写像我从这里复制的这样的测试:

    def test_sql_query(self):
        t_env = self.t_env
        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
        field_names = ["a", "b", "c"]
        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
        t_env.register_table_sink(
            "sinks",
            source_sink_utils.TestAppendSink(field_names, field_types))

        result = t_env.sql_query("select a + 1, b, c from %s" % source)
        result.execute_insert("sinks").wait()
        actual = source_sink_utils.results()

        expected = ['+I[2, Hi, Hello]', '+I[3, Hello, Hello]']
        self.assert_equals(actual, expected)

还有更多测试来自该测试。

于 2021-11-12T11:08:32.743 回答