Source code for d6t.stack.stack_csv

import os
import ntpath

import numpy as np
import pandas as pd

from .helpers import *
from .helpers_ui import *

#******************************************************************
# combiner
#******************************************************************

[docs]def sniff_settings_csv(fname_list): sniff = CSVSnifferList(fname_list) csv_sniff = {} csv_sniff['delim'] = sniff.get_delim() csv_sniff['skiprows'] = sniff.count_skiprows() csv_sniff['has_header'] = sniff.has_header() csv_sniff['header'] = 0 if sniff.has_header() else None return csv_sniff
[docs]class CombinerCSV(object): """ Core combiner class. Checks columns, generates preview, combines. Args: fname_list (list): file names, eg ['a.csv','b.csv'] sep (string): CSV delimiter, see pandas.read_csv() all_strings (boolean): read all values as strings (faster) header_row (int): header row, see pandas.read_csv() skiprows (int): rows to skip at top of file, see pandas.read_csv() nrows_preview (boolean): number of rows in preview logger (object): logger object with send_log() """ def __init__(self, fname_list, sep=',', all_strings = False, header_row = 0, skiprows=0, nrows_preview=5, logger=None): self.fname_list = fname_list self.sep = sep self.all_strings = all_strings self.header_row = header_row self.skiprows=skiprows self.nrows_preview = nrows_preview self.logger = logger
[docs] def read_csv(self, fname, is_preview=False, chunksize=None): cfg_dype = str if self.all_strings else None cfg_nrows = self.nrows_preview if is_preview else None return pd.read_csv(fname, dtype=cfg_dype, sep=self.sep, header=self.header_row, skiprows=self.skiprows, nrows=cfg_nrows, chunksize=chunksize)
[docs] def read_csv_all(self, msg=None, is_preview=False, chunksize=None, cfg_col_sel=None, cfg_col_rename={}): dfl_all = [] for fname in self.fname_list: if self.logger and msg: self.logger.send_log(msg+' '+ntpath.basename(fname),'ok') df=self.read_csv(fname, is_preview=is_preview, chunksize=chunksize) df['filename'] = ntpath.basename(fname) if cfg_col_sel: df = df.reindex(columns=['filename']+cfg_col_sel) df = df.rename(columns=cfg_col_rename) dfl_all.append(df) return dfl_all
[docs] def preview_columns(self): """ Checks column consistency in list of files. It checks both presence and order of columns in all files Returns: col_preview (dict): results dictionary with files_columns (dict): dictionary with information, keys = filename, value = list of columns in file columns_all (list): all columns in files columns_common (list): only columns present in every file is_all_equal (boolean): all files equal in all files? df_columns_present (dataframe): which columns are present in which file? df_columns_order (dataframe): where in the file is the column? """ dfl_all = self.read_csv_all(msg='scanning colums of', is_preview=True) dfl_all_col = [df.columns.tolist() for df in dfl_all] [df.remove('filename') for df in dfl_all_col] col_files = dict(zip(self.fname_list, dfl_all_col)) col_common = list_common(list(col_files.values())) col_all = list_unique(list(col_files.values())) # find index in column list so can check order is correct df_col_present = {} for iFileName,iFileCol in col_files.items(): df_col_present[iFileName]=[ntpath.basename(iFileName),]+[iCol in iFileCol for iCol in col_all] df_col_present = pd.DataFrame(df_col_present,index=['filename']+col_all).T df_col_present.index.names = ['file_path'] # find index in column list so can check order is correct df_col_order = {} for iFileName,iFileCol in col_files.items(): df_col_order[iFileName]=[ntpath.basename(iFileName),]+[iFileCol.index(iCol) if iCol in iFileCol else np.nan for iCol in col_all] df_col_order = pd.DataFrame(df_col_order,index=['filename']+col_all).T col_preview = {'files_columns':col_files, 'columns_all':col_all, 'columns_common':col_common, 'is_all_equal':columns_all_equal(dfl_all_col), 'df_columns_present':df_col_present, 'df_columns_order':df_col_order} self.col_preview = col_preview return col_preview
[docs] def combine_preview(self, is_col_common = False): """ Preview of combines all files Note: Unlike `CombinerCSVAdvanced.combine()` this function supports simple combine operations Args: is_col_common (bool): keep only common columns? If `false` returns all columns filled with nans Returns: df_all (dataframe): pandas dataframe with combined data from all files, only self.nrows_preview top rows """ return self.combine(is_col_common, is_preview=True)
[docs] def combine(self, is_col_common = False, is_preview=False): """ Combines all files Note: Unlike `CombinerCSVAdvanced.combine()` this function supports simple combine operations Args: is_col_common (bool): keep only common columns? If `false` returns all columns filled with nans is_preview (bool): read only self.nrows_preview top rows Returns: df_all (dataframe): pandas dataframe with combined data from all files """ dfl_all = self.read_csv_all('reading full file', is_preview=is_preview) if self.logger: self.logger.send_log('combining files','ok') if is_col_common: df_all = pd.concat(dfl_all,join='inner') else: df_all = pd.concat(dfl_all) self.df_all = df_all return df_all
[docs]class CombinerCSVAdvanced(object): def __init__(self, combiner, cfg_col_sel, cfg_col_rename={}): self.combiner = combiner self.cfg_col_sel = cfg_col_sel self.cfg_col_rename = cfg_col_rename
[docs] def combine_preview(self): df_all = self.combiner.read_csv_all(msg='reading preview file', is_preview=True, cfg_col_sel=self.cfg_col_sel, cfg_col_rename=self.cfg_col_rename) df_all = pd.concat(df_all) return df_all
[docs] def combine_preview_save(self, fname_out): df_all_preview = self.combine_preview() df_all_preview.to_csv(fname_out,index=False) return True
[docs] def combine(self): df_all = self.combiner.read_csv_all(msg='reading full file', cfg_col_sel=self.cfg_col_sel, cfg_col_rename=self.cfg_col_rename) df_all = pd.concat(df_all) return df_all
[docs] def combine_save(self, fname_out): cfg_dype = str if self.combiner.all_strings else None cfg_col_sel = ['filename']+self.cfg_col_sel if not os.path.exists(os.path.dirname(fname_out)): os.makedirs(os.path.dirname(fname_out)) fhandle = open(fname_out,'w') # write header df_all_header = pd.DataFrame(columns=cfg_col_sel) df_all_header.to_csv(fhandle,header=True,index=False) # todo: what if file hasn't header for fname in self.combiner.fname_list: if self.combiner.logger: self.combiner.logger.send_log('processing '+ntpath.basename(fname),'ok') for df_chunk in self.combiner.read_csv(fname,chunksize=1e5): df_chunk['filename'] = ntpath.basename(fname) df_chunk = df_chunk.reindex(columns=cfg_col_sel) # todo: only reindex if need be df_chunk = df_chunk.rename(columns=self.cfg_col_rename) # todo: only rename if need be df_chunk.to_csv(fhandle,header=False,index=False) return True