我目前正在使用 jdbc-river 从 Sql Server 数据库/表中读取数据。到目前为止,我已经为数据库中的每个表创建了一个单独的类型。作为我实现的下一步,我想使用父/子类型,以便我可以转换我的 sql 表之间的关系并存储它们。
Table1
Col_id| name| prop1|prop2|prop3
child_table1
col_id| table_id| child_prop1|child_prop2|child_prop3
curl -XPUT 'localhost:9200/_river/parent/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from table1",
"index" : "index1",
"type" : "parent"
}
}'
curl -XPUT 'localhost:9200/_river/child/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from child_table1",
"index" : "index1",
"type" : "child"
}
}'
curl -XPOST 'localhost:9200/_river/child/_mapping' -d '{
"child":{
"_parent": {"type": "parent"}
}
}'
我想以以下格式存储我的数据
{
"id": "1",
"name": "A leading wordsmith",
"prop1": "data",
"prop2": "data",
"prop3": "data",
"child": [
{
"child_prop1": "data",
"child_prop2": "data",
"child_prop3": "data",
}
{
"child_prop1": "data1",
"child_prop2": "data1",
"child_prop3": "data1",
}
]
}
任何人都可以评论我如何使用 jdbc-rivers 将我的数据存储为上述场景的父/子类型。
更新 基于以下反馈是更新的映射和元数据。
curl -XPOST 'http://localhost:9200/library' -d '{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"person": {
"properties": {
"person_id": {
"type": "integer"
},
"name": {
"type": "string"
}
}
},
"work": {
"_parent": {
"type": "person"
},
"properties": {
"person_id": {
"type": "integer",
"index": "not_analyzed"
},
"name": {
"type": "string"
},
"genre": {
"type": "string"
},
"publisher": {
"type": "string"
}
}
}
}
}'
curl -XPUT localhost:9200/_river/person/_meta -d '{
"type": "jdbc",
"jdbc": {
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url": "jdbc:sqlserver://127.0.0.1:1433;databaseName=blogcontext",
"user": "sa",
"password": "password",
"sql": "select person_id as _id, name from person",
"poll": "30s"
},
"index": {
"index": "library",
"type": "person",
"bulk_size": 500,
"autocommit": true
}
}'
curl -XPUT localhost:9200/_river/work/_meta -d '{
"type": "jdbc",
"jdbc": {
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url": "jdbc:sqlserver://127.0.0.1:1433;databaseName=blogcontext",
"user": "sa",
"password": "password",
"sql": "select person_id as _parent,name,genre,publisher from work",
"poll": "30s"
},
"index": {
"index": "library",
"type": "work",
"bulk_size": 500,
"autocommit": true
}
}'
日志文件
[2014-01-14 07:10:35,488][ERROR][OneShotRiverMouth ] bulk [1] error
org.elasticsearch.ElasticSearchIllegalArgumentException: Can't specify parent if no parent field has been configured
at org.elasticsearch.action.index.IndexRequest.process(IndexRequest.java:597)
at org.elasticsearch.action.bulk.TransportBulkAction.executeBulk(TransportBulkAction.java:165)
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:140)
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:63)
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:92)
at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:149)
at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:283)
at org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:46)
at org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:336)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
谢谢