我知道它可以以传统方式完成,但如果我要使用 Cassandra DB,是否有一种简单/快速和敏捷的方式将 csv 作为一组键值对添加到 DB 中?
能够添加来自 CSV 文件的时间序列数据是我的主要要求。我可以切换到任何其他数据库,例如 mongodb、rike,如果它在那里可以方便地实现的话..
我知道它可以以传统方式完成,但如果我要使用 Cassandra DB,是否有一种简单/快速和敏捷的方式将 csv 作为一组键值对添加到 DB 中?
能够添加来自 CSV 文件的时间序列数据是我的主要要求。我可以切换到任何其他数据库,例如 mongodb、rike,如果它在那里可以方便地实现的话..
2017 年 12 月 2 日编辑 2
请使用端口 9042。Cassandra 访问已更改为 CQL,默认端口为 9042,9160 是 Thrift 的默认端口。
编辑 1
有一种更好的方法可以做到这一点,无需任何编码。看看这个答案https://stackoverflow.com/a/18110080/298455
但是,如果您想进行预处理或自定义某些内容,您可能需要自己进行。这是一个冗长的方法:
创建柱族。
cqlsh> create keyspace mykeyspace
with strategy_class = 'SimpleStrategy'
and strategy_options:replication_factor = 1;
cqlsh> use mykeyspace;
cqlsh:mykeyspace> create table stackoverflow_question
(id text primary key, name text, class text);
假设您的 CSV 是这样的:
$ cat data.csv
id,name,class
1,hello,10
2,world,20
编写一个简单的 Python 代码来读取文件并转储到您的 CF 中。像这样的东西:
import csv
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily
pool = ConnectionPool('mykeyspace', ['localhost:9160'])
cf = ColumnFamily(pool, "stackoverflow_question")
with open('data.csv', 'rb') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
print str(row)
key = row['id']
del row['id']
cf.insert(key, row)
pool.dispose()
执行这个:
$ python loadcsv.py
{'class': '10', 'id': '1', 'name': 'hello'}
{'class': '20', 'id': '2', 'name': 'world'}
看数据:
cqlsh:mykeyspace> select * from stackoverflow_question;
id | class | name
----+-------+-------
2 | 20 | world
1 | 10 | hello
也可以看看:
一个。当心DictReader
b。看看Pycassa
c。谷歌将现有的 CSV 加载器加载到 Cassandra。我想有。
d。使用CQL驱动程序可能有更简单的方法,我不知道。
e. 使用适当的数据类型。我只是将它们全部包装成文本。不好。
高温高压
我没有看到时间序列要求。以下是您如何处理时间序列。
这是你的数据
$ cat data.csv
id,1383799600,1383799601,1383799605,1383799621,1383799714
1,sensor-on,sensor-ready,flow-out,flow-interrupt,sensor-killAll
创建传统的宽行。(CQL 建议不要使用COMPACT STORAGE,但这只是为了让您快速上手。)
cqlsh:mykeyspace> create table timeseries
(id text, timestamp text, data text, primary key (id, timestamp))
with compact storage;
这是修改后的代码:
import csv
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily
pool = ConnectionPool('mykeyspace', ['localhost:9160'])
cf = ColumnFamily(pool, "timeseries")
with open('data.csv', 'rb') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
print str(row)
key = row['id']
del row['id']
for (timestamp, data) in row.iteritems():
cf.insert(key, {timestamp: data})
pool.dispose()
这是你的时间序列
cqlsh:mykeyspace> select * from timeseries;
id | timestamp | data
----+------------+----------------
1 | 1383799600 | sensor-on
1 | 1383799601 | sensor-ready
1 | 1383799605 | flow-out
1 | 1383799621 | flow-interrupt
1 | 1383799714 | sensor-killAll
假设您的 CSV 看起来像
'P38-Lightning', 'Lockheed', 1937, '.7'
cqlsh
到您的数据库
和..
CREATE TABLE airplanes (
name text PRIMARY KEY,
manufacturer ascii,
year int,
mach float
);
然后...
COPY airplanes (name, manufacturer, year, mach) FROM '/classpath/temp.csv';
做备份
./cqlsh -e"copy <keyspace>.<table> to '../data/table.csv';"
使用备份
./cqlsh -e"copy <keyspace>.<table> from '../data/table.csv';"