2

我正在使用 Logstash 将数据从 mysql 迁移到 elasticsearch。我的 mysql 数据库有一个名为 product 的主表,它有很多关系,要选择的查询包含大约 46 个左外连接,并且返回的结果对于一条记录来说是非常大的(50k)行。所以我打算把查询分成多个选择。我使用了 Logstash 的jdbc_streaming插件。但是,我想知道我的解决方案是否合乎逻辑且正确?

这是描述我的实现的简单配置文件(不是所有关系):

input {
    jdbc {
       jdbc_driver_library => "mysql-connector-java-5.1.47-bin.jar"
       jdbc_driver_class => "com.mysql.jdbc.Driver"
       jdbc_connection_string => "jdbc:mysql://localhost:3306/my-conn?useSSL=false&allowPublicKeyRetrieval=true"
       jdbc_user => "root"
       jdbc_password => "root"
       schedule => "* * * * *"
       #jdbc_paging_enabled => true
       #jdbc_page_size => 10000
       #jdbc_fetch_size => 5
       statement => "select product_id from product"
       clean_run => false
    }
 }
filter {

   jdbc_streaming {
      jdbc_driver_library => "mysql-connector-java-5.1.47-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_connection_string => "jdbc:mysql://localhost:3306/my-conn?useSSL=false&allowPublicKeyRetrieval=true"
      jdbc_user => "root"
      jdbc_password => "root"
      statement => "select * from product_translation where product_id = :id"
      parameters => { "id" =>"product_id"}
      target => "productTranslations"
}
aggregate {
     task_id => "%{product_id}"
     code => "
        require 'C:\elasticsearch\replication\product-replication\demo.rb' ; 
        ProductMapping.startAggregate(event, map)
     "
     push_previous_map_as_event => true
     timeout => 5
     timeout_tags => ["aggregate"]
 }

 if "aggregate" not in [tags] {
    drop{}
 }
}
output {
elasticsearch { 
    hosts => ["localhost:9200"]
    document_id => "%{productId}"
    document_type => "product"
    index => "test-products"
}
stdout { codec => rubydebug }
}
4

0 回答 0