1

I've got an application which pulls items off a queue in a callback. The callback is in a separate thread, so I can't do any of this directly in the callback. The items on the queue include a URL and a message body to be POSTed to the URL. Based on the response, I either remove the item from the queue or re-enqueue it.

Here's what I've got:

let (mut sender, receiver) = mpsc::channel(1000);
let mut core = Core::new().expect("Error creating core");
let client = Client::new(&core.handle());

// Consumer callback closure
let consumer = move |_: &mut Channel, deliver: basic::Deliver, headers: basic::BasicProperties, data: Vec<u8>|
{
     let body = match String::from_utf8(data) {
        Ok(body) => body,
        Err(error) => {
            error!("Error parsing message body as UTF8: {}", error);
            return;
        }
    };
    if let Err(error) = sender.try_send(QueueData {
        deliver: deliver,
        headers: headers,
        data: body,
    }) {
        error!("Error sending queue data over channel: {}", error);
    }
};

let work = receiver.for_each(|queue_data| {
    let message = match Message::parse(&queue_data.data, &queue_data.headers) {
        Ok(message) => message,
        Err(error) => return err(Box::new(error) as Box<Error>),
    };

    let uri = match Uri::from_str(&message.body.destination) {
        Ok(uri) => uri,
        Err(error) => return err(Box::new(error) as Box<Error>),
    };

    let mut request: Request = Request::new(Method::Post, uri);
    request.headers_mut().set(ContentType::json());
    request.headers_mut().set(ContentLength(queue_data.data.len() as u64));
    request.set_body(queue_data.data.clone().into_bytes());

    client.request(request)
});

which gives me the following error:

error[E0271]: type mismatch resolving `<futures::FutureResult<(), std::boxed::Box<std::error::Error>> as futures::IntoFuture>::Error == ()`
   --> src/main.rs:116:29
    |
116 |         let work = receiver.for_each(|queue_data| {
    |                             ^^^^^^^^ expected struct `std::boxed::Box`, found ()
    |
    = note: expected type `std::boxed::Box<std::error::Error>`
               found type `()`

error[E0308]: mismatched types
   --> src/main.rs:132:13
    |
132 |             client.request(request)
    |             ^^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `hyper::client::FutureResponse`
    |
    = note: expected type `futures::FutureResult<(), std::boxed::Box<std::error::Error>>`
               found type `hyper::client::FutureResponse`

which makes sense, but I can't find any sort of way to convert a FutureResponse to a FutureResult. Am I missing something here? Can I not use a Hyper client within a future like this?

4

1 回答 1

1

经过大量挖掘,找到了答案。

首先,我需要使用map而不是for_each,因为for_each不希望返回任何内容。其次,我需要将返回值包装在 中futures::future::ok,因此我更新后的代码如下所示:

let work = receiver.map(|queue_data| {
    let message = match Message::parse(&queue_data.data, &queue_data.headers) {
        Ok(message) => message,
        Err(error) => return err(Box::new(error) as Box<Error>),
    };

    let uri = match Uri::from_str(&message.body.destination) {
        Ok(uri) => uri,
        Err(error) => return err(Box::new(error) as Box<Error>),
    };

    let mut request: Request = Request::new(Method::Post, uri);
    request.headers_mut().set(ContentType::json());
    request.headers_mut().set(ContentLength(queue_data.data.len() as u64));
    request.set_body(queue_data.data.clone().into_bytes());

    ok(client.request(request))
});

不过,我不得不说,那个错误信息并不是最清楚的......

于 2018-03-05T02:48:36.733 回答