我正在使用 5.1 版的 Active Pivot,但计划升级到 5.2。我想使用 CsvSource 读取数据并接收实时更新。
1 回答
介绍
本文解释了有关如何将数据从 Hadoop 读取到 Active Pivot 的一些内容。这已使用 Active Pivot 5.1 和 5.2 进行了测试。简而言之,您有两种方法可以填补空白:
使用挂载的 HDFS,这使您的 HDFS 类似于磁盘
使用 Hadoop Java API
使用挂载的 HDFS
您可以使用某些 Hadoop 发行版轻松挂载 HDFS。(例如:使用 Cloudera CDH 5 安装 HDFS 很容易。)
这样做之后,您的 Active Pivot 服务器上将有一个挂载点链接到您的 HDFS,它的行为就像一个普通磁盘。(至少对于阅读来说,写作有一些限制)
例如,如果您的 HDFS 上有 csv 文件,则可以直接使用 Active Pivot Csv Source。
使用 Hadoop Java API
另一种方法是使用 Hadoop Java API:http ://hadoop.apache.org/docs/current/api/
几个主要的类使用:
org.apache.hadoop.fs.FileSystem
- 用于 Hadoop 的常见操作。
org.apache.hadoop.conf.Configuration
- 用于配置 FileSystem 对象。
org.apache.hadoop.hdfs.client.HdfsAdmin
- 可用于观看事件(例如:新文件添加到 HDFS)
注意:观察事件适用于 Hadoop 2.6.0 及更高版本。对于以前的 Hadoop,您可以构建自己的或使用已安装的 HDFS 和现有的 FileWatcher。
依赖项
您将需要很少的 Hadoop 依赖项。
请注意 Hadoop 依赖项和 Jaxb 上的 Active Pivot 依赖项之间可能存在冲突。
在以下 pom.xml 中,解决方案是从 Hadoop 依赖项中排除 Jaxb 依赖项。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0</version>
</dependency>
<!-- These 2 dependencies have conflicts with ActivePivotCva on Jaxb -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>
特性
您将需要定义至少 2 个属性:
Hadoop 地址(例如:hdfs://localhost:9000)
文件的 HDFS 路径(例如:/user/quartetfs/data/)
如果您的集群是安全的,那么您将需要弄清楚如何以安全的方式远程访问它。
从 Hadoop 读取文件的示例
// Configuring
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem hdfs = FileSystem.get(this.conf);
Path filePath = new Path(/user/username/input/file.txt);
// Reading
BufferedReader bfr =new BufferedReader(new InputStreamReader(hdfs.open(filePath)));
String str = null;
while ((str = bfr.readLine()) != null)
{
System.out.println(str);
}
Hadoop源
当您能够从 HDFS 读取数据时,您现在可以像编写其他源一样编写 Hadoop 源。
例如,您可以创建一个实现 ISource 的 HadoopSource。
你可以在你的 SourceConfig 中启动它,你可以从你的环境中检索你的属性。
监视事件(例如:新文件)
如果您想检索存储在 HDFS 上的文件,您可以创建另一个类来监视事件。
一个示例是以下代码,您可以在其中使用自己的方法来处理某些事件。(例如在以下代码中:onCreation()、onAppend())
protected HdfsAdmin admin;
protected String threadName;
public void run()
{
DFSInotifyEventInputStream eventStream;
try
{
eventStream = admin.getInotifyEventStream();
LOGGER.info(" - Thread: " + this.threadName + "Starting catching events.");
while (true)
{
try
{
Event event = eventStream.take();
// Possible eventType: CREATE, APPEND, CLOSE, RENAME, METADATA, UNLINK
switch (event.getEventType())
{
case CREATE:
CreateEvent createEvent = (CreateEvent) event;
onCreation(createEvent.getPath());
break;
case APPEND:
AppendEvent appendEvent = (AppendEvent) event;
onAppend(appendEvent.getPath());
break;
default:
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (MissingEventsException e) {
e.printStackTrace();
}
}
} catch (IOException e1) {
LOGGER.severe(" - Thread: " + this.threadName + "Failure to start the eventStream");
e1.printStackTrace();
}
}
我为我的 onCreation 方法(未显示)所做的是将新创建的文件存储到并发队列中,以便我的 HadoopSource 可以并行检索多个文件。
-
如果我在某些方面不够清楚,或者您有任何问题,请随时提出。