1

我正在尝试将 1000 亿(数千列,数百万行)多维时间序列数据点从 CSV 文件加载到 InfluxDB 中。

我目前正在通过以下线路协议执行此操作(我的代码库在 Python 中):

f = open(args.file, "r")
l = []
bucket_size = 100
if rows > 10000:
    bucket_size = 10
for x in tqdm(range(rows)):
    s = f.readline()[:-1].split(" ")
    v = {}
    for y in range(columns):
        v["dim" + str(y)] = float(s[y + 1])
    time = (get_datetime(s[0])[0] - datetime(1970, 1, 1)).total_seconds() * 1000000000
    time = int(time)
    body = {"measurement": "puncte", "time": time, "fields": v }
    l.append(body)
    if len(l) == bucket_size:
        while True:
            try:
                client.write_points(l)
            except influxdb.exceptions.InfluxDBServerError:
                continue
            break
        l = []
client.write_points(l)

final_time = datetime.now()
final_size = get_size()

seconds = (final_time - initial_time).total_seconds()

如上面的代码所示,我的代码正在读取数据集 CSV 文件并准备 10000 个数据点的批次,然后使用client.write_points(l).

但是,这种方法效率不高。事实上,我正在尝试加载 1000 亿个数据点,这比预期的要长,仅加载 300 万行,每行 100 列已经运行了 29 小时,还有 991 小时要完成!!!!

我确信有更好的方法将数据集加载到 InfluxDB 中。对更快的数据加载有什么建议吗?

4

1 回答 1

0

尝试在多个并发线程中加载数据。这应该可以加快多 CPU 机器的速度。

另一种选择是将 CSV 文件直接提供给时间序列数据库,而无需进行额外的转换。请参阅此示例

于 2021-09-26T09:16:03.733 回答