2

我正在使用 scalikejdbc 访问一个巨大的表。我的理解是 - 它在我可以映射或迭代它们之前将所有行提取到内存中。

目前我有一个使用 rxscala Observable 的实现,非常简单。但是接收器比读取sql慢,然后我因为缓冲而得到OutOfMemory。这是我目前的制片人,可以观察到:

  def fetchProductsAsObservable(
    sql: SQL[Nothing,NoExtractor],
    extractor: (WrappedResultSet) => ProductItem)
  ) =
    Observable[ProductItem](o =>
      try {
          sql.foreach(row => o.onNext(extractor(row)))
          o.onCompleted()
      } catch {
        case e: Throwable => o.onError(e)
      }
    )

我知道 SQL.foreach 方法,但它获取一个回调方法并返回 Unit。我的背景是.NET。无法弄清楚自己如何使用 scalikejdbc 在 scala 中正确实现一个简单的迭代器,我可以将其用于并行处理?

4

1 回答 1

-1

您的问题与 ScalikeJDBC 行为无关,而是与 RxJava 相关。由于您的 Observable 由热源提供营养,因此您需要使用背压策略。

于 2017-03-29T14:38:48.657 回答