I've got an application which pulls items off a queue in a callback. The callback is in a separate thread, so I can't do any of this directly in the callback. The items on the queue include a URL and a message body to be POSTed to the URL. Based on the response, I either remove the item from the queue or re-enqueue it.
Here's what I've got:
let (mut sender, receiver) = mpsc::channel(1000);
let mut core = Core::new().expect("Error creating core");
let client = Client::new(&core.handle());
// Consumer callback closure
let consumer = move |_: &mut Channel, deliver: basic::Deliver, headers: basic::BasicProperties, data: Vec<u8>|
{
let body = match String::from_utf8(data) {
Ok(body) => body,
Err(error) => {
error!("Error parsing message body as UTF8: {}", error);
return;
}
};
if let Err(error) = sender.try_send(QueueData {
deliver: deliver,
headers: headers,
data: body,
}) {
error!("Error sending queue data over channel: {}", error);
}
};
let work = receiver.for_each(|queue_data| {
let message = match Message::parse(&queue_data.data, &queue_data.headers) {
Ok(message) => message,
Err(error) => return err(Box::new(error) as Box<Error>),
};
let uri = match Uri::from_str(&message.body.destination) {
Ok(uri) => uri,
Err(error) => return err(Box::new(error) as Box<Error>),
};
let mut request: Request = Request::new(Method::Post, uri);
request.headers_mut().set(ContentType::json());
request.headers_mut().set(ContentLength(queue_data.data.len() as u64));
request.set_body(queue_data.data.clone().into_bytes());
client.request(request)
});
which gives me the following error:
error[E0271]: type mismatch resolving `<futures::FutureResult<(), std::boxed::Box<std::error::Error>> as futures::IntoFuture>::Error == ()`
--> src/main.rs:116:29
|
116 | let work = receiver.for_each(|queue_data| {
| ^^^^^^^^ expected struct `std::boxed::Box`, found ()
|
= note: expected type `std::boxed::Box<std::error::Error>`
found type `()`
error[E0308]: mismatched types
--> src/main.rs:132:13
|
132 | client.request(request)
| ^^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `hyper::client::FutureResponse`
|
= note: expected type `futures::FutureResult<(), std::boxed::Box<std::error::Error>>`
found type `hyper::client::FutureResponse`
which makes sense, but I can't find any sort of way to convert a FutureResponse
to a FutureResult
. Am I missing something here? Can I not use a Hyper client within a future like this?