2021-02-01
< view all postsPython用来做数据分析是十分方便的,不论是程序员还是金融或学术领域的研究者都很喜欢使用它。但是当Python遇到海量数据的时候,语言性能上的局限性就会逐渐浮现。首先,读取数据都会成为一个问题,因为对于庞大的数据文件,机器内存有时候都无法存下,或者可以存下但是速度十分缓慢。这篇笔记就讨论一些Python对于大文件读取的问题。
通常来说对于数据分析任务,我们有三种方式读取数据:Python原生的文件接口、numpy和pandas,下面分别进行说明。
*更新一下,再补充一个库Datatable,这是一个和pandas类似而速度更快的库。经过测试确实有明显的性能提升,适合用作大数据处理。
首先通过Python原生的方式进行读取,总的来说有三种处理的思路:逐行读取、分chunk读取、使用memory map。
逐行读取适用于不需要考虑各行之间关联性的情况,最典型的,比如说读取一行数据,然后生成一行对应新数据,保存为一个新的文件。利用file object的generator就可以保证内存中每次只有一行的数据:
import os for root, dir, files in os.walk('/path/to/files/'): for filename in files: with open(os.path.join(root, filename)) as f: for line in f: # process line here
这里顺便提一下,上面的代码里面展示了如何读取多个文件,因为有时候我们需要一起处理复数个文件。个人感觉用os.walk()比os.listdir()更加清晰。os.walk()返回的是一个generator,当中的每一项是一个tuple 3,代表一个目录,默认情况下目录的顺序是从顶层到底层。因此上面的代码就可以遍历到所有的子目录中的文件。
如果只希望读顶层目录里的文件,只需要改成:
root, dir, files = next(os.walk('/path/'))
就算是只需要读顶层文件,相比于os.listdir(),用os.walk()也会更方便一些,因为它的返回结果很好地区分出了文件和文件夹,不需要我们再做更多的判断。
对于大文件,一次需要读取一部分,但又无法一次全部读取到内存的,可以分chunk读取。只需要在.read(n)中提供需要读取的字节数即可。如果读取的是ASCII字符,那么n是多少就会读取多少字母。如果读取的是中文这种多字节的字符,那么就需要使用'rb'模式,首先读取二进制流,然后.decode()为相应的编码。
def chunk_generator(size): with open('test.txt') as f: chunk = f.read(size) yield chunk while chunk: chunk = f.read(size) yield chunk
在Python 3.8及以上的版本中,可以使用海象运算符:=的语法(将值赋给一个表达式中的变量)。
def chunk_generator(size): with open('test.txt') as f: while chunk := f.read(size): yield chunk
在使用时可以用for循环依次载入处理各个chunk:
for c in chunk_generator(1000000): process(c)
或者通过next()一次读取一个:
chunk_iter = chunk_generator(1000000) c = next(chunk_iter)
另一个函数f.readlines(n)和f.read()很类似,n也代表读取的字节数(注意不是行数),区别只在于readlines()会保证读的都是完整的行,会返回一个字符串数组。而还有一个函数f.readline()则是每调用一次就返回一行,注意不要混淆。
Memory Map将一段硬盘空间映射为内存空间,典型适用场景是对大文件内容的搜索:无须将文件读入内存就能够进行查找,且支持re模块(需要用byte regex)。Memory Map也能够直接操作和修改硬盘文件,不过因为改动是字节级别的,需要格外的小心。
下面举一个具体的例子,假设现在有这样一个需求,需要快速地读取某个文件的随机指定行,而这个文件非常大,无法存到内存当中。Memory Map在这个时候就非常的好用。我们首先将硬盘上的文件映射到内存,之后用正则匹配所有的行(注意正则表达式的前缀是rb,表示字节匹配),将每行开头和结尾的位置保存到一个数组中。比起存整个文件,存这个数组需要的空间是很小的。而当要读取某行的数据时,根据位置去slice映射就可以了。
import mmap import re with open("test.txt", 'r+b') as f: # 第二个参数是要映射是大小,0表示映射整个文件 mm = mmap.mmap(f.fileno(), 0) span_list = [match.span() for match in re.finditer(rb'(.+\n)', mm)] # 例如读取第10000行 left, right = span_list[10000] print(mm[left:right]) mm.close()
numpy提供了两个从文件读取的方法,np.loadtxt()和np.genfromtxt()。这两个方法的区别在于,genfromtxt()提供了更多的选项,例如可以指定对缺失数据的处理方式等。而loadtxt()则适用于更加简单的数据,会快一些。
这两个方法的输入参数都需要是文件(字节流或路径),是不接受字符串的。因此在分chunk读取时,不能先用f.readlines()之类的方式读成字符串再调用它们(除非再将字符串转换成StringIO,但这样就会占用两倍的内存)。我们可以用itertools.islice()直接对文件进行切分,以loadtxt()为例:
from itertools import islice def chunk_generator(size): with open('test.txt') as f: while True: chunk = islice(f, size) ary = np.loadtxt(chunk, delimiter = ',') if len(ary) == 0: break else: yield ary
但是总的来说,numpy读取数据的性能比pandas慢不少,所以在读取大数据文件的时候通常用pandas是更好的方法。
pandas读大数据文件有两种思路:抽样和分chunk。首先来看抽样读取,也就是随机地读一部分数据。适用于分析数据分布等需要了解数据整体情况的场景:
import pandas as pd import random sample_rate = 0.1 df = pd.read_csv('test.csv', delimiter = '\t', error_bad_lines = False, header = 0, skiprows = lambda i: i > 0 and random.random()>sample_rate)
在上面的例子中,我们从源文件中随机读取10%的记录到内存。这是通过skiprows参数提供一个callable实现的。其它一些设置包括:error_bad_lines设置为False使得格式异常的数据直接被跳过(而不是抛出异常);header参数用来指定列名所在的行。
还有一点需要注意的是,pd.read_csv()默认会使用速度较快的C引擎,但如果使用了某些C引擎不支持的设置,就会换成python引擎执行。为了避免fallback到python引擎,主要需要注意两点:一是不要使用"skipfooter"参数;二是"sep"参数的值不要多于一个字符,也不要用None:一个字符以上的sep会被认为是正则表达式,而None则表示由引擎猜测分隔符,都只能用python引擎执行。
因为pandas内置了分chunk读取的功能,使用起来很方便:
import pandas as pd chunksize = 1000000 with pd.read_csv('text.csv', chunksize=chunksize) as reader: for chunk in reader: process(chunk)