0

在尝试重现浮士德文档和 Kafka 中显示的示例时,我得到了以下堆栈:

Traceback (most recent call last): 
File "src/mainsend.py", line 10, in <module> loop.run_until_complete(send_value()) 
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "src/mainsend.py", line 6, in send_value print(await adding.ask(Add(a=4, b=4))) 
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/agents/agent.py", line 752, in ask p = await self.ask_nowait( 
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/agents/agent.py", line 788, in ask_nowait await self.channel.send( 
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/topics.py", line 184, in send return await self._send_now( 
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/channels.py", line 300, in _send_now return await self.publish_message( 
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/topics.py", line 413, in publish_message fut2 = cast(asyncio.Future, await producer.send( 
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/transport/drivers/confluent.py", line 516, in send self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'

错误显示:self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'

工人的代码用户是:

import logging
import boto3
from decouple import config
import faust
from typing import AsyncIterable
from faust import StreamT
import uuid
import datetime

KF_BOOTSTRAP_SERVERS = config('BOOTSTRAP_SERVERS')
KF_API_KEY = config('API_KEY')
KF_API_SECRET = config('API_SECRET')

app = faust.App('ms-mapper', 
    debug = True,
    broker = f'confluent://{KF_BOOTSTRAP_SERVERS}',
    protocol='SASL_SSL',
    mechanisms='PLAIN',
    broker_credentials = faust.SASLCredentials(
    username = KF_API_KEY,
    password = KF_API_SECRET
))

class Add(faust.Record):
    a: int
    b: int

inbound_topic = app.topic('lifecycle.geo.transaction.new.in', value_type=Add)

@app.agent(inbound_topic)
async def adding(stream):
    async for value in stream:
        # here we receive Add objects, add a + b.
        yield value.a + value.b

对于发件人,代码是:

# examples/send_to_agent.py
import asyncio
from maintest import Add, adding

async def send_value() -> None:
    print(await adding.ask(Add(a=4, b=4)))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send_value())

我尝试使用其他传输编解码器,在记录中添加时间戳,但它不起作用

4

0 回答 0