我正在玩 flink+ElasticSearch 5 Sink,使用 x-pack 身份验证。
我首先收到此错误 Flink Xpack ElasticSearch 5 ElasticsearchSecurityException missing autentication
所以我修复了它覆盖 ES Sink 函数。
我现在的问题是当我尝试在 flink 上运行作业(使用 jar)时出现此错误。
Caused by: java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:408)
at org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:354)
at org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:195)
at org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:312)
at com.ceptinel.flink.sink.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:45)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
at java.lang.Thread.run(Thread.java:745)
看起来 flink 和 ES 客户端之间的库 io.netty 存在冲突(不确定是 x-pack-transport 还是连接器本身)
有什么办法可以避免这种冲突?
谢谢路易斯