0

我正在使用 ksqlDB,我从流中创建了一个表。当我在该表中触发选择查询时,它会正确地为我提供所有记录。现在我想在 MongoDB 中下沉那个表。我还能够在 Kafka 表和 MongoDB 之间创建一个接收器。但不知何故,它只将一条记录沉入其中(MongoDB)。而在表中我有 100 条记录。下面是我的 MongoDB 接收器连接器。

{
  "name": "MongoSinkConnectorConnector_1",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "FEEDS",
    "connection.uri": "mongodb://xxx:xxx@x.x.x.x:27017/",
    "database": "xxx",
    "max.num.retries": "1000000",
    "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
    "value.projection.type": "allowlist",
    "value.projection.list": "id",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
    "buffer.capacity": "20000",
    "value.converter.schema.registry.url": "http://x.x.x.x:8081",
    "key.converter.schemas.enable": "false",
    "insert.mode": "upsert"
  }
}

我无法理解,这背后的原因是什么。任何帮助表示赞赏。谢谢

4

1 回答 1

0

您可以在 Sink Connector 属性中设置“batch.size”属性,您还可以通过 Mongo DB Source Connector 官方文档https://docs.confluent.io/cloud/current/阅读更好的写入模型策略连接器/cc-mongo-db-source.html

于 2021-06-18T02:17:31.270 回答