0

我有这个代码:

extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.17
use futures::future::ok as fut_ok;
use futures::Future;
use tokio::runtime::Builder;

use std::sync::{Arc, Mutex};
extern crate serde_json;

type Error = ();

fn questions_data(
    val: i32,
) -> Box<Future<Item = serde_json::Value, Error = actix_web::error::Error>> {
    let f = std::fs::read_to_string("auth_token").unwrap();
    let token = f.trim();
    use actix_web::{client, HttpMessage};
    use std::time::Duration;
    Box::new(
        client::ClientRequest::get(
            "https://jsonplaceholder.typicode.com/todos/".to_owned() + &val.to_string(),
        )
        .header(
            actix_web::http::header::AUTHORIZATION,
            "Bearer ".to_owned() + token,
        )
        .finish()
        .unwrap()
        .send()
        .timeout(Duration::from_secs(30))
        .map_err(actix_web::error::Error::from) // <- convert SendRequestError to an Error
        .and_then(|resp| {
            resp.body().limit(67_108_864).from_err().and_then(|body| {
                let resp: serde_json::Value = serde_json::from_slice(&body).unwrap();
                fut_ok(resp)
            })
        }),
    )
}

fn main() {
    let num_workers = 8;

    let mut core = Builder::new().core_threads(num_workers).build().unwrap();

    let results = Arc::new(Mutex::new(Vec::new()));
    for n in 1..100 {
        let res = results.clone();
        core.spawn(questions_data(n).map(move |n| {
            res.lock().unwrap().push(n);
        }));
    }
    core.shutdown_on_idle().wait().unwrap();
    let data = results.lock().unwrap();
    println!("{:?}", *data);
}
[dependencies]
futures = "0.1.25"
tokio-core = "0.1.17"
futures-util = "0.2.1"
tokio = "0.1.11"
rand = "0.6.0"
actix-web = "0.7.14"
actix = "0.7.6"
env_logger = "0.6.0"
serde_json = "1.0.33"

我得到错误时cargo run

error[E0271]: type mismatch resolving `<std::boxed::Box<futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>> as futures::Future>::Error == ()`
  --> src/main.rs:52:14
   |
52 |         core.spawn(Box::new(questions_data(n).map(move |n| {
   |              ^^^^^ expected struct `actix_web::Error`, found ()
   |
   = note: expected type `actix_web::Error`
              found type `()`

error[E0277]: `dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>` cannot be sent between threads safely
  --> src/main.rs:52:14
   |
52 |         core.spawn(Box::new(questions_data(n).map(move |n| {
   |              ^^^^^ `dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>` cannot be sent between threads safely
   |
   = help: the trait `std::marker::Send` is not implemented for `dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>`
   = note: required because it appears within the type `std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>`
   = note: required because it appears within the type `futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>>`
   = note: required because it appears within the type `std::boxed::Box<futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>>`

类似的代码,无需运行 actix-web 客户端,工作https://play.rust-lang.org/?version=stable&mode=debug&edition=2015&gist=e81185a73fcb40a3426875573c78accd

编辑:

也许类似map_err(|_| ())但不知道如何应用:

4

1 回答 1

0

如前所述rustc,这里有两个错误:

  1. 首先是关于错误类型不匹配。map_err正如您已经注意到的那样,它可以修复:

    questions_data(n)
        .map(move |n| {
            res.lock().unwrap().push(n);
        })
        .map_err(|e| println!("an error occurred: {}", e))
    
  2. 第二个(关于Send标记)有点令人困惑。基本上这意味着tokio::Runtime::spawn希望它的论点未来也是Send + 'static因为 tokio 可以将它移动到另一个线程。但是你的 future 不能在线程之间安全地发送,因为它dyn futures::Stream<Item=bytes::bytes::Bytes, Error=actix_web::Error> + 'static在类型中包含不可发送的类型 ( ) actix_web::client::ClientRequest

修复第二个错误的最简单方法可能是使用actix::spawn函数代替tokio::Runtime::spawn方法:它只需要'static对其参数的标记。

在这些更改之后,您的程序至少可以编译。

于 2018-11-24T10:41:27.937 回答