问题标签 [faust]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
501 浏览

python-3.x - FAUST Asynchronous kafka message processing concurrency is not working

Currently, I'm trying to read the data from kafka topic and call the rest-API asynchronously with data that I fetched from kafka topic. here rest-api gives response instantly if msg is Meher else response will takes 5 sec

kafka-data

below is the code:

current-output:

expected-output:

expected is to get the responses for all the rest calls with an instant response first and late response should come after that, but currently it is working as sequentially.

If I increase the concurrency to 5, It is giving expected output, but should work with the same in case of concurrency 1. Not sure, if i'm missing something....any help on this ?

update1:

I have tried the samething with normal python asyncIO..It is working as expected

0 投票
1 回答
234 浏览

python - 如何在 python 的浮士德库中运行 hello_world 示例

我的 kafka 部署在 kubernetes 中,所以我转发了 9092 端口

kubectl port-forward -n kafka svc/kafka 9092

并尝试运行hello_world.py 但遇到异常

[2020-10-01 00:31:52,331] [3503] [错误] 无法连接到 id 为 0 的节点:[Errno -2] 名称或服务未知
[2020-10-01 00:31:52,341] [3503 ] [ERROR] [^Worker]: Error: ConnectionError('No connection to node with id 0',)

完整的堆栈跟踪:

你能解释一下,我应该解决什么问题

0 投票
0 回答
117 浏览

python - 尝试使用 faust 在 django 中创建一个 cronjob

我正在尝试使用 faust 在 django 中创建一个 cronjob。如果我创建一个简单的 cronjob 在屏幕上打印一些东西,它可以工作,但如果我尝试使用一些 ORM 东西,它就不行。

我也尝试这样做:

0 投票
3 回答
1061 浏览

python-3.x - 使用python编译的protobuf pb2作为键值序列化器

我正在尝试从已使用 google 的 protobuf 序列化的 kafka topiv 中读取数据。

protoc我使用生成的文件编译了原型pb2文件。

现在我正在尝试使用 faust 并创建一个流处理器,但我找不到将 pb2 文件用作key_serializervalue_serializer.

这是我尝试过的:

有人知道如何在序列化程序中使用 pb2 吗?

0 投票
1 回答
403 浏览

python-3.x - 使用 kafka 运行 Faust 会导致 ConsumerStoppedError 崩溃

我刚刚安装了 Faust 并运行了一个基本程序来通过 Kafka 发送和接收消息。我使用了(发布到 kafka 主题的 Faust 示例)中提到的示例代码,最初运行程序时它连接到 Kafka(它也在我的机器)。但是在尝试使用 Kafka 时断开连接,应用程序崩溃并出现以下异常

在调试消费者断开连接的原因时,我看到在 alokafka 消费者的 fetcher.py 中,连接由于以下异常而关闭

软件版本如下

  • Mac 操作系统:10.15.4
  • 卡夫卡:2_12.2.1.1
  • 爱奥卡夫卡:1.1.6
  • 蟒蛇:3.9.0
  • 浮士德:1.10.4

请在这里帮忙。

0 投票
0 回答
113 浏览

python - 使从数据类创建的 faust.Record 与可选字段一起使用

目前我正在编写一个 python 测试,所以我可以确保我的make_faust_record()方法有效。这就是它的样子:

Order是使用 Google ProtocolBuffers 生成的,如您所见,第一个字段是id

在此处输入图像描述

这是我得到的输出make_faust_record

在此处输入图像描述

问题是,当我调用from时,.from_data()它抱怨第一个字段它不在有效负载()中:@classmethodfaust.Recordid

在此处输入图像描述

我希望能够使用 INCOMPLETE PAYLOAD.from_data()对我的klass( ) 运行,因此只包含我在此 INCOMPLETE PAYLOAD 中提供的属性。faust.Recordfaust.Record

这是我的make_faust_record()逻辑:

0 投票
0 回答
736 浏览

python - Faust for Python:如何启动 worker 来运行 hello world

在 Spyder 中,我正在尝试 Faust 官方网站上的 Hello World 应用程序:

我启动 Zookeeper 和 Kafka(kafka 在 Spyder 中使用 Consumer/Producer 示例)

我被困在这个阶段开始一个工人

我在命令行中尝试过,但$ faust无法识别;我也在 Python 解释器中尝试过,但没有用(我没想到这会起作用)。

我阅读了有关守护进程的信息,所以我想这与此有关,但我可以再次使用 python 解释器运行 Kafka 消费者/生产者示例。

谢谢你

0 投票
1 回答
260 浏览

python - 未来中的未来总是悬而未决

PS 开始了一个问题https://github.com/robinhood/faust/issues/702

开发浮士德-app:

当方法sink.send_soon返回始终处于挂起状态的FutureMessage(asyncio.Future, Awaitable[RecordMetadata])时遇到问题。

这就是未来在另一个未来中的情况。

笔记。函数句柄应该是同步的,因为不能将异步函数传递给 ProcessPollExecutor。方法send_soon是同步方法。根据此示例https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47不一定等待。

如果有什么方法可以处理未决的未来?

也试过这个:

但它也没有奏效。

似乎循环甚至没有机会运行 send_soon 方法。

0 投票
1 回答
251 浏览

python - 将消息重新发送到主题的正确方法

我将消息从 kafka 主题加载到数据库。加载到数据库可能会失败。我也不想丢失未发送的消息。

应用代码:

也许有更标准的方式来处理这种情况?添加 app.topic('source_topic', acks=False) 仅在重新启动应用程序后才有效。

0 投票
1 回答
1347 浏览

python - 浮士德与 Kafka-python 之间的区别

我找不到任何答案:Faustkafka-python有什么区别?
偏爱其中任何一个有什么优点/缺点吗?
据我了解:

  • Kafka是用Java编写的,Kafka-python是一个与“Java 流”通信的Python客户端
  • Faust是一个纯粹的“Python 流”

所以,如果我打算只使用 Python,那么 Faust 应该是更好的选择,如果我想拥有更广泛的兼容性(Go、.NET、C/C#、Java、Python),那么使用 Kafka + Kafka-python?

注意:我是使用 Kafka 的新手,我正在尝试了解不同解决方案的优缺点。

我将非常感谢任何建议!