1

我们的 Rust 应用程序似乎存在内存泄漏,我已将问题提炼为下面的代码示例。我仍然看不出问题出在哪里。

我的期望是,在第 (500,000 + 1) 条消息上,应用程序的内存将恢复到低水平。相反,我观察到以下内容:

  • 在发送 500,000 条消息之前,内存使用量为 124KB
  • 发送 500,000 条消息后,内存使用量攀升至 27MB
  • 发送 500,000 + 1 条消息后,内存使用量降至 15.5MB

在尝试了很多东西之后,我找不到 15.5MB 的隐藏位置。释放内存的唯一方法是终止应用程序。Valgrind 没有检测到任何内存泄漏。解决方法,解决方案或指向正确方向的方法都将不胜感激。

可以在此处找到包含以下代码的演示项目:https ://github.com/loriopatrick/mem-help

笔记

  • 如果我删除self.items.push(data);内存使用量不会增加,所以我认为这不是 Sender/Receiver 的问题
  • 包装items: Vec<String>Arc<Mutex<..>>没有可观察到的内存差异中

应该管理内存的任务

struct Processor {
    items: Vec<String>,
}

impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }

    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);

            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }

        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}

完整的可运行示例

use std::time::Duration;

use tokio::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};

struct Processor {
    items: Vec<String>,
}

impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }

    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);

            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }

        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}

pub fn main() {
    {
        let mut runtime: Runtime = tokio::runtime::Builder::new()
            .threaded_scheduler()
            .core_threads(1)
            .enable_all()
            .build()
            .expect("Failed to build runtime");

        let (mut sender, receiver) = channel(1024);
        let p = Processor::new();

        runtime.spawn(async move {
            println!("Before send, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;

            for i in 0..500000 {
                sender.send("Hello".to_string()).await;
            }

            println!("Sent 500,000 items, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
            sender.send("Hello".to_string()).await;

            println!("Send message to clear items");
            tokio::time::delay_for(Duration::from_secs(3)).await;

            println!("Closing sender in 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });

        runtime.block_on(async move {
            {
                p.task(receiver).await;
            }
            println!("Task is done, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });
    }

    println!("Runtime closed, waiting 5 seconds");
    std::thread::sleep(Duration::from_secs(5));
}

货运.toml

[package]
name = "mem-help"
version = "0.1.0"
authors = ["Patrick Lorio <dev@plorio.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }
4

0 回答 0