您可以使用Avro序列化您的数据,尽管它不会很快(尤其是在示例中使用 python 时),然后将其加载到 hdfs 中。
假设您有数据库 foo:
postgres=# \c foo
You are now connected to database "foo" as user "user".
foo=#
foo=# \d bar
Table "public.bar"
Column | Type | Modifiers
--------+-------------------------+---------------------------------------------------
key | integer | not null default nextval('bar_key_seq'::regclass)
value | character varying(1024) | not null
您可以创建如下所示的 avro 架构:
{"namespace": "foo.avro",
"type": "record",
"name": "bar",
"fields": [
{"name": "id", "type": "int"},
{"name": "value", "type": "string"}
]
}
然后逐行序列化您的数据:
import psycopg2
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
schema = avro.schema.parse(open("foo.avsc").read())
writer = DataFileWriter(open("foo.avro", "w"), DatumWriter(), schema)
c = psycopg2.connect(user='user', password='s3cr3t', database='foo')
cur = c.cursor()
cur.execute('SELECT * FROM bar')
for row in cur.fetchall():
writer.append({"id": row[0], "value": row[1]})
writer.close()
cur.close()
c.close()
或者,您可以使用纯 json 序列化您的数据。