1

我需要在同一个 TcpStream 上发送和接收正常数据,同时定期发送心跳数据。在当前的实现中, Arc<Mutex> 用于我的目的,但它编译时出错。如何修复这些错误,或者是否有其他方法可以实现相同的目标?

use anyhow::Result;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
    let stream = Arc::new(Mutex::new(stream));

    let common_stream = stream.clone();
    let handler1 = tokio::spawn(async {
        loop {
            let mut stream = common_stream.lock().unwrap();
            let mut buf = [0u8; 10];
            stream.read_exact(&mut buf).await.unwrap();
            buf.reverse();
            stream.write(&buf).await.unwrap();
        }
    });

    let heartbeat_stream = stream.clone();
    let handler2 = tokio::spawn(async {
        loop {
            let mut stream = heartbeat_stream.lock().unwrap();
            stream.write_u8(1).await.unwrap();

            thread::sleep(Duration::from_millis(200));
        }
    });

    handler1.await?;
    handler2.await?;

    Ok(())
}
error: future cannot be sent between threads safely
   --> src\main.rs:14:20
    |
14  |     let handler1 = tokio::spawn(async {
    |                    ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
   --> src\main.rs:20:31
    |
16  |             let mut stream = common_stream.lock().unwrap();
    |                 ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
...
20  |             stream.write(&buf).await.unwrap();
    |                               ^^^^^^ await occurs here, with `mut stream` maybe used later
21  |         }
    |         - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
   --> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src\main.rs:25:20
    |
25  |     let handler2 = tokio::spawn(async {
    |                    ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
   --> src\main.rs:28:31
    |
27  |             let mut stream = heartbeat_stream.lock().unwrap();
    |                 ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
28  |             stream.write_u8(1).await.unwrap();
    |                               ^^^^^^ await occurs here, with `mut stream` maybe used later
...
31  |         }
    |         - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
   --> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

在这里欣赏任何建议。

谢谢。

4

2 回答 2

1

您的代码中有几个错误,尽管它背后的想法几乎是好的。您应该尽可能使用任何可用的异步工具。一些需要/期望的更改:

  • 使用tokio::time::sleep因为它是异步的,否则调用被阻塞
  • 使用互斥锁的异步版本(futures例如来自 crate 的那个)
  • 使用某种通用错误处理(anyhow会有所帮助)
use futures::lock::Mutex;
use anyhow::Error;
use tokio::time::sleep;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Error> {
    let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
    let stream = Arc::new(Mutex::new(stream));

    let common_stream = stream.clone();
    let handler1 = tokio::spawn(async move {
        loop {
            let mut stream = common_stream.lock().await;
            let mut buf = [0u8; 10];
            stream.read_exact(&mut buf).await.unwrap();
            buf.reverse();
            stream.write(&buf).await.unwrap();
        }
    });

    let heartbeat_stream = stream.clone();
    let handler2 = tokio::spawn(async move {
        loop {
            let mut stream = heartbeat_stream.lock().await;
            stream.write_u8(1).await.unwrap();

            sleep(Duration::from_millis(200)).await;
        }
    });

    handler1.await?;
    handler2.await?;

    Ok(())
}

操场

于 2022-03-05T20:59:21.107 回答
1

这是一个将流分成两部分进行读取和写入的解决方案,并且在循环中执行:

  • 等待心跳事件并在发生这种情况时发送一个字节来写入流的一半
  • 等待读取一半的数据(10 个字节),将其反转并再次写入以写入一半

此外,这不会产生线程,并且在当前没有锁的情况下可以很好地完成所有工作。

use anyhow::Result;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8888").await?;
    let (mut read, mut write) = stream.split();
    let mut heartbeat_interval = tokio::time::interval(Duration::from_millis(200));
    let mut buf = [0u8; 10];

    loop {
        tokio::select! {
            _ = heartbeat_interval.tick() => {
                write.write(&[1]).await?;
            }

            result = read.read_exact(&mut buf) => {
                let _bytes_read = result?;
                buf.reverse();
                write.write(&buf).await?;
            }
        }
    }
}
于 2022-03-05T21:05:54.503 回答