4

我正在尝试使用 clojurepantomime库从大量tif文档(以及其他文档)中提取/ocr 文本。

我的计划是使用 pmap 对一系列输入数据(来自 postgres 数据库)应用映射,然后使用 tika/tesseract OCR 输出更新同一个 postgres 数据库。这一直工作正常,但是我在 htop 中注意到许多内核有时处于空闲状态。

无论如何要调和这一点,我可以采取哪些步骤来确定为什么这可能会在某处阻塞?所有处理都发生在一个 tif 文件上,并且每个线程都是完全互斥的。

附加信息:

  1. 一些 tika/tesseract 过程需要 3 秒,其他需要 90 秒。一般来说,tika 受 CPU 限制很大。根据 ,我有足够的可用内存htop
  2. postgres 在会话管理中没有锁定问题,所以我认为这不会阻碍我。
  3. 也许某个地方future正在等待deref?怎么知道在哪里?

任何提示表示赞赏,谢谢。下面添加的代码。

(defn parse-a-path [{:keys [row_id, file_path]}]
      (try
        (let [
              start        (System/currentTimeMillis)
              mime_type    (pm/mime-type-of file_path)
              file_content (-> file_path (extract/parse) :text)
              language     (pl/detect-language file_content)
              ]
          {:mime_type   mime_type
          :file_content file_content
          :language     language
          :row_id       row_id
          :parse_time_in_seconds   (float (/ ( - (System/currentTimeMillis) start) 100))
          :record_status "doc parsed"})))


(defn fetch-all-batch []
      (t/info (str "Fetching lazy seq. all rows for batch.") )
      (jdbc/query (db-connection)
                  ["select
                   row_id,
                   file_path ,
                   file_extension
                   from the_table" ]))


(defn update-a-row [{:keys [row_id, file_path, file_extension] :as all-keys}]
      (let [parse-out (parse-a-path all-keys )]
        (try
          (doall
            (jdbc/execute!
              (db-connection)
              ["update the_table
               set
               record_last_updated        = current_timestamp ,
               file_content          = ?                 ,
               mime_type             = ?                 ,
               language              = ?                 ,
               parse_time_in_seconds = ?                 ,
               record_status         = ?
               where row_id = ? "
               (:file_content          parse-out) ,
               (:mime_type             parse-out) ,
               (:language              parse-out) ,
               (:parse_time_in_seconds parse-out) ,
               (:record_status         parse-out) ,
               row_id ])
            (t/debug (str "updated row_id " (:row_id parse-out) " (" file_extension ") "
                          " in " (:parse_time_in_seconds parse-out) " seconds." )))
          (catch  Exception _ ))))

(dorun
  (pmap
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )
  )
4

2 回答 2

3

pmap在批量(+ 2 个内核)上并行运行 map 函数,但保留顺序。这意味着如果您有 8 个核心,将处理 10 个项目的批次,但只有在所有 10 个都完成后才会开始新批次。

您可以创建自己的代码,使用和的组合future,这将是一个很好的学术练习。在那之后,你可以扔掉你的代码,开始使用claypoole库,它有一组抽象,涵盖了.delaydereffuture

对于这种特定情况,请使用它们的无序pmappfor实现(upmapupfor),它们的作用完全相同,pmap但没有排序;一旦批次中的任何一个项目完成,新项目就会被拾取。

在 IO 是主要瓶颈的情况下,或者工作项之间的处理时间可能有很大差异的情况下,这是并行化mapfor操作的最佳方式。

当然,您应该注意不要依赖任何类型的返回值排序。

  (require '[com.climate.claypoole :as cp])

  (cp/upmap (cp/ncpus)
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )
于 2016-04-25T15:19:44.923 回答
1

前段时间我遇到了类似的问题。我猜你的假设和我一样:

  • pmap 并行调用 f 。但这并不意味着工作是平等分享的。正如你所说,有些需要 3 秒,而另一些需要 90 秒。在 3 秒内完成的线程不会要求其他线程共享一些剩余的工作。所以完成的线程只是等待中途直到最后一个完成。

  • 您没有准确描述您的数据如何,但我会假设您正在使用某种惰性序列,这不利于并行处理。如果您的进程是 CPU 受限的并且您可以将整个输入保存在内存中,那么更喜欢使用clojure.core.reducers('map'、'filter' 和特别是 'fold')而不是使用惰性映射、过滤器和其他.

就我而言,这些技巧将处理时间从 34 秒缩短到 8 秒。希望能帮助到你

于 2016-04-25T11:33:35.410 回答