0

我有发出多个版本记录的 kafka 主题。在我的解决方案中,我只想过滤掉非常具体的消息,例如

一些消息如下所示:

{
  magicByte: 2,
  attributes: 0,
  timestamp: '1594661912299',
  offset: '136862',
  key: <Buffer 32 30 30 39 36 37 38 7c 6d 65 6d 67 72 6f 75 70 5f 63 6f 6d 70 6f 73 69 74 65 2d 32>,
  value: Memgroup_composite_2 {
    ...
    memGroupAddress: [ [MemGroupAddressrecord] ],
    ...
    memGroupTIN: [ [MemGroupTINrecord] ],
    ...
    memGroupAlias: [ [MemGroupAliasrecord] ],
   ...
  },
  headers: { ... },
  ...
  }

而其他人看起来像这样:

{
  magicByte: 2,
  attributes: 0,
  timestamp: '1594661912299',
  offset: '136862',
  key: <Buffer 32 30 30 39 36 37 38 7c 6d 65 6d 67 72 6f 75 70 5f 63 6f 6d 70 6f 73 69 74 65 2d 32>,
  value: Memgroup_composite_1 {...}
}

订阅消息流并注销整个消息时,我可以在控制台中看到 Avro 类型,但是在使用 message.value 时,该信息会丢失。我最终希望在 allowMessageFilter() 中有一些过滤逻辑来确定允许哪些记录。在这种情况下,我只想过滤“Memgroup_composite_2”

consumer.run({
  eachMessage: async ({ topic, partition, message }) => {

    if(!allowMessageFilter(message)){ return; }

    console.log(message);
    let msg = JSON.parse(message.value);
    console.log(msg);
  },
});

任何帮助将不胜感激。

4

0 回答 0