0

我正在玩 flink+ElasticSearch 5 Sink,使用 x-pack 身份验证。

我首先收到此错误 Flink Xpack ElasticSearch 5 ElasticsearchSecurityException missing autentication

所以我修复了它覆盖 ES Sink 函数。

我现在的问题是当我尝试在 flink 上运行作业(使用 jar)时出现此错误。

Caused by: java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
    at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
    at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
    at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
    at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
    at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
    at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
    at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
    at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
    at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
    at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:408)
    at org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:354)
    at org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:195)
    at org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:312)
    at com.ceptinel.flink.sink.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:45)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
    at java.lang.Thread.run(Thread.java:745)

看起来 flink 和 ES 客户端之间的库 io.netty 存在冲突(不确定是 x-pack-transport 还是连接器本身)

有什么办法可以避免这种冲突?

谢谢路易斯

4

2 回答 2

1

我使用的是 gradle 而不是 maven,但过程或多或少是相同的。

万一,如果您仍然有这个问题(这极不可能),我已经尝试解决并且该解决方案似乎有效。

这是我的依赖项块:

dependencies {
    ....
    compile(group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: project.flinkStreamJavaVersion)
            {
                exclude group: 'io.netty'
            }
    compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: project.flinkKafkaConnectorVersion
    compile group: 'org.apache.flink', name: 'flink-connector-elasticsearch5_2.10', version: project.flinkElasticConnectorVersion

    ....
}

然后使用以下方法将 io.netty 着色到不同的包中:

shadowJar {
    ....
    relocate 'io.netty', 'shaded.io.netty'
    ....
}

注意:问题是排除 netty 来自

flink-streaming-java_2.10

如果您尝试在不排除来自 flink-streaming 的网络的情况下进行遮蔽,它不会改变任何东西。所以排除来自 flink-streaming lib 的 netty 是非常重要的。

于 2017-06-28T07:44:57.833 回答
1

在 Apache Flink 中解决此问题之前(通过隐藏 netty 依赖项),我建议您将用户 jar 中的 netty 隐藏到不同的命名空间中。

如果您正在使用 Apache Maven 构建项目,则可以使用maven-shade-plugin来完成。另请查看有关 Flink 中阴影的文档页面:https ://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html#resolving-dependency-conflicts-with-flink-using- maven-shade-插件

于 2017-03-23T11:21:57.517 回答