#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Finds CSV settings and Excel sheets in multiple files. Often needed as input for stacking
"""
import collections
import csv
import ntpath
import numpy as np
import pandas as pd
import openpyxl
import xlrd
from .helpers import check_valid_xls
#******************************************************************
# csv
#******************************************************************
[docs]def csv_count_rows(fname):
def blocks(files, size=65536):
while True:
b = files.read(size)
if not b: break
yield b
with open(fname) as f:
nrows = sum(bl.count("\n") for bl in blocks(f))
return nrows
[docs]class CSVSniffer(object):
"""
Automatically detects settings needed to read csv files. SINGLE file only, for MULTI file use CSVSnifferList
Args:
fname (string): file path
nlines (int): number of lines to sample from each file
delims (string): possible delimiters, default ',;\t|'
"""
def __init__(self, fname, nlines = 10, delims=',;\t|'):
self.cfg_fname = fname
self.nrows = csv_count_rows(fname) # todo: check for file size, if large don't run this
self.cfg_nlines = min(nlines,self.nrows) # read_lines() doesn't check EOF # todo: check 1% of file up to a max
self.cfg_delims_pool = delims
self.delim = None # delim used for the file
self.csv_lines = None # top n lines read from file
self.csv_lines_delim = None # detected delim for each line in file
self.csv_rows = None # top n lines split usingn delim
[docs] def read_nlines(self):
# read top lines
fhandle = open(self.cfg_fname)
self.csv_lines = [fhandle.readline().rstrip() for _ in range(self.cfg_nlines)]
fhandle.close()
[docs] def scan_delim(self):
if not self.csv_lines:
self.read_nlines()
# get delimiter for each line in file
delims = []
for line in self.csv_lines:
try:
csv_sniff = csv.Sniffer().sniff(line, self.cfg_delims_pool)
delims.append(csv_sniff.delimiter)
except:
delims.append(None) # todo: able to catch exception more specifically?
self.csv_lines_delim = delims
[docs] def get_delim(self):
if not self.csv_lines_delim:
self.scan_delim()
# all delimiters the same?
if len(set(self.csv_lines_delim))>1:
self.delim_is_consistent = False
csv_delim_count = collections.Counter(self.csv_lines_delim)
csv_delim = csv_delim_count.most_common(1)[0][0] # use the most common used delimeter
# todo: rerun on cfg_csv_scan_topline**2 files in case there is a large # of header rows
else:
self.delim_is_consistent = True
csv_delim = self.csv_lines_delim[0]
if csv_delim==None:
raise IOError('Could not determine a valid delimiter, pleaes check your files are .csv or .txt using one delimiter of %s' %(self.cfg_delims_pool))
else:
self.delim = csv_delim
self.csv_rows = [s.split(self.delim) for s in self.csv_lines][self.count_skiprows():]
if self.check_column_length_consistent():
self.certainty = 'high'
else:
self.certainty = 'probable'
return self.delim
[docs] def check_column_length_consistent(self):
# check if all rows have the same length. NB: this is just on the sample!
if not self.csv_rows:
self.get_delim()
return len(set([len(row) for row in self.csv_rows]))==1
[docs] def count_skiprows(self):
# finds the number of rows to skip by finding the last line which doesn't use the selected delimiter
if not self.delim:
self.get_delim()
if self.delim_is_consistent: # all delims the same so nothing to skip
return 0
l = [d != self.delim for d in self.csv_lines_delim]
l = list(reversed(l))
return len(l) - l.index(True)
[docs]class CSVSnifferList(object):
"""
Automatically detects settings needed to read csv files. MULTI file use
Args:
fname_list (list): file names, eg ['a.csv','b.csv']
nlines (int): number of lines to sample from each file
delims (string): possible delimiters, default ',;\t|'
"""
def __init__(self, fname_list, nlines = 10, delims=',;\t|'):
self.cfg_fname_list = fname_list
self.sniffers = [CSVSniffer(fname, nlines, delims) for fname in fname_list]
[docs] def get_all(self, fun_name, msg_error):
val = []
for sniffer in self.sniffers:
func = getattr(sniffer, fun_name)
val.append(func())
if len(set(val))>1:
raise NotImplementedError(msg_error+' Make sure all files have the same format')
# todo: want to raise an exception here...? or just use whatever got detected for each file?
else:
return val[0]
[docs] def get_delim(self):
return self.get_all('get_delim','Inconsistent delimiters detected!')
[docs] def count_skiprows(self):
return self.get_all('count_skiprows','Inconsistent skiprows detected!')
# todo: propagate status of individual sniffers. instead of raising exception pass back status to get user input
#******************************************************************
# xls
#******************************************************************
[docs]class XLSSniffer(object):
"""
Extracts available sheets from MULTIPLE Excel files and runs diagnostics
Args:
fname_list (list): file paths, eg ['dir/a.csv','dir/b.csv']
logger (object): logger object with send_log(), optional
"""
def __init__(self, fname_list, logger=None):
self.fname_list = fname_list
self.logger = logger
check_valid_xls(self.fname_list)
self.sniff()
[docs] def sniff(self):
"""
Executes sniffer
Returns:
boolean: True if everything ok. Results are accessible in ``.df_xls_sheets``
"""
xls_sheets = {}
for fname in self.fname_list:
if self.logger:
self.logger.send_log('sniffing sheets in '+ntpath.basename(fname),'ok')
xls_fname = {}
xls_fname['file_name'] = ntpath.basename(fname)
if fname[-5:]=='.xlsx':
fh = openpyxl.load_workbook(fname,read_only=True)
xls_fname['sheets_names'] = fh.sheetnames
fh.close()
# todo: need to close file?
elif fname[-4:]=='.xls':
fh = xlrd.open_workbook(fname, on_demand=True)
xls_fname['sheets_names'] = fh.sheet_names()
fh.release_resources()
else:
raise IOError('Only .xls or .xlsx files can be combined')
xls_fname['sheets_count'] = len(xls_fname['sheets_names'])
xls_fname['sheets_idx'] = np.arange(xls_fname['sheets_count']).tolist()
xls_sheets[fname] = xls_fname
self.xls_sheets = xls_sheets
df_xls_sheets = pd.DataFrame(xls_sheets).T
df_xls_sheets.index.names = ['file_path']
self.dict_xls_sheets = xls_sheets
self.df_xls_sheets = df_xls_sheets
return True
[docs] def all_contain_sheetname(self,sheet_name):
"""
Check if all files contain a certain sheet
Args:
sheet_name (string): sheetname to check
Returns:
boolean: If true
"""
return np.all([sheet_name in self.dict_xls_sheets[fname]['sheets_names'] for fname in self.fname_list])
[docs] def all_have_idx(self,sheet_idx):
"""
Check if all files contain a certain index
Args:
sheet_idx (string): index to check
Returns:
boolean: If true
"""
return np.all([sheet_idx<=(d['sheets_count']-1) for k,d in self.dict_xls_sheets.items()])
[docs] def all_same_count(self):
"""
Check if all files contain the same number of sheets
Args:
sheet_idx (string): index to check
Returns:
boolean: If true
"""
first_elem = next(iter(self.dict_xls_sheets.values()))
return np.all([first_elem['sheets_count']==d['sheets_count'] for k,d in self.dict_xls_sheets.items()])
[docs] def all_same_names(self):
first_elem = next(iter(self.dict_xls_sheets.values()))
return np.all([first_elem['sheets_names']==d['sheets_names'] for k,d in self.dict_xls_sheets.items()])