1

我正在使用 5.1 版的 Active Pivot,但计划升级到 5.2。我想使用 CsvSource 读取数据并接收实时更新。

4

1 回答 1

2

介绍

本文解释了有关如何将数据从 Hadoop 读取到 Active Pivot 的一些内容。这已使用 Active Pivot 5.1 和 5.2 进行了测试。简而言之,您有两种方法可以填补空白:

  • 使用挂载的 HDFS,这使您的 HDFS 类似于磁盘

  • 使用 Hadoop Java API

使用挂载的 HDFS

您可以使用某些 Hadoop 发行版轻松挂载 HDFS。(例如:使用 Cloudera CDH 5 安装 HDFS 很容易。)

这样做之后,您的 Active Pivot 服务器上将有一个挂载点链接到您的 HDFS,它的行为就像一个普通磁盘。(至少对于阅读来说,写作有一些限制)

例如,如果您的 HDFS 上有 csv 文件,则可以直接使用 Active Pivot Csv Source。

使用 Hadoop Java API

另一种方法是使用 Hadoop Java API:http ://hadoop.apache.org/docs/current/api/

几个主要的类使用:

org.apache.hadoop.fs.FileSystem - 用于 Hadoop 的常见操作。

org.apache.hadoop.conf.Configuration - 用于配置 FileSystem 对象。

org.apache.hadoop.hdfs.client.HdfsAdmin- 可用于观看事件(例如:新文件添加到 HDFS)

注意:观察事件适用于 Hadoop 2.6.0 及更高版本。对于以前的 Hadoop,您可以构建自己的或使用已安装的 HDFS 和现有的 FileWatcher。

依赖项

您将需要很少的 Hadoop 依赖项。

请注意 Hadoop 依赖项和 Jaxb 上的 Active Pivot 依赖项之间可能存在冲突。

在以下 pom.xml 中,解决方案是从 Hadoop 依赖项中排除 Jaxb 依赖项。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-auth</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- These 2 dependencies have conflicts with ActivePivotCva on Jaxb -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.sun.xml.bind</groupId>
            <artifactId>jaxb-impl</artifactId>
        </exclusion>
        <exclusion>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
    <exclusions>
        <exclusion>
            <groupId>com.sun.xml.bind</groupId>
            <artifactId>jaxb-impl</artifactId>
        </exclusion>
        <exclusion>
            <groupId>javax.xml.bind</groupId>
            <artifactId>jaxb-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

特性

您将需要定义至少 2 个属性:

  • Hadoop 地址(例如:hdfs://localhost:9000)

  • 文件的 HDFS 路径(例如:/user/quartetfs/data/)

如果您的集群是安全的,那么您将需要弄清楚如何以安全的方式远程访问它。

从 Hadoop 读取文件的示例

// Configuring

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");

FileSystem hdfs = FileSystem.get(this.conf);
Path filePath = new Path(/user/username/input/file.txt);

// Reading
BufferedReader bfr =new BufferedReader(new InputStreamReader(hdfs.open(filePath)));
String str = null;

while ((str = bfr.readLine()) != null)
{
       System.out.println(str);
}

Hadoop源

当您能够从 HDFS 读取数据时,您现在可以像编写其他源一样编写 Hadoop 源。

例如,您可以创建一个实现 ISource 的 HadoopSource。

你可以在你的 SourceConfig 中启动它,你可以从你的环境中检索你的属性。

监视事件(例如:新文件)

如果您想检索存储在 HDFS 上的文件,您可以创建另一个类来监视事件。

一个示例是以下代码,您可以在其中使用自己的方法来处理某些事件。(例如在以下代码中:onCreation()、onAppend())

protected HdfsAdmin admin;
protected String threadName;

public void run()
{
    DFSInotifyEventInputStream eventStream;

    try
    {
      eventStream = admin.getInotifyEventStream();
      LOGGER.info(" - Thread: " + this.threadName + "Starting catching events.");

      while (true)
      {

        try
        {
          Event event = eventStream.take();

          // Possible eventType: CREATE, APPEND, CLOSE, RENAME, METADATA, UNLINK
          switch (event.getEventType())
          {
          case CREATE:
            CreateEvent createEvent = (CreateEvent) event;
            onCreation(createEvent.getPath());
            break;

          case APPEND:
            AppendEvent appendEvent = (AppendEvent) event;
            onAppend(appendEvent.getPath());
            break;

          default:
            break;
          }

        } catch (InterruptedException e) {
          e.printStackTrace();

        } catch (MissingEventsException e) {
          e.printStackTrace();
        }
      }
    } catch (IOException e1) {
      LOGGER.severe(" - Thread: " + this.threadName + "Failure to start the eventStream");
      e1.printStackTrace();
    }
  }

我为我的 onCreation 方法(未显示)所做的是将新创建的文件存储到并发队列中,以便我的 HadoopSource 可以并行检索多个文件。

-

如果我在某些方面不够清楚,或者您有任何问题,请随时提出。

于 2015-07-16T10:07:35.930 回答