Python对大数据文件的读取

2021-02-01

< view all posts

Python用来做数据分析是十分方便的,不论是程序员还是金融或学术领域的研究者都很喜欢使用它。但是当Python遇到海量数据的时候,语言性能上的局限性就会逐渐浮现。首先,读取数据都会成为一个问题,因为对于庞大的数据文件,机器内存有时候都无法存下,或者可以存下但是速度十分缓慢。这篇笔记就讨论一些Python对于大文件读取的问题。

通常来说对于数据分析任务,我们有三种方式读取数据:Python原生的文件接口、numpy和pandas,下面分别进行说明。

*更新一下,再补充一个库Datatable,这是一个和pandas类似而速度更快的库。经过测试确实有明显的性能提升,适合用作大数据处理。

1 原生方式

首先通过Python原生的方式进行读取,总的来说有三种处理的思路:逐行读取、分chunk读取、使用memory map。

逐行读取

逐行读取适用于不需要考虑各行之间关联性的情况,最典型的,比如说读取一行数据,然后生成一行对应新数据,保存为一个新的文件。利用file object的generator就可以保证内存中每次只有一行的数据:

import os
for root, dir, files in os.walk('/path/to/files/'):
    for filename in files:
        with open(os.path.join(root, filename)) as f:
            for line in f:
                # process line here

这里顺便提一下,上面的代码里面展示了如何读取多个文件,因为有时候我们需要一起处理复数个文件。个人感觉用os.walk()os.listdir()更加清晰。os.walk()返回的是一个generator,当中的每一项是一个tuple 3,代表一个目录,默认情况下目录的顺序是从顶层到底层。因此上面的代码就可以遍历到所有的子目录中的文件。

如果只希望读顶层目录里的文件,只需要改成:

root, dir, files = next(os.walk('/path/'))

就算是只需要读顶层文件,相比于os.listdir(),用os.walk()也会更方便一些,因为它的返回结果很好地区分出了文件和文件夹,不需要我们再做更多的判断。

分chunk读取

对于大文件,一次需要读取一部分,但又无法一次全部读取到内存的,可以分chunk读取。只需要在.read(n)中提供需要读取的字节数即可。如果读取的是ASCII字符,那么n是多少就会读取多少字母。如果读取的是中文这种多字节的字符,那么就需要使用'rb'模式,首先读取二进制流,然后.decode()为相应的编码。

def chunk_generator(size):
    with open('test.txt') as f:
        chunk = f.read(size)
        yield chunk
        while chunk:
            chunk = f.read(size)
            yield chunk

在Python 3.8及以上的版本中,可以使用海象运算符:=的语法(将值赋给一个表达式中的变量)。

def chunk_generator(size):
    with open('test.txt') as f:
        while chunk := f.read(size):
            yield chunk

在使用时可以用for循环依次载入处理各个chunk:

for c in chunk_generator(1000000):
    process(c)

或者通过next()一次读取一个:

chunk_iter = chunk_generator(1000000)
c = next(chunk_iter)

另一个函数f.readlines(n)和f.read()很类似,n也代表读取的字节数(注意不是行数),区别只在于readlines()会保证读的都是完整的行,会返回一个字符串数组。而还有一个函数f.readline()则是每调用一次就返回一行,注意不要混淆。

Memory Map

Memory Map将一段硬盘空间映射为内存空间,典型适用场景是对大文件内容的搜索:无须将文件读入内存就能够进行查找,且支持re模块(需要用byte regex)。Memory Map也能够直接操作和修改硬盘文件,不过因为改动是字节级别的,需要格外的小心。

下面举一个具体的例子,假设现在有这样一个需求,需要快速地读取某个文件的随机指定行,而这个文件非常大,无法存到内存当中。Memory Map在这个时候就非常的好用。我们首先将硬盘上的文件映射到内存,之后用正则匹配所有的行(注意正则表达式的前缀是rb,表示字节匹配),将每行开头和结尾的位置保存到一个数组中。比起存整个文件,存这个数组需要的空间是很小的。而当要读取某行的数据时,根据位置去slice映射就可以了。

import mmap
import re
with open("test.txt", 'r+b') as f:
    # 第二个参数是要映射是大小,0表示映射整个文件
    mm = mmap.mmap(f.fileno(), 0)
    span_list = [match.span() for match in re.finditer(rb'(.+\n)', mm)]
    # 例如读取第10000行
    left, right = span_list[10000]
    print(mm[left:right])
    mm.close()

2 通过numpy

numpy提供了两个从文件读取的方法,np.loadtxt()和np.genfromtxt()。这两个方法的区别在于,genfromtxt()提供了更多的选项,例如可以指定对缺失数据的处理方式等。而loadtxt()则适用于更加简单的数据,会快一些。

这两个方法的输入参数都需要是文件(字节流或路径),是不接受字符串的。因此在分chunk读取时,不能先用f.readlines()之类的方式读成字符串再调用它们(除非再将字符串转换成StringIO,但这样就会占用两倍的内存)。我们可以用itertools.islice()直接对文件进行切分,以loadtxt()为例:

from itertools import islice
def chunk_generator(size):
    with open('test.txt') as f:
        while True:
            chunk = islice(f, size)
            ary = np.loadtxt(chunk, delimiter = ',')
            if len(ary) == 0:
                break
            else:
                yield ary

但是总的来说,numpy读取数据的性能比pandas慢不少,所以在读取大数据文件的时候通常用pandas是更好的方法。

3 通过pandas

抽样读取

pandas读大数据文件有两种思路:抽样和分chunk。首先来看抽样读取,也就是随机地读一部分数据。适用于分析数据分布等需要了解数据整体情况的场景:

import pandas as pd
import random

sample_rate = 0.1
df = pd.read_csv('test.csv', delimiter = '\t', error_bad_lines = False,
                 header = 0, skiprows = lambda i: i > 0 and random.random()>sample_rate)

在上面的例子中,我们从源文件中随机读取10%的记录到内存。这是通过skiprows参数提供一个callable实现的。其它一些设置包括:error_bad_lines设置为False使得格式异常的数据直接被跳过(而不是抛出异常);header参数用来指定列名所在的行。

还有一点需要注意的是,pd.read_csv()默认会使用速度较快的C引擎,但如果使用了某些C引擎不支持的设置,就会换成python引擎执行。为了避免fallback到python引擎,主要需要注意两点:一是不要使用"skipfooter"参数;二是"sep"参数的值不要多于一个字符,也不要用None:一个字符以上的sep会被认为是正则表达式,而None则表示由引擎猜测分隔符,都只能用python引擎执行。

分chunk读取

因为pandas内置了分chunk读取的功能,使用起来很方便:

import pandas as pd

chunksize = 1000000
with pd.read_csv('text.csv', chunksize=chunksize) as reader:
    for chunk in reader:
        process(chunk)