我们需要将 MySQL 数据库中的 200 万条记录非规范化到 ElasticSearch。我们的开发人员在 AWS 上设置了 ElasticSearch。我编写了一个 Clojure 应用程序,它从 MySQL 中抓取数据,将其聚合成我们想要的格式,然后放入 ElasticSearch。我在我们的 EC2 实例上进行了设置,devops 人员正确设置了 AWS 角色,然后我开始运行应用程序。10分钟后,我这样做了:
curl --verbose -d '{"query": { "match_all": {} }}' -H 'Content-Type: application/json' -X GET "https://search-samedayes01-ntt7r7b7sfhy3wu.us-east-1.es.amazonaws.com/facts-over-time/_search"
我看到了:
{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":14952,"max_score":1.0,"hits": [...
惊人的!它正在工作!我看了一些文件,它们看起来不错。
又过了 15 分钟,我运行与上面相同的查询。可悲的是,我得到了相同的结果:
{"took":1,"timed_out":false,"_shards": {"total":5,"successful":5,"failed":0},"hits": {"total":14952,"max_score":1.0,"hits": [...
我当时想,什么?为什么它会接受 14,952 条记录然后停止?
如果有任何问题,我的 Clojure 函数设置为抛出错误;
(defn push-item-to-persistence
[item db]
(let [
denormalized-id (get-in item [:denormalized-id] :no-id)
item (assoc item :updated-at (temporal/current-time-as-datetime))
item (assoc item :permanent-holding-id-for-item-instances (java.util.UUID/randomUUID))
item (assoc item :instance-id-for-this-one-item (java.util.UUID/randomUUID))
item (assoc item :item-type :deduplication)
]
(if (= denormalized-id :no-id)
(slingshot/throw+ {
:type ::no-denormalized-id-in-push-item-into-database
:item item
})
(slingshot/try+
(esd/put db "facts-over-time" "deduplicaton" (str denormalized-id) item)
(println "We just put a document in ES.")
(catch Object o
(slingshot/throw+ {
:type ::push-item-to-persistence
:error o
:item item
:db db
}
))))))
如果我查看日志,没有错误,并且我一直看到打印出这一行:
We just put a document in ES.
现在已经过去了一个多小时,我们似乎仍然停留在 14,952 个文档上。
可能出了什么问题?为什么我没有看到错误?
使用 Elastisch 作为库将 Clojure 连接到 AWS ES。
更新
好的,现在至少我看到了这些异常。我不清楚他们是在哪里被抓到的。在我的代码中的任何地方,我都会重新抛出异常,因为我希望应用程序在第一个异常上死掉。这些是在某个地方被捕获的,可能在我正在使用的 Elastish 库中?或者我不小心抓到并记录在某个地方。
但这是一个有点微不足道的问题。更重要:
下一个问题是为什么我会得到这些例外。我在哪里调整 AWS ElasticSearch 以便它以合理的速度接受我们的写入。
Oct 04, 2017 6:53:44 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: I/O exception (java.net.SocketException) caught when connecting to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443: Broken pipe (Write failed)
Oct 04, 2017 7:09:06 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: Retrying connect to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443
Oct 04, 2017 6:54:13 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: I/O exception (java.net.SocketException) caught when connecting to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443: Broken pipe (Write failed)
Oct 04, 2017 7:09:09 PM org.apache.http.impl.client.DefaultHttpClient tryConnect
INFO: Retrying connect to {s}->https://search-samedayes01-ntsdht7sfhy3wu.us-east-1.es.amazonaws.com:443
更新 2
我又重新开始了。大约 920 个文档被成功放入 ElasticSearch。然后我得到:
:hostname "UnknownHost"
:type java.io.EOFException
:message "SSL peer shut down incorrectly"
什么?
此外,写入似乎非常缓慢。也许每秒 10 次操作。AWS 中一定有一些我可以调整的东西让我们的 ElasticSearch 节点接受更多的写入?我希望每秒至少有 1,000 次写入。
更新 3
所以现在我已经达到了这个应用程序大部分工作的地步,但它以我能想象的最奇怪的方式工作。
我收到一条“断管”消息,将我带到这里:
按照这个建议,我这样做了:
(System/setProperty "https.protocols" "TLSv1.1")
这似乎没有任何效果。
但现在我的应用程序这样做了:
- 以极快的速度移动,每秒可能向 ElasticSearch 写入 1 次。
- 引发“断管”异常。
- 像火箭一样起飞,开始每分钟向 ElasticSearch 写入大约 15,000 个请求。
我很高兴它终于起作用了,但我对我不知道它为什么起作用这一事实感到不舒服。
此外,每分钟 15,000 个请求实际上并没有那么快。移动 200 万个文档时,这需要 2 个多小时,这太可怕了。但是,Amazon 仅支持 ElasticSearch 的 REST 接口。我读过本机协议会快 8 倍。这听起来像我们需要的。