1

我已经安装了水槽并试图将 Twitter 数据输入 hdfs 文件夹。

我的 flume.conf 文件如下所示:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = <required>
TwitterAgent.sources.Twitter.consumerSecret = <required>
TwitterAgent.sources.Twitter.accessToken = <required>
TwitterAgent.sources.Twitter.accessTokenSecret = <required>
TwitterAgent.sources.Twitter.keywords = hadoop, big data, china, india.
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

我遇到以下错误:

2014-11-03 02:00:49,834 (Twitter Stream consumer-1[Establishing connection]) [DEBUG -  twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] User-Agent: twitter4j http://twitter4j.org/ /2.2.6
2014-11-03 02:00:49,834 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Connection: close
2014-11-03 02:00:49,835 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-Version: 2.2.6
2014-11-03 02:00:49,835 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-URL: http://twitter4j.org/en/twitter4j-2.2.6.xml
2014-11-03 02:00:49,836 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Accept-Encoding: gzip
2014-11-03 02:00:49,836 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client: Twitter4J
2014-11-03 02:00:49,837 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:75)] Post Params: count=0&track=hadoop%2Cbig%20data%2Canalytics%2Cbigdata%2Ccloudera%2Cdata%20science&include_entities=true
2014-11-03 02:00:49,843 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Connection refused
2014-11-03 02:00:49,843 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Waiting for 2000 milliseconds
2014-11-03 02:00:49,843 (Twitter Stream consumer-1[Waiting for 2000 milliseconds]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Twitter Stream consumer-1[Waiting for 2000 milliseconds]
2014-11-03 02:00:51,843 (Twitter Stream consumer-1[Waiting for 2000 milliseconds]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Connection refused
2014-11-03 02:00:51,844 (Twitter Stream consumer-1[Waiting for 2000 milliseconds]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Establishing connection.

我的大学网络配备了代理服务器。我认为问题是由于代理服务器。

如何使用带有水槽的代理?

4

1 回答 1

3

从https://github.com/cloudera/cdh-twitter-example构建 jar

解压缩,然后在里面执行(如前所述):

转到 /cdh-twitter-example-master/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java

并添加此行

cb.setHttpProxyHost("your proxy");
cb.setHttpProxyPort(8080);//port
cb.setHttpProxyUser("");
cb.setHttpProxyPassword("");

$ cd 水槽源

$ mvn 包

den 将 jar 从 target 放到 flume lib folder.enjoy

于 2015-02-11T09:58:45.267 回答