我是 Rust 的新手,我想通过实施一些小项目来学习这门语言并更好地理解。我的第一次尝试是解析从 MQTT 代理收到的 JSON 数据。tornado
我很高兴在and的帮助下很容易做到这一点serde
。然而,很快出现了一种我想(理想情况下)提取到特征中的模式。
let person_stream = sender.subscribe().filter_map(|data| {
if let Ok(value) = data {
return serde_json::from_slice::<Person>(&value).ok();
}
None
});
其中sender
通常是 atokio::sync::*::Sender
并且Person
会实现serde::de::DeserializeOwned
Trait。
因此,目标是实现将 atokio::stream::StreamExt<Item = Vec<u8>>
和transform
它转换StreamExt
为关联类型Item
将实现的另一个的东西DeserializOwned
。
我花了很长时间才弄清楚(因为我最初想使用 Trait 或带有泛型的函数),然后我想出了
fn transform<T, U, V>(stream: U) -> impl StreamExt<Item = T>
where
T: serde::de::DeserializeOwned,
U: StreamExt<Item = Result<Vec<u8>, V>>,
{
stream.filter_map(|data| {
if let Ok(value) = data {
return serde_json::from_slice::<T>(&value).ok();
}
None
})
}
虽然这行得通,但我最初想要一个像
trait Transform<T>
{
fn transform(self) -> T;
}
或实现Into
实际上是相同的,我可以为StreamExt<Item = Vec<u8>>
. 由于impl Trait
Traits 无法返回,因此这是我认为的唯一选择。但是,我在实现这一点时遇到了几个问题。
- 使用
tokio::stream::filter_map::FilterMap<_,_>
forT
(这是 的返回类型filter_map()
)是不可能的,因为模块filter_map
是private
. Box<dyn StreamExt>
也不可能使用,因为返回StreamExt
几个Self
函数。不过,我一开始不希望堆开销;)
所以我的问题是:鉴于返回类型是对象安全的和不是对象安全的,我能在这里做些什么来获得 Trait 实现的语法filter_map()
糖private
吗StreamExt
?使用会很酷
let person_stream = receiver.transform();
或into()
。显然我有一个可行的实现,所以这对我来说并不是一个真正的关键问题。但正如我一开始所说的,我想首先对 Rust 有更深入和更好的理解。
我注意到有tokio-serde
箱子,但乍一看它只处理框架数据,所以我没有深入研究它。
transform
PS:当类型推断失败时,我也遇到了我实现的自由函数的问题。例如,当将新流传递给类似的函数时
async fn debug_sink<T>(mut receiver: T)
where
T: StreamExt + Unpin,
T::Item: std::fmt::Debug,
{
while let Some(item) = receiver.next().await {
println!("Item: {:#?}", item);
}
}
在这种情况下,它显然无法T
与transfer
像
async fn person_sink(mut receiver: impl StreamExt<Item = Person> + Unpin) {
while let Some(person) = receiver.next().await {
println!("Person: {:#?}", person);
}
}
但是我不想注释所有类型参数,只注释它无法推断的一个。经过反复试验,我发现我可以使用
transform::<Person, _, _>(stream);
我完全不知道。不过,我在文档中找不到这个。这是一些隐藏的功能,还是我只是未能正确 rtfm?:)