1

我们需要将 MySQL 数据库中的 200 万条记录非规范化到 ElasticSearch。我们的开发人员在 AWS 上设置了 ElasticSearch。我编写了一个 Clojure 应用程序,它从 MySQL 中抓取数据,将其聚合成我们想要的格式,然后放入 ElasticSearch。我在我们的 EC2 实例上进行了设置,devops 人员正确设置了 AWS 角色,然后我开始运行应用程序。10分钟后,我这样做了:

curl --verbose  -d '{"query": { "match_all": {} }}' -H 'Content-Type: application/json' -X GET "https://search-samedayes01-ntt7r7b7sfhy3wu.us-east-1.es.amazonaws.com/facts-over-time/_search"

我看到了:

{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":14952,"max_score":1.0,"hits": [...

惊人的!它正在工作!我看了一些文件,它们看起来不错。

又过了 15 分钟,我运行与上面相同的查询。可悲的是,我得到了相同的结果:

{"took":1,"timed_out":false,"_shards":    {"total":5,"successful":5,"failed":0},"hits":    {"total":14952,"max_score":1.0,"hits": [...

我当时想,什么?为什么它会接受 14,952 条记录然后停止?

如果有任何问题,我的 Clojure 函数设置为抛出错误;

(defn push-item-to-persistence
  [item db]
  (let [
        denormalized-id (get-in item [:denormalized-id] :no-id)
        item (assoc item :updated-at (temporal/current-time-as-datetime))
        item (assoc item :permanent-holding-id-for-item-instances (java.util.UUID/randomUUID))
        item (assoc item :instance-id-for-this-one-item (java.util.UUID/randomUUID))
        item (assoc item :item-type :deduplication)
        ]
    (if (= denormalized-id :no-id)
      (slingshot/throw+ {
                         :type ::no-denormalized-id-in-push-item-into-database
                         :item item
                         })
      (slingshot/try+
       (esd/put db "facts-over-time" "deduplicaton" (str denormalized-id) item)
       (println "We just put a document in ES.")
       (catch Object o
         (slingshot/throw+ {
                            :type ::push-item-to-persistence
                            :error o
                            :item item
                            :db db
                            }
                           ))))))

如果我查看日志,没有错误,并且我一直看到打印出这一行:

We just put a document in ES.    

现在已经过去了一个多小时,我们似乎仍然停留在 14,952 个文档上。

可能出了什么问题?为什么我没有看到错误?

使用 Elastisch 作为库将 Clojure 连接到 AWS ES。

更新

好的,现在至少我看到了这些异常。我不清楚他们是在哪里被抓到的。在我的代码中的任何地方,我都会重新抛出异常,因为我希望应用程序在第一个异常上死掉。这些是在某个地方被捕获的,可能在我正在使用的 Elastish 库中?或者我不小心抓到并记录在某个地方。

但这是一个有点微不足道的问题。更重要:

下一个问题是为什么我会得到这些例外。我在哪里调整 AWS ElasticSearch 以便它以合理的速度接受我们的写入。

Oct 04, 2017 6:53:44 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: I/O exception (java.net.SocketException) caught when connecting to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443: Broken pipe (Write failed)

Oct 04, 2017 7:09:06 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: Retrying connect to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443

Oct 04, 2017 6:54:13 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: I/O exception (java.net.SocketException) caught when connecting to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443: Broken pipe (Write failed)

Oct 04, 2017 7:09:09 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: Retrying connect to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443

更新 2

我又重新开始了。大约 920 个文档被成功放入 ElasticSearch。然后我得到:

:hostname "UnknownHost"
:type java.io.EOFException
:message "SSL peer shut down incorrectly"

什么?

此外,写入似乎非常缓慢。也许每秒 10 次操作。AWS 中一定有一些我可以调整的东西让我们的 ElasticSearch 节点接受更多的写入?我希望每秒至少有 1,000 次写入。

更新 3

所以现在我已经达到了这个应用程序大部分工作的地步,但它以我能想象的最奇怪的方式工作。

我收到一条“断管”消息,将我带到这里:

Java 中的 SSL 对等点错误关闭

按照这个建议,我这样做了:

(System/setProperty "https.protocols" "TLSv1.1")

这似乎没有任何效果。

但现在我的应用程序这样做了:

  1. 以极快的速度移动,每秒可能向 ElasticSearch 写入 1 次。
  2. 引发“断管”异常。
  3. 像火箭一样起飞,开始每分钟向 ElasticSearch 写入大约 15,000 个请求。

我很高兴它终于起作用了,但我对我不知道它为什么起作用这一事实感到不舒服。

此外,每分钟 15,000 个请求实际上并没有那么快。移动 200 万个文档时,这需要 2 个多小时,这太可怕了。但是,Amazon 仅支持 ElasticSearch 的 REST 接口。我读过本机协议会快 8 倍。这听起来像我们需要的。

4

0 回答 0