我正在使用 Apache Flink 开发一个项目,并且正在使用 junit 来测试我的操作员。
但是我面临一个问题:由于并行性,flink 将以“随机”行顺序写入其输出 csv 文件,因此我无法轻易断言输出文件等于 Junit 的预期输出文件。
性能不是问题,因为我们讨论的是小文件(<100 行)并且仅用于测试。
有简单的解决方案吗?
我正在使用 Apache Flink 开发一个项目,并且正在使用 junit 来测试我的操作员。
但是我面临一个问题:由于并行性,flink 将以“随机”行顺序写入其输出 csv 文件,因此我无法轻易断言输出文件等于 Junit 的预期输出文件。
性能不是问题,因为我们讨论的是小文件(<100 行)并且仅用于测试。
有简单的解决方案吗?
您可以分两个阶段检查您的程序:
单独测试您的单个函数,例如 MapFunction。在这里,您只检查您自己的代码,并且输出应该是确定性的(假设您的函数是确定性的)。
测试完整的程序。在这里,您的代码将由 Flink 执行,并且结果的顺序是不确定的(除非您对其进行排序)。在 Flink 中,我们有一些实用程序类来测试完整的程序(主要用于运行我们自己的集成测试)。这些类启动一个小的本地 Flink 实例,运行测试,并将其与预期结果(排序或无序)进行比较。查看MultipleProgramsTestBase以及如何在DegreesITCase中使用它。您可以通过包含flink-test-utils Maven 依赖项来使用 MultipleProgramsTestBase。根据您使用的 Flink 版本,情况可能与当前的 master 有点不同。如果您有任何问题,请在此处发表评论或 ping Flink 用户邮件列表。
测试您是否有预期的行数。然后,将预期的行存储在列表中。遍历实际行并确保它们“在”预期的行列表中,并从列表中删除该条目。类似于(groovy 伪代码):
assert actualLines.size() == expectedLines.size()
actualLines.each { line->
assert line in expectedLines
expectedLines.remove(line)
}
这应该测试您是否具有预期的行数,并且您是否具有预期的行值而不考虑顺序。