17

编者注:这个问题来自 Rust 1.0 之前的版本,使用了 Rust 1.0 代码中不存在的术语和函数。表达的概念仍然相关。

我需要通过 Rust 程序中的 POSIX 文件描述符读取外部进程提供的数据。文件描述符连接保持了很长时间(几小时),另一方不时将数据传递给我。我需要连续读取和处理数据流。

为此,我编写了一个循环调用libc::read()readv实际上)读取数据并在收到数据时对其进行处理。由于这会阻塞整个调度程序,我将在新调度程序 ( task::spawn_sched(SingleThreaded)) 上生成一个任务。只要它运行,它就可以正常工作,但我找不到干净地关闭循环的方法。

由于循环大部分时间都在阻塞,我不能使用端口/通道来通知循环退出。

我试图通过使用失败的链接任务将其取消来终止循环任务(生成受监督的循环任务,在其中生成一个链接任务并等待端口上的信号发生,然后再fail!()使用它来取消循环任务) . 它在测试中运行良好,但libc::read()不会中断(任务在读取完成之前不会失败,并且task::yield()有时会命中。

我从 libcore 源代码中学到了很多东西,但我似乎找不到合适的解决方案。

  1. 有没有办法杀死 Rust 中的(子)任务,即使它正在执行一些长的外部函数调用,比如阻塞读取?
  2. 有没有办法对 POSIX 文件描述符进行非阻塞读取,以便 Rust 保持对任务的控制?
  3. 我如何对信号做出反应,例如,SIGTERM如果用户终止我的程序?Rust中似乎还没有类似sigaction()的东西。
4

2 回答 2

1
  1. 根据mozila的说法,现在不再可能杀死任务,更不用说阻止读取了。
  2. 之后可以这样做mozilla/rust/pull/11410,另请参阅我的另一个关于 rust-zmq 的问题报告,erickt/rust-zmq/issues/24这也取决于此。(对不起链接)
  3. 也许信号监听器会为你工作。
于 2014-03-18T05:34:40.027 回答
0

有没有办法杀死 Rust 中的(子)任务,即使它正在执行一些长的外部函数调用,比如阻塞读取?

不。

也可以看看:

有没有办法进行非阻塞读取 [...] 以便 Rust 保持对任务的控制?

是的。

也可以看看:

在 POSIX 文件描述符上

是的。

也可以看看:

我如何对信号做出反应

决定你想要的平台支持,然后选择一个合适的 crate。

也可以看看:

把它们放在一起

use future::Either;
use signal_hook::iterator::Signals;
use std::os::unix::io::FromRawFd;
use tokio::{fs::File, io, prelude::*};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

fn main() -> Result<()> {
    let signals = Signals::new(&[signal_hook::SIGUSR1])?;
    let signals = signals.into_async()?;

    let input = unsafe { std::fs::File::from_raw_fd(5) };
    let input = File::from_std(input);
    let lines = io::lines(std::io::BufReader::new(input));

    let signals = signals.map(Either::A);
    let lines = lines.map(Either::B);

    let combined = signals.select(lines);

    tokio::run({
        combined
            .map_err(|e| panic!("Early error: {}", e))
            .for_each(|v| match v {
                Either::A(signal) => {
                    println!("Got signal: {:?}", signal);
                    Err(())
                }
                Either::B(data) => {
                    println!("Got data: {:?}", data);
                    Ok(())
                }
            })
    });

    Ok(())
}

货运.toml

[package]
name = "future_example"
version = "0.1.0"
authors = ["An Devloper <an.devloper@example.com>"]
edition = "2018"

[dependencies]
tokio = "0.1.22"
signal-hook = { version = "0.1.9", features = ["tokio-support"] }

垫片

#!/bin/bash

set -eu

exec 5< /tmp/testpipe
exec ./target/debug/future_example

执行

cargo build
mkfifo /tmp/testpipe
./shim.sh

另一个终端

printf 'hello\nthere\nworld' > /tmp/testpipe
kill -s usr1 $PID_OF_THE_PROCESS
于 2019-07-06T17:56:04.377 回答