ChatGPT解决这个技术问题 Extra ChatGPT

将批量插入 Postgres 的最快方法是什么?

我需要以编程方式将数千万条记录插入 Postgres 数据库。目前,我正在一个查询中执行数千个插入语句。

有没有更好的方法来做到这一点,一些我不知道的批量插入语句?


K
Kos

PostgreSQL 有关于如何最好地最初填充数据库的 a guide,他们建议使用 COPY 命令批量加载行。该指南还有一些其他关于如何加快流程的好技巧,例如在加载数据之前删除索引和外键(然后再将它们添加回来)。


我也在 stackoverflow.com/questions/12206600/… 中写了更多细节来详细说明。
@CraigRinger 哇,“更详细一点”是我整个星期看到的最好的轻描淡写;)
尝试安装包 NpgsqlBulkCopy
- 因为索引也用于数据库记录的物理布局。不确定删除任何数据库中的索引是否是个好主意。
但是你推荐的,内存中什么都没有!!!而且,如果您的批量大小可以很小,那么它的类就非常非常糟糕:(我尝试使用 npgsql CopyIn 类,因为它就像 PG 查询语句中的 CSV 格式映射一样。您可以尝试使用 Big Table?
B
Ben Harper

除了使用 COPY,还有一种替代方法,它是 Postgres 支持的多行值语法。从 documentation

INSERT INTO films (code, title, did, date_prod, kind) VALUES
    ('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
    ('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy');

上面的代码插入了两行,但您可以任意扩展它,直到达到准备好的语句令牌的最大数量(可能是 999 美元,但我对此不是 100% 确定)。有时不能使用 COPY,而对于这些情况,这是一个值得替代的方法。


你知道这种方法的性能与 COPY 相比如何吗?
如果您遇到权限问题,请在尝试之前使用 COPY ... FROM STDIN
如果您使用的是行级安全性,这是您能做的最好的。从版本 12 开始,“具有行级安全性的表不支持 COPY FROM”。
COPY 比扩展 INSERT 快很多
在这种过程(原始数据摄取)中最重要的是转换,以 SQL 标准表达(不使用特殊工具)。请参阅stackoverflow.com/a/62493516/287948
D
Dana the Sane

加快速度的一种方法是在事务中显式执行多个插入或复制(例如 1000 次)。 Postgres 的默认行为是在每条语句之后提交,因此通过批量提交,可以避免一些开销。正如丹尼尔回答中的指南所说,您可能必须禁用自动提交才能使其正常工作。另请注意底部的注释建议将 wal_buffers 的大小增加到 16 MB 也可能有所帮助。


值得一提的是,您可以向同一事务添加多少插入/副本的限制可能比您尝试的任何操作都要高得多。您可以在同一个事务中添加数百万行,而不会遇到问题。
@SumeetJain 是的,我只是在每笔交易的副本/插入数量方面评论速度“最佳位置”。
这会在事务运行时锁定表吗?
n
ndpu

带有数组的 UNNEST 函数可以与多行 VALUES 语法一起使用。我认为这种方法比使用 COPY 慢,但它对我使用 psycopg 和 python 很有用(传递给 cursor.execute 的 python list 变为 pg ARRAY):

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
    UNNEST(ARRAY[1, 2, 3]), 
    UNNEST(ARRAY[100, 200, 300]), 
    UNNEST(ARRAY['a', 'b', 'c'])
);

没有 VALUES 使用带有额外存在检查的子选择:

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
    SELECT UNNEST(ARRAY[1, 2, 3]), 
           UNNEST(ARRAY[100, 200, 300]), 
           UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
    SELECT 1 FROM tablename tt
    WHERE tt.fieldname1=temptable.fieldname1
);

批量更新的语法相同:

UPDATE tablename
SET fieldname1=temptable.data
FROM (
    SELECT UNNEST(ARRAY[1,2]) AS id,
           UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;

C
Community

您可以使用 COPY table TO ... WITH BINARY,即“somewhat faster than the text and CSV formats”。仅当您有数百万行要插入并且您对二进制数据感到满意时才这样做。

这是一个example recipe in Python, using psycopg2 with binary input


4
4 revs, 2 users 96%

((这是一个您可以编辑和增强答案的 WIKI!))

外部文件是最好和典型的批量数据

“批量数据”一词与“大量数据”相关,因此使用原始原始数据是很自然的,无需将其转换为SQL。 “批量插入”的典型原始数据文件是 CSVJSON 格式。

带有一些转换的批量插入

ETL 应用程序和摄取过程中,我们需要在插入数据之前对其进行更改。临时表消耗(大量)磁盘空间,这不是更快的方法。 PostgreSQL foreign-data wrapper (FDW) 是最佳选择。

CSV 示例。假设 SQL 上的 tablename (x, y, z) 和一个类似的 CSV 文件

fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...

您可以使用经典的 SQL COPY 将(原样原始数据)加载到 tmp_tablename,它们将过滤后的数据插入 tablename...但是,为了避免磁盘消耗,最好是直接摄入

INSERT INTO tablename (x, y, z)
  SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms 
  FROM tmp_tablename_fdw
  -- WHERE condictions
;

您需要为 FDW 准备数据库,而您可以使用静态 tmp_tablename_fdw a function that generates it

CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
  ...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');

JSON 示例。一组两个文件 myRawData1.jsonRanger_Policies2.json 可以通过以下方式摄取:

INSERT INTO tablename (fname, metadata, content)
 SELECT fname, meta, j  -- do any data transformation here
 FROM jsonb_read_files('myRawData%.json')
 -- WHERE any_condiction_here
;

其中函数 jsonb_read_files() 读取由掩码定义的文件夹的所有文件:

CREATE or replace FUNCTION jsonb_read_files(
  p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
  WITH t AS (
     SELECT (row_number() OVER ())::int id, 
           f AS fname,
           p_fpath ||'/'|| f AS f
     FROM pg_ls_dir(p_fpath) t(f)
     WHERE f LIKE p_flike
  ) SELECT id, fname,
         to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
         pg_read_file(f)::jsonb
    FROM t
$f$  LANGUAGE SQL IMMUTABLE;

缺乏 gzip 流

“文件摄取”(主要在大数据中)最常见的方法是在 gzip format 上保留原始文件并使用 streaming algorithm 传输它,任何可以在 unix 管道中快速运行且不消耗磁盘的东西:

 gunzip remote_or_local_file.csv.gz | convert_to_sql | psql 

所以理想(未来)是格式 .csv.gz服务器选项

@CharlieClark 评论后注意:目前(2022 年)无事可做,最好的选择似乎是 pgloader STDIN

  gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo

当我尝试将 FDW 用于非常大(> 10 GB)的 CSV 时,我遇到了 Postges 的内存问题(以及处理 MySQL 导出中的一些奇怪错误)并且找不到任何解决方法。
@CharlieClark,PostgreSQL 版本?您可以在此处或 at the PostgreSQL options 报告错误。另一种解决方案是拆分...例如测试 Unix head -n 1000 file.csv > file_test.csv 以通过 FDW 检查前 1000 行,问题可能是格式错误的 CSV。 PS:要修复格式错误的 CSV,您可以使用 csvformat
我在使用不同版本的 Postgres 时遇到过错误,但最近的是 13。我没有寻求其他选项,因为 pgloader 涵盖了其中的大部分,但我对内存问题感到惊讶。
C
Charlie Clark

它主要取决于数据库中的(其他)活动。像这样的操作有效地冻结了其他会话的整个数据库。另一个考虑因素是数据模型和约束、触发器等的存在。

我的第一种方法始终是:创建一个结构类似于目标表 (create table tmp AS select * from target where 1=0) 的 (temp) 表,然后首先将文件读入 temp 表。然后我检查可以检查的内容:重复项、目标中已经存在的键等。

然后我只做一个 do insert into target select * from tmp 或类似的。

如果失败或花费太长时间,我将中止它并考虑其他方法(暂时删除索引/约束等)


E
Elyor

我使用本机 libpq 方法实现了非常快速的 Postgresq 数据加载器。试试我的包https://www.nuget.org/packages/NpgsqlBulkCopy/


P
Peter Krauss

我刚遇到这个问题,建议使用 csvsql (releases) 批量导入 Postgres。要执行批量插入,您只需 createdb,然后使用 csvsql,它连接到您的数据库并为整个 CSV 文件夹创建单独的表。

$ createdb test 
$ csvsql --db postgresql:///test --insert examples/*.csv

对于 csvsql,为了同时清除源 csv 中任何可能的格式错误,最好遵循 these instructions,更多文档 here
A
Anish Panthi

可能我已经迟到了。但是,Bytefish 有一个名为 pgbulkinsert 的 Java 库。我和我的团队能够在 15 秒内批量插入 100 万条记录。当然,我们还执行了一些其他操作,例如从 Minio 上的文件中读取 1M+ 记录,在 1M+ 记录的顶部进行几次处理,如果重复则过滤掉记录,然后最后将 1M 记录插入 Postgres 数据库.所有这些过程都在15秒内完成。我不记得执行数据库操作需要多少时间,但我认为大约不到 5 秒。从 https://www.bytefish.de/blog/pgbulkinsert_bulkprocessor.html 查找更多详细信息


我能够在 3.7 秒内插入 1M 条记录。
C
Charlie Clark

正如其他人所指出的,在将数据导入 Postgres 时,Postgres 旨在为您执行的检查会减慢速度。此外,您经常需要以一种或另一种方式操作数据,以使其适合使用。任何可以在 Postgres 进程之外完成的操作都意味着您可以使用 COPY 协议进行导入。

为了我的使用,我经常使用 pgloaderhttparchive.org 项目导入数据。由于源文件是由 MySQL 创建的,因此您需要能够处理一些 MySQL 异常,例如将 \N 用于空值以及编码问题。这些文件也很大,至少在我的机器上,使用 FDW 会耗尽内存。 pgloader 可以很容易地创建一个管道,让您可以选择所需的字段、转换为相关的数据类型以及在进入主数据库之前进行任何其他工作,从而最大限度地减少索引更新等。