问题标签 [python-trio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
47 浏览

python-3.x - 我正在使用 Trio,如何获取已经打开的套接字的 IP 地址?

这是一个简单的问题,我相信有一个解决方案。我正在尝试获取已连接到的套接字的 IP 地址。我使用 trio.serve_tcp(handler, port) 创建了一个服务器,现在我想知道是否有人与它建立了数千个连接,但我需要知道如何获取已接受连接的 IP 地址。

0 投票
1 回答
197 浏览

python - 将带有 websockets 的 Quart 项目从 asyncio 迁移到 trio

我正在尝试将我的 asyncio 项目转换为 trio。我知道我必须使用内存通道而不是队列,但由于某种原因,我没有得到我期望的结果。

我的主要问题是,当我运行两个客户端时,如果第二个客户端离开,第一个客户端不会收到通知(从服务器广播“部分”消息会引发错误)。另一个问题是,有时客户端在打开 websocket 时会立即退出。当我使用 asyncio 时,一切正常。

这是我在第二个客户端断开连接时得到的堆栈跟踪:

这是代码(设置TRIOFalse使用 asyncio):

服务器.py

客户端.py

编辑:我使用 anyio 进行了测试,虽然 anyio+trio 的行为相同,但 anyio+asyncio 重现了问题(无任何例外)。所以我猜它来自队列替换。

0 投票
2 回答
101 浏览

concurrency - 如何在 CPU 内核上执行函数,并在完成后获得回调?

如何在 CPU 内核上执行函数,并在完成后获得回调?


语境

我正在接收一个流:

我需要使用多个 CPU 内核来加速它。

handler['ABC'](eg) 持有状态,但它与 (eg) 的状态不相交handler['DFG']

基本上我不能同时运行 2 个内核,例如handler['ABC'].


到目前为止我的方法

我提出了以下解决方案,但它是部分伪代码,因为我看不到如何实现它。

所以我的问题是:如何将最后一条语句转换为工作 Python 代码?

PS更一般地说,我的方法是最优的吗?

0 投票
0 回答
53 浏览

python - 无法通过键盘中断 trio/asyncio 程序 (ctrl+c) 退出

以下代码有效,但无法在 CTRL+c 时退出

谁能解释为什么,以及如何解决?

这是 asyncio 代码,但我使用 trio-asyncio(前瞻性规划)来允许我也使用 trio 代码。

我已经放置except KeyboardInterrupt在 2 个地方,尽管它似乎什么也没做。

0 投票
1 回答
22 浏览

python - 可靠地检测到没有更多的数据包到来

我正在编写一个 TCP SYN 端口扫描器。我启动了 15 个任务,每个任务将 64K / 15 个数据包发送到 64K 端口,并且我有一个位于循环中并打印响应的接收器。然而,一些数据包没有返回,使得接收者永远在循环中等待。

我正在寻找一种方法来可靠地检测到没有更多的数据包到来,以便我可以重试获取丢失的数据包。

0 投票
1 回答
42 浏览

python - 从 AF_PACKET 套接字获取重复数据包

我在使用AF_PACKET套接字时遇到了一个奇怪的问题,我想知道这是否是由于可能trio代表我的滥用行为,还是其他原因。

我正在编写一个 TCP SYN 端口扫描器,目标是使其具有高性能。假设我要扫描 64K 端口;程序将把这个间隔分成 8 个 8K 的间隔,并且对于每个间隔,产生一个multiprocessing.Processtrio.run作为参数。在这个过程中,由于使用了异步套接字,一切都是异步的。说到套接字,我正在做我自己的过滤BPF——它是一个简单的过滤器,只允许来自目标并发往侦听端口的数据包通过。

然而,问题是我recv()似乎不止一次地随机数据包,我不知道为什么会这样。我已经使用 Wireshark 检查了流量,但没有看到重复的数据包,所以我认为问题一定出在程序的某个地方。

我正在寻找有关为什么会发生这种情况的见解,以及有关如何克服该问题的建议。

我也不确定这种 nmultiprocessing.Process运行 ntrio事件循环的设计是否被认为是可取的。

如果有人想看,我附上代码:

谢谢。

0 投票
3 回答
186 浏览

python - 用于异步计算/获取的 Python Asyncio/Trio

我正在寻找一种方法来有效地从磁盘中获取一大块值,然后对该块执行计算/计算。我的想法是一个 for 循环,它首先运行磁盘获取任务,然后对获取的数据运行计算。我想让我的程序在运行计算时获取下一批,这样我就不必在每次计算完成时等待另一个数据获取。我预计计算将比从磁盘获取数据花费更长的时间,并且由于单个计算任务已经将 cpu 使用率固定在接近 100%,因此可能无法真正并行完成。

我在下面使用 trio 的 python 中提供了一些代码(但也可以与 asyncio 一起使用以达到相同的效果)来说明我使用异步编程执行此操作的最佳尝试:

此代码将通过生成 30 个随机矩阵来模拟从磁盘获取数据,这会使用少量 cpu。然后它将对生成的矩阵执行矩阵求逆,该矩阵使用 100% cpu(在 numpy 中使用 openblas/mkl 配置)。我通过对同步和异步操作计时来比较运行任务所花费的时间。

据我所知,这两个作业完成的时间完全相同,这意味着异步操作并没有加快执行速度。观察每个计算的行为,顺序操作按顺序运行提取和计算,异步操作首先运行所有提取,然后是所有计算。

有没有办法使用异步获取和计算?也许有期货或类似收集()的东西?Asyncio 具有这些功能,而 trio 将它们放在单独的包trio_future中。我也对通过其他方法(线程和多处理)的解决方案持开放态度。

我相信可能存在一种多处理解决方案,可以使磁盘读取操作在单独的进程中运行。但是,进程间通信和阻塞就变得很麻烦,因为由于内存限制,我需要某种信号量来控制一次可以生成多少块,并且多处理往往非常繁重和缓慢。

编辑

谢谢VPfB的回答。我无法在操作中休眠(0),但我认为即使我这样做了,它也必然会阻止计算以支持执行磁盘操作。我认为这可能是 python 线程和 asyncio 的一个硬限制,它一次只能执行 1 个线程。如果除了等待一些外部资源从您的 CPU 响应之外,两个不同的进程都需要任何东西,那么同时运行两个不同的进程是不可能的。

也许有一种方法可以使用多处理池的执行器。我在下面添加了以下代码:

尽管计时,它仍然需要与同步示例相同的时间。我认为我将不得不采用一个更复杂的解决方案,因为似乎 async 和 await 对于正确执行此类任务切换的工具来说太粗糙了。

0 投票
1 回答
202 浏览

python - 将 anyio.TaskGroup 与 fastapi.StreamingResponse 一起使用

anyio是 FastAPI 的一部分starlette,因此也是 FastAPI 的一部分。我发现使用它的任务组对我的一个 API 服务器之外的外部服务执行并发请求非常方便。

另外,我想在结果准备好后立即发布。fastapi.StreamingResponse可以解决问题,但我仍然需要能够在返回后保持任务组正常运行StreamingResponse,但这听起来与结构化并发的想法背道而驰。

使用异步生成器可能看起来是一个明显的解决方案,但yield通常不能在任务组上下文中使用,根据此:https ://trio.readthedocs.io/en/stable/reference-core.html#cancel-范围和托儿所

有一个 FastAPI 服务器的示例似乎可以工作,尽管它会在返回之前聚合响应:

所以,问题是,是否有类似于@trio_util.trio_async_generatorin 的东西anyio,或者是否可以@trio_util.trio_async_generator直接与 FastAPI 一起使用?

也许还有其他解决方案?

0 投票
0 回答
69 浏览

python - 对 ipywidget 事件做出反应,同时单元格在 `await` 中被阻塞(使用 `%autoawait` 魔法)

我想等待异步 jupyter 小部件事件,而单元被阻止执行异步代码(因此,通常是空闲的,等待 I/O,利用%autowait魔法)。不幸的是,当单元格阻塞时,jupyter notebook 似乎没有发送任何 traitlet 事件。上面的文档通过将回调发送到“后台”来规避这一点,因此它可以在单元格完成执行后做出反应。我想避免这种情况,因为它使对后台任务的推理变得比需要的更难;这恰恰违背了我最喜欢的 python 异步事件循环的原则:trio。有没有办法让它工作?

这是适用于 trio 的 jupyter 文档中的示例(尽管它与 asyncio 以相同的方式失败):

0 投票
1 回答
175 浏览

python - 使用 trio Nursery 作为带有 FastAPI 的 Sever Sent Events 的生成器?

我正在尝试使用 FastAPI 构建一个服务器发送事件端点,但我不确定我想要完成的事情是否可行或者我将如何去做。

问题简介

基本上假设我有一个run_task(limit, task)异步函数,它发送异步请求、进行事务或类似的东西。假设每个任务run_task都可以返回一些 JSON 数据。

我想run_task(limit, task)异步运行多个任务(多个),为此我使用三重奏和托儿所,如下所示:

最后,我想通过 FastAPI 端点返回每个任务的结果

起初,我只是创建了一个包含一个列表的对象,并将该对象(通过引用)传递给 each run_task,当任务完成时,我会将 JSON 数据作为字典推送,并通过端点返回整个对象任务完成了。

这可行,但我发现效率低下,发送请求的客户端需要等待所有任务完成才能显示数据,但是,某些任务可能会很慢,这意味着从其他任务获取的数据最终会停滞不前.

我想完成什么

每当任务完成时,我希望 API 直接返回所述任务的数据(我之前会添加到对象中),以便客户端可以实时显示所述数据。

那时我发现了服务器发送事件和 Web 套接字是什么。服务器发送的事件似乎是解决我的问题的合适方法,因为我不需要双向通信。

由于 FastAPI 是在 Starlette 上构建的,我决定使用sse-Starlette来构建一个带有服务器发送事件的端点,为此我需要像这样构建一个端点

实际问题

顾名思义status_event_generator, sse-starlette 需要返回一个事件生成器,这就是我卡住的地方。我希望生成器在完成时产生任务的数据(以便客户端可以实时接收每个任务的数据),但是,这些任务在异步三重奏托儿所内,所以我不确定如何继续

根据异步生成器功能中的托儿所内部产生是否不好?,看来(如果我理解正确的话)我不能只投入收益run_task(limit, task)并期望它起作用