3

我正在使用 ElasticSearch 的 JDBC 插件来更新我的 MySQL 数据库。它拾取新的和更改的记录,但不删除已从 MySQL 中删除的记录。它们保留在索引中。

这是我用来创建河流的代码:

curl -XPUT 'localhost:9200/_river/account_river/_meta' -d '{
    "type" : "jdbc",
    "jdbc" : {
        "driver" : "com.mysql.jdbc.Driver",
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "test_user",
        "password" : "test_pass",
        "sql" : "SELECT `account`.`id` as `_id`, `account`.`id`, `account`.`reference`, `account`.`company_name`, `account`.`also_known_as` from `account` WHERE NOT `account`.`deleted`",
        "strategy" : "simple",
        "poll" : "5s",
        "versioning" : true,
        "digesting" : false,
        "autocommit" : true,
        "index" : "headphones",
        "type" : "Account"
    }
}'

通过 OSX Mountain Lion 上的自制软件安装 ElasticSearch,没有错误或问题,一切都按预期响应。权限正常,日志中没有错误。

我已经删除并包含(并设置为真和假)autocommit,versioningdigesting我能想到的每个组合。这是一个开发数据库,​​所以我确信记录被完全删除,没有缓存也没有软删除。如果我删除所有记录(即保持河流完好无损,只删除在 ES 上索引的内容),下次河流更新时它不会重新添加记录,这让我相信我错过了有关版本控制和删除的一些内容.

注意我还尝试了各种方法来指定_id列,并且我通过 JSON on call 检查了它是否具有值。

干杯。

4

2 回答 2

3

自从问了这个问题后,参数发生了很大的变化,版本控制和摘要已被弃用,并且 poll 已被 schedule 取代,这将采取 cron 表达式关于多久重新运行一次河(下面是计划每 5 分钟运行一次) )

    curl -XPUT 'localhost:9200/_river/account_river/_meta' -d '{
        "type" : "jdbc",
        "jdbc" : {
            "driver" : "com.mysql.jdbc.Driver",
            "url" : "jdbc:mysql://localhost:3306/test",
            "user" : "test_user",
            "password" : "test_pass",
            "sql" : "SELECT `account`.`id` as `_id`, `account`.`id`, `account`.`reference`, `account`.`company_name`, `account`.`also_known_as` from `account` WHERE NOT `account`.`deleted`",
            "strategy" : "simple",
            "schedule": "0 0/5 * * * ?" ,
            "autocommit" : true,
            "index" : "headphones",
            "type" : "Account"
        }
    }'

但是对于主要问题,我从开发人员那里得到的答案是 https://github.com/jprante/elasticsearch-river-jdbc/issues/213

不再检测到行的删除。

我尝试使用版本控制进行内务处理,但这与增量更新和添加行不能很好地配合使用。

一个好的方法是窗口索引。每个时间范围(可能每天或每周一次)为河流创建一个新索引,并添加到别名中。一段时间后将删除旧索引。这种维护类似于logstash索引,但它不在河流的范围内。

我目前用作研究别名的方法是每晚重新创建索引和河流,并安排河流每隔几个小时运行一次。它确保将在当天为输入的新数据编制索引,并且每 24 小时反映一次删除

于 2014-04-02T14:43:42.337 回答
0

我对弹性仍然相对较新,并且一直在我的项目中使用 jdbc River。如果我理解正确,不一定是这种情况,这就是它的工作原理:

  1. 从数据库中获取所有行(由 River 中的 SQL 语句指定)。
  2. 从所有获取的行的(id、type 和 index)计算摘要(如果添加了新行或删除了行,这应该改变)。
  3. 为所有行重新索引文档。这将自动增加每个文档的版本。
  4. 存储在 _river 索引中的河流的增量版本(自定义)
  5. 如果 #3 中计算的摘要与存储在 _river 索引中的摘要不同,则:
    • 存储它
    • 运行管家功能(删除所有版本号较低的文档)。

因此,考虑到您希望运行内务管理,您需要将版本控制设置为true,随后这意味着也digesting应该设置true为。

话虽如此,你的河流应该是这样的:

curl -XPUT 'localhost:9200/_river/account_river/_meta' -d '{
    "type" : "jdbc",
    "jdbc" : {
        "driver" : "com.mysql.jdbc.Driver",
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "test_user",
        "password" : "test_pass",
        "sql" : "SELECT `account`.`id` as `_id`, `account`.`id`, `account`.`reference`, `account`.`company_name`, `account`.`also_known_as` from `account` WHERE NOT `account`.`deleted`",
        "strategy" : "simple",
        "poll" : "5s",
        "autocommit" : true,
        "index": {
          "index" : "headphones",
          "type" : "Account",
          "versioning" : true,
          "digesting" : true
        }
    }
}'

注意versioninganddigesting应该是index定义的一部分而不是jdbc定义

于 2014-01-21T17:55:51.920 回答