我正在尝试将 1000 亿(数千列,数百万行)多维时间序列数据点从 CSV 文件加载到 InfluxDB 中。
我目前正在通过以下线路协议执行此操作(我的代码库在 Python 中):
f = open(args.file, "r")
l = []
bucket_size = 100
if rows > 10000:
bucket_size = 10
for x in tqdm(range(rows)):
s = f.readline()[:-1].split(" ")
v = {}
for y in range(columns):
v["dim" + str(y)] = float(s[y + 1])
time = (get_datetime(s[0])[0] - datetime(1970, 1, 1)).total_seconds() * 1000000000
time = int(time)
body = {"measurement": "puncte", "time": time, "fields": v }
l.append(body)
if len(l) == bucket_size:
while True:
try:
client.write_points(l)
except influxdb.exceptions.InfluxDBServerError:
continue
break
l = []
client.write_points(l)
final_time = datetime.now()
final_size = get_size()
seconds = (final_time - initial_time).total_seconds()
如上面的代码所示,我的代码正在读取数据集 CSV 文件并准备 10000 个数据点的批次,然后使用client.write_points(l)
.
但是,这种方法效率不高。事实上,我正在尝试加载 1000 亿个数据点,这比预期的要长,仅加载 300 万行,每行 100 列已经运行了 29 小时,还有 991 小时要完成!!!!
我确信有更好的方法将数据集加载到 InfluxDB 中。对更快的数据加载有什么建议吗?