问题标签 [faust]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
50 浏览

python - 如何从 Docker 运行 Faust - 错误:python-rocksdb 的构建轮失败

我正在尝试从 Docker 运行 Python Faust。

基于此文档:https ://faust.readthedocs.io/en/latest/userguide/installation.html

我创建了一个简单的 Docker 文件:

当我创建 docker 文件时,我在第 5 阶段收到错误(步骤 5/7:RUN pip install "faust[rocksdb]")

---> 在 1e42a5e50cbe 中运行 已满足要求:/usr/local/lib/python3.10/site-packages (1.10.4) 中的 faust[rocksdb] 已满足要求:/usr/ 中的 terminaltables<4.0,>=3.1 local/lib/python3.10/site-packages (from faust[rocksdb]) (3.1.10) 已经满足要求:点击/usr/local/lib/python3.10/site-packages中的<8.0,>=6.7 (来自 faust[rocksdb]) (7.1.2) 要求已经满足:yarl<2.0,>=1.0 in /usr/local/lib/python3.10/site-packages (来自 faust[rocksdb]) (1.7.2) 要求已经满足:/usr/local/lib/python3.10/site-packages 中的 aiohttp-cors<2.0,>=0.7(来自 faust[rocksdb])(0.7.0) 已经满足要求:/usr/ 中的 mypy-extensions local/lib/python3.10/site-packages (from faust[rocksdb]) (0.4.3) 已经满足要求:/usr/local/lib/python3 中的 colorclass<3.0,>=2.2。10/site-packages (来自 faust[rocksdb]) (2.2.2) 要求已经满足:/usr/local/lib/python3.10/site-packages 中的 opentracing<2.0.0,>=1.3.0 (来自 faust [rocksdb]) (1.3.0) 要求已经满足:mode<4.4,>=4.3.2 in /usr/local/lib/python3.10/site-packages (from faust[rocksdb]) (4.3.2) 要求已经满足:venusian<2.0,>=1.1 in /usr/local/lib/python3.10/site-packages (from faust[rocksdb]) (1.2.0) 要求已经满足:aiohttp<4.0,>=3.5.2在 /usr/local/lib/python3.10/site-packages 中(来自 faust[rocksdb])(3.8.1) 已经满足要求:robinhood-aiokafka<1.2,>=1.1.6 在 /usr/local/lib/ python3.10/site-packages (来自 faust[rocksdb]) (1.1.6) 要求已经满足:/usr/local/lib/python3.10/site-packages 中的 croniter>=0.3.16 (来自 faust[rocksdb] ) (1.1.0) 收集 python-rocksdb>=0.6.7 下载 python-rocksdb-0.7.0.tar.gz (219 kB) 准备元数据 (setup.py): 开始准备元数据 (setup.py): 完成状态“完成”已满足要求:/usr/local/lib/python3.10/site-packages 中的 aiosignal>=1.1.2(来自 aiohttp<4.0,>=3.5.2->faust[rocksdb])(1.2.0)已满足要求满意:/usr/local/lib/python3.10/site-packages 中的 attrs>=17.3.0(来自 aiohttp<4.0,>=3.5.2->faust[rocksdb])(21.2.0) 要求已经满足: /usr/local/lib/python3.10/site-packages 中的 frozenlist>=1.1.1 (来自 aiohttp<4.0,>=3.5.2->faust[rocksdb]) (1.2.0) 已满足要求:charset- /usr/local/lib/python3.10/site-packages 中的 normalizer<3.0,>=2.0 (来自 aiohttp<4.0,>=3.5.2->faust[rocksdb]) (2.0.9) 已满足要求:/usr/local/lib/python3.10/site-packages 中的 multidict<7.0,>=4.5 (来自 aiohttp<4.0,>=3.5.2->faust[rocksdb]) (5.2.0) 已满足要求:异步-timeout<5.0,>=4.0.0a3 in /usr/local/lib/python3.10/site-packages (来自 aiohttp<4.0,>=3.5.2->faust[rocksdb]) (4.0.2) 已经有要求满意:/usr/local/lib/python3.10/site-packages 中的 python-dateutil(来自 croniter>=0.3.16->faust[rocksdb])(2.8.2) 已满足要求:colorlog>=2.9.0在 /usr/local/lib/python3.10/site-packages (from mode<4.4,>=4.3.2->faust[rocksdb]) (6.6.0) 要求已经满足:setuptools>=25 in /usr/ local/lib/python3.10/site-packages (from python-rocksdb>=0.6.7->faust[rocksdb]) (57.5.0) 要求已经满足:kafka-python<1.5,>=1.4.6 in / usr/local/lib/python3.10/site-packages (from robinhood-aiokafka<1.2,>=1.1.6->faust[rocksdb]) (1.4.7) 要求已经满足:idna>=2.0 in /usr/local/lib/python3.10/站点包(来自 yarl<2.0,>=1.0->faust[rocksdb])(3.3)已经满足要求:/usr/local/lib/python3.10/site-packages 中的六个>=1.5(来自 python-dateutil ->croniter>=0.3.16->faust[rocksdb]) (1.16.0)

还有一个错误部分

为收集的包构建轮子:python-rocksdb 为 python-rocksdb (setup.py) 构建轮子:已启动错误:命令错误退出状态为 1:命令:/usr/local/bin/python -u -c 'import io,操作系统、系统、设置工具、标记化;sys.argv[0] = '"'"'/tmp/pip-install-b8y7g4hs/python-rocksdb_b1c08993fd134ac4bc5​​9e6f5d18bcd91/setup.py'"'"'; file ='"'"'/tmp/pip-install-b8y7g4hs/python-rocksdb_b1c08993fd134ac4bc5​​9e6f5d18bcd91/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)( file ) if os.path.exists( file ) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read()., '"'"'exec'"'"'))' bdist_wheel -d /tmp/pip-wheel-9_o4ek6z cwd: /tmp/pip-install-b8y7g4hs/python-rocksdb_b1c08993fd134ac4bc5​​9e6f5d18bcd91/ 完整输出(64 行):运行 bdist_wheel运行 build
运行 build_py 创建 build 创建 build/lib.linux-x86_64-3.10 创建 build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/interfaces.py -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/ errors.py -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/merge_operators.py -> build/lib.linux-x86_64-3.10/rocksdb
复制rocksdb/ init .py -> build/lib.linux-x86_64 -3.10/rocksdb
创建 build/lib.linux-x86_64-3.10/rocksdb/tests 复制rocksdb/tests/test_memtable.py -> build/lib.linux-x86_64-3.10/rocksdb/tests 复制rocksdb/tests/test_db.py -> build/lib .linux-x86_64-3.10/rocksdb/tests 复制rocksdb/tests/ init.py -> build/lib.linux-x86_64-3.10/rocksdb/tests 复制rocksdb/tests/test_options.py -> build/lib.linux-x86_64-3.10/rocksdb/tests 运行 egg_info 写入 python_rocksdb.egg-info/PKG -INFO 将dependency_links 写入python_rocksdb.egg-info/dependency_links.txt 将要求写入python_rocksdb.egg-info/requires.txt 将顶级名称写入python_rocksdb.egg-info/top_level.txt 读取清单文件'python_rocksdb.egg-info /SOURCES.txt'读取清单模板'MANIFEST.in'写入清单文件'python_rocksdb.egg-info/SOURCES.txt'复制rocksdb/_rocksdb.cpp -> build/lib.linux-x86_64-3.10/rocksdb复制rocksdb/rockdb.pyx -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/backup.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/cache.pxd -> build/lib.linux-x86_64 -3.10/rocksdb 复制rocksdb/comparator.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/db.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/env.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/filter_policy.pxd -> build/lib.linux-x86_64-3.10/rocksdb
复制rocksdb/iterator.pxd -> build/lib.linux-x86_64-3.10/rocksdb
复制rockdb/logger.pxd -> build/lib.linux-x86_64-3.10/rocksdb
复制rocksdb/memtablerep.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/merge_operator.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/options.pxd -> build/lib。 linux-x86_64-3.10/rocksdb 复制rocksdb/slice
.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/slice_transform.pxd -> build/lib.linux-x86_64-3.10/rocksdb
复制rocksdb/snapshot。 pxd -> build/lib.linux-x86_64-3.10/rocksdb
复制rocksdb/status.pxd -> build/lib.linux-x86_64-3.10/rocksdb
复制rocksdb/std_memory.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/table_factory.pxd -> build/lib.linux-x86_64-3.10/rocksdb 复制rocksdb/universal_compaction.pxd -> build/lib。 linux-x86_64-3.10/rocksdb 创建 build/lib.linux-x86_64-3.10/rocksdb/cpp 复制rocksdb/cpp/comparator_wrapper.hpp -> build/lib.linux-x86_64-3.10/rocksdb/cpp 复制rocksdb/cpp/filter_policy_wrapper .hpp -> build/lib.linux-x86_64-3.10/rocksdb/cpp 复制rocksdb/cpp/memtable_factories.hpp -> build/lib.linux-x86_64-3.10/rocksdb/cpp 复制rocksdb/cpp/merge_operator_wrapper.hpp -> build/lib.linux-x86_64-3.10/rocksdb/cpp 复制rocksdb/cpp/slice_transform_wrapper.hpp -> build/lib.linux-x86_64-3.10/rocksdb/cpp 复制rocksdb/cpp/utils.hpp -> build/lib。 linux-x86_64-3.10/rocksdb/cpp
复制rocksdb/cpp/write_batch_iter_helper.hpp -> build/lib.linux-x86_64-3.10/rocksdb/cpp 运行build_ext
cythoning rocksdb/_rocksdb.pyx 到rocksdb/_rocksdb.cpp
/tmp/pip-install-b8y7g4hs/python-rocksdb_b1c08993fd134ac4bc5​​9e6f5d18 .eggs/Cython-0.29.26-py3.10-linux-x86_64.egg/Cython/Compiler/Main.py:369:FutureWarning:未设置 Cython 指令“language_level”,目前使用 2 (Py2)。这将在以后的版本中更改!文件:/tmp/pip-install-b8y7g4hs/python-rocksdb_b1c08993fd134ac4bc5​​9e6f5d18bcd91/rocksdb/_rocksdb.pyx tree = Parsing.p_module(s, pxd, full_module_name) building 'rocksdb._rocksdb' extension 创建 build/temp.linux-x86_64-3.10
创建 build/temp.linux-x86_64-3.10/rocksdb gcc -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -fPIC -I/usr/local/include/python3.10 - c rockdb/_rocksdb.cpp -o build/temp.linux-x86_64-3.10/rocksdb/_rocksdb.o -std=c++11 -O3 -Wall -Wextra -Wconversion -fno-strict-aliasing -fno-rtti rocksdb/ _rocksdb.cpp:705:10:致命错误:rocksdb/slice.h:没有这样的文件或目录 705 | #include "rocksdb/slice.h" | ^~~~~~~~~~~~~~~~~ 编译终止。错误:命令 '/usr/bin/gcc' 失败,退出代码 1
---------------------------------- ------ python-rocksdb (setup.py) 的构建轮:完成状态“错误”错误:python-rocksdb 的构建轮失败

任何人都可以帮助我继续前进吗?我想在 Kubernetes 上使用来自 Docker 的 Faust。

0 投票
0 回答
9 浏览

python-3.x - 浮士德不喜欢相对路径

我正在尝试清理我的代码并将 models.py 文件移到顶层,因为除了浮士德之外的其他模块现在将使用它。文件夹结构如下(尽管为简单起见被删减)

fastapi_server.py需要agent_a.py访问models.py。如果我从App目录运行服务器,它可以正常工作。但是,当我尝试运行以下命令以从 App 目录启动 faust 代理时,它会返回No module named 'kafka.agent_a'错误:

奇怪的是,当我从一个完全不同的目录运行相同的命令时,它只包含 faust/kafka 的东西。报告错误可能会发生什么?

但还要注意,当我使用以下命令运行服务器时:

它根本不抱怨该模块。如果我尝试使用以下命令运行 faust 应用程序:

然后它抱怨模型不是模块。所以我对为什么一个python脚本运行正常而另一个没有运行完全感到困惑......但它确实在不同的目录中正常运行

我总是发现 python 中的导入很荒谬,让我无法理解,但这个更令人震惊。

0 投票
0 回答
89 浏览

python - 如何使用 python 向 kafka 生产者发送大消息?

如果我将最大的 Json 发送到 Kafka 服务器,它会显示这种错误,我该如何增加message.max.bytes=15728640replica.fetch.max.bytes=15728640在 Kafka 中。我试图增加字节级别,如下所示它不起作用

套接字服务器使用的发送缓冲区(SO_SNDBUF)

socket.send.buffer.bytes=15728640

套接字服务器使用的接收缓冲区(SO_RCVBUF)

socket.receive.buffer.bytes=15728640

错误:=>

0 投票
0 回答
22 浏览

apache-spark - Spark 流 - 在 2 个数据流中按时间戳查找最接近的记录

我已经加入了 2 个 kafka 流,它们在 spark 中流式传输 3 条记录/秒。

stream_id 时间 v1
1 1643627396.2 3
2 1643627396.2 5
1 1643627396.22 4
2 1643627396.22 6
1 1643627396.24 7
2 1643627396.23 3
1 1643627396.26 2
2 1643627396.27 3

对于从中选择的记录,stream_id = 1我必须按时间找到最接近的匹配 项stream_id = 2

例如。如果在这种情况下:

stream_id 时间 v1
2 1643627396.23 3
1 1643627396.26 2
2 1643627396.27 3

如果我们选择与 的记录stream_id = 1,则与最接近的匹配 stream_id = 2将具有1643627396.27作为timestamp值。

基于v1这两条记录,我必须计算差异:3-2=1并将该计算与时间戳一起存储在新数据框中,最终看起来像这样:

最终数据框如下所示:

time_stream_1 time_stream_2 v1_diff
1643627396.2 1643627396.2 2
1643627396.22 1643627396.22 2
1643627396.24 1643627396.23 4
1643627396.26 1643627396.27 1

基于如果值变得大于某个值,v1_diff我必须发出 API 命令。v1_diff

问题是如何在 spark 中迭代数据帧以执行这样的操作?我认为 Spark 并不是真的为此而构建的……也许吧。我需要一些关于下一步最佳的指示,卡夫卡,浮士德也许?

关于火花:也许我应该采取 0.35 秒的批处理窗口从两个数据帧中获取 1-2 条记录,如果 v1_diff 太大,则执行计算和火灾响应。想法是尽快做出反应,最好是在记录出现时立即做出反应。

文档中提到:

在内部,默认情况下,结构化流查询使用微批处理引擎处理,该引擎将数据流作为一系列小批量作业处理,从而实现低至 100 毫秒的端到端延迟和一次性容错保证. 但是,从 Spark 2.3 开始,我们引入了一种新的低延迟处理模式,称为Continuous Processing,它可以实现低至 1 毫秒的端到端延迟,并保证至少一次。在不更改查询中的 Dataset/DataFrame 操作的情况下,您将能够根据您的应用程序要求选择模式。

100ms由于差异很大并且很重要,我如何为我的案例实现连续处理1ms

0 投票
0 回答
19 浏览

python-3.x - 浮士德:无法异步处理多条消息

我是 faust 和 asyncio 的新手,我试图理解为什么这段代码最终会同步打印它的输出。

例如,我在问候主题中预先创建了 3 条消息,使用:

我假设我会先得到所有“开始”,然后 20 秒后得到所有“你好:......”,但我有这个:

有人可以帮助理解我做错了什么或者我的假设是否正确吗?谢谢 !

0 投票
0 回答
19 浏览

avro - faust Record 没有正确反序列化 avro Union 类型

我正在使用faust-streaming并将类python-schema-registry-client序列化为faust.Recordavro。

当我尝试反序列化两个复杂类型的 Union 时,faust 无法重建正确的记录,并且只提供字典而不是faust.Record类。

我有以下代码和架构:

如果我现在尝试序列化record然后反序列化它:

我明白了:

而我希望得到这个:

如果我删除 Union 类型并更改架构,以便我可以拥有一个faust.Record只有这个字段的:Optional[Company],我确实得到了正确的反序列化。

0 投票
2 回答
20 浏览

python - 如何使用 Faust 向 kafka 主题发送无值(墓碑)

我正在尝试使用 Faust 将数据从 Kafka 中的一个主题发送到另一个主题。如果原始主题中的值为 None (消息是墓碑),我将带有 None 值的当前键发送到目标主题。

我希望它成为目标主题中的墓碑,但事实并非如此。它有一个删除标题和以下值:

"ERROR" :{ "message":"src 属性必须是有效的 json 对象" }

我是浮士德的新手,所以我可能遗漏了一些东西......有没有办法用它发送墓碑?