0

我正在将 csv 导入到 postgresql 9.5.7 数据库中。但问题是,csv 部分格式错误(某些行缺少逗号,因此整个列,或者有些可能有太多,或者某些值无效)。

因此,我要么在导入之前使用外部工具清理 csv,要么让过滤由数据库本身完成。

我更喜欢第二种方法,因为在我看来它不太依赖外部 csv 清理脚本,而且所有数据验证都直接在持久性级别进行。

虽然在进行 csv 导入时通常无法处理形状错误的行,但我还是通过以下方式找到了解决此问题的方法:

  1. 将 csv 作为外部表包含到数据库中,但只有文本,并且只有一个文本列,其中包括包括逗号在内的整行。

  2. 通过拆分单个文本列各自的逗号,将该外部表插入到一个干净的目标表中。

但是在我的测试机器上导入一个包含 3300 万行的 200 MB csv 文件大约需要 6 个小时。那么肯定可以进一步优化 Insert 语句吗?我对 postgres 很陌生,所以这完全有可能。请纠正我在哪里做出了可以做得更好的决定。

现在,简要解释要建模的领域:它是关于处理传感器,这些传感器的位置正在通过其信号强度以特定时间间隔记录到各个站点。通过以毫秒精度记录这些间隔非常精确。

因此,为使其正常工作而发出的所有命令如下。

创建 fdw 服务器:

CREATE EXTENSION file_fdw;
CREATE SERVER csv_import_server FOREIGN DATA WRAPPER file_fdw;

接下来,创建外部 csv 表,其中只有一个文本列包含所有数据。一个干净的行看起来像这样:

'1465721143588,-83,55,1361'

其中第一个值是具有毫秒精度的unix 时间戳,然后是rssi信号强度值,然后是接收信号的站点 ID,然后是传感器 ID

CREATE FOREIGN TABLE signals_csv ( 
    value TEXT )
SERVER csv_import_server OPTIONS( 
filename '<path_to_file>/signals.csv', format 'text');

目标表:

CREATE TABLE signals (
    timestamp TIMESTAMP NOT NULL,
    rssi INTEGER NOT NULL,
    stations_id INTEGER NOT NULL,
    distributed_tags_id INTEGER NOT NULL,
    PRIMARY KEY(timestamp, stations_id, distributed_tags_id),
    FOREIGN KEY(stations_id) REFERENCES stations(stations_id),
    FOREIGN KEY(distributed_tags_id) REFERENCES tags(id) );

现在插入:

INSERT INTO signals (timestamp, rssi, stations_id, distributed_tags_id) SELECT
TO_TIMESTAMP( tmp.timestamp::double precision / 1000),
tmp.rssi::INTEGER,
tmp.stations_id::INTEGER,
tmp.distributed_tags_id::INTEGER
FROM ( SELECT 
    SPLIT_PART ( value, ',', 1) AS timestamp, 
    SPLIT_PART ( value, ',', 2) AS rssi, 
    SPLIT_PART ( value, ',', 3) AS stations_id,
    SPLIT_PART ( value, ',', 4) AS distributed_tags_id
        FROM signals_csv ) AS tmp WHERE (
        tmp.timestamp ~ '^[0-9]+$' AND
        tmp.rssi ~ '^-[0-9]+$' AND
        tmp.stations_id ~ '^[0-9]+$' AND
        tmp.distributed_tags_id ~ '^[0-9]+$' AND
        EXISTS ( SELECT 1 FROM tags t WHERE t.id::TEXT = tmp.distributed_tags_id ) AND
        EXISTS ( SELECT 1 FROM stations s WHERE s.stations_id::TEXT = tmp.stations_id )
        )
ON CONFLICT (timestamp, stations_id, distributed_tags_id ) DO NOTHING;

我想批量性能命中是:

  • 将 unix 时间戳转换为双精度,然后进行除法,
  • 拆分字符串的正则表达式分析。
  • 外键查找检查

但正如我所看到的,如果我想以一致的方式保持数据建模,同时又保持毫秒精度以人类可读的方式存储,那么就无法绕过这些限制。

导入的数据是干净且一致的,对于这个维度,我对我的方法感到满意;唯一的缺点是它的糟糕表现。因此,如果有人能给我指点如何改进这一点,我将非常感激。

干杯!

4

2 回答 2

0

我以不同的方式解决了它,并且可以将导入时间从 7 小时减少到只有 1 小时。

因此,我没有在 INSERT之前验证数据(在我最初的帖子的 WHERE 类中),而是让 INSERT 操作本身验证数据(因为我已经在 CREATE TABLE 中定义了列的类型)。

虽然由于 INSERT 在遇到意外数据类型时会引发异常,但我在循环中对每行执行 INSERT,以便异常仅中止当前迭代,而不是整个事务。

工作代码如下所示:

CREATE OR REPLACE FUNCTION import_tags_csv( path_to_csv TEXT ) RETURNS VOID AS $$

DECLARE

    cursor SCROLL CURSOR FOR SELECT 
        SPLIT_PART ( value, ',', 1) AS id, 
        SPLIT_PART ( value, ',', 2) AS active_from,
        SPLIT_PART ( value, ',', 3) AS active_to
        FROM csv_table;
    i BIGINT := 0;

BEGIN 

    -- create the whole foreign data wrapper for integrating the csv:
    CREATE EXTENSION file_fdw;
    CREATE SERVER csv_import_server FOREIGN DATA WRAPPER file_fdw;
    EXECUTE '
        CREATE FOREIGN TABLE csv_table ( value TEXT )
        SERVER csv_import_server OPTIONS( filename ''' || path_to_csv || ''', format ''text'')';

    -- Iterating through the rows, converting the text data and inserting it into table tags
    FOR csv_row IN cursor LOOP
    BEGIN

        i := i +1;
        INSERT INTO tags ( 
            id, 
            active_from, 
            active_to) 
            VALUES (
            csv_row.id::INTEGER,
            TO_TIMESTAMP( csv_row.active_from::double precision / 1000),
            TO_TIMESTAMP( csv_row.active_to::double precision / 1000) );

        --If invalid data is read, the table constraints throw an exception. The faulty line is dismissed
        EXCEPTION WHEN OTHERS THEN 
            RAISE NOTICE E'% \n\t line %: %\n', SQLERRM, i, csv_row;

    END;
    END LOOP;

    -- Dropping the foreign table which had the csv integrated
    DROP FOREIGN TABLE csv_table;
    DROP SERVER csv_import_server;
    DROP EXTENSION file_fdw;

END;
$$ LANGUAGE plpgsql;
于 2017-08-09T16:05:23.000 回答
0

如果要插入很多行,则可以使用COPY而不是 INSERT。

它的性能比 INSERT 好得多。

于 2017-08-04T09:58:15.197 回答