我正在使用 Logstash 将数据从 mysql 迁移到 elasticsearch。我的 mysql 数据库有一个名为 product 的主表,它有很多关系,要选择的查询包含大约 46 个左外连接,并且返回的结果对于一条记录来说是非常大的(50k)行。所以我打算把查询分成多个选择。我使用了 Logstash 的jdbc_streaming插件。但是,我想知道我的解决方案是否合乎逻辑且正确?
这是描述我的实现的简单配置文件(不是所有关系):
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my-conn?useSSL=false&allowPublicKeyRetrieval=true"
jdbc_user => "root"
jdbc_password => "root"
schedule => "* * * * *"
#jdbc_paging_enabled => true
#jdbc_page_size => 10000
#jdbc_fetch_size => 5
statement => "select product_id from product"
clean_run => false
}
}
filter {
jdbc_streaming {
jdbc_driver_library => "mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my-conn?useSSL=false&allowPublicKeyRetrieval=true"
jdbc_user => "root"
jdbc_password => "root"
statement => "select * from product_translation where product_id = :id"
parameters => { "id" =>"product_id"}
target => "productTranslations"
}
aggregate {
task_id => "%{product_id}"
code => "
require 'C:\elasticsearch\replication\product-replication\demo.rb' ;
ProductMapping.startAggregate(event, map)
"
push_previous_map_as_event => true
timeout => 5
timeout_tags => ["aggregate"]
}
if "aggregate" not in [tags] {
drop{}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
document_id => "%{productId}"
document_type => "product"
index => "test-products"
}
stdout { codec => rubydebug }
}