Source code for dsa.da.util_feature

# pylint: disable=C0321,C0103,E1221,C0301,E1305,E1121,C0302,C0330
# -*- coding: utf-8 -*-
"""
Methods for feature extraction and preprocessing
util_feature: input/output is pandas



"""
import copy
import math
import os
from collections import Counter, OrderedDict

import numpy as np
import pandas as pd
import scipy as sci
from sklearn.cluster import KMeans



########### LOCAL ##################################################################################
print("os.getcwd", os.getcwd())


class dict2(object):
    def __init__(self, d):
        self.__dict__ = d



def ztest():
    import sklearn as sk
    print(sk)


####################################################################################################
[docs]def pd_col_to_onehot(dfref, colname=None, colonehot=None, return_val="dataframe,column"): """ :param df: :param colname: :param colonehot: previous one hot columns :param returncol: :return: """ df = copy.deepcopy(dfref) coladded = [] colname = list(df.columns) if colname is None else colname # Encode each column into OneHot for x in colname: try: nunique = len(df[x].unique()) print(x, nunique, df.shape, flush=True) if nunique > 2: df = pd.concat([df, pd.get_dummies(df[x], prefix=x)], axis=1).drop([x], axis=1) else: df[x] = df[x].factorize()[0] # put into 0,1 format coladded.append(x) except Exception as e: print(x, e) # Add missing category columns if colonehot is not None: for x in colonehot: if not x in df.columns: df[x] = 0 print(x, "added") coladded.append(x) colnew = colonehot if colonehot is not None else [c for c in df.columns if c not in colname] if return_val == "dataframe,param": return df[colnew], colnew else: return df[colnew]
[docs]def pd_colcat_mergecol(df, col_list, x0, colid="easy_id"): """ Merge category onehot column :param df: :param l: :param x0: :return: """ dfz = pd.DataFrame({colid: df[colid].values}) for t in col_list: ix = t.rfind("_") val = int(t[ix + 1 :]) print(ix, t[ix + 1 :]) dfz[t] = df[t].apply(lambda x: val if x > 0 else 0) # print(dfz) dfz = dfz.set_index(colid) dfz[x0] = dfz.iloc[:, :].sum(1) for t in dfz.columns: if t != x0: del dfz[t] return dfz
[docs]def pd_colcat_tonum(df, colcat="all", drop_single_label=False, drop_fact_dict=True): """ Encoding a data-set with mixed data (numerical and categorical) to a numerical-only data-set, using the following logic: * categorical with only a single value will be marked as zero (or dropped, if requested) * categorical with two values will be replaced with the result of Pandas `factorize` * categorical with more than two values will be replaced with the result of Pandas `get_dummies` * numerical columns will not be modified **Returns:** DataFrame or (DataFrame, dict). If `drop_fact_dict` is True, returns the encoded DataFrame. else, returns a tuple of the encoded DataFrame and dictionary, where each key is a two-value column, and the value is the original labels, as supplied by Pandas `factorize`. Will be empty if no two-value columns are present in the data-set Parameters ---------- df : NumPy ndarray / Pandas DataFrame The data-set to encode colcat : sequence / string A sequence of the nominal (categorical) columns in the dataset. If string, must be 'all' to state that all columns are nominal. If None, nothing happens. Default: 'all' drop_single_label : Boolean, default = False If True, nominal columns with a only a single value will be dropped. drop_fact_dict : Boolean, default = True If True, the return value will be the encoded DataFrame alone. If False, it will be a tuple of the DataFrame and the dictionary of the binary factorization (originating from pd.factorize) """ df = convert(df, "dataframe") if colcat is None: return df elif colcat == "all": colcat = df.columns df_out = pd.DataFrame() binary_columns_dict = dict() for col in df.columns: if col not in colcat: df_out.loc[:, col] = df[col] else: unique_values = pd.unique(df[col]) if len(unique_values) == 1 and not drop_single_label: df_out.loc[:, col] = 0 elif len(unique_values) == 2: df_out.loc[:, col], binary_columns_dict[col] = pd.factorize(df[col]) else: dummies = pd.get_dummies(df[col], prefix=col) df_out = pd.concat([df_out, dummies], axis=1) if drop_fact_dict: return df_out else: return df_out, binary_columns_dict
[docs]def pd_colcat_mapping(df, colname): """ for col in colcat : df[col] = df[col].apply(lambda x : colcat_map["cat_map"][col].get(x) ) :param df: :param colname: :return: """ mapping_rev = { col: {n: cat for n, cat in enumerate(df[col].astype("category").cat.categories)} for col in df[colname] } mapping = { col: {cat: n for n, cat in enumerate(df[col].astype("category").cat.categories)} for col in df[colname] } return {"cat_map": mapping, "cat_map_inverse": mapping_rev}
def pd_colcat_toint(dfref, colname, colcat_map=None, suffix=None): df = dfref[colname] suffix = "" if suffix is None else suffix colname_new = [] if colcat_map is not None: for col in colname: ddict = colcat_map[col]["encode"] df[col + suffix], label = df[col].apply(lambda x: ddict.get(x)) colname_new.append(col + suffix) return df[colname], colcat_map colcat_map = {} for col in colname: colcat_map[col] = {} df[col + suffix], label = df[col].factorize() colcat_map[col]["decode"] = {i: t for i, t in enumerate(list(label))} colcat_map[col]["encode"] = {t: i for i, t in enumerate(list(label))} colname_new.append(col + suffix) return df[colname_new], colcat_map
[docs]def pd_colnum_tocat( df, colname=None, colexclude=None, colbinmap=None, bins=5, suffix="_bin", method="uniform", na_value=-1, return_val="dataframe,param", params = { "KMeans_n_clusters" : 8 , "KMeans_init": 'k-means++', "KMeans_n_init":10, "KMeans_max_iter" : 300, "KMeans_tol": 0.0001, "KMeans_precompute_distances" : 'auto', "KMeans_verbose" : 0, "KMeans_random_state": None, "KMeans_copy_x": True, "KMeans_n_jobs" : None, "KMeans_algorithm" : 'auto'} ): """ colbinmap = for each column, definition of bins https://scikit-learn.org/stable/modules/classes.html#module-sklearn.preprocessing :param df: :param method: :return: """ colexclude = [] if colexclude is None else colexclude colname = colname if colname is not None else list(df.columns) colnew = [] col_stat = OrderedDict() colmap = OrderedDict() #Bin Algo p = dict2(params) # Bin model params def bin_create(dfc, bins): mi, ma = dfc.min(), dfc.max() space = (ma - mi) / bins lbins = [mi + i * space for i in range(bins + 1)] lbins[0] -= 0.0001 return lbins def bin_create_quantile(dfc, bins): qt_list_ref = np.arange(0, 1.00001, 1.0 / bins) # print(qt_list_ref ) qt_list = dfc.quantile(qt_list_ref) # print(qt_list ) lbins = list(qt_list.values) lbins[0] -= 0.01 return lbins def bin_create_cluster(dfc): kmeans = KMeans(n_clusters= p.KMeans_n_clusters, init=p.KMeans_init, n_init=p.KMeans_n_init, max_iter=p.KMeans_max_iter, tol=p.KMeans_tol, precompute_distances=p.KMeans_precompute_distances, verbose=p.KMeans_verbose, random_state=p.KMeans_random_state, copy_x=p.KMeans_copy_x, n_jobs=p.KMeans_n_jobs, algorithm=p.KMeans_algorithm).fit(dfc) return kmeans.predict(dfc) # Loop on all columns for c in colname: if c in colexclude: continue print(c) df[c] = df[c].astype(np.float32) # Using Prebin Map data if colbinmap is not None: lbins = colbinmap.get(c) else: if method == "quantile": lbins = bin_create_quantile(df[c], bins) elif method == "cluster": non_nan_index = np.where(~np.isnan(df[c]))[0] lbins = bin_create_cluster(df.loc[non_nan_index][c].values.reshape((-1, 1))).reshape((-1,)) else: lbins = bin_create(df[c], bins) cbin = c + suffix if method == 'cluster': df.loc[non_nan_index][cbin] = lbins else: labels = np.arange(0, len(lbins) - 1) df[cbin] = pd.cut(df[c], bins=lbins, labels=labels) # NA processing df[cbin] = df[cbin].astype("float") df[cbin] = df[cbin].apply(lambda x: x if x >= 0.0 else na_value) # 3 NA Values df[cbin] = df[cbin].astype("int") col_stat = df.groupby(cbin).agg({c: {"size", "min", "mean", "max"}}) colmap[c] = lbins colnew.append(cbin) print(col_stat) if return_val == "dataframe": return df[colnew] elif return_val == "param": return colmap else: return df[colnew], colmap
[docs]def pd_colnum_normalize(df, colnum_log, colproba): """ :param df: :param colnum_log: :param colproba: :return: """ for x in colnum_log: try: df[x] = np.log(df[x].values.astype(np.float64) + 1.1) df[x] = df[x].replace(-np.inf, 0) df[x] = df[x].fillna(0) print(x, df[x].min(), df[x].max()) df[x] = df[x] / df[x].max() except BaseException: pass for x in colproba: print(x) df[x] = df[x].replace(-1, 0.5) df[x] = df[x].fillna(0.5) return df
def pd_col_remove(df, cols): for x in cols: try: del df[x] except BaseException: pass return df
[docs]def pd_col_intersection(df1, df2, colid): """ :param df1: :param df2: :param colid: :return : """ n2 = list(set(df1[colid].values).intersection(df2[colid])) print("total matchin", len(n2), len(df1), len(df2)) return n2
[docs]def pd_col_merge_onehot(df, colname): """ Merge columns into single (hotn :param df: :param colname: :return : """ dd = {} for x in colname: merge_array = [] for t in df.columns: if x in t and t[len(x) : len(x) + 1] == "_": merge_array.append(t) dd[x] = merge_array return dd
def pd_col_to_num(df, colname=None, default=np.nan): def to_float(x): try: return float(x) except BaseException: return default colname = list(df.columns) if colname is None else colname for c in colname: df[c] = df[c].apply(lambda x: to_float(x)) return df
[docs]def pd_col_filter(df, filter_val=None, iscol=1): """ # Remove Columns where Index Value is not in the filter_value # filter1= X_client['client_id'].values :param df: :param filter_val: :param iscol: :return: """ axis = 1 if iscol == 1 else 0 col_delete = [] for colname in df.index.values: # !!!! row Delete if colname in filter_val: col_delete.append(colname) df2 = df.drop(col_delete, axis=axis, inplace=False) return df2
[docs]def pd_col_fillna( dfref, colname=None, method="frequent", value=None, colgroupby=None, return_val="dataframe,param", ): """ Function to fill NaNs with a specific value in certain columns Arguments: df: dataframe colname: list of columns to remove text value: value to replace NaNs with Returns: df: new dataframe with filled values """ colname = list(dfref.columns) if colname is None else colname df = dfref[colname] params = {"method": method, "na_value": {}} for col in colname: nb_nans = df[col].isna().sum() if method == "frequent": x = df[col].value_counts().idxmax() if method == "mode": x = df[col].mode() if method == "median": x = df[col].median() if method == "median_conditional": x = df.groupby(colgroupby)[col].transform("median") # Conditional median value = x if value is None else value print(col, nb_nans, "replaceBY", value) params["na_value"][col] = value df[col] = df[col].fillna(value) if return_val == "dataframe,param": return df, params else: return df
[docs]def pd_col_fillna_advanced( dfref, colname=None, method="median", colname_na=None, return_val="dataframe,param" ): """ Function to fill NaNs with a specific value in certain columns https://impyute.readthedocs.io/en/master/ Arguments: df: dataframe colname: list of columns to remove text colname_na : target na coluns value: value to replace NaNs with Returns: df: new dataframe with filled values https://impyute.readthedocs.io/en/master/user_guide/overview.html """ import impyute as impy colname = list(dfref.columns) if colname is None else colname df = dfref[colname] params = {"method": method, "na_value": {}} for col in colname: nb_nans = df[col].isna().sum() print(nb_nans) if method == "mice": from impyute.imputation.cs import mice imputed_df = mice(df.values) dfout = pd.DataFrame(data=imputed_df, columns=colname) elif method == "knn": from impyute.imputation.cs import fast_knn imputed_df = fast_knn(df.values, k=5) dfout = pd.DataFrame(data=imputed_df, columns=colname) if return_val == "dataframe,param": return dfout, params else: return dfout
[docs]def pd_col_fillna_datawig( dfref, colname=None, method="median", colname_na=None, return_val="dataframe,param" ): """ Function to fill NaNs with a specific value in certain columns https://impyute.readthedocs.io/en/master/ Arguments: df: dataframe colname: list of columns to remove text colname_na : target na coluns value: value to replace NaNs with Returns: df: new dataframe with filled values https://impyute.readthedocs.io/en/master/user_guide/overview.html """ import impyute as impy colname = list(dfref.columns) if colname is None else colname df = dfref[colname] params = {"method": method, "na_value": {}} for colna in colname_na: nb_nans = df[colna].isna().sum() print(nb_nans) if method == "datawig": import datawig for colna in colname_na: imputer = datawig.SimpleImputer( input_columns=colname, output_column=colna, # the column we'd like to impute values for output_path="preprocess_fillna/", # stores model data and metrics ) # Fit an imputer model on the train data imputer.fit(train_df=df) # Impute missing values and return original dataframe with predictions dfout = imputer.predict(df) if return_val == "dataframe,param": return dfout, params else: return dfout
[docs]def pd_row_drop_above_thresh(df, colnumlist, thresh): """ Function to remove outliers above a certain threshold Arguments: df: dataframe col: col from which to remove outliers thresh: value above which to remove row colnumlist:list Returns: df: dataframe with outliers removed """ for col in colnumlist: df = df.drop(df[(df[col] > thresh)], axis=0) return df
[docs]def pd_pipeline_apply(df, pipeline): """ pipe_preprocess_colnum = [ (pd_col_to_num, {"val": "?", }) , (pd_colnum_tocat, {"colname": None, "colbinmap": colnum_binmap, 'bins': 5, "method": "uniform", "suffix": "_bin", "return_val": "dataframe"}) , (pd_col_to_onehot, {"colname": None, "colonehot": colnum_onehot, "return_val": "dataframe"}) ] :param df: :param pipeline: :return: """ dfi = copy.deepcopy(df) for i, function in enumerate(pipeline): print( "############## Pipeline ", i, "Start", dfi.shape, str(function[0].__name__), flush=True ) dfi = function[0](dfi, **function[1]) print("############## Pipeline ", i, "Finished", dfi.shape, flush=True) return dfi
[docs]def pd_df_sampling(df, coltarget="y", n1max=10000, n2max=-1, isconcat=1): """ DownSampler :param df: :param coltarget: binary class :param n1max: :param n2max: :param isconcat: :return: """ df1 = df[df[coltarget] == 0].sample(n=n1max) n2max = len(df[df[coltarget] == 1]) if n2max == -1 else n2max df0 = df[df[coltarget] == 1].sample(n=n2max) if isconcat: df2 = pd.concat((df1, df0)) df2 = df2.sample(frac=1.0, replace=True) return df2 else: print("y=1", n2max, "y=0", len(df1)) return df0, df1
[docs]def pd_df_stack(df_list, ignore_index=True): """ Concat vertically dataframe :param df_list: :return: """ df0 = None for i, dfi in enumerate(df_list): if df0 is None: df0 = dfi else: try: df0 = df0.append(dfi, ignore_index=ignore_index) except Exception as e: print("Error appending: ", i, e) return df0
[docs]def pd_stat_correl_pair(df, coltarget=None, colname=None): """ Genearte correletion between the column and target column df represents the dataframe comprising the column and colname comprising the target column :param df: :param colname: list of columns :param coltarget : target column :return: """ from scipy.stats import pearsonr colname = colname if colname is not None else list(df.columns) target_corr = [] for col in colname: target_corr.append(pearsonr(df[col].values, df[coltarget].values)[0]) df_correl = pd.DataFrame({"colx": [""] * len(colname), "coly": colname, "correl": target_corr}) df_correl[coltarget] = colname return df_correl
[docs]def pd_stat_colcheck(df): """ :param df: :return : """ for x in df.columns: if len(df[x].unique()) > 2 and df[x].dtype != np.dtype("O"): print(x, len(df[x].unique()), df[x].min(), df[x].max())
[docs]def pd_stat_jupyter_profile(df, savefile="report.html", title="Pandas Profile"): """ Describe the tables #Pandas-Profiling 2.0.0 df.profile_report() """ import pandas_profiling as pp print("start profiling") profile = df.profile_report(title=title) profile.to_file(output_file=savefile) colexclude = profile.get_rejected_variables(threshold=0.98) return colexclude
[docs]def pd_stat_distribution_colnum(df): """ Describe the tables """ coldes = [ "col", "coltype", "dtype", "count", "min", "max", "nb_na", "pct_na", "median", "mean", "std", "25%", "75%", "outlier", ] def getstat(col): """ max, min, nb, nb_na, pct_na, median, qt_25, qt_75, nb, nb_unique, nb_na, freq_1st, freq_2th, freq_3th s.describe() count 3.0 mean 2.0 std 1.0 min 1.0 25% 1.5 50% 2.0 75% 2.5 max 3.0 """ ss = list(df[col].describe().values) ss = [str(df[col].dtype)] + ss nb_na = df[col].isnull().sum() ntot = len(df) ss = ss + [nb_na, nb_na / (ntot + 0.0)] return pd.Series( ss, ["dtype", "count", "mean", "std", "min", "25%", "50%", "75%", "max", "nb_na", "pct_na"], ) dfdes = pd.DataFrame([], columns=coldes) cols = df.columns for col in cols: dtype1 = str(df[col].dtype) if dtype1[0:3] in ["int", "flo"]: row1 = getstat(col) dfdes = pd.concat((dfdes, row1)) if dtype1 == "object": pass
[docs]def pd_stat_histogram(df, bins=50, coltarget="diff"): """ :param df: :param bins: :param coltarget: :return: """ hh = np.histogram( df[coltarget].values, bins=bins, range=None, normed=None, weights=None, density=None ) hh2 = pd.DataFrame({"bins": hh[1][:-1], "freq": hh[0]}) hh2["density"] = hh2["freqall"] / hh2["freqall"].sum() return hh2
[docs]def pd_stat_histogram_groupby(df, bins=50, coltarget="diff", colgroupby="y"): """ :param df: :param bins: :param coltarget: :param colgroupby: :return: """ dfhisto = pd_stat_histogram_groupby(df, bins, coltarget) xunique = list(df[colgroupby].unique()) # todo : issues with naming for x in xunique: dfhisto1 = pd_stat_histogram_groupby(df[df[colgroupby] == x], bins, coltarget) dfhisto = pd.concat((dfhisto, dfhisto1)) return dfhisto
[docs]def pd_stat_na_perow(df, n=10 ** 6): """ :param df: :param n: :return: """ ll = [] n = 10 ** 6 for ii, x in df.iloc[:n, :].iterrows(): ii = 0 for t in x: if pd.isna(t) or t == -1: ii = ii + 1 ll.append(ii) dfna_user = pd.DataFrame( {"": df.index.values[:n], "n_na": ll, "n_ok": len(df.columns) - np.array(ll)} ) return dfna_user
[docs]def pd_stat_distribution(df, subsample_ratio=1.0): """ :param df: :return: """ print("Univariate distribution") ll = { x: [] for x in [ "col", "n", "n_na", "n_notna", "n_na_pct", "nunique", "nunique_pct", "xmin", "xmin_freq", "xmin_pct", "xmax", "xmax_freq", "xmax_pct", "xmed", "xmed_freq", "xmed_pct", ] } if subsample_ratio < 1.0: df = df.sample(frac=subsample_ratio) nn = len(df) + 0.0 for x in df.columns: try: xmin = df[x].min() nx = len(df[df[x] < xmin + 0.01]) # Can failed if string ll["xmin_freq"].append(nx) ll["xmin"].append(xmin) ll["xmin_pct"].append(nx / nn) xmed = df[x].median() nx = len(df[(df[x] > xmed - 0.1) & (df[x] < xmed + 0.1)]) ll["xmed_freq"].append(nx) ll["xmed"].append(xmed) ll["xmed_pct"].append(nx / nn) xmax = df[x].max() nx = len(df[df[x] > xmax - 0.01]) ll["xmax_freq"].append(nx) ll["xmax"].append(xmax) ll["xmax_pct"].append(nx / nn) n_notna = df[x].count() ll["n_notna"].append(n_notna) ll["n_na"].append(nn - n_notna) ll["n"].append(nn) ll["n_na_pct"].append((nn - n_notna) / nn * 1.0) nx = df[x].nunique() ll["nunique"].append(nx) # Should be in last ll["nunique_pct"].append(nx / nn) # Should be in last ll["col"].append(x) # Should be in last except Exception as e: print(x, e) # for k, x in ll.items(): # print(k, len(x)) ll = pd.DataFrame(ll) return ll
[docs]def convert(data, to): """ :param data: :param to: :return : """ converted = None if to == "array": if isinstance(data, np.ndarray): converted = data elif isinstance(data, pd.Series): converted = data.values elif isinstance(data, list): converted = np.array(data) elif isinstance(data, pd.DataFrame): converted = data.as_matrix() elif to == "list": if isinstance(data, list): converted = data elif isinstance(data, pd.Series): converted = data.values.tolist() elif isinstance(data, np.ndarray): converted = data.tolist() elif to == "dataframe": if isinstance(data, pd.DataFrame): converted = data elif isinstance(data, np.ndarray): converted = pd.DataFrame(data) else: raise ValueError("Unknown data conversion: {}".format(to)) if converted is None: raise TypeError("cannot handle data conversion of type: {} to {}".format(type(data), to)) else: return converted
[docs]def col_stat_getcategorydict_freq(catedict): """ Generate Frequency of category : Id, Freq, Freqin%, CumSum%, ZScore given a dictionnary of category parsed previously """ catlist = [] for key, v in list(catedict.items()): df = pd.DataFrame(v) # , ["category", "freq"]) df["freq_pct"] = 100.0 * df["freq"] / df["freq"].sum() df["freq_zscore"] = df["freq"] / df["freq"].std() df = df.sort_values(by=["freq"], ascending=0) df["freq_cumpct"] = 100.0 * df["freq_pct"].cumsum() / df["freq_pct"].sum() df["rank"] = np.arange(0, len(df.index.values)) catlist.append((key, df)) return catlist
[docs]def col_extractname_colbin(cols2): """ 1hot column name to generic column names :param cols2: :return: """ coln = [] for ss in cols2: xr = ss[ss.rfind("_") + 1 :] xl = ss[: ss.rfind("_")] if len(xr) < 3: # -1 or 1 coln.append(xl) else: coln.append(ss) coln = np_drop_duplicates(coln) return coln
def col_getnumpy_indice(colall, colcat): def np_find_indice(v, x): for i, j in enumerate(v): if j == x: return i return -1 return [np_find_indice(colall, x) for x in colcat]
[docs]def col_extractname(col_onehot): """ Column extraction :param col_onehot :return: """ colnew = [] for x in col_onehot: if len(x) > 2: if x[-2] == "_": if x[:-2] not in colnew: colnew.append(x[:-2]) elif x[-2] == "-": if x[:-3] not in colnew: colnew.append(x[:-3]) else: if x not in colnew: colnew.append(x) return colnew
[docs]def col_remove(cols, colsremove, mode="exact"): """ Parameters ---------- cols : TYPE DESCRIPTION. colsremove : TYPE DESCRIPTION. mode : TYPE, optional DESCRIPTION. The default is "exact", "fuzzy" Returns ------- cols : TYPE DESCRIPTION. remove column name from list """ if mode == "exact" : for x in colsremove: try: cols.remove(x) except BaseException: pass return cols if mode == "fuzzy" : cols3 = [] for t in cols: flag = 0 for x in colsremove: if x in t: flag = 1 break if flag == 0: cols3.append(t) return cols3
from matplotlib import pyplot as plt
[docs]def pd_colnum_tocat_stat(input_data, feature, target_col, bins, cuts=0): """ Bins continuous features into equal sample size buckets and returns the target mean in each bucket. Separates out nulls into another bucket. :param input_data: dataframe containg features and target column :param feature: feature column name :param target_col: target column :param bins: Number bins required :param cuts: if buckets of certain specific cuts are required. Used on test data to use cuts from train. :return: If cuts are passed only grouped data is returned, else cuts and grouped data is returned """ has_null = pd.isnull(input_data[feature]).sum() > 0 if has_null == 1: data_null = input_data[pd.isnull(input_data[feature])] input_data = input_data[~pd.isnull(input_data[feature])] input_data.reset_index(inplace=True, drop=True) is_train = 0 if cuts == 0: is_train = 1 prev_cut = min(input_data[feature]) - 1 cuts = [prev_cut] reduced_cuts = 0 for i in range(1, bins + 1): next_cut = np.percentile(input_data[feature], i * 100 / bins) if next_cut > prev_cut + .000001: # float numbers shold be compared with some threshold! cuts.append(next_cut) else: reduced_cuts = reduced_cuts + 1 prev_cut = next_cut # if reduced_cuts>0: # print('Reduced the number of bins due to less variation in feature') cut_series = pd.cut(input_data[feature], cuts) else: cut_series = pd.cut(input_data[feature], cuts) grouped = input_data.groupby([cut_series], as_index=True).agg( {target_col: [np.size, np.mean], feature: [np.mean]}) grouped.columns = ['_'.join(cols).strip() for cols in grouped.columns.values] grouped[grouped.index.name] = grouped.index grouped.reset_index(inplace=True, drop=True) grouped = grouped[[feature] + list(grouped.columns[0:3])] grouped = grouped.rename(index=str, columns={target_col + '_size': 'Samples_in_bin'}) grouped = grouped.reset_index(drop=True) corrected_bin_name = '[' + str(min(input_data[feature])) + ', ' + str(grouped.loc[0, feature]).split(',')[1] grouped[feature] = grouped[feature].astype('category') grouped[feature] = grouped[feature].cat.add_categories(corrected_bin_name) grouped.loc[0, feature] = corrected_bin_name if has_null == 1: grouped_null = grouped.loc[0:0, :].copy() grouped_null[feature] = grouped_null[feature].astype('category') grouped_null[feature] = grouped_null[feature].cat.add_categories('Nulls') grouped_null.loc[0, feature] = 'Nulls' grouped_null.loc[0, 'Samples_in_bin'] = len(data_null) grouped_null.loc[0, target_col + '_mean'] = data_null[target_col].mean() grouped_null.loc[0, feature + '_mean'] = np.nan grouped[feature] = grouped[feature].astype('str') grouped = pd.concat([grouped_null, grouped], axis=0) grouped.reset_index(inplace=True, drop=True) grouped[feature] = grouped[feature].astype('str').astype('category') if is_train == 1: return (cuts, grouped) else: return (grouped)
[docs]def draw_plots(input_data, feature, target_col, trend_correlation=None): """ Draws univariate dependence plots for a feature :param input_data: grouped data contained bins of feature and target mean. :param feature: feature column name :param target_col: target column :param trend_correlation: correlation between train and test trends of feature wrt target :return: Draws trend plots for feature """ trend_changes = get_trend_changes(grouped_data=input_data, feature=feature, target_col=target_col) plt.figure(figsize=(12, 5)) ax1 = plt.subplot(1, 2, 1) ax1.plot(input_data[target_col + '_mean'], marker='o') ax1.set_xticks(np.arange(len(input_data))) ax1.set_xticklabels((input_data[feature]).astype('str')) plt.xticks(rotation=45) ax1.set_xlabel('Bins of ' + feature) ax1.set_ylabel('Average of ' + target_col) comment = "Trend changed " + str(trend_changes) + " times" if trend_correlation == 0: comment = comment + '\n' + 'Correlation with train trend: NA' elif trend_correlation != None: comment = comment + '\n' + 'Correlation with train trend: ' + str(int(trend_correlation * 100)) + '%' props = dict(boxstyle='round', facecolor='wheat', alpha=0.3) ax1.text(0.05, 0.95, comment, fontsize=12, verticalalignment='top', bbox=props, transform=ax1.transAxes) plt.title('Average of ' + target_col + ' wrt ' + feature) ax2 = plt.subplot(1, 2, 2) ax2.bar(np.arange(len(input_data)), input_data['Samples_in_bin'], alpha=0.5) ax2.set_xticks(np.arange(len(input_data))) ax2.set_xticklabels((input_data[feature]).astype('str')) plt.xticks(rotation=45) ax2.set_xlabel('Bins of ' + feature) ax2.set_ylabel('Bin-wise sample size') plt.title('Samples in bins of ' + feature) plt.tight_layout() plt.show()
[docs]def get_trend_changes(grouped_data, feature, target_col, threshold=0.03): """ Calculates number of times the trend of feature wrt target changed direction. :param grouped_data: grouped dataset :param feature: feature column name :param target_col: target column :param threshold: minimum % difference required to count as trend change :return: number of trend chagnes for the feature """ grouped_data = grouped_data.loc[grouped_data[feature] != 'Nulls', :].reset_index(drop=True) target_diffs = grouped_data[target_col + '_mean'].diff() target_diffs = target_diffs[~np.isnan(target_diffs)].reset_index(drop=True) max_diff = grouped_data[target_col + '_mean'].max() - grouped_data[target_col + '_mean'].min() target_diffs_mod = target_diffs.fillna(0).abs() low_change = target_diffs_mod < threshold * max_diff target_diffs_norm = target_diffs.divide(target_diffs_mod) target_diffs_norm[low_change] = 0 target_diffs_norm = target_diffs_norm[target_diffs_norm != 0] target_diffs_lvl2 = target_diffs_norm.diff() changes = target_diffs_lvl2.fillna(0).abs() / 2 tot_trend_changes = int(changes.sum()) if ~np.isnan(changes.sum()) else 0 return (tot_trend_changes)
[docs]def get_trend_correlation(grouped, grouped_test, feature, target_col): """ Calculates correlation between train and test trend of feature wrt target. :param grouped: train grouped data :param grouped_test: test grouped data :param feature: feature column name :param target_col: target column name :return: trend correlation between train and test """ grouped = grouped[grouped[feature] != 'Nulls'].reset_index(drop=True) grouped_test = grouped_test[grouped_test[feature] != 'Nulls'].reset_index(drop=True) if grouped_test.loc[0, feature] != grouped.loc[0, feature]: grouped_test[feature] = grouped_test[feature].cat.add_categories(grouped.loc[0, feature]) grouped_test.loc[0, feature] = grouped.loc[0, feature] grouped_test_train = grouped.merge(grouped_test[[feature, target_col + '_mean']], on=feature, how='left', suffixes=('', '_test')) nan_rows = pd.isnull(grouped_test_train[target_col + '_mean']) | pd.isnull( grouped_test_train[target_col + '_mean_test']) grouped_test_train = grouped_test_train.loc[~nan_rows, :] if len(grouped_test_train) > 1: trend_correlation = np.corrcoef(grouped_test_train[target_col + '_mean'], grouped_test_train[target_col + '_mean_test'])[0, 1] else: trend_correlation = 0 print("Only one bin created for " + feature + ". Correlation can't be calculated") return (trend_correlation)
[docs]def univariate_plotter(feature, data, target_col, bins=10, data_test=0): """ Calls the draw plot function and editing around the plots :param feature: feature column name :param data: dataframe containing features and target columns :param target_col: target column name :param bins: number of bins to be created from continuous feature :param data_test: test data which has to be compared with input data for correlation :return: grouped data if only train passed, else (grouped train data, grouped test data) """ print(' {:^100} '.format('Plots for ' + feature)) if data[feature].dtype == 'O': print('Categorical feature not supported') else: cuts, grouped = pd_colnum_tocat_stat(input_data=data, feature=feature, target_col=target_col, bins=bins) has_test = type(data_test) == pd.core.frame.DataFrame if has_test: grouped_test = pd_colnum_tocat_stat(input_data=data_test.reset_index(drop=True), feature=feature, target_col=target_col, bins=bins, cuts=cuts) trend_corr = get_trend_correlation(grouped, grouped_test, feature, target_col) print(' {:^100} '.format('Train data plots')) draw_plots(input_data=grouped, feature=feature, target_col=target_col) print(' {:^100} '.format('Test data plots')) draw_plots(input_data=grouped_test, feature=feature, target_col=target_col, trend_correlation=trend_corr) else: draw_plots(input_data=grouped, feature=feature, target_col=target_col) print( '--------------------------------------------------------------------------------------------------------------') print('\n') if has_test: return (grouped, grouped_test) else: return (grouped)
[docs]def get_univariate_plots(data, target_col, features_list=0, bins=10, data_test=0): """ Creates univariate dependence plots for features in the dataset :param data: dataframe containing features and target columns :param target_col: target column name :param features_list: by default creates plots for all features. If list passed, creates plots of only those features. :param bins: number of bins to be created from continuous feature :param data_test: test data which has to be compared with input data for correlation :return: Draws univariate plots for all columns in data """ if type(features_list) == int: features_list = list(data.columns) features_list.remove(target_col) for cols in features_list: if cols != target_col and data[cols].dtype == 'O': print(cols + ' is categorical. Categorical features not supported yet.') elif cols != target_col and data[cols].dtype != 'O': univariate_plotter(feature=cols, data=data, target_col=target_col, bins=bins, data_test=data_test)
[docs]def get_trend_stats(data, target_col, features_list=0, bins=10, data_test=0): """ Calculates trend changes and correlation between train/test for list of features :param data: dataframe containing features and target columns :param target_col: target column name :param features_list: by default creates plots for all features. If list passed, creates plots of only those features. :param bins: number of bins to be created from continuous feature :param data_test: test data which has to be compared with input data for correlation :return: dataframe with trend changes and trend correlation (if test data passed) """ if type(features_list) == int: features_list = list(data.columns) features_list.remove(target_col) stats_all = [] has_test = type(data_test) == pd.core.frame.DataFrame ignored = [] for feature in features_list: if data[feature].dtype == 'O' or feature == target_col: ignored.append(feature) else: cuts, grouped = pd_colnum_tocat_stat(input_data=data, feature=feature, target_col=target_col, bins=bins) trend_changes = get_trend_changes(grouped_data=grouped, feature=feature, target_col=target_col) if has_test: grouped_test = pd_colnum_tocat_stat(input_data=data_test.reset_index(drop=True), feature=feature, target_col=target_col, bins=bins, cuts=cuts) trend_corr = get_trend_correlation(grouped, grouped_test, feature, target_col) trend_changes_test = get_trend_changes(grouped_data=grouped_test, feature=feature, target_col=target_col) stats = [feature, trend_changes, trend_changes_test, trend_corr] else: stats = [feature, trend_changes] stats_all.append(stats) stats_all_df = pd.DataFrame(stats_all) stats_all_df.columns = ['Feature', 'Trend_changes'] if has_test == False else ['Feature', 'Trend_changes', 'Trend_changes_test', 'Trend_correlation'] if len(ignored) > 0: print('Categorical features ' + str(ignored) + ' ignored. Categorical features not supported yet.') print('Returning stats for all numeric features') return (stats_all_df)