1

我想创建一个由kafka自动压缩的主题,使用浮士德流。我使用如下代码:

import asyncio
import random
import faust

app = faust.App(
    'mysample',
    broker=<...>,
    value_serializer='raw',
    key_serializer='raw',
    topic_replication_factor=3,
)

topic = app.topic('mytest5', partitions=1, compacting=True, acks=False)

count = 1
@app.timer(2.0, on_leader=True)
async def publish_greetings():
    global count
    print('PUBLISHING ON LEADER!')
    await topic.send(key=str(count), value=str(random.random()))
    count += 1

@app.agent(topic)
async def say(greetings):
    async for k, v in greetings.items():
        print(f'{k}, {v}')

我也尝试了 options internal, retention,deleting创建主题,并且每次更改主题名称时,都应该创建新主题。但我没有看到卡夫卡实际上压缩了这个话题。每次阅读该主题时,我都会看到完整的事件历史记录。

如何用浮士德创建一个紧凑的主题?

4

0 回答 0