ChatGPT解决这个技术问题 Extra ChatGPT

在 Python 中读取大文件的惰性方法?

我有一个非常大的 4GB 文件,当我尝试读取它时,我的计算机挂起。所以我想一块一块地读取它,在处理完每一块后将处理后的块存储到另一个文件中并读取下一块。

有什么方法可以yield这些作品吗?

我很想有一个懒惰的方法。


B
Boštjan Mejak

要编写惰性函数,只需使用 yield

def read_in_chunks(file_object, chunk_size=1024):
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 1k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open('really_big_file.dat') as f:
    for piece in read_in_chunks(f):
        process_data(piece)

另一种选择是使用 iter 和辅助函数:

f = open('really_big_file.dat')
def read1k():
    return f.read(1024)

for piece in iter(read1k, ''):
    process_data(piece)

如果文件是基于行的,则文件对象已经是行的惰性生成器:

for line in open('really_big_file.dat'):
    process_data(line)

使用 open('really_big_file.dat', 'rb') 与同事使用我们的 Posix 挑战 Windows 兼容的良好做法。
@Tal Weiss 提到的缺少 rb;并且缺少 file.close() 语句(可以使用 with open('really_big_file.dat', 'rb') as f: 来完成相同的操作;请参阅 here for another concise implementation
@cod3monk3y:文本和二进制文件是不同的东西。这两种类型都很有用,但在不同的情况下。默认(文本)模式在这里可能很有用,即'rb'没有缺失。
@jf-sebastian:是的,OP 没有说明他是在阅读文本数据还是二进制数据。但是,如果他在 Windows 上使用 python 2.7 并且 正在 读取二进制数据,那么值得注意的是,如果他忘记了 'b',他的数据将很可能是损坏From the docs - Python on Windows makes a distinction between text and binary files; [...] it’ll corrupt binary data like that in JPEG or EXE files. Be very careful to use binary mode when reading and writing such files.
这是一个返回 1k 块的生成器:buf_iter = (x for x in iter(lambda: buf.read(1024), ''))。然后 for chunk in buf_iter: 循环遍历这些块。
n
nbro

file.readlines() 接受一个可选的大小参数,该参数近似于在返回的行中读取的行数。

bigfile = open('bigfilename','r')
tmp_lines = bigfile.readlines(BUF_SIZE)
while tmp_lines:
    process([line for line in tmp_lines])
    tmp_lines = bigfile.readlines(BUF_SIZE)

这是一个非常好的主意,尤其是当它与 defaultdict 结合将大数据拆分成更小的数据时。
我建议使用 .read() 而不是 .readlines()。如果文件是二进制文件,则不会有换行符。
如果文件是一个巨大的字符串怎么办?
这个解决方案是错误的。如果其中一行大于您的 BUF_SIZE,则您将处理一条不完整的行。 @MattSom 是正确的。
@MyersCarpenter 那行会重复两次吗? tmp_lines = bigfile.readlines(BUF_SIZE)
u
user48678

已经有很多好的答案,但是如果您的整个文件都在一行上并且您仍然想处理“行”(而不是固定大小的块),那么这些答案对您没有帮助。

99% 的时间,可以逐行处理文件。然后,如此 answer 中所建议的,您可以将文件对象本身用作惰性生成器:

with open('big.csv') as f:
    for line in f:
        process(line)

但是,可能会遇到行分隔符不是 '\n' 的非常大的文件(常见情况是 '|')。

转换“|” '\n' 在处理之前可能不是一个选项,因为它可能会弄乱可能合法包含 '\n' 的字段(例如自由文本用户输入)。

使用 csv 库也被排除在外,因为至少在 lib 的早期版本中,它是硬编码的,可以逐行读取输入。

对于这种情况,我创建了以下代码段 [2021 年 5 月更新,适用于 Python 3.8+]:

def rows(f, chunksize=1024, sep='|'):
    """
    Read a file where the row separator is '|' lazily.

    Usage:

    >>> with open('big.csv') as f:
    >>>     for r in rows(f):
    >>>         process(r)
    """
    row = ''
    while (chunk := f.read(chunksize)) != '':   # End of file
        while (i := chunk.find(sep)) != -1:     # No separator found
            yield row + chunk[:i]
            chunk = chunk[i+1:]
            row = ''
        row += chunk
    yield row

[对于旧版本的python]:

def rows(f, chunksize=1024, sep='|'):
    """
    Read a file where the row separator is '|' lazily.

    Usage:

    >>> with open('big.csv') as f:
    >>>     for r in rows(f):
    >>>         process(r)
    """
    curr_row = ''
    while True:
        chunk = f.read(chunksize)
        if chunk == '': # End of file
            yield curr_row
            break
        while True:
            i = chunk.find(sep)
            if i == -1:
                break
            yield curr_row + chunk[:i]
            curr_row = ''
            chunk = chunk[i+1:]
        curr_row += chunk

我能够成功地使用它来解决各种问题。它已经过广泛的测试,具有各种块大小。这是我正在使用的测试套件,供那些需要说服自己的人使用:

test_file = 'test_file'

def cleanup(func):
    def wrapper(*args, **kwargs):
        func(*args, **kwargs)
        os.unlink(test_file)
    return wrapper

@cleanup
def test_empty(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1_char_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1_char(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1025_chars_1_row(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1024_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1023):
            f.write('a')
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1025_chars_1026_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1026

@cleanup
def test_2048_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_2049_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

if __name__ == '__main__':
    for chunksize in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
        test_empty(chunksize)
        test_1_char_2_rows(chunksize)
        test_1_char(chunksize)
        test_1025_chars_1_row(chunksize)
        test_1024_chars_2_rows(chunksize)
        test_1025_chars_1026_rows(chunksize)
        test_2048_chars_2_rows(chunksize)
        test_2049_chars_2_rows(chunksize)

C
Community

如果您的计算机、操作系统和 python 是 64 位的,那么您可以使用 mmap module 将文件的内容映射到内存并使用索引和切片访问它。这是文档中的一个示例:

import mmap
with open("hello.txt", "r+") as f:
    # memory-map the file, size 0 means whole file
    map = mmap.mmap(f.fileno(), 0)
    # read content via standard file methods
    print map.readline()  # prints "Hello Python!"
    # read content via slice notation
    print map[:5]  # prints "Hello"
    # update content using slice notation;
    # note that new content must have same size
    map[6:] = " world!\n"
    # ... and read again using standard file methods
    map.seek(0)
    print map.readline()  # prints "Hello  world!"
    # close the map
    map.close()

如果您的计算机、操作系统或 python 是 32 位的,则映射大文件可以保留大部分地址空间和starve您的内存程序。


这应该如何工作?如果我有一个 32GB 的文件怎么办?如果我在具有 256MB RAM 的 VM 上怎么办?映射这么大的文件,真的不是什么好事。
这个答案值得 -12 票。这将杀死任何将其用于大文件的人。
即使对于大文件,这也可以在 64 位 Python 上工作。即使文件是内存映射的,它也不会被读取到内存中,因此物理内存量可能比文件大小小得多。
@SavinoSguera 物理内存的大小与映射文件有关吗?
@V3ss0n:我尝试在 64 位 Python 上映射 32GB 文件。它可以工作(我的 RAM 小于 32GB):我可以使用序列和文件接口访问文件的开头、中间和结尾。
C
Community
f = ... # file-like object, i.e. supporting read(size) function and 
        # returning empty string '' when there is nothing to read

def chunked(file, chunk_size):
    return iter(lambda: file.read(chunk_size), '')

for data in chunked(f, 65536):
    # process the data

更新:该方法在 https://stackoverflow.com/a/4566523/38592 中得到了最好的解释


这适用于 blob,但可能不适用于行分隔的内容(如 CSV、HTML 等需要逐行处理的处理)
打扰一下。 f 的值是多少?
@user1, 可以打开('filename')
B
Boris Verkhovskiy

在 Python 3.8+ 中,您可以在 while 循环中使用 .read()

with open("somefile.txt") as f:
    while chunk := f.read(8192):
        do_something(chunk)

当然,您可以使用任何您想要的块大小,您不必使用 8192 (2**13) 个字节。除非您的文件大小恰好是您的块大小的倍数,否则最后一个块将小于您的块大小。


b
bruce

参考python的官方文档https://docs.python.org/3/library/functions.html#iter

也许这种方法更pythonic:

"""A file object returned by open() is a iterator with
read method which could specify current read's block size
"""
with open('mydata.db', 'r') as f_in:
    block_read = partial(f_in.read, 1024 * 1024)
    block_iterator = iter(block_read, '')

    for index, block in enumerate(block_iterator, start=1):
        block = process_block(block)  # process your block data

        with open(f'{index}.txt', 'w') as f_out:
            f_out.write(block)

布鲁斯是正确的。我使用 functools.partial 来解析视频流。使用 py;py3,我可以每秒解析超过 1GB。 ` for pkt in iter(partial(vid.read, PACKET_SIZE), b""):`
T
TonyCoolZhu

我想我们可以这样写:

def read_file(path, block_size=1024): 
    with open(path, 'rb') as f: 
        while True: 
            piece = f.read(block_size) 
            if piece: 
                yield piece 
            else: 
                return

for piece in read_file(path):
    process_piece(piece)

s
sinzi

由于我的声誉低,我不允许发表评论,但是使用 file.readlines([sizehint]) SilentGhosts 解决方案应该更容易

python file methods

编辑: SilentGhost 是对的,但这应该比:

s = "" 
for i in xrange(100): 
   s += file.next()

好吧,对不起,你是绝对正确的。但也许这个解决方案会让你更快乐 ;) : s = "" for i in xrange(100): s += file.next()
-1:糟糕的解决方案,这意味着每行在内存中创建一个新字符串,并将读取的整个文件数据复制到新字符串中。性能和内存最差。
为什么它将整个文件数据复制到一个新字符串中?来自 python 文档:为了使 for 循环成为循环文件行的最有效方式(一种非常常见的操作),next() 方法使用隐藏的预读缓冲区。
@sinzi:“s +=”或连接字符串每次都会创建一个新的字符串副本,因为字符串是不可变的,所以您正在创建一个新字符串。
@nosklo:这些是实现的细节,可以使用列表理解代替它
J
Jason Plank

我的情况有点类似。目前尚不清楚您是否知道块大小(以字节为单位);我通常不知道,但所需的记录(行)数是已知的:

def get_line():
     with open('4gb_file') as file:
         for i in file:
             yield i

lines_required = 100
gen = get_line()
chunk = [i for i, j in zip(gen, range(lines_required))]

更新:感谢 nosklo。这就是我的意思。它几乎可以工作,只是它在“块之间”丢失了一条线。

chunk = [next(gen) for i in range(lines_required)]

这个技巧不会丢失任何线条,但看起来不太好。


这是伪代码吗?它行不通。这也是不必要的混淆,您应该将行数作为 get_line 函数的可选参数。
S
Shrikant

您可以使用以下代码。

file_obj = open('big_file') 

open() 返回一个文件对象

然后使用 os.stat 获取大小

file_size = os.stat('big_file').st_size

for i in range( file_size/1024):
    print file_obj.read(1024)

如果大小不是 1024 的倍数,则不会读取整个文件