2

我目前正在从事一个大数据项目,用于对 Twitter 的热门话题进行情绪分析。我跟着 cloudera 的教程,了解了如何通过 flume 将推文发送到 Hadoop。

http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/

水槽.conf:

# 获得 Apache 软件基金会 (ASF) 的一项许可

# 或更多贡献者许可协议。请参阅通知文件

# 随本作品分发以获取更多信息

# 关于版权所有权。ASF 许可此文件

# 根据 Apache 许可证,版本 2.0(

# “执照”); 除非合规,否则您不得使用此文件

# 使用许可证。您可以在以下网址获取许可证的副本

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# 除非适用法律要求或书面同意,

# 根据许可分发的软件分布在

#“原样”基础,不提供任何保证或条件

# 种类,明示或暗示。请参阅许可证

# 特定语言管理权限和限制

# 根据许可证。



# 配置文件需要定义源,

# 通道和接收器。

# 每个代理都定义了源、通道和接收器,

# 在这种情况下称为“TwitterAgent”


TwitterAgent.sources = 推特

TwitterAgent.channels = MemChannel

TwitterAgent.sinks = HDFS


TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey =

TwitterAgent.sources.Twitter.consumerSecret =

TwitterAgent.sources.Twitter.accessToken =  

TwitterAgent.sources.Twitter.accessTokenSecret =  

TwitterAgent.sources.Twitter.keywords = hadoop、大数据、分析、大数据、cloudera、数据科学、数据科学家、商业智能、mapreduce、数据仓库、数据仓库、mahout、hbase、nosql、newsql、商业智能、云计算


TwitterAgent.sinks.HDFS.channel = MemChannel

TwitterAgent.sinks.HDFS.type = hdfs

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

TwitterAgent.sinks.HDFS.hdfs.fileType = 数据流

TwitterAgent.sinks.HDFS.hdfs.writeFormat = 文本

TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000

TwitterAgent.sinks.HDFS.hdfs.rollSize = 0

TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000


TwitterAgent.channels.MemChannel.type = 内存

TwitterAgent.channels.MemChannel.capacity = 10000

TwitterAgent.channels.MemChannel.transactionCapacity = 100

现在要将其扩展到我的应用程序,我需要水槽配置文件中的关键字部分来获得热门主题,我想出了 Java 代码来获取热门主题,但现在我有一个问题,我不知道如何将此代码连接到水槽配置文件或如何在关键字部分添加实时趋势主题的新文件。我在网上搜索了很多,因为我是这个领域的初学者,如果你提供一些信息或至少有一些其他的选择,这将有很大的帮助。

4

1 回答 1

1

一个非常有趣的问题..!

我同意@cricket_007 的评论 - 无法在不重新启动 Flume 代理的情况下编辑配置。

我不能说太多,因为我还没有看到你的 Java 代码来获取热门话题的关键字。但是,根据您提供的信息,我可以想到一种替代方法(或者我应该说一种解决方法)-但我自己还没有尝试过。

您可以像这样修改TwitterSource.java类:

public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);

//MODIFY THE FOLLOWING PORTION
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
if (keywordString.trim().length() == 0) {
    keywords = new String[0];
} else {
  keywords = keywordString.split(",");
  for (int i = 0; i < keywords.length; i++) {
    keywords[i] = keywords[i].trim();
  }
}
//UNTIL THIS POINT

ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);

twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); 
}

我已经在上面的注释中添加了初始化关键字字符串变量的地方——你可以调用你的java代码(我假设它是一个可以返回逗号分隔的关键字字符串的方法)而不是从flume.conf 中可用的上下文(只需删除 context.getString() 部分)。

除此之外,只需从 flume.conf 中删除以下语句:

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

我希望这有帮助。

于 2018-04-04T15:31:41.240 回答