我需要以编程方式将数千万条记录插入 Postgres 数据库。目前,我正在一个查询中执行数千个插入语句。
有没有更好的方法来做到这一点,一些我不知道的批量插入语句?
PostgreSQL 有关于如何最好地最初填充数据库的 a guide,他们建议使用 COPY 命令批量加载行。该指南还有一些其他关于如何加快流程的好技巧,例如在加载数据之前删除索引和外键(然后再将它们添加回来)。
除了使用 COPY,还有一种替代方法,它是 Postgres 支持的多行值语法。从 documentation:
INSERT INTO films (code, title, did, date_prod, kind) VALUES
('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy');
上面的代码插入了两行,但您可以任意扩展它,直到达到准备好的语句令牌的最大数量(可能是 999 美元,但我对此不是 100% 确定)。有时不能使用 COPY,而对于这些情况,这是一个值得替代的方法。
加快速度的一种方法是在事务中显式执行多个插入或复制(例如 1000 次)。 Postgres 的默认行为是在每条语句之后提交,因此通过批量提交,可以避免一些开销。正如丹尼尔回答中的指南所说,您可能必须禁用自动提交才能使其正常工作。另请注意底部的注释建议将 wal_buffers 的大小增加到 16 MB 也可能有所帮助。
带有数组的 UNNEST
函数可以与多行 VALUES 语法一起使用。我认为这种方法比使用 COPY
慢,但它对我使用 psycopg 和 python 很有用(传递给 cursor.execute
的 python list
变为 pg ARRAY
):
INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
UNNEST(ARRAY[1, 2, 3]),
UNNEST(ARRAY[100, 200, 300]),
UNNEST(ARRAY['a', 'b', 'c'])
);
没有 VALUES
使用带有额外存在检查的子选择:
INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
SELECT UNNEST(ARRAY[1, 2, 3]),
UNNEST(ARRAY[100, 200, 300]),
UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
SELECT 1 FROM tablename tt
WHERE tt.fieldname1=temptable.fieldname1
);
批量更新的语法相同:
UPDATE tablename
SET fieldname1=temptable.data
FROM (
SELECT UNNEST(ARRAY[1,2]) AS id,
UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;
您可以使用 COPY table TO ... WITH BINARY
,即“somewhat faster than the text and CSV formats”。仅当您有数百万行要插入并且您对二进制数据感到满意时才这样做。
这是一个example recipe in Python, using psycopg2 with binary input。
((这是一个您可以编辑和增强答案的 WIKI!))
外部文件是最好和典型的批量数据
“批量数据”一词与“大量数据”相关,因此使用原始原始数据是很自然的,无需将其转换为SQL。 “批量插入”的典型原始数据文件是 CSV 和 JSON 格式。
带有一些转换的批量插入
在 ETL 应用程序和摄取过程中,我们需要在插入数据之前对其进行更改。临时表消耗(大量)磁盘空间,这不是更快的方法。 PostgreSQL foreign-data wrapper (FDW) 是最佳选择。
CSV 示例。假设 SQL 上的 tablename (x, y, z)
和一个类似的 CSV 文件
fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...
您可以使用经典的 SQL COPY
将(原样原始数据)加载到 tmp_tablename
,它们将过滤后的数据插入 tablename
...但是,为了避免磁盘消耗,最好是直接摄入
INSERT INTO tablename (x, y, z)
SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms
FROM tmp_tablename_fdw
-- WHERE condictions
;
您需要为 FDW 准备数据库,而您可以使用静态 tmp_tablename_fdw
a function that generates it:
CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');
JSON 示例。一组两个文件 myRawData1.json
和 Ranger_Policies2.json
可以通过以下方式摄取:
INSERT INTO tablename (fname, metadata, content)
SELECT fname, meta, j -- do any data transformation here
FROM jsonb_read_files('myRawData%.json')
-- WHERE any_condiction_here
;
其中函数 jsonb_read_files() 读取由掩码定义的文件夹的所有文件:
CREATE or replace FUNCTION jsonb_read_files(
p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
WITH t AS (
SELECT (row_number() OVER ())::int id,
f AS fname,
p_fpath ||'/'|| f AS f
FROM pg_ls_dir(p_fpath) t(f)
WHERE f LIKE p_flike
) SELECT id, fname,
to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
pg_read_file(f)::jsonb
FROM t
$f$ LANGUAGE SQL IMMUTABLE;
缺乏 gzip 流
“文件摄取”(主要在大数据中)最常见的方法是在 gzip format 上保留原始文件并使用 streaming algorithm 传输它,任何可以在 unix 管道中快速运行且不消耗磁盘的东西:
gunzip remote_or_local_file.csv.gz | convert_to_sql | psql
所以理想(未来)是格式 .csv.gz
的 服务器选项。
@CharlieClark 评论后注意:目前(2022 年)无事可做,最好的选择似乎是 pgloader
STDIN:
gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo
head -n 1000 file.csv > file_test.csv
以通过 FDW 检查前 1000 行,问题可能是格式错误的 CSV。 PS:要修复格式错误的 CSV,您可以使用 csvformat
它主要取决于数据库中的(其他)活动。像这样的操作有效地冻结了其他会话的整个数据库。另一个考虑因素是数据模型和约束、触发器等的存在。
我的第一种方法始终是:创建一个结构类似于目标表 (create table tmp AS select * from target where 1=0
) 的 (temp) 表,然后首先将文件读入 temp 表。然后我检查可以检查的内容:重复项、目标中已经存在的键等。
然后我只做一个 do insert into target select * from tmp
或类似的。
如果失败或花费太长时间,我将中止它并考虑其他方法(暂时删除索引/约束等)
我刚遇到这个问题,建议使用 csvsql (releases) 批量导入 Postgres。要执行批量插入,您只需 createdb
,然后使用 csvsql
,它连接到您的数据库并为整个 CSV 文件夹创建单独的表。
$ createdb test
$ csvsql --db postgresql:///test --insert examples/*.csv
可能我已经迟到了。但是,Bytefish 有一个名为 pgbulkinsert
的 Java 库。我和我的团队能够在 15 秒内批量插入 100 万条记录。当然,我们还执行了一些其他操作,例如从 Minio 上的文件中读取 1M+ 记录,在 1M+ 记录的顶部进行几次处理,如果重复则过滤掉记录,然后最后将 1M 记录插入 Postgres 数据库.所有这些过程都在15秒内完成。我不记得执行数据库操作需要多少时间,但我认为大约不到 5 秒。从 https://www.bytefish.de/blog/pgbulkinsert_bulkprocessor.html 查找更多详细信息
正如其他人所指出的,在将数据导入 Postgres 时,Postgres 旨在为您执行的检查会减慢速度。此外,您经常需要以一种或另一种方式操作数据,以使其适合使用。任何可以在 Postgres 进程之外完成的操作都意味着您可以使用 COPY 协议进行导入。
为了我的使用,我经常使用 pgloader 从 httparchive.org 项目导入数据。由于源文件是由 MySQL 创建的,因此您需要能够处理一些 MySQL 异常,例如将 \N
用于空值以及编码问题。这些文件也很大,至少在我的机器上,使用 FDW 会耗尽内存。 pgloader 可以很容易地创建一个管道,让您可以选择所需的字段、转换为相关的数据类型以及在进入主数据库之前进行任何其他工作,从而最大限度地减少索引更新等。