Methods for feature extraction and preprocessing
util_feature: input/output is pandas

import copy
import math
import os
from collections import Counter, OrderedDict

import numpy as np
import pandas as pd
import scipy as sci
from sklearn.cluster import KMeans

########### LOCAL ##################################################################################
class dict2(object):
    def __init__(self, d):
        self.__dict__ = d

[docs]def pd_col_to_onehot(dfref, colname=None, colonehot=None, return_val="dataframe,column"): """ :param df: :param colname: :param colonehot: previous one hot columns :param returncol: :return: """ df = copy.deepcopy(dfref) coladded = [] colname = list(df.columns) if colname is None else colname # Encode each column into OneHot for x in colname: try: nunique = len(df[x].unique()) print(x, nunique, df.shape, flush=True) if nunique > 2: df = pd.concat([df, pd.get_dummies(df[x], prefix=x)], axis=1).drop([x], axis=1) else: df[x] = df[x].factorize()[0] # put into 0,1 format coladded.append(x) except Exception as e: print(x, e) # Add missing category columns if colonehot is not None: for x in colonehot: if not x in df.columns: df[x] = 0 print(x, "added") coladded.append(x) colnew = colonehot if colonehot is not None else [c for c in df.columns if c not in colname] if return_val == "dataframe,param": return df[colnew], colnew else: return df[colnew]
[docs]def pd_colcat_mergecol(df, col_list, x0, colid="easy_id"): """ Merge category onehot column :param df: :param l: :param x0: :return: """ dfz = pd.DataFrame({colid: df[colid].values}) for t in col_list: ix = t.rfind("_") val = int(t[ix + 1 :]) print(ix, t[ix + 1 :]) dfz[t] = df[t].apply(lambda x: val if x > 0 else 0) # print(dfz) dfz = dfz.set_index(colid) dfz[x0] = dfz.iloc[:, :].sum(1) for t in dfz.columns: if t != x0: del dfz[t] return dfz
[docs]def pd_colcat_tonum(df, colcat="all", drop_single_label=False, drop_fact_dict=True): """ Encoding a data-set with mixed data (numerical and categorical) to a numerical-only data-set, using the following logic: * categorical with only a single value will be marked as zero (or dropped, if requested) * categorical with two values will be replaced with the result of Pandas `factorize` * categorical with more than two values will be replaced with the result of Pandas `get_dummies` * numerical columns will not be modified **Returns:** DataFrame or (DataFrame, dict). If `drop_fact_dict` is True, returns the encoded DataFrame. else, returns a tuple of the encoded DataFrame and dictionary, where each key is a two-value column, and the value is the original labels, as supplied by Pandas `factorize`. Will be empty if no two-value columns are present in the data-set Parameters ---------- df : NumPy ndarray / Pandas DataFrame The data-set to encode colcat : sequence / string A sequence of the nominal (categorical) columns in the dataset. If string, must be 'all' to state that all columns are nominal. If None, nothing happens. Default: 'all' drop_single_label : Boolean, default = False If True, nominal columns with a only a single value will be dropped. drop_fact_dict : Boolean, default = True If True, the return value will be the encoded DataFrame alone. If False, it will be a tuple of the DataFrame and the dictionary of the binary factorization (originating from pd.factorize) """ df = convert(df, "dataframe") if colcat is None: return df elif colcat == "all": colcat = df.columns df_out = pd.DataFrame() binary_columns_dict = dict() for col in df.columns: if col not in colcat: df_out.loc[:, col] = df[col] else: unique_values = pd.unique(df[col]) if len(unique_values) == 1 and not drop_single_label: df_out.loc[:, col] = 0 elif len(unique_values) == 2: df_out.loc[:, col], binary_columns_dict[col] = pd.factorize(df[col]) else: dummies = pd.get_dummies(df[col], prefix=col) df_out = pd.concat([df_out, dummies], axis=1) if drop_fact_dict: return df_out else: return df_out, binary_columns_dict
[docs]def pd_colcat_mapping(df, colname): """ for col in colcat : df[col] = df[col].apply(lambda x : colcat_map["cat_map"][col].get(x) ) :param df: :param colname: :return: """ mapping_rev = { col: {n: cat for n, cat in enumerate(df[col].astype("category").cat.categories)} for col in df[colname] } mapping = { col: {cat: n for n, cat in enumerate(df[col].astype("category").cat.categories)} for col in df[colname] } return {"cat_map": mapping, "cat_map_inverse": mapping_rev}
def pd_colcat_toint(dfref, colname, colcat_map=None, suffix=None): df = dfref[colname] suffix = "" if suffix is None else suffix colname_new = [] if colcat_map is not None: for col in colname: ddict = colcat_map[col]["encode"] df[col + suffix], label = df[col].apply(lambda x: ddict.get(x)) colname_new.append(col + suffix) return df[colname], colcat_map colcat_map = {} for col in colname: colcat_map[col] = {} df[col + suffix], label = df[col].factorize() colcat_map[col]["decode"] = {i: t for i, t in enumerate(list(label))} colcat_map[col]["encode"] = {t: i for i, t in enumerate(list(label))} colname_new.append(col + suffix) return df[colname_new], colcat_map
[docs]def pd_colnum_tocat( df, colname=None, colexclude=None, colbinmap=None, bins=5, suffix="_bin", method="uniform", na_value=-1, return_val="dataframe,param", params = { "KMeans_n_clusters" : 8 , "KMeans_init": 'k-means++', "KMeans_n_init":10, "KMeans_max_iter" : 300, "KMeans_tol": 0.0001, "KMeans_precompute_distances" : 'auto', "KMeans_verbose" : 0, "KMeans_random_state": None, "KMeans_copy_x": True, "KMeans_n_jobs" : None, "KMeans_algorithm" : 'auto'} ): """ colbinmap = for each column, definition of bins :param df: :param method: :return: """ colexclude = [] if colexclude is None else colexclude colname = colname if colname is not None else list(df.columns) colnew = [] col_stat = OrderedDict() colmap = OrderedDict() #Bin Algo p = dict2(params) # Bin model params def bin_create(dfc, bins): mi, ma = dfc.min(), dfc.max() space = (ma - mi) / bins lbins = [mi + i * space for i in range(bins + 1)] lbins[0] -= 0.0001 return lbins def bin_create_quantile(dfc, bins): qt_list_ref = np.arange(0, 1.00001, 1.0 / bins) # print(qt_list_ref ) qt_list = dfc.quantile(qt_list_ref) # print(qt_list ) lbins = list(qt_list.values) lbins[0] -= 0.01 return lbins def bin_create_cluster(dfc): kmeans = KMeans(n_clusters= p.KMeans_n_clusters, init=p.KMeans_init, n_init=p.KMeans_n_init, max_iter=p.KMeans_max_iter, tol=p.KMeans_tol, precompute_distances=p.KMeans_precompute_distances, verbose=p.KMeans_verbose, random_state=p.KMeans_random_state, copy_x=p.KMeans_copy_x, n_jobs=p.KMeans_n_jobs, algorithm=p.KMeans_algorithm).fit(dfc) return kmeans.predict(dfc) # Loop on all columns for c in colname: if c in colexclude: continue print(c) df[c] = df[c].astype(np.float32) # Using Prebin Map data if colbinmap is not None: lbins = colbinmap.get(c) else: if method == "quantile": lbins = bin_create_quantile(df[c], bins) elif method == "cluster": non_nan_index = np.where(~np.isnan(df[c]))[0] lbins = bin_create_cluster(df.loc[non_nan_index][c].values.reshape((-1, 1))).reshape((-1,)) else: lbins = bin_create(df[c], bins) cbin = c + suffix if method == 'cluster': df.loc[non_nan_index][cbin] = lbins else: labels = np.arange(0, len(lbins) - 1) df[cbin] = pd.cut(df[c], bins=lbins, labels=labels) # NA processing df[cbin] = df[cbin].astype("float") df[cbin] = df[cbin].apply(lambda x: x if x >= 0.0 else na_value) # 3 NA Values df[cbin] = df[cbin].astype("int") col_stat = df.groupby(cbin).agg({c: {"size", "min", "mean", "max"}}) colmap[c] = lbins colnew.append(cbin) print(col_stat) if return_val == "dataframe": return df[colnew] elif return_val == "param": return colmap else: return df[colnew], colmap
[docs]def pd_colnum_normalize(df, colnum_log, colproba): """ :param df: :param colnum_log: :param colproba: :return: """ for x in colnum_log: try: df[x] = np.log(df[x].values.astype(np.float64) + 1.1) df[x] = df[x].replace(-np.inf, 0) df[x] = df[x].fillna(0) print(x, df[x].min(), df[x].max()) df[x] = df[x] / df[x].max() except BaseException: pass for x in colproba: print(x) df[x] = df[x].replace(-1, 0.5) df[x] = df[x].fillna(0.5) return df
def pd_col_remove(df, cols): for x in cols: try: del df[x] except BaseException: pass return df
[docs]def pd_col_intersection(df1, df2, colid): """ :param df1: :param df2: :param colid: :return : """ n2 = list(set(df1[colid].values).intersection(df2[colid])) print("total matchin", len(n2), len(df1), len(df2)) return n2
[docs]def pd_col_merge_onehot(df, colname): """ Merge columns into single (hotn :param df: :param colname: :return : """ dd = {} for x in colname: merge_array = [] for t in df.columns: if x in t and t[len(x) : len(x) + 1] == "_": merge_array.append(t) dd[x] = merge_array return dd
def pd_col_to_num(df, colname=None, default=np.nan): def to_float(x): try: return float(x) except BaseException: return default colname = list(df.columns) if colname is None else colname for c in colname: df[c] = df[c].apply(lambda x: to_float(x)) return df
[docs]def pd_col_filter(df, filter_val=None, iscol=1): """ # Remove Columns where Index Value is not in the filter_value # filter1= X_client['client_id'].values :param df: :param filter_val: :param iscol: :return: """ axis = 1 if iscol == 1 else 0 col_delete = [] for colname in df.index.values: # !!!! row Delete if colname in filter_val: col_delete.append(colname) df2 = df.drop(col_delete, axis=axis, inplace=False) return df2
[docs]def pd_col_fillna( dfref, colname=None, method="frequent", value=None, colgroupby=None, return_val="dataframe,param", ): """ Function to fill NaNs with a specific value in certain columns Arguments: df: dataframe colname: list of columns to remove text value: value to replace NaNs with Returns: df: new dataframe with filled values """ colname = list(dfref.columns) if colname is None else colname df = dfref[colname] params = {"method": method, "na_value": {}} for col in colname: nb_nans = df[col].isna().sum() if method == "frequent": x = df[col].value_counts().idxmax() if method == "mode": x = df[col].mode() if method == "median": x = df[col].median() if method == "median_conditional": x = df.groupby(colgroupby)[col].transform("median") # Conditional median value = x if value is None else value print(col, nb_nans, "replaceBY", value) params["na_value"][col] = value df[col] = df[col].fillna(value) if return_val == "dataframe,param": return df, params else: return df
[docs]def pd_col_fillna_advanced( dfref, colname=None, method="median", colname_na=None, return_val="dataframe,param" ): """ Function to fill NaNs with a specific value in certain columns Arguments: df: dataframe colname: list of columns to remove text colname_na : target na coluns value: value to replace NaNs with Returns: df: new dataframe with filled values """ import impyute as impy colname = list(dfref.columns) if colname is None else colname df = dfref[colname] params = {"method": method, "na_value": {}} for col in colname: nb_nans = df[col].isna().sum() print(nb_nans) if method == "mice": from impyute.imputation.cs import mice imputed_df = mice(df.values) dfout = pd.DataFrame(data=imputed_df, columns=colname) elif method == "knn": from impyute.imputation.cs import fast_knn imputed_df = fast_knn(df.values, k=5) dfout = pd.DataFrame(data=imputed_df, columns=colname) if return_val == "dataframe,param": return dfout, params else: return dfout
[docs]def pd_col_fillna_datawig( dfref, colname=None, method="median", colname_na=None, return_val="dataframe,param" ): """ Function to fill NaNs with a specific value in certain columns Arguments: df: dataframe colname: list of columns to remove text colname_na : target na coluns value: value to replace NaNs with Returns: df: new dataframe with filled values """ import impyute as impy colname = list(dfref.columns) if colname is None else colname df = dfref[colname] params = {"method": method, "na_value": {}} for colna in colname_na: nb_nans = df[colna].isna().sum() print(nb_nans) if method == "datawig": import datawig for colna in colname_na: imputer = datawig.SimpleImputer( input_columns=colname, output_column=colna, # the column we'd like to impute values for output_path="preprocess_fillna/", # stores model data and metrics ) # Fit an imputer model on the train data # Impute missing values and return original dataframe with predictions dfout = imputer.predict(df) if return_val == "dataframe,param": return dfout, params else: return dfout
[docs]def pd_row_drop_above_thresh(df, colnumlist, thresh): """ Function to remove outliers above a certain threshold Arguments: df: dataframe col: col from which to remove outliers thresh: value above which to remove row colnumlist:list Returns: df: dataframe with outliers removed """ for col in colnumlist: df = df.drop(df[(df[col] > thresh)], axis=0) return df
[docs]def pd_pipeline_apply(df, pipeline): """ pipe_preprocess_colnum = [ (pd_col_to_num, {"val": "?", }) , (pd_colnum_tocat, {"colname": None, "colbinmap": colnum_binmap, 'bins': 5, "method": "uniform", "suffix": "_bin", "return_val": "dataframe"}) , (pd_col_to_onehot, {"colname": None, "colonehot": colnum_onehot, "return_val": "dataframe"}) ] :param df: :param pipeline: :return: """ dfi = copy.deepcopy(df) for i, function in enumerate(pipeline): print( "############## Pipeline ", i, "Start", dfi.shape, str(function[0].__name__), flush=True ) dfi = function[0](dfi, **function[1]) print("############## Pipeline ", i, "Finished", dfi.shape, flush=True) return dfi
[docs]def pd_df_sampling(df, coltarget="y", n1max=10000, n2max=-1, isconcat=1): """ DownSampler :param df: :param coltarget: binary class :param n1max: :param n2max: :param isconcat: :return: """ df1 = df[df[coltarget] == 0].sample(n=n1max) n2max = len(df[df[coltarget] == 1]) if n2max == -1 else n2max df0 = df[df[coltarget] == 1].sample(n=n2max) if isconcat: df2 = pd.concat((df1, df0)) df2 = df2.sample(frac=1.0, replace=True) return df2 else: print("y=1", n2max, "y=0", len(df1)) return df0, df1
[docs]def pd_df_stack(df_list, ignore_index=True): """ Concat vertically dataframe :param df_list: :return: """ df0 = None for i, dfi in enumerate(df_list): if df0 is None: df0 = dfi else: try: df0 = df0.append(dfi, ignore_index=ignore_index) except Exception as e: print("Error appending: ", i, e) return df0
[docs]def pd_stat_correl_pair(df, coltarget=None, colname=None): """ Genearte correletion between the column and target column df represents the dataframe comprising the column and colname comprising the target column :param df: :param colname: list of columns :param coltarget : target column :return: """ from scipy.stats import pearsonr colname = colname if colname is not None else list(df.columns) target_corr = [] for col in colname: target_corr.append(pearsonr(df[col].values, df[coltarget].values)[0]) df_correl = pd.DataFrame({"colx": [""] * len(colname), "coly": colname, "correl": target_corr}) df_correl[coltarget] = colname return df_correl
[docs]def pd_stat_colcheck(df): """ :param df: :return : """ for x in df.columns: if len(df[x].unique()) > 2 and df[x].dtype != np.dtype("O"): print(x, len(df[x].unique()), df[x].min(), df[x].max())
[docs]def pd_stat_jupyter_profile(df, savefile="report.html", title="Pandas Profile"): """ Describe the tables #Pandas-Profiling 2.0.0 df.profile_report() """ import pandas_profiling as pp print("start profiling") profile = df.profile_report(title=title) profile.to_file(output_file=savefile) colexclude = profile.get_rejected_variables(threshold=0.98) return colexclude
[docs]def pd_stat_distribution_colnum(df): """ Describe the tables """ coldes = [ "col", "coltype", "dtype", "count", "min", "max", "nb_na", "pct_na", "median", "mean", "std", "25%", "75%", "outlier", ] def getstat(col): """ max, min, nb, nb_na, pct_na, median, qt_25, qt_75, nb, nb_unique, nb_na, freq_1st, freq_2th, freq_3th s.describe() count 3.0 mean 2.0 std 1.0 min 1.0 25% 1.5 50% 2.0 75% 2.5 max 3.0 """ ss = list(df[col].describe().values) ss = [str(df[col].dtype)] + ss nb_na = df[col].isnull().sum() ntot = len(df) ss = ss + [nb_na, nb_na / (ntot + 0.0)] return pd.Series( ss, ["dtype", "count", "mean", "std", "min", "25%", "50%", "75%", "max", "nb_na", "pct_na"], ) dfdes = pd.DataFrame([], columns=coldes) cols = df.columns for col in cols: dtype1 = str(df[col].dtype) if dtype1[0:3] in ["int", "flo"]: row1 = getstat(col) dfdes = pd.concat((dfdes, row1)) if dtype1 == "object": pass
[docs]def pd_stat_histogram(df, bins=50, coltarget="diff"): """ :param df: :param bins: :param coltarget: :return: """ hh = np.histogram( df[coltarget].values, bins=bins, range=None, normed=None, weights=None, density=None ) hh2 = pd.DataFrame({"bins": hh[1][:-1], "freq": hh[0]}) hh2["density"] = hh2["freqall"] / hh2["freqall"].sum() return hh2
[docs]def pd_stat_histogram_groupby(df, bins=50, coltarget="diff", colgroupby="y"): """ :param df: :param bins: :param coltarget: :param colgroupby: :return: """ dfhisto = pd_stat_histogram_groupby(df, bins, coltarget) xunique = list(df[colgroupby].unique()) # todo : issues with naming for x in xunique: dfhisto1 = pd_stat_histogram_groupby(df[df[colgroupby] == x], bins, coltarget) dfhisto = pd.concat((dfhisto, dfhisto1)) return dfhisto
[docs]def pd_stat_na_perow(df, n=10 ** 6): """ :param df: :param n: :return: """ ll = [] n = 10 ** 6 for ii, x in df.iloc[:n, :].iterrows(): ii = 0 for t in x: if pd.isna(t) or t == -1: ii = ii + 1 ll.append(ii) dfna_user = pd.DataFrame( {"": df.index.values[:n], "n_na": ll, "n_ok": len(df.columns) - np.array(ll)} ) return dfna_user
[docs]def pd_stat_distribution(df, subsample_ratio=1.0): """ :param df: :return: """ print("Univariate distribution") ll = { x: [] for x in [ "col", "n", "n_na", "n_notna", "n_na_pct", "nunique", "nunique_pct", "xmin", "xmin_freq", "xmin_pct", "xmax", "xmax_freq", "xmax_pct", "xmed", "xmed_freq", "xmed_pct", ] } if subsample_ratio < 1.0: df = df.sample(frac=subsample_ratio) nn = len(df) + 0.0 for x in df.columns: try: xmin = df[x].min() nx = len(df[df[x] < xmin + 0.01]) # Can failed if string ll["xmin_freq"].append(nx) ll["xmin"].append(xmin) ll["xmin_pct"].append(nx / nn) xmed = df[x].median() nx = len(df[(df[x] > xmed - 0.1) & (df[x] < xmed + 0.1)]) ll["xmed_freq"].append(nx) ll["xmed"].append(xmed) ll["xmed_pct"].append(nx / nn) xmax = df[x].max() nx = len(df[df[x] > xmax - 0.01]) ll["xmax_freq"].append(nx) ll["xmax"].append(xmax) ll["xmax_pct"].append(nx / nn) n_notna = df[x].count() ll["n_notna"].append(n_notna) ll["n_na"].append(nn - n_notna) ll["n"].append(nn) ll["n_na_pct"].append((nn - n_notna) / nn * 1.0) nx = df[x].nunique() ll["nunique"].append(nx) # Should be in last ll["nunique_pct"].append(nx / nn) # Should be in last ll["col"].append(x) # Should be in last except Exception as e: print(x, e) # for k, x in ll.items(): # print(k, len(x)) ll = pd.DataFrame(ll) return ll
[docs]def convert(data, to): """ :param data: :param to: :return : """ converted = None if to == "array": if isinstance(data, np.ndarray): converted = data elif isinstance(data, pd.Series): converted = data.values elif isinstance(data, list): converted = np.array(data) elif isinstance(data, pd.DataFrame): converted = data.as_matrix() elif to == "list": if isinstance(data, list): converted = data elif isinstance(data, pd.Series): converted = data.values.tolist() elif isinstance(data, np.ndarray): converted = data.tolist() elif to == "dataframe": if isinstance(data, pd.DataFrame): converted = data elif isinstance(data, np.ndarray): converted = pd.DataFrame(data) else: raise ValueError("Unknown data conversion: {}".format(to)) if converted is None: raise TypeError("cannot handle data conversion of type: {} to {}".format(type(data), to)) else: return converted
[docs]def col_stat_getcategorydict_freq(catedict): """ Generate Frequency of category : Id, Freq, Freqin%, CumSum%, ZScore given a dictionnary of category parsed previously """ catlist = [] for key, v in list(catedict.items()): df = pd.DataFrame(v) # , ["category", "freq"]) df["freq_pct"] = 100.0 * df["freq"] / df["freq"].sum() df["freq_zscore"] = df["freq"] / df["freq"].std() df = df.sort_values(by=["freq"], ascending=0) df["freq_cumpct"] = 100.0 * df["freq_pct"].cumsum() / df["freq_pct"].sum() df["rank"] = np.arange(0, len(df.index.values)) catlist.append((key, df)) return catlist
[docs]def col_extractname_colbin(cols2): """ 1hot column name to generic column names :param cols2: :return: """ coln = [] for ss in cols2: xr = ss[ss.rfind("_") + 1 :] xl = ss[: ss.rfind("_")] if len(xr) < 3: # -1 or 1 coln.append(xl) else: coln.append(ss) coln = np_drop_duplicates(coln) return coln
def col_getnumpy_indice(colall, colcat): def np_find_indice(v, x): for i, j in enumerate(v): if j == x: return i return -1 return [np_find_indice(colall, x) for x in colcat]
[docs]def col_extractname(col_onehot): """ Column extraction :param col_onehot :return: """ colnew = [] for x in col_onehot: if len(x) > 2: if x[-2] == "_": if x[:-2] not in colnew: colnew.append(x[:-2]) elif x[-2] == "-": if x[:-3] not in colnew: colnew.append(x[:-3]) else: if x not in colnew: colnew.append(x) return colnew
[docs]def col_remove(cols, colsremove, mode="exact"): """ Parameters ---------- cols : TYPE DESCRIPTION. colsremove : TYPE DESCRIPTION. mode : TYPE, optional DESCRIPTION. The default is "exact", "fuzzy" Returns ------- cols : TYPE DESCRIPTION. remove column name from list """ if mode == "exact" : for x in colsremove: try: cols.remove(x) except BaseException: pass return cols if mode == "fuzzy" : cols3 = [] for t in cols: flag = 0 for x in colsremove: if x in t: flag = 1 break if flag == 0: cols3.append(t) return cols3
from matplotlib import pyplot as plt
[docs]def pd_colnum_tocat_stat(input_data, feature, target_col, bins, cuts=0): """ Bins continuous features into equal sample size buckets and returns the target mean in each bucket. Separates out nulls into another bucket. :param input_data: dataframe containg features and target column :param feature: feature column name :param target_col: target column :param bins: Number bins required :param cuts: if buckets of certain specific cuts are required. Used on test data to use cuts from train. :return: If cuts are passed only grouped data is returned, else cuts and grouped data is returned """ has_null = pd.isnull(input_data[feature]).sum() > 0 if has_null == 1: data_null = input_data[pd.isnull(input_data[feature])] input_data = input_data[~pd.isnull(input_data[feature])] input_data.reset_index(inplace=True, drop=True) is_train = 0 if cuts == 0: is_train = 1 prev_cut = min(input_data[feature]) - 1 cuts = [prev_cut] reduced_cuts = 0 for i in range(1, bins + 1): next_cut = np.percentile(input_data[feature], i * 100 / bins) if next_cut > prev_cut + .000001: # float numbers shold be compared with some threshold! cuts.append(next_cut) else: reduced_cuts = reduced_cuts + 1 prev_cut = next_cut # if reduced_cuts>0: # print('Reduced the number of bins due to less variation in feature') cut_series = pd.cut(input_data[feature], cuts) else: cut_series = pd.cut(input_data[feature], cuts) grouped = input_data.groupby([cut_series], as_index=True).agg( {target_col: [np.size, np.mean], feature: [np.mean]}) grouped.columns = ['_'.join(cols).strip() for cols in grouped.columns.values] grouped[] = grouped.index grouped.reset_index(inplace=True, drop=True) grouped = grouped[[feature] + list(grouped.columns[0:3])] grouped = grouped.rename(index=str, columns={target_col + '_size': 'Samples_in_bin'}) grouped = grouped.reset_index(drop=True) corrected_bin_name = '[' + str(min(input_data[feature])) + ', ' + str(grouped.loc[0, feature]).split(',')[1] grouped[feature] = grouped[feature].astype('category') grouped[feature] = grouped[feature].cat.add_categories(corrected_bin_name) grouped.loc[0, feature] = corrected_bin_name if has_null == 1: grouped_null = grouped.loc[0:0, :].copy() grouped_null[feature] = grouped_null[feature].astype('category') grouped_null[feature] = grouped_null[feature].cat.add_categories('Nulls') grouped_null.loc[0, feature] = 'Nulls' grouped_null.loc[0, 'Samples_in_bin'] = len(data_null) grouped_null.loc[0, target_col + '_mean'] = data_null[target_col].mean() grouped_null.loc[0, feature + '_mean'] = np.nan grouped[feature] = grouped[feature].astype('str') grouped = pd.concat([grouped_null, grouped], axis=0) grouped.reset_index(inplace=True, drop=True) grouped[feature] = grouped[feature].astype('str').astype('category') if is_train == 1: return (cuts, grouped) else: return (grouped)
[docs]def draw_plots(input_data, feature, target_col, trend_correlation=None): """ Draws univariate dependence plots for a feature :param input_data: grouped data contained bins of feature and target mean. :param feature: feature column name :param target_col: target column :param trend_correlation: correlation between train and test trends of feature wrt target :return: Draws trend plots for feature """ trend_changes = get_trend_changes(grouped_data=input_data, feature=feature, target_col=target_col) plt.figure(figsize=(12, 5)) ax1 = plt.subplot(1, 2, 1) ax1.plot(input_data[target_col + '_mean'], marker='o') ax1.set_xticks(np.arange(len(input_data))) ax1.set_xticklabels((input_data[feature]).astype('str')) plt.xticks(rotation=45) ax1.set_xlabel('Bins of ' + feature) ax1.set_ylabel('Average of ' + target_col) comment = "Trend changed " + str(trend_changes) + " times" if trend_correlation == 0: comment = comment + '\n' + 'Correlation with train trend: NA' elif trend_correlation != None: comment = comment + '\n' + 'Correlation with train trend: ' + str(int(trend_correlation * 100)) + '%' props = dict(boxstyle='round', facecolor='wheat', alpha=0.3) ax1.text(0.05, 0.95, comment, fontsize=12, verticalalignment='top', bbox=props, transform=ax1.transAxes) plt.title('Average of ' + target_col + ' wrt ' + feature) ax2 = plt.subplot(1, 2, 2), input_data['Samples_in_bin'], alpha=0.5) ax2.set_xticks(np.arange(len(input_data))) ax2.set_xticklabels((input_data[feature]).astype('str')) plt.xticks(rotation=45) ax2.set_xlabel('Bins of ' + feature) ax2.set_ylabel('Bin-wise sample size') plt.title('Samples in bins of ' + feature) plt.tight_layout()
[docs]def get_trend_changes(grouped_data, feature, target_col, threshold=0.03): """ Calculates number of times the trend of feature wrt target changed direction. :param grouped_data: grouped dataset :param feature: feature column name :param target_col: target column :param threshold: minimum % difference required to count as trend change :return: number of trend chagnes for the feature """ grouped_data = grouped_data.loc[grouped_data[feature] != 'Nulls', :].reset_index(drop=True) target_diffs = grouped_data[target_col + '_mean'].diff() target_diffs = target_diffs[~np.isnan(target_diffs)].reset_index(drop=True) max_diff = grouped_data[target_col + '_mean'].max() - grouped_data[target_col + '_mean'].min() target_diffs_mod = target_diffs.fillna(0).abs() low_change = target_diffs_mod < threshold * max_diff target_diffs_norm = target_diffs.divide(target_diffs_mod) target_diffs_norm[low_change] = 0 target_diffs_norm = target_diffs_norm[target_diffs_norm != 0] target_diffs_lvl2 = target_diffs_norm.diff() changes = target_diffs_lvl2.fillna(0).abs() / 2 tot_trend_changes = int(changes.sum()) if ~np.isnan(changes.sum()) else 0 return (tot_trend_changes)
[docs]def get_trend_correlation(grouped, grouped_test, feature, target_col): """ Calculates correlation between train and test trend of feature wrt target. :param grouped: train grouped data :param grouped_test: test grouped data :param feature: feature column name :param target_col: target column name :return: trend correlation between train and test """ grouped = grouped[grouped[feature] != 'Nulls'].reset_index(drop=True) grouped_test = grouped_test[grouped_test[feature] != 'Nulls'].reset_index(drop=True) if grouped_test.loc[0, feature] != grouped.loc[0, feature]: grouped_test[feature] = grouped_test[feature].cat.add_categories(grouped.loc[0, feature]) grouped_test.loc[0, feature] = grouped.loc[0, feature] grouped_test_train = grouped.merge(grouped_test[[feature, target_col + '_mean']], on=feature, how='left', suffixes=('', '_test')) nan_rows = pd.isnull(grouped_test_train[target_col + '_mean']) | pd.isnull( grouped_test_train[target_col + '_mean_test']) grouped_test_train = grouped_test_train.loc[~nan_rows, :] if len(grouped_test_train) > 1: trend_correlation = np.corrcoef(grouped_test_train[target_col + '_mean'], grouped_test_train[target_col + '_mean_test'])[0, 1] else: trend_correlation = 0 print("Only one bin created for " + feature + ". Correlation can't be calculated") return (trend_correlation)
[docs]def univariate_plotter(feature, data, target_col, bins=10, data_test=0): """ Calls the draw plot function and editing around the plots :param feature: feature column name :param data: dataframe containing features and target columns :param target_col: target column name :param bins: number of bins to be created from continuous feature :param data_test: test data which has to be compared with input data for correlation :return: grouped data if only train passed, else (grouped train data, grouped test data) """ print(' {:^100} '.format('Plots for ' + feature)) if data[feature].dtype == 'O': print('Categorical feature not supported') else: cuts, grouped = pd_colnum_tocat_stat(input_data=data, feature=feature, target_col=target_col, bins=bins) has_test = type(data_test) == pd.core.frame.DataFrame if has_test: grouped_test = pd_colnum_tocat_stat(input_data=data_test.reset_index(drop=True), feature=feature, target_col=target_col, bins=bins, cuts=cuts) trend_corr = get_trend_correlation(grouped, grouped_test, feature, target_col) print(' {:^100} '.format('Train data plots')) draw_plots(input_data=grouped, feature=feature, target_col=target_col) print(' {:^100} '.format('Test data plots')) draw_plots(input_data=grouped_test, feature=feature, target_col=target_col, trend_correlation=trend_corr) else: draw_plots(input_data=grouped, feature=feature, target_col=target_col) print( '--------------------------------------------------------------------------------------------------------------') print('\n') if has_test: return (grouped, grouped_test) else: return (grouped)
[docs]def get_univariate_plots(data, target_col, features_list=0, bins=10, data_test=0): """ Creates univariate dependence plots for features in the dataset :param data: dataframe containing features and target columns :param target_col: target column name :param features_list: by default creates plots for all features. If list passed, creates plots of only those features. :param bins: number of bins to be created from continuous feature :param data_test: test data which has to be compared with input data for correlation :return: Draws univariate plots for all columns in data """ if type(features_list) == int: features_list = list(data.columns) features_list.remove(target_col) for cols in features_list: if cols != target_col and data[cols].dtype == 'O': print(cols + ' is categorical. Categorical features not supported yet.') elif cols != target_col and data[cols].dtype != 'O': univariate_plotter(feature=cols, data=data, target_col=target_col, bins=bins, data_test=data_test)
[docs]def get_trend_stats(data, target_col, features_list=0, bins=10, data_test=0): """ Calculates trend changes and correlation between train/test for list of features :param data: dataframe containing features and target columns :param target_col: target column name :param features_list: by default creates plots for all features. If list passed, creates plots of only those features. :param bins: number of bins to be created from continuous feature :param data_test: test data which has to be compared with input data for correlation :return: dataframe with trend changes and trend correlation (if test data passed) """ if type(features_list) == int: features_list = list(data.columns) features_list.remove(target_col) stats_all = [] has_test = type(data_test) == pd.core.frame.DataFrame ignored = [] for feature in features_list: if data[feature].dtype == 'O' or feature == target_col: ignored.append(feature) else: cuts, grouped = pd_colnum_tocat_stat(input_data=data, feature=feature, target_col=target_col, bins=bins) trend_changes = get_trend_changes(grouped_data=grouped, feature=feature, target_col=target_col) if has_test: grouped_test = pd_colnum_tocat_stat(input_data=data_test.reset_index(drop=True), feature=feature, target_col=target_col, bins=bins, cuts=cuts) trend_corr = get_trend_correlation(grouped, grouped_test, feature, target_col) trend_changes_test = get_trend_changes(grouped_data=grouped_test, feature=feature, target_col=target_col) stats = [feature, trend_changes, trend_changes_test, trend_corr] else: stats = [feature, trend_changes] stats_all.append(stats) stats_all_df = pd.DataFrame(stats_all) stats_all_df.columns = ['Feature', 'Trend_changes'] if has_test == False else ['Feature', 'Trend_changes', 'Trend_changes_test', 'Trend_correlation'] if len(ignored) > 0: print('Categorical features ' + str(ignored) + ' ignored. Categorical features not supported yet.') print('Returning stats for all numeric features') return (stats_all_df)