0

我正在尝试使用 Java Bulk API 在 ElasticSearch 中创建文档。我使用 JSON 文件作为批量输入。当我执行时,我得到以下异常:

Exception in thread "main" NoNodeAvailableException[None of the configured nodes are available: []]
at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:280)
at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:197)
at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55)
at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:272)
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:347)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59)
at bulkApi.test.App.main(App.java:92)

这是java代码。

package bulkApi.test;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;


/**
 * Hello world!
 *
 */


public class App 
{
    public static void main( String[] args ) throws IOException, ParseException
    {
        System.out.println( "Hello World!" );
     // configuration setting

        Settings settings = Settings.settingsBuilder()
        .put("cluster.name", "test-cluster").build();
        TransportClient client = TransportClient.builder().settings(settings).build();

        String hostname = "localhost";
        int port = 9300;
        client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostname),port));
        // bulk API
        BulkRequestBuilder bulkBuilder = client.prepareBulk();

        long bulkBuilderLength = 0;
        String readLine = "";
        String index = "testindex";
        String type = "testtype";
        String _id = null;

        BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("/home/nilesh/Shiney/Learn/elasticSearch/ES/test/parseAccount.json"))));
        JSONParser parser = new JSONParser();

        while((readLine = br.readLine()) != null){
        // it will skip the metadata line which is supported by bulk insert format
            if (readLine.startsWith("{\"index")){
                continue;
            }
            else {
                Object json = parser.parse(readLine);
                if(((JSONObject)json).get("account_number")!=null){
                    _id = String.valueOf(((JSONObject)json).get("account_number"));
                    System.out.println(_id);
                }

            //_id is unique field in elasticsearch
            JSONObject jsonObject = (JSONObject) json;
            bulkBuilder.add(client.prepareIndex(index, type, String.valueOf(_id)).setSource(jsonObject));
            bulkBuilderLength++;

            try {
                if(bulkBuilderLength % 100== 0){
                    System.out.println("##### " + bulkBuilderLength + "data indexed.");
                    BulkResponse bulkRes = bulkBuilder.execute().actionGet();
                if(bulkRes.hasFailures()){
                    System.out.println("##### Bulk Request failure with error: " +                                              bulkRes.buildFailureMessage());
                        }
                    bulkBuilder = client.prepareBulk();
                    }
                } catch (Exception e) {
                e.printStackTrace();
                }
        }
    }
        br.close();

        if(bulkBuilder.numberOfActions() > 0){
            System.out.println("##### " + bulkBuilderLength + " data indexed.");
            BulkResponse bulkRes = bulkBuilder.execute().actionGet();
            if(bulkRes.hasFailures()){
                System.out.println("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
            }
            bulkBuilder = client.prepareBulk();
        }
    }
}
4

1 回答 1

0

几件事要检查:

  1. 检查您的集群名称是否与 elastic.yml 中定义的名称相同
  2. 尝试使用来自任何 REST 客户端(例如:AdvancedRestClient) http://localhost:9200/testindex的 PUT api 使用索引名称(在您的情况下为“testindex”)创建索引(您的请求必须成功,即状态代码==200 )

使用上述 PUT 请求创建索引后运行程序。

于 2016-09-05T10:44:21.033 回答