0

我正在使用flink stateful functions开发一个新项目。我已经使用FunctionTestHarness编写了一些基本的单元测试,但是使用此方法的测试无法测试有状态函数之间的交互。

flink测试文档(基本 flink,不适用于有状态函数)演示了如何使用作业的输出运行完整的作业MiniClusterWithClientResource,然后对作业的输出进行断言。我正在寻找一种与有状态函数类似的方法。

statefun -flink-harness-example看起来很有希望,但是RunnerTestusing Harness标记为 ,@Ignore因为它永远不会终止。这对于调试很有用,但不能用于自动化测试。

以下是我迄今为止发现的问题,这使得编写以Harness终止的测试变得困难:

  1. Harness使用SerializableSupplier来提供输入,而SerializableSupplier没有办法说它已经完成。这意味着任何使用Harness的测试总是在等待更多的输入。
  2. 如果Harness知道所有输入都已发送,那么一旦没有未决事件,它就需要一种终止方式。
  3. 作为一个额外的复杂因素,由于Context发送的延迟事件,一些系统仍然永远不会终止.sendAfter()

我认为这将是实现更有趣的自动化测试的常见需求,这些自动化测试可以从 CI/CD 流程运行。有没有人找到解决上述问题的方法,或者使用Harness以外的工具发现了一种完全不同的方法?

4

1 回答 1

4

Harness 还包含一个.withFlinkSourceFunction()允许使用任何 FlinkSourceFunction作为入口的方法。

您可以创建自己的源函数来生成有限的元素集合,例如:

class FiniteSource<T extends Serializable> implements SourceFunction<T> {
    private final List<T> items;

    FiniteSource(List<T> items) {
      this.items = items;
    }

    @Override
    public void run(SourceContext<T> sourceContext) {
      for (T item : items) {
        sourceContext.collect(item);
      }
    }

    @Override
    public void cancel() {}
  }

然后,您可以通过以下方式修改线束示例:

   FiniteSource<MyInputMessage> finiteSource = new FiniteSource<>(
            Arrays.asList(
                    new MyInputMessage("user-1", "hello"),
                    new MyInputMessage("user-2", "world")));

    Harness harness =
        new Harness()
            .withKryoMessageSerializer()
            .withFlinkSourceFunction(MyConstants.REQUEST_INGRESS,finiteSource)
            .withPrintingEgress(MyConstants.RESULT_EGRESS);

    harness.start();

这应该在将两个输入消息生成到入口后终止。如果你认为这是一个常见的要求,那么我鼓励你在 Flink 邮件列表中提出这个问题,我相信那里的友好社区会很乐意接受你的反馈和更多的贡献;)

有没有人找到解决上述问题的方法,或者使用 Harness 以外的工具发现了一种完全不同的方法?

对于 CI/CD 管道,我建议查看我们基于测试容器的 e2e 测试。(例如这个

于 2020-05-22T20:47:31.740 回答