在通过 Kafka 和 Spark 处理 Avro 消息流时,我将处理后的数据保存为 ElasticSearch 索引中的文档。这是代码(简化):
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
MyType t = deserialize(encodedAvroData);
// Creating the ElasticSearch Transport client
Settings settings = Settings.builder()
.put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
IndexRequest indexRequest = new IndexRequest("index", "item", id)
.source(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
.doc(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
client.close();
一切都按预期工作;唯一的问题是性能:保存到 ES 需要一些时间,我想这是因为我为每个 RDD 打开/关闭了一个 ES 传输客户端。Spark文档表明这种方法是非常正确的:据我了解,唯一可能的优化是使用 rdd.foreachPartition,但我只有一个分区,所以我不确定这是否有益。还有其他解决方案可以实现更好的性能吗?