0

我目前正在评估在新项目中使用 spring cloud stream kinesis binder 的可能性,但我遇到了一些问题。

当我启用 kcl-kpl-enabled: false 时,一切正常。但是,当我启用 kcl-kop 时,我不断收到以下错误:

org.springframework.core.serializer.support.SerializationFailedException: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is java.io.StreamCorruptedException: invalid stream header: 61686868

这是我目前的配置:

spring:
  cloud:
    stream:
      kinesis:
        binder:
          checkpoint:
            create-delay: 0
            table: feeder_mycollection_changes_table
          kpl-kcl-enabled: true
        bindings:
          processEvent-in-0:
            consumer:
              shardIteratorType: TRIM_HORIZON
      bindings:
        processEvent-in-0:
          destination: mycollection_changes_stream
          content-type: application/json
          consumer:
            headerMode: none

我在测试中使用的依赖项的版本是:

<spring-cloud.version>Hoxton.RC2</spring-cloud.version>
<spring-cloud-stream.version>Horsham.RC2</spring-cloud-stream.version>
<spring-cloud-stream-kinesis.version>2.0.0.BUILD-SNAPSHOT</spring-cloud-stream-kinesis.version>
4

1 回答 1

1

修复在这里:https ://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/commit/b64cd10c5b5aac209b61399f81e2801f24fbbaf4

converter问题是中的默认值KclMessageDrivenChannelAdapterDeserializingConverter.

Spring Cloud Stream 不处理 Java 序列化,它有自己的转换机制byte[]。因此,我们需要修复一个 Binder 实现以依赖 Spring Cloud Stream 转换基础架构。

于 2019-11-18T23:26:37.270 回答