1

我对 KinesisMessageDrivenChannelAdapter 进行了以下配置,当我dynamoDbMetaDataStore作为检查点删除时,可以正确接收消息,但是当我添加回来时,记录总是空的。我调试了代码,KinesisMessageDrivenChannelAdapter.processTask()第 776 行(版本 2.0.0.M2)返回空记录。

更新:

public DynamoDbMetaDataStore dynamoDbMetaDataStore() {
    String url = consumerClientProperties.getDynamoDB().getUrl();
    final AmazonDynamoDBAsync amazonDynamoDB = AmazonDynamoDBAsyncClientBuilder.standard()
        .withEndpointConfiguration(new EndpointConfiguration(
            url,
            Regions.fromName(awsRegion).getName()))
        .withClientConfiguration(new ClientConfiguration()
            .withMaxErrorRetry(consumerClientProperties.getDynamoDB().getRetries())
            .withConnectionTimeout(consumerClientProperties.getDynamoDB().getConnectionTimeout())).build();
    DynamoDbMetaDataStore dynamoDbMetaDataStore = new DynamoDbMetaDataStore(amazonDynamoDB, "consumer-test");
    return dynamoDbMetaDataStore;
  }

  public KinesisMessageDrivenChannelAdapter kinesisInboundChannel(
      AmazonKinesis amazonKinesis, String[] streamNames) {
    KinesisMessageDrivenChannelAdapter adapter =
        new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
    adapter.setConverter(null);
    adapter.setOutputChannel(kinesisReceiveChannel());
    adapter.setCheckpointStore(dynamoDbMetaDataStore());
    adapter.setConsumerGroup(consumerClientProperties.getName());
    adapter.setCheckpointMode(CheckpointMode.manual);
    adapter.setListenerMode(ListenerMode.record);
    adapter.setStartTimeout(10000);
    adapter.setDescribeStreamRetries(1);
    adapter.setConcurrency(10);
    return adapter;
  }

谢谢

4

1 回答 1

0

我建议您使用最新的2.0.0.BUILD-SNAPSHOT.

已经有一个选项,例如:

/**
 * Specify a {@link LockRegistry} for an exclusive access to provided streams.
 * This is not used when shards-based configuration is provided.
 * @param lockRegistry the {@link LockRegistry} to use.
 * @since 2.0
 */
public void setLockRegistry(LockRegistry lockRegistry) {

您需要在其中注入一个DynamoDbLockRegistry更好的检查点管理。

为此,您还需要添加此依赖项:

compile("com.amazonaws:dynamodb-lock-client:1.0.0")

过滤确实可能存在一些问题M2......

于 2018-06-25T19:12:44.603 回答