问题标签 [scalaz-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - http4s - 将请求正文作为字符串或 InputStream
我正在尝试定义HttpService
接收json并将其解析为带有json4s
库的案例类:
我怎样才能org.json4s.JsonInput
从req.body
or得到req.bodyAsText
?
我知道json4s
也有StringInput
,StreamInput
并且继承自JsonInput
用于使用String
,InputStream
所以我认为我需要转换req.body
为InputStream
或req.bodyAsText
,String
但我仍然不明白如何。
我是 Scala 的新手,我还没有完全理解一些概念,例如scalaz.stream.Process
.
scala - 如何将两个 scalaz 流与谓词选择器结合起来?
我想将两个 scalaz 流与一个谓词结合起来,该谓词从任一流中选择下一个元素。例如,我希望这个测试通过:
正如你所看到的,我们不能做一些聪明的事情,比如zip
对这两个元素进行排序,因为有时可能会连续选择其中一个过程。
我尝试了一个我认为可行的解决方案。它编译!但该死的,如果它什么都不做。JVM只是挂起:(
请注意,以上是我的第二次尝试。在我的第一次尝试中,我尝试创建一个Tee
但我无法弄清楚如何取消消费失败者元素。我觉得我需要像这里一样的递归。
我正在使用流版本0.7.3a
。
非常感谢任何提示(包括增量提示,因为我想简单地学习如何自己解决这些问题)!
scala - 具有 scalaz 流的 Monad 转换器
在此代码段y.run
中不进行类型检查。
编译器显示此错误:
could not find implicit value for parameter C: scalaz.Catchable[[x]Test.StateStringTask[x]]
我必须创建一个Catchable
实例StateStringTask
吗?我怎么做?或者在运行时是否有更简单的方法来处理有状态的效果Process
?
scalaz-stream - 有没有办法使用 http4s 将从 http 端点接收到的数据直接流式传输到 kafka?
http4s 使用 scalaz 流,并且有针对 kafka 的 scalaz 流实现。我们可以直接将在 http 端点接收到的数据流式传输到 kafka 中,http 端点是源,kafka 是接收器。
喜欢,
scalaz-stream - 如何停止由 time.awakeEvery 创建的 ScalaZ 进程?
我了解到,scalaz.stream.time.awakeEvery(1.second)
我可以创建一个每秒创建一个事件的流程。很明显。然后我可以映射该过程以每秒完成一些任务。到目前为止,一切都很好。
如果我想停止此活动怎么办?我试过了,创建的进程在p.kill.run.run
哪里,但它不起作用。p
time.awakeEvery(1.second)
所以代码看起来像这样:
尽管如此,它仍然会在打印“Killed”之后打印时间,并且一直持续下去。
如何停止该进程并释放它使用的资源(线程)?我发现我可以通过关闭 ScheduledThreadPoolExecutor 来阻止它,但是没有更优雅的方法吗?
scala - 从scalaz中的队列创建进程
我正在尝试遵循https://github.com/functional-streams-for-scala/fs2/wiki/Binding-to-asynchronous-processes上的第一个示例
填补一些空白并添加一些调试打印,我得到了以下代码:
当我尝试运行它时,thread2 中没有收到任何内容:
我做错了什么?这是哪里堵的?
(我正在使用 scalaz-stream 0.8.6)
scala - 同时执行两个 fs2 任务(非确定性)
使用 Scalaz Task,我可以使用scalaz.Nondeterminism.both
:
或与Nondeterminism[Task].gatherUnordered()
.
我怎样才能对fs2 0.9.x 版本的任务做同样的事情?
scala - 如何不确定地展平无限 FS2 流
我正在使用 Scala 的FS2 流库。
我有一个Stream[F, [Stream[F, A]]
内部流和外部流都是无限的(带有适当Async
的实例F
)。我想最终得到一个Stream[F, A]
同时从外部流和内部流中提取的结果,其中外部流中的每个新元素都会替换我从中提取的当前内部流。特别是我想“最终”至少尝试从所有内部流中拉出(尽管我可能会在这样做之前被外部流打断)。
我尝试使用包含当前内部流的外部Async
引用来执行此操作似乎以Interrupt
抛出异常而告终。
我不要一个简单的flatMap
甚至concurrent.join
。因为我的内部流是无限的,所以它们永远不会超过有限数量的内部流。
有没有办法通过 FS2 实现这一点?
scala - FS2 join 不能证明 Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]
我正在尝试使用 fs2 流 0.10.0-M9 和 doobie 版本 0.5.0-M9 从 http 调用中获取一系列对象,然后将其插入到 postgres 数据库中,但我在构建此代码时遇到问题,得到以下错误:
错误:(49, 12) 无法证明 Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]。.join(100)
我想要做的是在对 Web 服务的调用返回后同时运行插入语句。这是代码: