0

我可以使用 MongoExchangeSchema 写入 Kafka 主题数组而不是字符串吗?我想将我的收藏中的所有字段都发送到主题并使用此参数

  "output.schema.value":"{\"name\":\"MongoExchangeSchema\",\"type\":\"record\",\"namespace\":\"com.mongoexchange.avro\",\"fields\":[{\"name\": \"client_id\",\"type\": \"string\"},{\"name\": \"contacts\",\"type\": \"string\"}]}",

我再次想使用 KSQL 从数组中提取一些字段。但是我的流没有像数组一样定义字段“联系人”。感谢您的任何想法!

我如何创建流

CREATE STREAM CONTACTS_TEST_STREAM (
    client_id STRING,
    contacts Array<String>
  ) WITH (
    KEY_FORMAT='NONE',
    VALUE_FORMAT='AVRO',
    KAFKA_TOPIC='t_cb.Contacts'
  );

配置连接

     "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
     "schema.compatibility": "NONE",
     "auto.offset.reset" : "latest",
     "key.converter":"org.apache.kafka.connect.storage.StringConverter",
     "output.schema.value":"{\"name\":\"MongoExchangeSchema\",\"type\":\"record\",\"namespace\":\"com.mongoexchange.avro\",\"fields\":[{\"name\": \"client_id\",\"type\": \"string\"},{\"name\": \"contacts\",\"type\": \"string\"}]}",
     "value.converter":"io.confluent.connect.avro.AvroConverter"

Mongo 集合中的示例


db.Contacts.insert(
{
    "client_id" : "000000000",
    "contacts" : [ 
        {
            "contact_id" : "1",
            "contact_name" : "First",
            "deleted" : false,
            "contact_numbers" : [ 
                {
                    "contact_num" : "+4900000000",
                   
                }
            ]
        }

    ]
}
);
4

0 回答 0