问题标签 [zio-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1006 浏览

scala - 将 ZIO Stream 的输出写入文件

我正在尝试将 ZIO 流的结果写入文件。以下示例应用程序采用整数序列,将它们转换为字节,使用 gzip 转换器压缩它们,但我不知道如何将它们写入文件。

我想我需要使用ZSink.fromOutputStream,但我不确定它如何适合代码。

0 投票
1 回答
147 浏览

scala - 如何通过 ZStream 读取来自 ZHub 的消息?

我是 ZHub 和 ZStream 的新手,想熟悉他们的 API。

不幸的是,我无法使这个简单的示例工作:

我怀疑这个程序不会终止,因为我正在尝试收集无限流。我还尝试使用stream.tap(...)或关闭集线器打印消息。没有任何帮助。

我在这里想念什么?任何帮助表示赞赏,谢谢。

0 投票
1 回答
329 浏览

scala - ZIO Streams:ZSink 和 ZTransducer 有什么区别?

我正在研究 ZIO Streams,使用版本 1.0.9 的库zio-streams。我找不到任何可以显示 aZSink和 a之间区别的参考资料ZTransducer

有什么不同?

0 投票
1 回答
187 浏览

json - 将 Zio-Kafka 消费者与 Zio-Json 反序列化器集成

我正在研究图书馆zio-kafka,我想用zio-jsonJSON 格式反序列化消息的值。

我有一个简单的案例类及其解码器和编码器:

现在,我创建了一个使用上述解码器/编码器的Serde从 a 开始:Serde.string

这是对的吗?还有其他(更好的)方法吗?

0 投票
0 回答
133 浏览

scala - 我可以使用 ZSink 在 Zio-Kafka 中提交偏移量吗?

我正在使用库学习 ZIO 与 Apache Kafka 的集成zio-kafka。在Github 主项目页面的示例中,他们使用一个mapM函数来提交一个块的偏移量:

但是,恕我直言,提交偏移量是流上的终端操作。使用 a 有什么区别ZSink

0 投票
0 回答
71 浏览

scala - 如何在 ZStream (ZIO) 的帮助下构建批处理请求并理解响应?

我有 api 得到这样的请求:

并返回这样的响应:

另外,我有发送此请求并创建用户的方法:

我想:

  1. 编写创建 ids 批处理的 ZStream
  2. 每 1 秒需要 10 个 id 并创建 UsersRequest
  3. 获取用户信息响应
  4. 使用 id 了解必须获取的信息
  5. 返回信息

所以我可以做到这一点,我应该创建类似的东西:

我不知道我怎么能做到。如何构建批处理并发送请求并在匹配后通过 id 获得结果?

0 投票
1 回答
64 浏览

scala - 将 ByteArrayOutputStream 流式传输到 akka http 响应

我正在使用 ZIO Streams 创建一个 ByteArrayOutputStream,即:

这工作正常 - 现在我需要使用 akka http 将这些数据流式传输回客户端。我可以做这个:

这有效,但当然 toByteArray 将输出流带入内存,即我不流式传输数据。我遗漏了一些明显的东西 - 有没有简单的方法可以做到这一点?

0 投票
1 回答
39 浏览

zio - 中断 ZStream mapMPar 处理

我有以下代码,由于 Excel 最大行数限制,限制为约 100 万行:

一切都相当简单,而且效果很好。我跟踪流式传输的行数,然后停止写入数据。但是,我想中断在 mapMPar 中生成的所有子纤维,如下所示:

不幸的是,该过程在此处立即中断。我可能遗漏了一些明显的东西......

编辑帖子,因为它需要一些清晰度。

我的数据流是由一个昂贵的过程生成的,在这个过程中,数据是从远程服务器中提取的(这个数据本身是由一个昂贵的过程计算出来的),带有n 个Fiber。然后我处理流,然后将它们流式传输到客户端。一旦处理的行数达到约 100 万,我需要停止从远程服务器提取数据(即中断所有光纤)并结束该过程。