7

这是我第一次来这里,如果我发的不好,很抱歉,也很抱歉我的英语不好。

我正在尝试配置 Apache Flume 和 Elasticsearch 接收器。一切正常,似乎运行良好,但是启动代理时有2个警告;以下是:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies

我的代理配置:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

它启动了netcat,一切都很好,但我担心这些警告,我不明白。

4

3 回答 3

2

我找到了一个原因,Apache Flume 1.6.0 和 Elasticsearch 2.0 似乎无法正常通信。

我从我修改的第三个人那里找到了一个很好的水槽。

链接在这里

这是我的最终配置,

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 127.0.0.1:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这个对我有用。

感谢您的回答。

PS是的,我不得不移动图书馆。

于 2015-11-17T15:45:30.673 回答
1

查看日志,存在一些缺少依赖项的问题。

如果您查看ElasticSearchSink文档,您将看到以下内容:

你的环境需要的elasticsearch和lucene-core jars必须放在Apache Flume安装的lib目录下. Elasticsearch 要求客户端 JAR 的主要版本与服务器的主要版本匹配,并且两者都运行相同的 JVM 次要版本。如果这不正确,将会出现 SerializationExceptions。要选择所需的版本,首先确定elasticsearch的版本和目标集群正在运行的JVM版本。然后选择与主版本匹配的 elasticsearch 客户端库。0.19.x 客户端可以与 0.19.x 集群通信;0.20.x 可以与 0.20.x 对话,0.90.x 可以与 0.90.x 对话。一旦确定了 elasticsearch 版本,然后读取 pom.xml 文件以确定要使用的正确 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 代理也应该与目标集群正在运行的 JVM 匹配到次要版本。

很可能您没有放置所需的 Java jar,或者版本不合适。

于 2015-11-17T14:28:18.903 回答
1

仅在 flume/lib 目录中添加了以下 2 个 JAR 并且它有效,不必添加所有其他 Lucene JAR:

elasticsearch-1.7.1.jar

lucene-core-4.10.4.jar

启动水槽的命令:

bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console

确保将以下内容添加到flume-env.sh

export JAVA_HOME=/usr/java/default

export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib"

Flume 聚合器配置以在 ES 中加载数据:flume-aggregator.conf

agent2.sources = source1
agent2.sinks = sink1
agent2.channels = channel1

################################################
# Describe Source
################################################

# Source Avro
agent2.sources.source1.type = avro
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 9997

################################################
# Describe Interceptors
################################################
# an example of nginx access log regex match
# agent2.sources.source1.interceptors = interceptor1
# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor
# 
# agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)( \"(.*?)\" \"(.*?)\")?"
# 
# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+)
# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13
# 
# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8
# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip
# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime
# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method
# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request
# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response
# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status
# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes
# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime
#  

################################################
# Describe Sink
################################################

# Sink ElasticSearch
# Elasticsearch lib ---> flume/lib
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data.
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300
agent2.sinks.sink1.indexName = pdi
agent2.sinks.sink1.indexType = pdi_metrics
agent2.sinks.sink1.clusterName = My-ES-CLUSTER
agent2.sinks.sink1.batchSize = 1000
agent2.sinks.sink1.ttl = 2
#this serializer is crucial in order to use kibana
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer



################################################
# Describe Channel
################################################

# Channel Memory
agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 10000000
agent2.channels.channel1.transactionCapacity = 1000

################################################
# Bind the source and sink to the channel
################################################

agent2.sources.source1.channels = channel1
agent2.sinks.sink1.channel = channel1
于 2017-03-31T22:12:03.617 回答