在我的SmtpService
情况下,我想在处理完成后立即发送响应头和正文。这应该遵循 SMTP 交换:
C: DATA
S: 354 Start mail input
C: ... data ...
C: ... more ...
C: .
S: 250 Ok
我在操场上有这么多:
#[macro_use]
extern crate log;
extern crate bytes;
extern crate tokio_proto;
extern crate tokio_service;
extern crate futures;
use std::io;
use bytes::Bytes;
use tokio_service::Service;
use tokio_proto::streaming::{Message, Body};
use futures::{future, Future, Stream};
use futures::sync::oneshot;
//use model::request::SmtpCommand;
//use model::response::SmtpReply;
#[derive(Eq, PartialEq, Debug)]
enum SmtpCommand {
Data,
}
#[derive(Eq, PartialEq, Debug)]
enum SmtpReply {
OkInfo,
StartMailInputChallenge,
TransactionFailure,
CommandNotImplementedFailure
}
pub struct SmtpService;
impl Service for SmtpService {
// For non-streaming protocols, service errors are always io::Error
type Error = io::Error;
// These types must match the corresponding protocol types:
type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>;
type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>;
// The future for computing the response; box it for simplicity.
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
// Produce a future for computing a response from a request.
fn call(&self, command: Self::Request) -> Self::Future {
info!("Received {:?}", command);
match command {
Message::WithBody(cmd, cmd_body) => {
match cmd {
SmtpCommand::Data => {
// start => SmtpReply::StartMailInputChallenge
// ok => SmtpReply::OkInfo
// err => SmtpReply::TransactionFailure
let (tx, rx) = oneshot::channel();
let fut = cmd_body
.inspect(|chunk| info!("data: {:?}", chunk))
.map(|_| tx.send(SmtpReply::OkInfo))
.map_err(|_| tx.send(SmtpReply::TransactionFailure))
.map(|_| Body::from(rx));
// ??? How to wire the fut future into the response message?
let msg = Message::WithBody(SmtpReply::StartMailInputChallenge, fut);
Box::new(future::ok(msg)) as Self::Future
}
_ => Box::new(future::ok(Message::WithoutBody(
SmtpReply::CommandNotImplementedFailure,
))),
}
}
Message::WithoutBody(cmd) => {
Box::new(future::ok(Message::WithoutBody(match cmd {
_ => SmtpReply::CommandNotImplementedFailure,
})))
}
}
}
}
fn main() {
println!("Hello, world!");
}
我想知道是否有可能,或者我是否需要生成两条消息——一条用于 DATA,另一条用于实际字节流?
我得到的错误显示消息结构不匹配;身体/未来显然不合适:
error[E0271]: type mismatch resolving `<futures::FutureResult<tokio_proto::streaming::Message<SmtpReply, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>, std::io::Error> as futures::Future>::Item == tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
--> src/main.rs:66:25
|
66 | Box::new(future::ok(msg)) as Self::Future
| ^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::stream::Map`, found struct `tokio_proto::streaming::Body`
|
= note: expected type `tokio_proto::streaming::Message<_, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>`
found type `tokio_proto::streaming::Message<_, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
= note: required for the cast to the object type `futures::Future<Error=std::io::Error, Item=tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>>`