0

我开始使用从 AWS Kinesis 读取的 Beam 项目,所以我有一个简单的 DoFn,它接受 KinesisRecord 并记录内容。我想编写一个单元测试来运行这个 DoFn 并证明它有效。不过,使用 KinesisRecord 进行单元测试已被证明具有挑战性。

当我尝试使用时出现此错误Create.of(testKinesisRecord)

java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly  or a schema by invoking Create.withSchema().

正如错误所暗示的那样,我已尝试使用“withCoder”显式提供 KinesisRecordCoder,但它是一个私有类。也许还有另一种对 DoFn 进行单元测试的方法?

测试代码:

public class MyProjectTests {
    @Rule
    public TestPipeline p = TestPipeline.create();

    @Test
    public void testPoC() {
        var testKinesisRecord = new KinesisRecord(
                ByteBuffer.wrap("SomeData".getBytes()),
                "seq01",
                12,
                "pKey",
                Instant.now().minus(Duration.standardHours(4)),
                Instant.now(),
                "MyStream",
                "shard-001"
        );


        PCollection<Void> output =
                p.apply(Create.of(testKinesisRecord))
                        .apply(ParDo.of(new MyProject.PrintRecordFn()));

        var result = p.run();
        result.waitUntilFinish();
        result.metrics().allMetrics().getCounters().forEach(longMetricResult -> {
            Assertions.assertEquals(1, longMetricResult.getCommitted().intValue());
        });
    }
}

自由度代码:

  static class PrintRecordFn extends DoFn<KinesisRecord, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(PrintRecordFn.class);
    private final Counter items = Metrics.counter(PrintRecordFn.class, "itemsProcessed");

    @ProcessElement
    public void processElement(@Element KinesisRecord element) {
      items.inc();

      LOG.info("Stream: `{}` Shard: `{}` Arrived at `{}`\nData: {}",
              element.getStreamName(),
              element.getShardId(),
              element.getApproximateArrivalTimestamp(),
              element.getDataAsBytes());
    }
  }
4

1 回答 1

1

KinesisRecordCoder应该用于内部目的,所以它是包私有的。同时,您可以提供自定义AWSClientsProvider并使用它来生成测试数据。作为示例,请查看KinesisMockReadTest和自定义提供程序

于 2019-11-13T14:58:55.920 回答