在尝试重现浮士德文档和 Kafka 中显示的示例时,我得到了以下堆栈:
Traceback (most recent call last):
File "src/mainsend.py", line 10, in <module> loop.run_until_complete(send_value())
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "src/mainsend.py", line 6, in send_value print(await adding.ask(Add(a=4, b=4)))
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/agents/agent.py", line 752, in ask p = await self.ask_nowait(
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/agents/agent.py", line 788, in ask_nowait await self.channel.send(
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/topics.py", line 184, in send return await self._send_now(
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/channels.py", line 300, in _send_now return await self.publish_message(
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/topics.py", line 413, in publish_message fut2 = cast(asyncio.Future, await producer.send(
File "/home/ralonso/work/smu/ms-lc-mapper/env/lib/python3.8/site-packages/faust/transport/drivers/confluent.py", line 516, in send self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'
错误显示:self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'
工人的代码用户是:
import logging
import boto3
from decouple import config
import faust
from typing import AsyncIterable
from faust import StreamT
import uuid
import datetime
KF_BOOTSTRAP_SERVERS = config('BOOTSTRAP_SERVERS')
KF_API_KEY = config('API_KEY')
KF_API_SECRET = config('API_SECRET')
app = faust.App('ms-mapper',
debug = True,
broker = f'confluent://{KF_BOOTSTRAP_SERVERS}',
protocol='SASL_SSL',
mechanisms='PLAIN',
broker_credentials = faust.SASLCredentials(
username = KF_API_KEY,
password = KF_API_SECRET
))
class Add(faust.Record):
a: int
b: int
inbound_topic = app.topic('lifecycle.geo.transaction.new.in', value_type=Add)
@app.agent(inbound_topic)
async def adding(stream):
async for value in stream:
# here we receive Add objects, add a + b.
yield value.a + value.b
对于发件人,代码是:
# examples/send_to_agent.py
import asyncio
from maintest import Add, adding
async def send_value() -> None:
print(await adding.ask(Add(a=4, b=4)))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send_value())
我尝试使用其他传输编解码器,在记录中添加时间戳,但它不起作用