问题标签 [rust-tokio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
698 浏览

rust - 将新任务添加到 tokio 事件循环并在失败时重试任务

我正在尝试编写一个可以从同一服务器执行获取请求的 tokio 事件循环,具有以下特征:

  • 应该使用连接池
  • get请求一般比较慢(>1s),所以需要并行执行
  • 服务器可能没有响应,所以我需要超时。如果未收到请求,请再次发送
  • 轮询接收器以获取必须下载的新 URL。它们应该被添加到事件循环中

到目前为止,在我的尝试中,我已经设法让这 4 个项目的不同组合起作用,但从来没有一起工作。我的主要问题是我不太明白如何向 tokio 事件循环添加新的期货。

我假设我需要使用loop_fn轮询接收器的主循环并handle.spawn生成新任务?handle.spawn只允许未来Result<(),()>,所以我不能使用它的输出在失败时重新生成作业,所以我需要将重试检查移到那个未来?

以下是批量接受和处理 url 的尝试(因此没有连续轮询),并且有超时(但没有重试):

loop_fn可悲的是,我的尝试失败了。

0 投票
1 回答
387 浏览

rust - 从 Tokio Core 导入通道模块失败

在 lib.rs 我有这个:

最终出现此错误:

查看tokio_core crate 的 lib.rs 文件,它导出channel如下:

对于我的生活,我无法弄清楚为什么这不起作用。我在 Rust 1.29 和 1.30.1 上都试过这个。

0 投票
1 回答
472 浏览

asynchronous - 如何在 Tokio 未来链的多个分支中使用 TcpStream?

我有一个 Rust Tokio TCP 服务器。每个客户端都由 Tokio 未来链处理,如下所示:

问题是我不能TcpStream在链的多个分支中使用相同的,因为tokio::io::write_all消耗流,即使它应该以顺序方式使用。根据是否存在(例如,数据库错误)发送不同的数据至关重要。

我该如何克服这个问题?也许有不同的API?

0 投票
1 回答
717 浏览

rust - 使用 Futures 和 Tokio 进行 Rust 并发执行

我有一些目前看起来像这样的 Rust 代码

它使用 tokio 和 futures,目的是运行一些“cpu 繁重”工作(由sleep_for函数模拟),然后将一些东西输出到stdout.

当我运行它时,一切似乎都很好,我得到了这个输出

带有该值的第一个输出2完全符合预期,我看到 200 毫秒后打印的时间戳。但是对于下一个输入,很明显该sleep_for函数是按顺序执行的,而不是同时执行的。

我想看到的输出是

似乎要获得我正在寻找的输出,我想同时sleep_for(10)执行sleep_for(1)。我将如何使用期货和 tokio 在 Rust 中执行此操作?

(注意:时间戳的实际值并不重要,我更多地使用它们来显示程序中的执行顺序)

0 投票
1 回答
733 浏览

asynchronous - 我如何 read_until 未来链中的 tokio::net::TcpStream ?

我想从中读取数据,TcpStream直到遇到“\ 0”。问题是tokio::io::read_until需要流是BufRead.

我怎样才能从TcpStream这种方式读取数据?

0 投票
2 回答
981 浏览

file - 如何在 Rust 中异步计算硬盘上文件的校验和?

我在 Rust / Tokio 堆栈中有一个 TCP 文件服务器。

当客户端上传文件时,数据正在从 a 读取tokio::net::TcpStream并写入 a futures_fs::FsWriteSink,它已在单独的futures_fs::FsPool.

文件完全上传后,我需要通过检查其校验和与客户端发送的校验和来检查其一致性。

异步计算校验和的最简单方法是什么,尤其是在文件不适合 RAM 的情况下?

0 投票
2 回答
4479 浏览

asynchronous - 如何优雅地关闭 Tokio 运行时以响应 SIGTERM?

我有一个main函数,我在其中创建一个 Tokio 运行时并在其上运行两个期货。

我如何收到一个SIGTERM,等待所有未完成NotReady的任务并退出应用程序?

0 投票
0 回答
474 浏览

asynchronous - Rust 中的异步连接池实现

我有一个 Tokio TCP 后端应用程序,简单地说,在收到请求后,它会从 Redis 读取一些内容,向 PostgreSQL 写入一些内容,通过 HTTP 上传一些内容,向 RabbitMQ 发送一些内容等。处理每个请求需要很长时间,所以为每个请求创建一个单独的任务。由于在异步模型中共享连接是不可能的,因此需要一些连接池。目前,每个请求都会建立新的连接,这非常过分。

我一直在寻找 Rust 中的异步连接池实现,但没有找到最新的。

我想听听一些关于如何自己实施它的建议。

我想出的唯一想法是:

  1. 实现一个Stream/Sink具有内部连接集合的对象。它是 LIFO 还是 FIFO 无关紧要,因为连接是相同的。在应用程序启动时,分配了 N 个连接。
  2. 现在我不确定是否可以在任务之间共享这样一个池,但如果可能的话,任务将轮询流以获取连接实例(而不是建立自己的连接实例),使用它,然后放回去。
  3. 如果没有可用的连接,则流可能会建立更多连接或要求任务挂起(取决于其配置)。
  4. 如果连接失败,它将被丢弃,并且池现在包含 N-1 个连接,因此它可能决定在下一个请求时分配一个新连接。

所以我有两个问题,我无法在任何地方找到正确的答案:

  1. 我必须/可以/应该以某种方式在任务之间共享流/接收池吗?无论如何,我在箱子里看到了一些Shared未来。futures

  2. tokio/futures 教程中有一些令人沮丧的地方。例如,它没有解释我如何通知最上面的任务,也就是说,我如何实现神话中的最里面的未来,它本身不汇集任何东西,但仍然必须通知上层的未来。

还是我的方法完全错误?我可以自己开始玩它,但我强烈怀疑我错过了一些东西,例如一键式解决方案。

0 投票
2 回答
1080 浏览

rust - 发送到数组中的每个 futures::sync::mpsc::Sender

我有一个动态集合futures::sync::mpsc::Sender,我想为每个传入连接向他们每个人发送一条消息。

我可以使用它UnboundedSender,因为我可以这样做(见下文)但Sender会消耗自己,所以我需要将其删除并重新插入Vec发送后。我怎样才能做到这一点?如果Sender阻塞,它不应该发送更多消息,而是切换到处理接收器上的传入连接。

UnboundedSender实现如下,否则我失败的尝试会被内联注释掉(只需将前面的行替换为注释掉的行)

UnboundedSender (作品)

发件人(无效)

0 投票
0 回答
225 浏览

rust - 带有多队列的 tokio 有时会挂起,有时会工作

我正在尝试用 tokio 对 crate多队列Stream进行基准测试,以通过制作可以迭代的 s来实现发布者/订阅者的某些东西。我不相信效率(我可能需要数十或数百个侦听器来过滤项目,并且单个发布者将每毫秒发布大约 10 条消息),所以我想在我承诺之前对方法进行基准测试它。但是,现在,我遇到了一个奇怪的错误,有时tokio::timer::Interval似乎根本没有触发。

完整代码如下:

我正在运行它cargo benchfutures在版本上"0.1"tokio在版本上"0.1"multiqueue在版本上"0.3"

有时,整个测试以许多“[0-2] Got a num! 100”和“Sending 100”的消息完成,但有时它会挂在中间(在几个“Sending”和“Got a”消息之后)或仅挂起 3 条“开始”消息。

我怀疑这可能是我可以同时运行的任务数量的问题tokio,但我真的不明白为什么这会是我遇到的限制,因为我正在生成两种类型的任务时间经常给执行人。

我怎样才能使它更可靠?