问题标签 [scalaz-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
6497 浏览

scala - http4s - 将请求正文作为字符串或 InputStream

我正在尝试定义HttpService接收json并将其解析为带有json4s库的案例类:

我怎样才能org.json4s.JsonInputreq.bodyor得到req.bodyAsText

我知道json4s也有StringInputStreamInput并且继承自JsonInput用于使用StringInputStream所以我认为我需要转换req.bodyInputStreamreq.bodyAsTextString但我仍然不明白如何。

我是 Scala 的新手,我还没有完全理解一些概念,例如scalaz.stream.Process.

0 投票
2 回答
178 浏览

scala - 如何将两个 scalaz 流与谓词选择器结合起来?

我想将两个 scalaz 流与一个谓词结合起来,该谓词从任一流中选择下一个元素。例如,我希望这个测试通过:

正如你所看到的,我们不能做一些聪明的事情,比如zip对这两个元素进行排序,因为有时可能会连续选择其中一个过程。

我尝试了一个我认为可行的解决方案。它编译!但该死的,如果它什么都不做。JVM只是挂起:(

请注意,以上是我的第二次尝试。在我的第一次尝试中,我尝试创建一个Tee但我无法弄清楚如何取消消费失败者元素。我觉得我需要像这里一样的递归。

我正在使用流版本0.7.3a

非常感谢任何提示(包括增量提示,因为我想简单地学习如何自己解决这些问题)!

0 投票
1 回答
86 浏览

scala - 具有 scalaz 流的 Monad 转换器

在此代码段y.run中不进行类型检查。

编译器显示此错误:

could not find implicit value for parameter C: scalaz.Catchable[[x]Test.StateStringTask[x]]

我必须创建一个Catchable实例StateStringTask吗?我怎么做?或者在运行时是否有更简单的方法来处理有状态的效果Process

0 投票
0 回答
83 浏览

scalaz-stream - 有没有办法使用 http4s 将从 http 端点接收到的数据直接流式传输到 kafka?

http4s 使用 scalaz 流,并且有针对 kafka 的 scalaz 流实现。我们可以直接将在 http 端点接收到的数据流式传输到 kafka 中,http 端点是源,kafka 是接收器。

喜欢,

0 投票
1 回答
101 浏览

scalaz-stream - 如何停止由 time.awakeEvery 创建的 ScalaZ 进程?

我了解到,scalaz.stream.time.awakeEvery(1.second)我可以创建一个每秒创建一个事件的流程。很明显。然后我可以映射该过程以每秒完成一些任务。到目前为止,一切都很好。

如果我想停止此活动怎么办?我试过了,创建的进程在p.kill.run.run哪里,但它不起作用。ptime.awakeEvery(1.second)

所以代码看起来像这样:

尽管如此,它仍然会在打印“Killed”之后打印时间,并且一直持续下去。

如何停止该进程并释放它使用的资源(线程)?我发现我可以通过关闭 ScheduledThreadPoolExecutor 来阻止它,但是没有更优雅的方法吗?

0 投票
1 回答
846 浏览

scala - 带有块和任务的 Scala fs2 流?

0 投票
1 回答
132 浏览

scala - 从scalaz中的队列创建进程

我正在尝试遵循https://github.com/functional-streams-for-scala/fs2/wiki/Binding-to-asynchronous-processes上的第一个示例

填补一些空白并添加一些调试打印,我得到了以下代码:

当我尝试运行它时,thread2 中没有收到任何内容:

我做错了什么?这是哪里堵的?

(我正在使用 scalaz-stream 0.8.6)

0 投票
1 回答
675 浏览

scala - 同时执行两个 fs2 任务(非确定性)

使用 Scalaz Task,我可以使用scalaz.Nondeterminism.both

或与Nondeterminism[Task].gatherUnordered().

我怎样才能对fs2 0.9.x 版本的任务做同样的事情?

0 投票
0 回答
597 浏览

scala - 如何不确定地展平无限 FS2 流

我正在使用 Scala 的FS2 流库

我有一个Stream[F, [Stream[F, A]]内部流和外部流都是无限的(带有适当Async的实例F)。我想最终得到一个Stream[F, A]同时从外部流和内部流中提取的结果,其中外部流中的每个新元素都会替换我从中提取的当前内部流。特别是我想“最终”至少尝试从所有内部流中拉出(尽管我可能会在这样做之前被外部流打断)。

我尝试使用包含当前内部流的外部Async引用来执行此操作似乎以Interrupt抛出异常而告终。

我不要一个简单的flatMap甚至concurrent.join。因为我的内部流是无限的,所以它们永远不会超过有限数量的内部流。

有没有办法通过 FS2 实现这一点?

0 投票
1 回答
523 浏览

scala - FS2 join 不能证明 Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]

我正在尝试使用 fs2 流 0.10.0-M9 和 doobie 版本 0.5.0-M9 从 http 调用中获取一系列对象,然后将其插入到 postgres 数据库中,但我在构建此代码时遇到问题,得到以下错误:

错误:(49, 12) 无法证明 Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]。.join(100)

我想要做的是在对 Web 服务的调用返回后同时运行插入语句。这是代码: