我们的 Rust 应用程序似乎存在内存泄漏,我已将问题提炼为下面的代码示例。我仍然看不出问题出在哪里。
我的期望是,在第 (500,000 + 1) 条消息上,应用程序的内存将恢复到低水平。相反,我观察到以下内容:
- 在发送 500,000 条消息之前,内存使用量为 124KB
- 发送 500,000 条消息后,内存使用量攀升至 27MB
- 发送 500,000 + 1 条消息后,内存使用量降至 15.5MB
在尝试了很多东西之后,我找不到 15.5MB 的隐藏位置。释放内存的唯一方法是终止应用程序。Valgrind 没有检测到任何内存泄漏。解决方法,解决方案或指向正确方向的方法都将不胜感激。
可以在此处找到包含以下代码的演示项目:https ://github.com/loriopatrick/mem-help
笔记
- 如果我删除
self.items.push(data);内存使用量不会增加,所以我认为这不是 Sender/Receiver 的问题 - 包装
items: Vec<String>在Arc<Mutex<..>>没有可观察到的内存差异中
应该管理内存的任务
struct Processor {
items: Vec<String>,
}
impl Processor {
pub fn new() -> Self {
Processor {
items: Vec::new(),
}
}
pub async fn task(mut self, mut receiver: Receiver<String>) {
while let Some(data) = receiver.next().await {
self.items.push(data);
if self.items.len() > 500000 {
{
std::mem::replace(&mut self.items, Vec::new());
}
println!("Emptied items array");
}
}
println!("Processor task closing in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
}
}
完整的可运行示例
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
struct Processor {
items: Vec<String>,
}
impl Processor {
pub fn new() -> Self {
Processor {
items: Vec::new(),
}
}
pub async fn task(mut self, mut receiver: Receiver<String>) {
while let Some(data) = receiver.next().await {
self.items.push(data);
if self.items.len() > 500000 {
{
std::mem::replace(&mut self.items, Vec::new());
}
println!("Emptied items array");
}
}
println!("Processor task closing in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
}
}
pub fn main() {
{
let mut runtime: Runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.expect("Failed to build runtime");
let (mut sender, receiver) = channel(1024);
let p = Processor::new();
runtime.spawn(async move {
println!("Before send, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
for i in 0..500000 {
sender.send("Hello".to_string()).await;
}
println!("Sent 500,000 items, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
sender.send("Hello".to_string()).await;
println!("Send message to clear items");
tokio::time::delay_for(Duration::from_secs(3)).await;
println!("Closing sender in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
});
runtime.block_on(async move {
{
p.task(receiver).await;
}
println!("Task is done, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
});
}
println!("Runtime closed, waiting 5 seconds");
std::thread::sleep(Duration::from_secs(5));
}
货运.toml
[package]
name = "mem-help"
version = "0.1.0"
authors = ["Patrick Lorio <dev@plorio.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }