2

更新:这似乎是正确生成和信任自签名证书的更多问题

我正在使用 tokio-rs 构建服务器和客户端。我一切正常,但现在正尝试将 SSL/TLS 添加到系统中。据我所知,我已经生成了一个自签名证书并正确安装了它,但是每次我尝试让客户端连接到服务器时,我都会收到以下错误:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Failure(Ssl(ErrorStack([Error { code: 336134278, library: "SSL routines", function: "ssl3_get_server_certificate", reason: "certificate verify failed", file: "s3_clnt.c", line: 1264 }])))', /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libcore/result.rs:837

我正在使用以下脚本来生成 crt、key 和 pfx 文件:

openssl req -nodes -x509 -newkey rsa:2048 -config ssl.conf -extensions ext -subj /C=CA/ST=EH/L=Canadia/O=Dis/CN=localhost -keyout localhost.key -out localhost.crt -days 365
openssl pkcs12 -export -nodes -inkey localhost.key -in localhost.crt -out localhost.pfx
mv localhost.pfx ../

使用这个 conf 文件

[req]
distinguished_name=dn
[ dn ]
CN=localhost
[ ext ]
basicConstraints=CA:FALSE,pathlen:0
subjectAltName = @alt_names
[alt_names]
DNS.1 = localhost

pfx 文件上移一级,然后在构建时复制到调试文件夹中。

我正在运行 Ubuntu 并将 localhost.crt 复制到 /etc/ssl/certs 并将 localhost.key 复制到 /etc/ssl/private

我的服务器代码使用 tokio-proto 和 tokio-tls tokio-proto 服务器协议包装器

协议:

use tokio_proto::pipeline::ServerProto;
use tokio_io::codec::{Framed};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tls::{TlsAcceptorExt};
use rem::codec::CacheCodec;
use std::io;

pub struct CacheProto {}

impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for CacheProto {
    /// For this protocol style, `Request` matches the codec `In` type
    type Request = String;

    /// For this protocol style, `Response` matches the coded `Out` type
    type Response = String;

    /// A bit of boilerplate to hook in the codec:
    type Transport = Framed<T, CacheCodec>;
    type BindTransport = Result<Self::Transport, io::Error>;
    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(CacheCodec{}))
    }
}

服务器:

use std::string::String;
use std::sync::{Arc, Mutex};

use rem::cache::Cache;
use rem::service::CacheService;
use rem::proto::CacheProto;

use futures_cpupool::CpuPool;

use tokio_tls::{TlsAcceptorExt};
use native_tls::{Pkcs12, TlsAcceptor};

use tokio_tls::proto::Server;
use tokio_proto::TcpServer;

use std::fs::File;
use std::io::{Read};

pub fn launch(ip: String, port: String) {
   // Specify the localhost address
    let addr = format!("{}:{}", ip, port).parse().unwrap();

    let pkcs12 = get_pkcs12();
    let acceptor = TlsAcceptor::builder(pkcs12).unwrap().build().unwrap();

    let proto = Server::new(CacheProto{}, acceptor);

    // The builder requires a protocol and an address
    let server = TcpServer::new(proto, addr);

    let pool = Box::new(CpuPool::new_num_cpus());

    // We provide a way to *instantiate* the service for each new
    // connection; here, we just immediately return a new instance.
    let cache = Arc::new(Mutex::new(Cache::new()));
    let cache_service = CacheService {
        cache: cache.clone(),
        pool : pool
    };

    server.serve( move || Ok(cache_service.clone()));
}

fn get_pkcs12() -> Pkcs12{
    let mut file = File::open("localhost.pfx").unwrap();
    let mut pkcs12 = vec![];
    file.read_to_end(&mut pkcs12).unwrap();
    let pkcs12 = Pkcs12::from_der(&pkcs12, "password").unwrap();
    return pkcs12;
}

客户端:(不使用 tokio-proto,仅使用本机 tls 和 TCP)

use std::io::prelude::*;
use std::io;
use std::string::String;
use std::vec::Vec;
use std::net::{TcpStream};
use std::mem;

use native_tls::{TlsConnector, TlsStream};

use rem::op;
use rem::error::*;

pub fn launch(ip: String, port: String) {
    info!("Connection to {}:{}", ip, port);

    match TcpStream::connect(format!("{}:{}", ip, port).as_str()) {
        Ok(mut tcp_stream) => {
            let connector = TlsConnector::builder().unwrap().build().unwrap();
            let mut stream = connector.connect(ip.as_str(), tcp_stream).unwrap();
            loop {
                // Contine looping, executing any commands from the user
                let handle = io::stdin();
                for line_res in handle.lock().lines() {
                    let line: String = line_res.unwrap();
                    let args:Vec<String> = parse_input(line);
                    if args.len() > 0{
                        let arg_ref = args[0].as_ref();
                        match arg_ref {
                            "write" => {
                                if args.len() == 3 {
                                    match client_exec_write(&args[1], &args[2], &mut stream){
                                        Ok(_) => (),
                                        Err(why) => why.log()
                                    }
                                }else{
                                    error!("Write expects two arguments - key and value");
                                }
                            },
                            "read" => {
                                if args.len() == 2 {
                                    match client_exec_read(&args[1], &mut stream){
                                        Ok(_) => (),
                                        Err(why) => why.log()
                                    }
                                }else{
                                    error!("Read expects one argument - key");
                                }
                            },
                            "delete" => {
                                if args.len() == 2 {
                                    match client_exec_delete(&args[1], &mut stream){
                                        Ok(_) => (),
                                        Err(why) => why.log()
                                    }
                                }else{
                                    error!("Delete expects one argument - key");
                                }
                            }
                            _ => error!("Not a valid command")
                        }
                    }
                }
            }
        }
        Err(e) => {
            panic!("Failed to connect to server. Error '{}'", e);
        }
    }
}


/// Executres a write operation by parsing the client command and converting it to REM format
/// ex: write abc:def would be converted to 9|W$abc:def and sent to the REM server
fn client_exec_write(key:&String, val:&String, mut stream: &mut TlsStream<TcpStream>)-> Result<(), RemError> {
    let sized_val = String::from(format!("W${}:{}", key, val));
    let res = op::write_str_to_stream_with_size(&mut stream, sized_val);
    try!(print_response(&mut stream));
    return res;
}

/// Executres a read operation by parsing the client command and converting it to REM format
/// ex: read abc:def would be converted to 5|R$abc and sent to the REM launch_server
/// The respone from the REM server is writen to stdout
/// If stdout::flush fail a warning will be logged
fn client_exec_read(key: &String, mut stream: &mut TlsStream<TcpStream>)-> Result<(), RemError>{
    let cmd_val = String::from(format!("R${}", key));
    try!(op::write_str_to_stream_with_size(&mut stream, cmd_val));
    try!(print_response(&mut stream));
    return Ok(());
}

/// Executes a delete operation by parsing the client command and converting it to REM format
/// ex: delete abc would be converted to 5|D$abc and sent to the REM server
fn client_exec_delete(key: &String, mut stream: &mut TlsStream<TcpStream>) -> Result<(), RemError>{
    let cmd_val = String::from(format!("D${}", key));
    let res = op::write_str_to_stream_with_size(&mut stream, cmd_val);
    try!(print_response(&mut stream));
    return res;
}

fn print_response(mut stream: &mut TlsStream<TcpStream>) -> Result<(), RemError>{
    let val: String = try!(op::string_from_stream(&mut stream));
    println!("{}", val);
    try!(io::stdout().flush());
    return Ok(());
}

struct InputParser{
    args:Vec<String>,
    current:String,
    consumed_double_quote:bool,
    consumed_single_quote:bool
}

impl InputParser{

    /// Consumes a space charater taking quotes into consideration
    /// If the parser has consumed an opening quote then the space will be consumed as a character
    pub fn consume_space(&mut self){
        // If neither a single quote or a double quote has been consumed then its a new argument
        if !self.consumed_double_quote && !self.consumed_single_quote {
            self.push_current();
        }else{
            self.current.push(' ');
        }
    }

    /// Consumes a double quote, keeping track of whether it is an opening or cloing quote
    /// Takes single quotes into account when determening if the double quote is a delimiter or character
    pub fn consume_double_quote(&mut self){
        // If a single quote hasn't been consumed we're at the end or 
        // beginning of an argument in double quotes
        if  !self.consumed_single_quote {
            if self.consumed_double_quote{
                self.push_current();
            }
            // Flip the value so we know the sate for the next double quote that is consumed
            self.consumed_double_quote = !self.consumed_double_quote;
        }else{
            // If we're in double quotes just treat the double quote as a regular character 
            self.current.push('"');
        }
    }

    /// Consumes a single quote, keeping track of whether it is an opening or cloing quote
    /// Takes double quotes into account when determening if the single quote is a delimiter or character
    pub fn consume_single_quote(&mut self){
         // If a double quote hasn't been consumed we're at the end or 
        // beginning of an argument in single quotes
         if !self.consumed_double_quote {
            if self.consumed_single_quote{
                self.push_current();
            }
            // Flip the value so we know the sate for the next single quote that is consumed
            self.consumed_single_quote = !self.consumed_single_quote;
        }else{
            // If we're in double quotes just treat the single quote as a regular character 
            self.current.push('\'');
        }
    }

    /// Adds the character onto the current argument
    pub fn consume_char(&mut self, c:char){
        self.current.push(c);
    }

    /// To be called when everything has been parsed
    pub fn end(&mut self){
        self.push_current();
    }

    /// Pushes the current string into the list of args
    /// If the length of current is 0 no actions are performed
    pub fn push_current(&mut self){
        if self.current.len() > 0 {
            let arg = mem::replace(&mut self.current, String::new());
            self.args.push(arg);
        }
    }

}

/// Parses the arguments out of an input string taking quotes and spaces into consideration
pub fn parse_input(input: String) -> Vec<String>{
    let mut parser = InputParser{
        args:Vec::new(),
        current:String::new(),
        consumed_double_quote:false,
        consumed_single_quote:false
    };    
    for c in input.chars(){
        match c {
            '"'  => parser.consume_double_quote(),
            ' '  => parser.consume_space(),
            '\'' => parser.consume_single_quote(), 
            _    => parser.consume_char(c)
        }
    }
    parser.end();

    return parser.args;
}

操作:

use std::io::prelude::*;
use std::string::String;
use std::vec::Vec;
use std::net::{TcpStream};
use std::sync::{Mutex};

use native_tls::{TlsConnector, TlsStream};

use rem::cache::Cache;
use rem::error::*;


pub fn read_value_from_cache(key: String,
                         cache_mtx: &Mutex<Cache>)
                         -> Result<(Vec<u8>), RemError> {
    let cache = cache_mtx.lock().unwrap();
    let cache_opt: Option<Box<Vec<u8>>> = try!(cache.read_item(key));
    match cache_opt {
        Some(boxed_val) => {
            let val: Vec<u8> = *boxed_val;
            return Ok(val.clone());
        }
        None => {
            return Err(RemError::with_reason(String::from(REM_00005)));
        }
    }
}

/// Parses a TCP input stream and extracts the data
/// Allocates a 64 byte buffer which is used to read the input info from the stream
/// The expected format is ```{size}|{content}```
/// Ex. ```5|W$a:b```
pub fn string_from_stream(stream: &mut TlsStream<TcpStream>) -> Result<String, RemError> {
    //Read in the first 54 bytes of the stram
    //try!(stream.set_nodelay(true));
    let mut buf_arr: [u8; 64] = [0; 64];
    try!(stream.read(&mut buf_arr));
    // Parse the message size
    let mut size_str = String::new();
    let mut buf_size: usize = 0;
    for i in 0..64 {
        buf_size += 1;
        if buf_arr[i] == '|' as u8 {
            break;
        }
        size_str.push(buf_arr[i as usize] as char);
    }

    // Convert the size string to a usize so it can be used to drain the buffer
    let upper_idx: usize = try!(size_str.parse::<i32>()) as usize;
    let mut buf_temp: Vec<u8> = buf_arr.to_vec();
    // Create a new buffer using the parsed indicies
    let buf: Vec<u8> = buf_temp.drain(buf_size..upper_idx + buf_size).collect();

    stream.flush().unwrap();

    // Return the value as a string
    let buf_str: String = String::from_utf8(buf).unwrap();
    return Ok(buf_str);
}

pub fn write_stream_str_to_cache(stream_str: String,
                             cache_mtx: &Mutex<Cache>)
                             -> Result<(), RemError> {
    let mut key: String = String::new();
    let mut val: String = String::new();
    let mut idx = 0;
    let chars_iter = stream_str.chars();
    for c in chars_iter {
        idx += 1;
        if c == ':' {
            val = String::from(&stream_str[idx..]);
            break;
        }
        key.push(c);
    }
    let bytes = val.into_bytes();
    let mut cache = cache_mtx.lock().unwrap();
    return cache.cache_item(key.as_str(), bytes);
}

pub fn delete_value_from_cache(key: String, cache_mtx: &Mutex<Cache>) -> Result<(), RemError> {
    let mut cache = cache_mtx.lock().unwrap();
    return cache.delete_item(key);
}

pub fn write_str_to_stream_with_size(stream: &mut TlsStream<TcpStream>, value: String) -> Result<(), RemError> {
    let sized_val = String::from(format!("{}|{}", value.len(), value));
    try!(stream.write(String::from(sized_val).as_bytes()));
    try!(stream.flush());
    return Ok(());
}

该项目还有更多文件,但我认为它们不相关,但是如果不清楚,我可以添加它们

4

1 回答 1

5

这更像是一个 OpenSSL 问题而不是 Rust 问题。假设客户端也在运行 Ubuntu,将文件复制到*.crt它的. 这将告诉客户端信任您的证书,并且 OpenSSL 将不再报告有关它的错误。/usr/local/share/ca-certificates/sudo /usr/sbin/update-ca-certificates

(编辑:)您还有一个问题,即证书的通用名称(CN)与客户端用于访问服务器的名称不匹配。你ssl.conf给它一个 CN localhost,但客户通过 IP 要求它。显然,如果客户要求https://localhost/它不会工作。因此,您应该使用不同的 DNS 名称重新生成证书,让客户端信任新证书,然后使用该名称发出请求。如有必要,您可以使用类似的名称rustsslserver并将其放入客户的/etc/hosts.

我有一个 git repo,它在这里显示了一个工作示例。请注意,它使用的是 hyper,而不是 tokio,但结果应该是相同的。

于 2017-03-30T03:31:43.513 回答