Source code for dsa.da.util

# pylint: disable=C0321,C0103,C0301,E1305,E1121,C0302,C0330,C0111,W0613,W0611,R1705
# -*- coding: utf-8 -*-
"""
Various utilities


"""
import copy
import datetime
import logging
import math
import os
import pickle
import random
import socket
import sys
from collections import Counter, OrderedDict
from logging.handlers import TimedRotatingFileHandler

import toml

print("os.getcwd", os.getcwd())


################### Global VAR Logs ################################################################
APP_ID = __file__ + "," + str(os.getpid()) + "," + str(socket.gethostname())
APP_ID2 = str(os.getpid()) + "_" + str(socket.gethostname())

LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logfile.log")
FORMATTER_1 = logging.Formatter("%(asctime)s,  %(name)s, %(levelname)s, %(message)s")
FORMATTER_2 = logging.Formatter("%(asctime)s.%(msecs)03dZ %(levelname)s %(message)s")
FORMATTER_3 = logging.Formatter("%(asctime)s  %(levelname)s %(message)s")
FORMATTER_4 = logging.Formatter("%(asctime)s, %(process)d, %(filename)s,    %(message)s")

FORMATTER_5 = logging.Formatter(
    "%(asctime)s, %(process)d, %(pathname)s%(filename)s, %(funcName)s, %(lineno)s,  %(message)s"
)


def os_make_dirs(filename):
    if isinstance(filename, str):
        filename = [os.path.dirname(filename)]

    if isinstance(filename, list):
        folder_list = filename
        for f in folder_list:
            try:
                if not os.path.exists(f):
                    os.makedirs(f)
            except Exception as e:
                print(e)
        return folder_list


####################################################################################################
[docs]def save_all(variable_list, folder, globals_main=None): """ Pickle saving batch :param variable_list: :param folder: :param globals_main: :return: """ for x in variable_list: try: filename = save(globals_main[x], "{a}/{b}.pkl".format(a=folder, b=x)) print(filename) except Exception as e: print("error", e)
[docs]def save(obj, filename="/folder1/keyname", isabsolutpath=0): """ Pickle saving :param obj: :param filename: :param isabsolutpath: :return: """ try: folder = os_make_dirs(filename) with open(filename, "wb") as f: pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL) return filename except Exception as e: print("error", e)
[docs]def load(filename="/folder1/keyname", isabsolutpath=0, encoding1="utf-8"): """ pickle load :param filename: :param isabsolutpath: :param encoding1: :return: """ try: folder = os_make_dirs(filename) with open(filename, "rb") as f: return pickle.load(f) except Exception as e: print("error", e)
#################################################################################################### def create_appid(filename): # appid = filename + ',' + str(os.getpid()) + ',' + str( socket.gethostname() ) appid = filename + "," + str(os.getpid()) return appid def create_logfilename(filename): return filename.split("/")[-1].split(".")[0] + ".log" def create_uniqueid(): return datetime.datetime.now().strftime("_%Y%m%d%H%M%S_") + str(random.randint(1000, 9999)) #################################################################################################### ################### Logger #########################################################################
[docs]def logger_setup( logger_name=None, log_file=None, formatter=FORMATTER_1, isrotate=False, isconsole_output=True, logging_level=logging.DEBUG, ): """ my_logger = util_log.logger_setup("my module name", log_file="") APP_ID = util_log.create_appid(__file__ ) def log(*argv): my_logger.info(",".join([str(x) for x in argv])) """ if logger_name is None: logger = logging.getLogger() # Gets the root logger else: logger = logging.getLogger(logger_name) logger.setLevel(logging_level) # better to have too much log than not enough if isconsole_output: logger.addHandler(logger_handler_console(formatter)) if log_file is not None: logger.addHandler( logger_handler_file(formatter=formatter, log_file_used=log_file, isrotate=isrotate) ) # with this pattern, it's rarely necessary to propagate the error up to parent logger.propagate = False return logger
def logger_handler_console(formatter=None): formatter = FORMATTER_1 if formatter is None else formatter console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) return console_handler def logger_handler_file(isrotate=False, rotate_time="midnight", formatter=None, log_file_used=None): formatter = FORMATTER_1 if formatter is None else formatter log_file_used = LOG_FILE if log_file_used is None else log_file_used if isrotate: print("Rotate log", rotate_time) fh = TimedRotatingFileHandler(log_file_used, when=rotate_time) fh.setFormatter(formatter) return fh else: fh = logging.FileHandler(log_file_used) fh.setFormatter(formatter) return fh def logger_setup2(name=__name__, level=None): _ = level # logger defines logger = logging.getLogger(name) logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter() ch.setFormatter(formatter) logger.addHandler(ch) return logger #################################################################################################### ################### Print ########################################################################## def printlog( s="", s1="", s2="", s3="", s4="", s5="", s6="", s7="", s8="", s9="", s10="", app_id="", logfile=None, iswritelog=True, ): try: if app_id != "": prefix = app_id + "," + datetime.datetime.now().strftime("_%Y%m%d%H%M%S_") else: prefix = APP_ID + "," + datetime.datetime.now().strftime("_%Y%m%d%H%M%S_") s = ",".join( [ prefix, str(s), str(s1), str(s2), str(s3), str(s4), str(s5), str(s6), str(s7), str(s8), str(s9), str(s10), ] ) print(s) if writelog: writelog(s, logfile) except Exception as e: print(e) if iswritelog: writelog(str(e), logfile) def writelog(m="", f=None): f = LOG_FILE if f is None else f with open(f, "a") as _log: _log.write(m + "\n") ####################################################################################################
[docs]def load_arguments(config_file=None, arg_list=None): """ Load CLI input, load config.toml , overwrite config.toml by CLI Input [{}, {}] """ import toml import argparse if config_file is not None: cur_path = os.path.dirname(os.path.realpath(__file__)) config_file = os.path.join(cur_path, "config.toml") p = argparse.ArgumentParser() p.add_argument("--config_file", default=config_file, help="Params File") p.add_argument("--config_mode", default="test", help=" test/ prod /uat") for x in arg_list: p.add_argument(x["--"], default=x.get("default"), help=x.get("help")) arg = p.parse_args() # Load file params as dict namespace class to_name(object): def __init__(self, adict): self.__dict__.update(adict) print(arg.config_file) pars = toml.load(arg.config_file) pars = pars[arg.config_mode] # test / prod print(arg.config_file, pars) ### Overwrite params by CLI input and merge with toml file for key, x in vars(arg).items(): if x is not None: # only values NOT set by CLI pars[key] = x # print(pars) pars = to_name(pars) # like object/namespace pars.instance return pars
[docs]def sk_tree_get_ifthen(tree, feature_names, target_names, spacer_base=" "): """Produce psuedo-code for decision tree. tree -- scikit-leant DescisionTree. feature_names -- list of feature names. target_names -- list of target (output) names. spacer_base -- used for spacing code (default: " "). """ left = tree.tree_.children_left right = tree.tree_.children_right threshold = tree.tree_.threshold features = [feature_names[i] for i in tree.tree_.feature] value = tree.tree_.value def recurse(left, right, threshold, features, node, depth): spacer = spacer_base * depth if threshold[node] != -2: print((spacer + "if " + features[node] + " <= " + str(threshold[node]) + " :")) # print(spacer + "if ( " + features[node] + " <= " + str(threshold[node]) + " ) :") if left[node] != -1: recurse(left, right, threshold, features, left[node], depth + 1) print(("" + spacer + "else :")) if right[node] != -1: recurse(left, right, threshold, features, right[node], depth + 1) # print(spacer + "") else: target = value[node] for i, v in zip(np.nonzero(target)[1], target[np.nonzero(target)]): target_name = target_names[i] target_count = int(v) print( ( spacer + "return " + str(target_name) + " ( " + str(target_count) + ' examples )"' ) ) recurse(left, right, threshold, features, 0, 0)