所以我有一个用 PyFlink SQL API 编写的简单聚合作业。该作业从 AWS kinesis 读取数据并将结果输出到 Kinesis。
我很好奇我是否可以用 pytest 对我的管道进行单元测试?我猜我需要用文件系统连接器模拟源和接收器?但是如何创建本地 Flink 会话来在 pytest 中运行作业?我们在这里有最佳实践建议吗?
谢谢!
所以我有一个用 PyFlink SQL API 编写的简单聚合作业。该作业从 AWS kinesis 读取数据并将结果输出到 Kinesis。
我很好奇我是否可以用 pytest 对我的管道进行单元测试?我猜我需要用文件系统连接器模拟源和接收器?但是如何创建本地 Flink 会话来在 pytest 中运行作业?我们在这里有最佳实践建议吗?
谢谢!
你应该看看 PyFlink 本身的测试是如何实现的。它为实现表测试用例设置了各种基类; PyFlinkStreamTableTestCase
可能是一个很好的起点。使用它可以编写像我从这里复制的这样的测试:
def test_sql_query(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
field_names = ["a", "b", "c"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
t_env.register_table_sink(
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
result = t_env.sql_query("select a + 1, b, c from %s" % source)
result.execute_insert("sinks").wait()
actual = source_sink_utils.results()
expected = ['+I[2, Hi, Hello]', '+I[3, Hello, Hello]']
self.assert_equals(actual, expected)
还有更多测试来自该测试。