1

我在 mongoDB 中有以下三个集合:

//Drinkers
{
    "drinkerId" : "ID1",
    "name" : "name",
    "lastName" : "lastname"
}

//Drinks
{
    "drinkId" : "ID2",
    "drinkName" : "wine",
    "alcoholPercent" : 0.12
}
//Happy-Hours
{
    "drinkerId" : "ID1",
    "drinkId" : "ID1"
}

我还使用以下 mongo 管道在欢乐时光集合上提交了一个 kafka 连接器实例。

    [
      {
        "$match": {
          "operationType": "insert"
        }
      },
      {
        "$lookup": {
          "from": "drinkers",
          "localField": "fullDocument.drinkerId",
          "foreignField": "drinkerId",
          "as": "drinkerInfo"
        }
      },
      {
        "$unwind": "$drinkerInfo"
      },
      {
        "$lookup": {
          "from": "drinks",
          "localField": "fullDocument.drinkId",
          "foreignField": "drinkId",
          "as": "drinkInfo"
        }
      },
      {
        "$unwind": "$drinkInfo"
      }
    ]

当我查看 kafka-connect 日志时,我看到以下消息:

WARN 无法恢复更改流:$changeStream 管道中不允许 $lookup 20

并且消息没有发布到主题。

我真正的用例是将大型 mongodb 文档插入到饮酒者和饮料集合中,并且我想将较小的文档插入到欢乐时光集合中。最终消费者需要饮酒者和饮料信息。

我是否有其他方法可以检索饮酒者和饮料信息并获取发布到该主题的详细消息?

4

0 回答 0