1

我需要从 Postgres 数据库获取数据到 Accumulo 数据库。我们希望使用序列文件来运行 map/reduce 作业来做到这一点,但不知道如何开始。出于内部技术原因,我们需要避免使用 Sqoop。

如果没有 Sqoop,这可能吗?同样,我真的不知道从哪里开始。我是否编写了一个 java 类来将所有记录(数百万)读入 JDBC 并以某种方式将其输出到 HDFS 序列文件?

感谢您的任何意见!

PS - 我应该提到使用分隔文件是我们现在遇到的问题。我们的一些是包含分隔符的长字符字段,因此无法正确解析。该字段甚至可能有一个选项卡。我们想从 Postgres 直接转到 HDFS 而无需解析。

4

3 回答 3

1

您可以使用Avro序列化您的数据,尽管它不会很快(尤其是在示例中使用 python 时),然后将其加载到 hdfs 中。

假设您有数据库 foo:

postgres=# \c foo
You are now connected to database "foo" as user "user".
foo=# 

foo=# \d bar
                              Table "public.bar"
Column |          Type           |                     Modifiers                     
--------+-------------------------+---------------------------------------------------
key    | integer                 | not null default nextval('bar_key_seq'::regclass)
value  | character varying(1024) | not null

您可以创建如下所示的 avro 架构:

{"namespace": "foo.avro",
 "type": "record",
 "name": "bar",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "value", "type": "string"}
 ]
}

然后逐行序列化您的数据:

import psycopg2
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("foo.avsc").read())
writer = DataFileWriter(open("foo.avro", "w"), DatumWriter(), schema)

c = psycopg2.connect(user='user', password='s3cr3t', database='foo')
cur = c.cursor()
cur.execute('SELECT * FROM bar')

for row in cur.fetchall():
    writer.append({"id": row[0], "value": row[1]})

writer.close()
cur.close()
c.close()

或者,您可以使用纯 json 序列化您的数据。

于 2013-09-04T09:05:58.377 回答
0

您可以将数据库中的数据导出为 CSV 或制表符分隔、竖线分隔或 Ctrl-A (Unicode 0x0001) 分隔文件。然后您可以将这些文件复制到 HDFS 并运行一个非常简单的 MapReduce 作业,甚至可能只包含一个 Mapper 并配置为读取您使用的文件格式并输出序列文件。

这将允许在 Hadoop 集群的服务器之间分配创建序列文件的负载。

此外,这很可能不是一次性交易。您必须定期将 Postgres 数据库中的数据加载到 HDFS 中。他们将能够调整您的 MapReduce 作业以合并新数据。

于 2013-09-03T18:48:07.267 回答
0

http://sqoop.apache.org/应该可以满足您的要求。

于 2017-03-17T15:46:17.510 回答