0

在通过 Kafka 和 Spark 处理 Avro 消息流时,我将处理后的数据保存为 ElasticSearch 索引中的文档。这是代码(简化):

    directKafkaStream.foreachRDD(rdd ->{

        rdd.foreach(avroRecord -> {
            byte[] encodedAvroData = avroRecord._2;
            MyType t = deserialize(encodedAvroData);

    // Creating the ElasticSearch Transport client
    Settings settings = Settings.builder()
            .put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
    TransportClient client = new PreBuiltTransportClient(settings)
            .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

    IndexRequest indexRequest = new IndexRequest("index", "item", id)
            .source(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject());

    UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
            .doc(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject())
            .upsert(indexRequest);

    client.update(updateRequest).get();

    client.close();

一切都按预期工作;唯一的问题是性能:保存到 ES 需要一些时间,我想这是因为我为每个 RDD 打开/关闭了一个 ES 传输客户端。Spark文档表明这种方法是非常正确的:据我了解,唯一可能的优化是使用 rdd.foreachPartition,但我只有一个分区,所以我不确定这是否有益。还有其他解决方案可以实现更好的性能吗?

4

2 回答 2

0

我会将处理后的消息流式传输回一个单独的 Kafka 主题,然后使用 Kafka Connect 将它们发送到Elasticsearch。这将您的 Spark 特定处理与将数据导入 Elasticsearch 分离。

实际操作示例:https ://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/

于 2017-11-28T11:31:38.827 回答
0

因为每当处理 RDD 记录时,您都会创建新的连接。所以,我认为foreachPartition不管只有一个分区, use 都会有更好的性能,因为它可以帮助你把你的 ES 连接实例带到外面,在循环中重用它。

于 2017-11-28T10:55:38.823 回答