0

尝试在新的 tokio 线程中调用异步函数会导致某些函数出错。

在这个最小的例子中,使用了 crates tokio 和 iota-streams。方法 send_announce() 是异步的并返回一个地址。等待此方法会导致编译错误,说明未实现 std::Marker::Send 特征

dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>

至于我的理解,问题在于缺少一个或多个 Sync/Send 特征实现,并且能够在线程之间传递数据 Rust 需要整个链来实现 Sync 和 Send。

文档指出,上述结构实现了 Sync 和 Send 作为自动特征:(iota_streams_core::Error, wrap::Context, TangleAddress, BinaryBody, command::sizeof::Context, KeccakF1600 ...)

在主线程中调用相同的函数可以正常工作。

我尝试将来自 send_announce() 的结果包装到一个 Box 中,实现 Send trait unsafe 并将响应包装到一个结构中......,而不改变编译错误。

在这种情况下,动态的未来反应似乎是有问题的。我是 rust 新手,我将不胜感激我能得到的关于如何解决这个问题的每一个帮助或想法。这种方法甚至可能吗?

我的程序应该通过调用调用并在单独的线程中处理请求。在此线程内,例如生成此公告链接。

所示示例是将问题减少到重要部分的最小示例。在 Ubuntu 上进行了 rust-stable 和 nightly 测试。

// main.rs
use iota_streams::{
    app::transport::tangle::client::Client,
    app_channels::api::tangle::{Author, ChannelType},
    core::Result,
};
use rand::Rng;

#[tokio::main]
async fn main() -> Result<()> {
    //
    // This works fine
    //
    let seed = generate_seed();
    let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
    let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
    //
    // No error occurs here
    //
    let announcement_link = author.send_announce().await?;
    //
    // Spawn new thread
    //
    tokio::spawn(async move {
        let seed = generate_seed();
        let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
        //
        // Error occurs here
        //
        let announcement_link = author.send_announce().await?;
        Ok(())
    });

    Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
    let seed: String = (0..81)
        .map(|_| { ALPH9
                .chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
        }).collect::<String>();
    seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"

[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch  = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `teststh` due to 4 previous errors
4

1 回答 1

0

返回的未来author.send_announce()不 impl Send,所以你不能在tokio::spawn().

您可以尝试使用tokio::task::LocalSetwhich 可以让您生成非Send期货tokio::task::spawn_local。这通过运行您在创建LocalSet.

相反,如果您想在线程池上生成非Send期货,您可以使用tokio_util::task::LocalPoolHandle,它通过将工作分配到给定数量的操作系统线程上来工作,每个线程都有自己的LocalSet.

于 2022-03-05T12:50:16.983 回答