我可以使用 MongoExchangeSchema 写入 Kafka 主题数组而不是字符串吗?我想将我的收藏中的所有字段都发送到主题并使用此参数
"output.schema.value":"{\"name\":\"MongoExchangeSchema\",\"type\":\"record\",\"namespace\":\"com.mongoexchange.avro\",\"fields\":[{\"name\": \"client_id\",\"type\": \"string\"},{\"name\": \"contacts\",\"type\": \"string\"}]}",
我再次想使用 KSQL 从数组中提取一些字段。但是我的流没有像数组一样定义字段“联系人”。感谢您的任何想法!
我如何创建流
CREATE STREAM CONTACTS_TEST_STREAM (
client_id STRING,
contacts Array<String>
) WITH (
KEY_FORMAT='NONE',
VALUE_FORMAT='AVRO',
KAFKA_TOPIC='t_cb.Contacts'
);
配置连接
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"schema.compatibility": "NONE",
"auto.offset.reset" : "latest",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"output.schema.value":"{\"name\":\"MongoExchangeSchema\",\"type\":\"record\",\"namespace\":\"com.mongoexchange.avro\",\"fields\":[{\"name\": \"client_id\",\"type\": \"string\"},{\"name\": \"contacts\",\"type\": \"string\"}]}",
"value.converter":"io.confluent.connect.avro.AvroConverter"
Mongo 集合中的示例
db.Contacts.insert(
{
"client_id" : "000000000",
"contacts" : [
{
"contact_id" : "1",
"contact_name" : "First",
"deleted" : false,
"contact_numbers" : [
{
"contact_num" : "+4900000000",
}
]
}
]
}
);