1

我正在尝试构建一个定义通用数据源的库,该数据源可以同步和异步地从各种源中提取数据。在构建异步片段时,我遇到了以下编译问题,我不知道如何解决:

这是我的简化代码(游乐场链接

extern crate futures; // futures = "0.1.24"
extern crate tokio; // tokio = "0.1.8"
extern crate serde_json;

use futures::Future;
use serde_json::Value;

use std::collections::HashMap;

trait DataSource {
    type Data;

    fn read_async(&self, Option<HashMap<String, Value>>) -> Box<futures::Future<Item=Option<Self::Data>, Error=String>> 
        where Self::Data: 'static + Send;
}

struct DataSourceImpl;
impl DataSource for DataSourceImpl {
    type Data = Vec<String>;

    fn read_async(&self, _params: Option<HashMap<String, Value>>) -> Box<futures::Future<Item=Option<Self::Data>, Error=String>> 
        where Self::Data: 'static + Send 
    {
        Box::new(futures::future::ok(Some(vec!["some data".to_string()])))
    }

}

fn main() {
    let datasource = DataSourceImpl{};

    let params = HashMap::new();
    tokio::run(datasource.read_async(Some(params))
        .map(|content| {
            println!("Content read = {:?}", &content);
            ()
        })
        .map_err(|err| {
            println!("Error {}", &err);
            ()
        })
    );
}

我收到以下编译错误:

error[E0277]: `dyn futures::Future<Item=std::option::Option<std::vec::Vec<std::string::String>>, Error=std::string::String>` cannot be sent between threads safely
  --> src/main.rs:45:13
   |
45 |     runtime.spawn(future);
   |             ^^^^^ `dyn futures::Future<Item=std::option::Option<std::vec::Vec<std::string::String>>, Error=std::string::String>` cannot be sent between threads safely
   |
   = help: the trait `std::marker::Send` is not implemented for `dyn futures::Future<Item=std::option::Option<std::vec::Vec<std::string::String>>, Error=std::string::String>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn futures::Future<Item=std::option::Option<std::vec::Vec<std::string::String>>, Error=std::string::String>>`
   = note: required because it appears within the type `std::boxed::Box<dyn futures::Future<Item=std::option::Option<std::vec::Vec<std::string::String>>, Error=std::string::String>>`
   = note: required because it appears within the type `futures::Map<std::boxed::Box<dyn futures::Future<Item=std::option::Option<std::vec::Vec<std::string::String>>, Error=std::string::String>>, [closure@src/main.rs:34:14: 37:10]>`
   = note: required because it appears within the type `futures::MapErr<futures::Map<std::boxed::Box<dyn futures::Future<Item=std::option::Option<std::vec::Vec<std::string::String>>, Error=std::string::String>>, [closure@src/main.rs:34:14: 37:10]>, [closure@src/main.rs:38:18: 41:10]>`

然而,在查看标准库时,我发现了以下实现:

  • impl<T: ?Sized> Send for Box<T> where T: Send
  • impl<T> Send for Option<T> where T: Send
  • impl<T> Send for Vec<T> where T: Send
  • impl Send for String
  • impl Send for [failure::]Error

我错过了什么?

如果我摆脱这个特征并替换它Box<Future<...>>impl Future<...>那么它就可以工作(新代码的游乐场链接);但我不明白特征和Box实现有什么问题......

extern crate failure;
extern crate futures; // futures = "0.1.24"
extern crate tokio; // tokio = "0.1.8"
extern crate serde_json;

use futures::Future;
use serde_json::Value;

use std::collections::HashMap;

fn read_async(_params: Option<HashMap<String, Value>>) -> impl futures::Future<Item=Option<Vec<String>>, Error=failure::Error> {
    futures::future::ok(Some(vec!["some data".to_string()]))
}

fn main() {
    let params = HashMap::new();
    let future = read_async(Some(params))
        .map(|content| {
            println!("Content read = {:?}", &content);
            ()
        })
        .map_err(|err| {
            println!("Error {}", &err);
            ()
        });

    tokio::run(future);
}
4

1 回答 1

0

看起来我只需要将函数签名更改为

fn read_async(
    &self,
    _: Option<HashMap<String, Value>>,
) -> Box<Future<Item = Option<Self::Data>, Error = String> + Send> {
//                                                        ^^^^^^^

确实,Box<T>应该是Sendwhen Tis Send,但我必须把它拼出来,因为Future它不是手动/自动派生的。

感谢Tobz指出这一点。

于 2018-09-27T15:42:56.433 回答