1

就我而言,我有一些原始 JSON 字符串数据发送到主题并且无法对 POJO 类进行硬编码,我想使用 pulsar 模式功能来验证结构。我有一个主题“我的主题”并与下面的 JSON 模式相关联,然后我尝试传输一些消息。

var producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES();
producer.send("{\"y\": 1}".getBytes()); // here! the value is 1(number) not string.

var reader = client.newReader(Schema.AUTO_CONSUME())
var message = reader.readNext();
I got {"y": 1}

我的问题是脉冲星模式是如何工作的?该消息应被拒绝。

{
  "version": 1,
  "schemaInfo": {
    "name": "my-topic",
    "schema": {
      "type": "record",
      "name": "Data",
      "namespace": "com.iot.test",
      "fields": [
        {
          "name": "y",
          "type": [
            "null",
            "string"
          ]
        }
      ]
    },
    "type": "JSON",
    "properties": {
      "__alwaysAllowNull": "true"
    }
  }
}
4

2 回答 2

1

我的错。只需要设置

v2.5.0
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable iot/test

v2.4.2
bin/pulsar-admin namespaces set-schema-autoupdate-strategy --disable iot/test
于 2020-03-23T07:23:39.970 回答
0

Schema.AUTO_PRODUCE_BYTES设置对于将数据从生产者传输到具有模式的 Pulsar 主题很有用,因为它确保发送的消息与主题的模式兼容。但是,我看不到您在哪里为该主题指定了架构。

当您连接类型化的生产者或消费者时,会自动为主题分配模式,例如

Producer producer = client.newProducer(JSONSchema.of(SensorReading.class))
    .topic("sensor-data")
    .sendTimeout(3, TimeUnit.SECONDS)
    .create();

但是你已经说过你不能这样做,因为你“不能硬编码 POJO”。因此,将模式分配给主题(因此它可以强制消息模式兼容性)的唯一其他选择是使用REST API 调用进行手动模式管理。

根据您的架构,您的架构定义文件将如下所示:

{
  "type": "JSON",
  "schema": "{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"com.iot.test\",\"fields\":[{\"name\":\"y\",\"type\":[\"null\",\"string\"],\"default\":null}}",
  "properties": {}
}

高温高压

于 2020-03-22T18:40:31.557 回答