使用熊猫的 “大数据” 工作流程

在学习熊猫的过程中,我试图迷惑了这个问题很多月。我在日常工作中使用 SAS,这非常有用,因为它提供了核心支持。但是,由于许多其他原因,SAS 作为一个软件还是很糟糕的。

有一天,我希望用 python 和 pandas 取代我对 SAS 的使用,但是我目前缺少大型数据集的核心工作流程。我不是在说需要分布式网络的 “大数据”,而是文件太大而无法容纳在内存中,但又足够小而无法容纳在硬盘上。

我的第一个想法是使用HDFStore将大型数据集保存在磁盘上,然后仅将我需要的部分放入数据帧中进行分析。其他人则提到 MongoDB 是一种更易于使用的替代方案。我的问题是这样的:

什么是实现以下目标的最佳实践工作流:

  1. 将平面文件加载到永久的磁盘数据库结构中
  2. 查询该数据库以检索要输入到熊猫数据结构中的数据
  3. 处理熊猫中的片段后更新数据库

现实世界中的示例将不胜感激,尤其是那些使用 “大数据” 中的熊猫的人。

编辑 - 我希望如何工作的示例:

  1. 迭代导入一个大型平面文件,并将其存储在永久的磁盘数据库结构中。这些文件通常太大而无法容纳在内存中。
  2. 为了使用 Pandas,我想读取此数据的子集(通常一次只读取几列),使其适合内存。
  3. 我将通过对所选列执行各种操作来创建新列。
  4. 然后,我将不得不将这些新列添加到数据库结构中。

我正在尝试找到执行这些步骤的最佳实践方法。阅读有关熊猫和 pytables 的链接,似乎添加一个新列可能是个问题。

编辑 - 专门回答杰夫的问题:

  1. 我正在建立消费者信用风险模型。数据类型包括电话,SSN 和地址特征;财产价值;犯罪记录,破产等贬义信息。我每天使用的数据集平均有近 1,000 到 2,000 个字段,这些字段是混合数据类型:数字和字符数据的连续,名义和有序变量。我很少追加行,但是我确实执行了许多创建新列的操作。
  2. 典型的操作涉及使用条件逻辑将几个列合并到一个新的复合列中。例如, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B' 。这些操作的结果是数据集中每个记录的新列。
  3. 最后,我想将这些新列添加到磁盘数据结构中。我将重复步骤 2,使用交叉表和描述性统计数据探索数据,以寻找有趣的直观关系进行建模。
  4. 一个典型的项目文件通常约为 1GB。文件以这样的方式组织:行包含消费者数据记录。每条记录的每一行都有相同的列数。情况总是如此。
  5. 创建新列时,我会按行进行子集化是非常罕见的。但是,在创建报告或生成描述性统计信息时,对行进行子集化是很常见的。例如,我可能想为特定业务创建一个简单的频率,例如零售信用卡。为此,除了我要报告的任何列之外,我将只选择那些业务线 = 零售的记录。但是,在创建新列时,我将拉出所有数据行,并且仅提取操作所需的列。
  6. 建模过程要求我分析每一列,寻找与某些结果变量有关的有趣关系,并创建描述这些关系的新复合列。我探索的列通常以小集形式完成。例如,我将集中介绍一组仅涉及属性值的 20 个列,并观察它们与贷款违约的关系。一旦探索了这些列并创建了新的列,我便转到另一组列,例如大学学历,并重复该过程。我正在做的是创建候选变量,这些变量解释我的数据和某些结果之间的关系。在此过程的最后,我应用了一些学习技术,这些技术可以根据这些复合列创建一个方程式。

我很少向数据集添加行。我几乎总是会创建新列(统计 / 机器学习术语中的变量或功能)。

答案

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])
frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
store.select(group, where = ['field_1000=foo', 'field_1001>0'])

我认为以上答案都缺少一种我发现非常有用的简单方法。

当我的文件太大而无法加载到内存中时,我将该文件分成多个较小的文件(按行或列)

示例:如果有 30 天的〜30GB 大小的交易数据值得每天将其拆分为一个〜1GB 大小的文件。随后,我分别处理每个文件,并在最后汇总结果

最大的优点之一是,它允许并行处理文件(多个线程或多个进程)

另一个优点是文件操作(如示例中的添加 / 删除日期)可以通过常规的 shell 命令完成,而在更高级 / 更复杂的文件格式中则无法实现

这种方法无法涵盖所有情况,但在许多情况下非常有用

问题提出两年后,现在出现了一个 “核心外” 的熊猫: dask 。太好了!尽管它不支持所有熊猫功能,但是您可以真正做到这一点。

如果您的数据集介于 1 到 20GB 之间,则应该获得具有 48GB RAM 的工作站。然后,熊猫可以将整个数据集保存在 RAM 中。我知道这不是您在这里寻找的答案,但是在具有 4GB RAM 的笔记本电脑上进行科学计算是不合理的。

我知道这是一个旧线程,但是我认为Blaze库值得一试。它是针对此类情况而构建的。

从文档:

Blaze 将 NumPy 和 Pandas 的可用性扩展到分布式和核外计算。 Blaze 提供了类似于 NumPy ND-Array 或 Pandas DataFrame 的接口,但是将这些熟悉的接口映射到了其他各种计算引擎上,例如 Postgres 或 Spark。

编辑:顺便说一下,它由 ContinuumIO 和 NumPy 的作者 Travis Oliphant 支持。

aCollection.insert((a[1].to_dict() for a in df.iterrows()))
pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
df = pandas.merge(df, aJoinDF, on=aKey, how='left')
collection.update({primarykey:foo},{key:change})
def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()
def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)
>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()
import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()
from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

另一种变化

在熊猫中完成的许多操作也可以作为 db 查询来完成(sql,mongo)

使用 RDBMS 或 mongodb 可以使您在数据库查询中执行某些聚合(针对大型数据进行了优化,并有效地使用了缓存和索引)

以后,您可以使用熊猫进行后期处理。

这种方法的优势在于,您可以在处理大型数据时获得数据库优化,同时仍可以使用高级声明性语法定义逻辑 - 无需处理决定在内存中做什么和做什么的细节。的核心。

尽管查询语言和熊猫语言不同,但是将部分逻辑从一个逻辑转换到另一个逻辑通常并不复杂。