1

在我的SmtpService情况下,我想在处理完成后立即发送响应头和正文。这应该遵循 SMTP 交换:

C: DATA
S: 354 Start mail input
C: ... data ...
C: ... more ...
C: .
S: 250 Ok

我在操场上有这么多:

#[macro_use] 
extern crate log;
extern crate bytes;
extern crate tokio_proto;
extern crate tokio_service;
extern crate futures;

use std::io;
use bytes::Bytes;
use tokio_service::Service;
use tokio_proto::streaming::{Message, Body};
use futures::{future, Future, Stream};
use futures::sync::oneshot;
//use model::request::SmtpCommand;
//use model::response::SmtpReply;

#[derive(Eq, PartialEq, Debug)]
enum SmtpCommand {
    Data,    
}
#[derive(Eq, PartialEq, Debug)]
enum SmtpReply {
    OkInfo,
    StartMailInputChallenge,
    TransactionFailure,
    CommandNotImplementedFailure
}

pub struct SmtpService;

impl Service for SmtpService {
    // For non-streaming protocols, service errors are always io::Error
    type Error = io::Error;
    // These types must match the corresponding protocol types:
    type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>;
    type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>;

    // The future for computing the response; box it for simplicity.
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    // Produce a future for computing a response from a request.
    fn call(&self, command: Self::Request) -> Self::Future {

        info!("Received {:?}", command);

        match command {
            Message::WithBody(cmd, cmd_body) => {
                match cmd {
                    SmtpCommand::Data => {
                        // start => SmtpReply::StartMailInputChallenge
                        // ok => SmtpReply::OkInfo
                        // err => SmtpReply::TransactionFailure

                        let (tx, rx) = oneshot::channel();

                        let fut = cmd_body
                                .inspect(|chunk| info!("data: {:?}", chunk))
                                .map(|_| tx.send(SmtpReply::OkInfo))
                                .map_err(|_| tx.send(SmtpReply::TransactionFailure))
                                .map(|_| Body::from(rx));

                        // ??? How to wire the fut future into the response message?

                        let msg = Message::WithBody(SmtpReply::StartMailInputChallenge, fut);

                        Box::new(future::ok(msg)) as Self::Future
                    }
                    _ => Box::new(future::ok(Message::WithoutBody(
                        SmtpReply::CommandNotImplementedFailure,
                    ))),
                }
            }
            Message::WithoutBody(cmd) => {
                Box::new(future::ok(Message::WithoutBody(match cmd {
                    _ => SmtpReply::CommandNotImplementedFailure,
                })))
            }
        }
    }
}

fn main() {
    println!("Hello, world!");
}

我想知道是否有可能,或者我是否需要生成两条消息——一条用于 DATA,另一条用于实际字节流?

我得到的错误显示消息结构不匹配;身体/未来显然不合适:

error[E0271]: type mismatch resolving `<futures::FutureResult<tokio_proto::streaming::Message<SmtpReply, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>, std::io::Error> as futures::Future>::Item == tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
  --> src/main.rs:66:25
   |
66 |                         Box::new(future::ok(msg)) as Self::Future
   |                         ^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::stream::Map`, found struct `tokio_proto::streaming::Body`
   |
   = note: expected type `tokio_proto::streaming::Message<_, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>`
              found type `tokio_proto::streaming::Message<_, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
   = note: required for the cast to the object type `futures::Future<Error=std::io::Error, Item=tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>>`
4

1 回答 1

1

返回的future在返回call时结束Response;你不能在那个未来“推动”进一步的行动。

这意味着您需要生成一个新任务来生成(流式传输的)主体;你需要一个Handlefrom tokio_core

Body需要从mpsc通道创建,而不是oneshot; 您可以发送许多正文块。

操场

#[macro_use] 
extern crate log;
extern crate bytes;
extern crate tokio_core;
extern crate tokio_proto;
extern crate tokio_service;
extern crate futures;

use std::io;
use bytes::Bytes;
use tokio_service::Service;
use tokio_proto::streaming::{Message, Body};
use futures::{future, Future, Stream, Sink};
use futures::sync::mpsc;
//use model::request::SmtpCommand;
//use model::response::SmtpReply;

#[derive(Eq, PartialEq, Debug)]
pub enum SmtpCommand {
    Data,    
}
#[derive(Eq, PartialEq, Debug)]
pub enum SmtpReply {
    OkInfo,
    StartMailInputChallenge,
    TransactionFailure,
    CommandNotImplementedFailure
}

pub struct SmtpService {
    handle: tokio_core::reactor::Handle,
}

impl Service for SmtpService {
    // For non-streaming protocols, service errors are always io::Error
    type Error = io::Error;
    // These types must match the corresponding protocol types:
    type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>;
    type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>;

    // The future for computing the response; box it for simplicity.
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    // Produce a future for computing a response from a request.
    fn call(&self, command: Self::Request) -> Self::Future {

        info!("Received {:?}", command);

        match command {
            Message::WithBody(cmd, cmd_body) => {
                match cmd {
                    SmtpCommand::Data => {
                        // start => SmtpReply::StartMailInputChallenge
                        // ok => SmtpReply::OkInfo
                        // err => SmtpReply::TransactionFailure

                        let (tx, rx) = mpsc::channel::<io::Result<SmtpReply>>(1);

                        let fut = cmd_body
                            // read cmd stream; for_each results in a Future,
                            // which completes when the stream is finished
                            .for_each(|chunk| {
                                info!("data: {:?}", chunk);
                                Ok(())
                            })
                            // now send the result body
                            .then(move |r| match r {
                                Ok(_) => tx.send(Ok(SmtpReply::OkInfo)),
                                Err(_) => tx.send(Ok(SmtpReply::TransactionFailure)),
                            })
                            // could send further body messages:
                            // .and_then(|tx| tx.send(...))
                            // ignore any send errors; spawn needs a future with
                            // Item=() and Error=().
                            .then(|_| Ok(()))
                        ;

                        self.handle.spawn(fut);

                        let body : Body<SmtpReply, Self::Error> = Body::from(rx);
                        let msg : Self::Response = Message::WithBody(SmtpReply::StartMailInputChallenge, body);

                        Box::new(future::ok(msg)) as Self::Future
                    }
                    _ => Box::new(future::ok(Message::WithoutBody(
                        SmtpReply::CommandNotImplementedFailure,
                    ))),
                }
            }
            Message::WithoutBody(cmd) => {
                Box::new(future::ok(Message::WithoutBody(match cmd {
                    _ => SmtpReply::CommandNotImplementedFailure,
                })))
            }
        }
    }
}

fn main() {
    println!("Hello, world!");
}
于 2017-10-25T10:09:31.807 回答