问题标签 [haskell-pipes]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
haskell - 管道中的错误处理
背景故事
我有许多数据文件,每个文件都包含一个数据记录列表(每行一个)。与 CSV 类似,但完全不同,我更愿意编写自己的解析器,而不是使用 CSV 库。出于这个问题的目的,我将使用一个每行仅包含一个数字的简化数据文件:
如您所见,文件可能包含格式错误的数据,在这种情况下,应将整个文件视为格式错误。
我想做的那种数据处理可以用地图和折叠来表达。所以,我认为这将是一个学习如何使用pipes
图书馆的好机会。
首先,我在文本文件中创建行的生产者。这与文档中的示例非常相似Pipes.Safe
。
接下来,我需要一个函数来解析每一行。正如我之前提到的,这可能会失败,我将用Either
.
为简单起见,作为第一步,我想将所有数据记录收集到记录列表中。最直接的方法是将所有行通过解析器传递,然后将整个内容收集到一个列表中。
不幸的是,这会创建一个记录列表。但是,如果文件包含一条错误记录,则应将整个文件视为错误。我真正想要的是记录列表中的一个。当然,我可以只sequence
用来转置要么列表。
但是,即使第一行已经格式错误,它也会读取整个文件。这些文件可能很大,而且我有很多,所以,如果在第一个错误时停止读取会更好。
问题
我的问题是如何实现这一目标。如何中止解析第一个格式错误的记录?
到目前为止我得到了什么
我的第一个想法是使用 and 的 monad 实例Either ErrMsg
而P.mapM
不是P.map
. 由于我们正在从我们已经拥有的文件中读取,IO
并且SafeT
在我们的 monad 堆栈中,所以,我想我需要ExceptT
在那个 monad 堆栈中进行错误处理。这就是我卡住的地方。我尝试了许多不同的组合,但总是被类型检查员大喊大叫。以下是我能得到的最接近它的编译器。
推断的readNumbers3
读取类型
这看起来接近我想要的:
但是,一旦我尝试实际执行该操作,我就会在 ghci 中收到以下错误消息:
如果我尝试应用以下类型签名:
然后我收到以下错误消息:
在旁边
将错误处理移动到管道的基本 monad 的另一个动机是,如果我不必在我的地图和折叠中处理任何一个问题,它将使进一步的数据处理变得更加容易。
haskell - 如何将 IO 操作的输出通过管道传输到 haskell 中的进程中
我想创建一个进程并将我的haskell程序中的一些文本定期写入进程的stdin(来自IO操作)。
以下内容在 GHCi 中正常工作,但在构建和运行时无法正常工作。在 GHCi 中,一切都运行良好,并且定期输入 IO 操作的值。但是,当构建并运行时,在写入进程的标准输入时,它似乎会暂停任意长时间。
我已经使用CreateProcess
(from System.Process
) 创建句柄并尝试过hPutStrLn
(缓冲区设置为NoBuffering
--LineBuffering
也不起作用)。
所以我正在尝试这个process-streaming
包,pipes
但似乎根本没有任何工作。
真正的问题是:我如何从 haskell 创建一个进程并定期写入它?
展示此行为的最小示例:
任何帮助将不胜感激。
haskell - 解码 JSON 流,其中一些值在其他值之前需要
假设我们有一个这样的 JSON 对象(带有 base64 编码的字节串):
现在,我们想从一个源接收,并使用标签image
中的信息将其存储在一个位置。id
因此,这意味着id
必须提前解析(以确定图像的位置),同时image
以流方式解析。这是直截了当的吗?
我打算使用pipes-aeson
, aws
(用于S3
存储)并使用作为消费者的存储桶pipes
从Websocket
生产者进行流式解码(在我们解析确定存储桶的位置S3
之前无法创建它)。看着方法,我不知道我是否真的可以按照我上面的要求去做。这是我第一次尝试在 JSON 和管道中进行流式传输。因此,我们将非常感谢您的帮助。id
S3
decoded
一个对文件系统进行读写的简单示例也可以作为 and 的替代Websocket producer
品S3 consumer
。
附录
由于 JSON 键值对根据RFC是无序的,而数组是有序的,因此对于我上面定义的数据类型,image
数据可能会出现在 之前。id
因此,将其更改为 JSON 数组(Haskell 中的一个元组,aeson
TH 派生似乎转换为有序数组)也可能会有所帮助。如果需要,请随时更改数据类型定义,以便对解码进行排序。例如,数据类型可能更改为:
haskell - haskell 管道 - 如何在字节串管道上重复执行 takeWhile 操作?
我正在尝试做的是使用 takeWhile 将字节串拆分为某个字符。
这让我获得了第一行,但我真正想要的是一次有效地将每个块产生一个换行符。我怎么做?
haskell - 将字节流式传输到网络 websocket
我有一个代码,它使用文件句柄来模拟Bytestring
来自源(AWS S3
)的流式传输的接收器。如果我们想用作接收器,将下面的代码与(带有连接句柄)Network.Websocket
交换就足够了吗?LBS.writeFile
sendBinaryData
对我来说困惑的根源是如何确定流的终止?对于文件,这由writeFile
API 处理。怎么样sendBinaryData
?它是否以类似的方式处理终止writeFile
?还是由客户端的数据解析器确定?
更新
这个问题是关于如何将数据流式传输到 websocket 句柄(假设已经提供了一个句柄),就像我们在上面的示例中处理文件句柄一样,而不是关于如何在resourceT
. conduit
似乎确实采取mapM_
了接收数据的方法。所以,这似乎确实是要走的路。
终止问题是因为我有这样的想法:如果我们有一个函数在 Websocket 句柄的另一侧监听数据,那么确定消息的结束在流上下文中似乎很重要。给定如下函数:
如果我们确实S.mapM_
将数据流式传输到 websocket 句柄,它是否会添加某种end of stream
标记,以便f
在另一端监听可以停止处理惰性字节串。否则f
将不知道消息何时完成。
haskell - Haskell Pipes——让管道消耗它产生的东西(本身)
我正在尝试使用 Pipes 编写一个 webscraper,我已经开始关注抓取的链接。我有一个process
下载网址、查找链接并生成链接的功能。
现在我想了解如何递归地跟踪这些链接,将 StateT 贯穿。我意识到可能会做一些更惯用的事情,然后将单个管道用于大部分刮板(尤其是当我开始添加更多功能时),我愿意接受建议。无论如何,当我考虑使用共享状态的多线程时,我可能不得不重新考虑设计。
haskell - 如何将“readfile”函数的输出转换为管道的源代码?
我通过以下方式打开一些 .txt 文件:
.txt 文件的格式为
我想把 xxs 变成一个源代码,这样它可能看起来像:
管道 API 是否提供了一种方法来做到这一点,而无需先进行一些字符串操作xxs
,从而使其成为表单[str_1, str_2, ..., str_m]
?
haskell - 如何使用 Conduit 保存文件?
如何使用导管的库保存文件?我浏览了导管的教程,但似乎找不到任何东西,这是我的用例:
所以这里有两个问题:
lines
使用将字符串转换为字符串列表然后将其提供给是否有意义sourceList
?我应该如何实现该
saveFile
功能,以便在xxs
完全处理字符串时将其写入磁盘?
haskell - 您将如何遍历目录并对所有文件执行某些功能并以内存有效的方式组合输出?
环境
我需要遍历一个超过 100 多个 .txt 文件的目录,打开每个文件并对每个文件执行一些功能,然后组合结果。这些文件很大,大约 10GB。伪代码中的一些常见操作可能是:
诀窍是确保所有文件永远不会同时存在于内存中,我之前的天真的解决方案在我的 Mac 上创建了各种交换文件。此外,如果其中一个 filePath 无效,我想跳过它并继续执行该程序。
我的解决方案
目前我正在使用管道,如果可能的话,我想找到一个使用管道的解决方案。但如果它不是正确的工具,我可以使用其他工具。
haskell - 管道:根据另一个文件的内容打开一个文件
我有这个代码:
但这似乎很hacky,我希望能够从管道中打开第二个文件,从我在第一个文件的第一行中读取的内容。任何的想法?