我有发出多个版本记录的 kafka 主题。在我的解决方案中,我只想过滤掉非常具体的消息,例如
一些消息如下所示:
{
magicByte: 2,
attributes: 0,
timestamp: '1594661912299',
offset: '136862',
key: <Buffer 32 30 30 39 36 37 38 7c 6d 65 6d 67 72 6f 75 70 5f 63 6f 6d 70 6f 73 69 74 65 2d 32>,
value: Memgroup_composite_2 {
...
memGroupAddress: [ [MemGroupAddressrecord] ],
...
memGroupTIN: [ [MemGroupTINrecord] ],
...
memGroupAlias: [ [MemGroupAliasrecord] ],
...
},
headers: { ... },
...
}
而其他人看起来像这样:
{
magicByte: 2,
attributes: 0,
timestamp: '1594661912299',
offset: '136862',
key: <Buffer 32 30 30 39 36 37 38 7c 6d 65 6d 67 72 6f 75 70 5f 63 6f 6d 70 6f 73 69 74 65 2d 32>,
value: Memgroup_composite_1 {...}
}
订阅消息流并注销整个消息时,我可以在控制台中看到 Avro 类型,但是在使用 message.value 时,该信息会丢失。我最终希望在 allowMessageFilter() 中有一些过滤逻辑来确定允许哪些记录。在这种情况下,我只想过滤“Memgroup_composite_2”
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if(!allowMessageFilter(message)){ return; }
console.log(message);
let msg = JSON.parse(message.value);
console.log(msg);
},
});
任何帮助将不胜感激。