1

我在这里用 http4s & fs2 编写了一个顺序 REST API 爬虫:

https://gist.github.com/NicolasRouquette/656ed7a2d6984ce0995fd78a3aec2566

这是查询 REST API 服务以获取一组起始 ID,获取一批 ID 的元素,并根据在这些元素中找到的交叉引用 ID 继续,直到没有新 ID 可获取并返回所有 ID 的映射提取的元素。

这行得通;但是,性能不足——太慢了!

由于我无法访问服务器,因此我尝试尝试不同的批量大小,从 10、50、100、200、500 甚至在单个查询中批量处理所有 ID。查询时间随着批量大小的增加而显着增加。在大尺寸(500 和所有尺寸)下,我什至从服务器收到了 HTTP 500 响应。

我想尝试使用线程池以负载平衡方式批处理并行查询;但是,我不清楚如何根据 fs2 文档执行此操作。

有人可以提供如何实现这一目标的建议吗?

关于使用 http4s 和 fs2:嗯,我发现这个库很容易用于简单的客户端编程。鉴于对支持任务、流等的强调,我认为批处理并行查询应该以某种方式可行。

4

1 回答 1

1

fs2.concurrent.join将允许您同时运行多个流。指南中的特定部分可在https://github.com/functional-streams-for-scala/fs2/blob/v0.9.7/docs/guide.md#concurrency

对于您的用例,您可以获取 id 队列,将它们分块,创建一个 http 任务,然后将其包装在一个流中。然后,您将同时运行此流流join并合并结果。

def createHttpRequest(ids: Seq[ID]): Task[(ElementMap, Set[ID])] = ???

def fetch(queue: Set[ID]): Task[(ElementMap, Set[ID])] = {
  val resultStreams = Stream.emits(queue.toSeq)
    .vectorChunkN(batchSize)
    .map(createHttpRequest)
    .map(Stream.eval)

  val resultStream = fs2.concurrent.join(maxOpen)(resultStreams)
  resultStream.runFold((Map.empty[ID, Element], Set.empty[ID])) {
    case ((a, b), (_a, _b)) => (a ++ _a, b ++ _b)
  }
}
于 2017-09-15T11:56:38.563 回答