问题标签 [rust-tokio]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
rust - 将新任务添加到 tokio 事件循环并在失败时重试任务
我正在尝试编写一个可以从同一服务器执行获取请求的 tokio 事件循环,具有以下特征:
- 应该使用连接池
- get请求一般比较慢(>1s),所以需要并行执行
- 服务器可能没有响应,所以我需要超时。如果未收到请求,请再次发送
- 轮询接收器以获取必须下载的新 URL。它们应该被添加到事件循环中
到目前为止,在我的尝试中,我已经设法让这 4 个项目的不同组合起作用,但从来没有一起工作。我的主要问题是我不太明白如何向 tokio 事件循环添加新的期货。
我假设我需要使用loop_fn
轮询接收器的主循环并handle.spawn
生成新任务?handle.spawn
只允许未来Result<(),()>
,所以我不能使用它的输出在失败时重新生成作业,所以我需要将重试检查移到那个未来?
以下是批量接受和处理 url 的尝试(因此没有连续轮询),并且有超时(但没有重试):
loop_fn
可悲的是,我的尝试失败了。
rust - 从 Tokio Core 导入通道模块失败
在 lib.rs 我有这个:
最终出现此错误:
查看tokio_core crate 的 lib.rs 文件,它导出channel
如下:
对于我的生活,我无法弄清楚为什么这不起作用。我在 Rust 1.29 和 1.30.1 上都试过这个。
asynchronous - 如何在 Tokio 未来链的多个分支中使用 TcpStream?
我有一个 Rust Tokio TCP 服务器。每个客户端都由 Tokio 未来链处理,如下所示:
问题是我不能TcpStream
在链的多个分支中使用相同的,因为tokio::io::write_all
消耗流,即使它应该以顺序方式使用。根据是否存在(例如,数据库错误)发送不同的数据至关重要。
我该如何克服这个问题?也许有不同的API?
rust - 使用 Futures 和 Tokio 进行 Rust 并发执行
我有一些目前看起来像这样的 Rust 代码
它使用 tokio 和 futures,目的是运行一些“cpu 繁重”工作(由sleep_for
函数模拟),然后将一些东西输出到stdout
.
当我运行它时,一切似乎都很好,我得到了这个输出
带有该值的第一个输出2
完全符合预期,我看到 200 毫秒后打印的时间戳。但是对于下一个输入,很明显该sleep_for
函数是按顺序执行的,而不是同时执行的。
我想看到的输出是
似乎要获得我正在寻找的输出,我想同时sleep_for(10)
执行sleep_for(1)
。我将如何使用期货和 tokio 在 Rust 中执行此操作?
(注意:时间戳的实际值并不重要,我更多地使用它们来显示程序中的执行顺序)
asynchronous - 我如何 read_until 未来链中的 tokio::net::TcpStream ?
我想从中读取数据,TcpStream
直到遇到“\ 0”。问题是tokio::io::read_until
需要流是BufRead
.
我怎样才能从TcpStream
这种方式读取数据?
file - 如何在 Rust 中异步计算硬盘上文件的校验和?
我在 Rust / Tokio 堆栈中有一个 TCP 文件服务器。
当客户端上传文件时,数据正在从 a 读取tokio::net::TcpStream
并写入 a futures_fs::FsWriteSink
,它已在单独的futures_fs::FsPool
.
文件完全上传后,我需要通过检查其校验和与客户端发送的校验和来检查其一致性。
异步计算校验和的最简单方法是什么,尤其是在文件不适合 RAM 的情况下?
asynchronous - 如何优雅地关闭 Tokio 运行时以响应 SIGTERM?
我有一个main
函数,我在其中创建一个 Tokio 运行时并在其上运行两个期货。
我如何收到一个SIGTERM
,等待所有未完成NotReady
的任务并退出应用程序?
asynchronous - Rust 中的异步连接池实现
我有一个 Tokio TCP 后端应用程序,简单地说,在收到请求后,它会从 Redis 读取一些内容,向 PostgreSQL 写入一些内容,通过 HTTP 上传一些内容,向 RabbitMQ 发送一些内容等。处理每个请求需要很长时间,所以为每个请求创建一个单独的任务。由于在异步模型中共享连接是不可能的,因此需要一些连接池。目前,每个请求都会建立新的连接,这非常过分。
我一直在寻找 Rust 中的异步连接池实现,但没有找到最新的。
我想听听一些关于如何自己实施它的建议。
我想出的唯一想法是:
- 实现一个
Stream/Sink
具有内部连接集合的对象。它是 LIFO 还是 FIFO 无关紧要,因为连接是相同的。在应用程序启动时,分配了 N 个连接。 - 现在我不确定是否可以在任务之间共享这样一个池,但如果可能的话,任务将轮询流以获取连接实例(而不是建立自己的连接实例),使用它,然后放回去。
- 如果没有可用的连接,则流可能会建立更多连接或要求任务挂起(取决于其配置)。
- 如果连接失败,它将被丢弃,并且池现在包含 N-1 个连接,因此它可能决定在下一个请求时分配一个新连接。
所以我有两个问题,我无法在任何地方找到正确的答案:
我必须/可以/应该以某种方式在任务之间共享流/接收池吗?无论如何,我在箱子里看到了一些
Shared
未来。futures
tokio/futures 教程中有一些令人沮丧的地方。例如,它没有解释我如何通知最上面的任务,也就是说,我如何实现神话中的最里面的未来,它本身不汇集任何东西,但仍然必须通知上层的未来。
还是我的方法完全错误?我可以自己开始玩它,但我强烈怀疑我错过了一些东西,例如一键式解决方案。
rust - 发送到数组中的每个 futures::sync::mpsc::Sender
我有一个动态集合futures::sync::mpsc::Sender
,我想为每个传入连接向他们每个人发送一条消息。
我可以使用它UnboundedSender
,因为我可以这样做(见下文)但Sender
会消耗自己,所以我需要将其删除并重新插入Vec
发送后。我怎样才能做到这一点?如果Sender
阻塞,它不应该发送更多消息,而是切换到处理接收器上的传入连接。
这UnboundedSender
实现如下,否则我失败的尝试会被内联注释掉(只需将前面的行替换为注释掉的行)
UnboundedSender (作品)
发件人(无效)
rust - 带有多队列的 tokio 有时会挂起,有时会工作
我正在尝试用 tokio 对 crate多队列Stream
进行基准测试,以通过制作可以迭代的 s来实现发布者/订阅者的某些东西。我不相信效率(我可能需要数十或数百个侦听器来过滤项目,并且单个发布者将每毫秒发布大约 10 条消息),所以我想在我承诺之前对方法进行基准测试它。但是,现在,我遇到了一个奇怪的错误,有时tokio::timer::Interval
似乎根本没有触发。
完整代码如下:
我正在运行它cargo bench
。futures
在版本上"0.1"
,tokio
在版本上"0.1"
,multiqueue
在版本上"0.3"
。
有时,整个测试以许多“[0-2] Got a num! 100”和“Sending 100”的消息完成,但有时它会挂在中间(在几个“Sending”和“Got a”消息之后)或仅挂起 3 条“开始”消息。
我怀疑这可能是我可以同时运行的任务数量的问题tokio
,但我真的不明白为什么这会是我遇到的限制,因为我正在生成两种类型的任务时间经常给执行人。
我怎样才能使它更可靠?