From 302ebfb799066fd761a7cf315a5f713b3b98c831 Mon Sep 17 00:00:00 2001 From: Ulrich Date: Fri, 18 Aug 2023 22:25:50 +0200 Subject: [PATCH] remove old utils --- utils/__init__.py | 0 utils/api_abstract.py | 51 ---- utils/api_const.py | 4 - utils/cli_abstract.py | 46 ---- utils/cli_const.py | 9 - utils/clibash_tool.py | 21 -- utils/clicmd_tool.py | 27 -- utils/clihadoop_tool.py | 27 -- utils/clissh_tool.py | 33 --- utils/config/path.yml | 61 ----- utils/config_tool.py | 362 -------------------------- utils/conn_tool.py | 81 ------ utils/css_tool.py | 70 ----- utils/date_tool.py | 176 ------------- utils/env_tool.py | 39 --- utils/file_abstract.py | 109 -------- utils/file_tool.py | 287 --------------------- utils/filejson_tool.py | 20 -- utils/filelog_tool.py | 19 -- utils/filexml_tool.py | 24 -- utils/flask_tool.py | 5 - utils/gen_const.py | 23 -- utils/gen_tool.py | 127 --------- utils/i18n_tool.py | 96 ------- utils/job_tool.py | 97 ------- utils/map_tool.py | 375 --------------------------- utils/match_const.py | 118 --------- utils/match_tool.py | 551 ---------------------------------------- utils/path_const.py | 115 --------- utils/report_tool.py | 292 --------------------- utils/tdata_tool.py | 451 -------------------------------- utils/xml2_tool.py | 45 ---- utils/zip_tool.py | 61 ----- 33 files changed, 3822 deletions(-) delete mode 100644 utils/__init__.py delete mode 100644 utils/api_abstract.py delete mode 100644 utils/api_const.py delete mode 100644 utils/cli_abstract.py delete mode 100644 utils/cli_const.py delete mode 100644 utils/clibash_tool.py delete mode 100644 utils/clicmd_tool.py delete mode 100644 utils/clihadoop_tool.py delete mode 100644 utils/clissh_tool.py delete mode 100644 utils/config/path.yml delete mode 100644 utils/config_tool.py delete mode 100644 utils/conn_tool.py delete mode 100644 utils/css_tool.py delete mode 100644 utils/date_tool.py delete mode 100644 utils/env_tool.py delete mode 100644 utils/file_abstract.py delete mode 100644 utils/file_tool.py delete mode 100644 utils/filejson_tool.py delete mode 100644 utils/filelog_tool.py delete mode 100644 utils/filexml_tool.py delete mode 100644 utils/flask_tool.py delete mode 100644 utils/gen_const.py delete mode 100644 utils/gen_tool.py delete mode 100644 utils/i18n_tool.py delete mode 100644 utils/job_tool.py delete mode 100644 utils/map_tool.py delete mode 100644 utils/match_const.py delete mode 100644 utils/match_tool.py delete mode 100644 utils/path_const.py delete mode 100644 utils/report_tool.py delete mode 100644 utils/tdata_tool.py delete mode 100644 utils/xml2_tool.py delete mode 100644 utils/zip_tool.py diff --git a/utils/__init__.py b/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/utils/api_abstract.py b/utils/api_abstract.py deleted file mode 100644 index 686f7e1..0000000 --- a/utils/api_abstract.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -""" -This abstract class ApiFcts defines the interface for the execution of any command-line-functions. -It uses the following configuration - .a) COMP.conf->artifact->api->[system] : for structural attributes of the operating-system \n - .c) COMP.conf->conn->[api] : for connection-attributes and structural attributes, - maybe specialized for the operating-system - -The attribute type resolves which technique is used, implemented in a specific tool-class: - * nifi, -The main tasks are: \n - -""" -import basic.program -import utils.config_tool - -class ApiFcts(): - """ - This interface defines each necessary connection to any kind of database. - The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool. - """ - def __init__(self): - self.comp = None - pass - - def reset_TData(self, job): - pass - - def setComp(self, job, comp): - self.job = job - self.comp = comp - - def startCommand(self, comp, args): - """ method to execute the statement - this method should only called by the class itself """ - raise Exception(B.EXCEPT_NOT_IMPLEMENT) - - def statusCommand(self, comp, args): - """ method to execute the statement - this method should only called by the class itself """ - raise Exception(B.EXCEPT_NOT_IMPLEMENT) - - def stopCommand(self, comp, args): - """ method to execute the statement - this method should only called by the class itself """ - raise Exception(B.EXCEPT_NOT_IMPLEMENT) diff --git a/utils/api_const.py b/utils/api_const.py deleted file mode 100644 index ccfd413..0000000 --- a/utils/api_const.py +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/python -""" -constants for used for api-functions -""" diff --git a/utils/cli_abstract.py b/utils/cli_abstract.py deleted file mode 100644 index 2dffb89..0000000 --- a/utils/cli_abstract.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -""" -This abstract class CliFcts defines the interface for the execution of any command-line-functions. -It uses the following configuration - .a) COMP.conf->artifact->cli->[system] : for structural attributes of the operating-system \n - .c) COMP.conf->conn->[cli] : for connection-attributes and structural attributes, - maybe specialized for the operating-system - -The attribute type resolves which technique is used, implemented in a specific tool-class: - * cmd,bash,powersh ... for specific local operating-system - * ssh,hadoop ... for specific remote operating-system -The main tasks are: \n -.1. executing the command-array - with attributes - * conn.ip, host, port, user, password, ... for synchronous db-connection - * conn.root, ... for path-definitions for file-implementations (csv, ) - -""" -import basic.program -import utils.config_tool -import basic.constants as B - -class CliFcts(): - """ - This interface defines each necessary connection to any kind of database. - The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool. - """ - def __init__(self): - self.comp = None - pass - - def reset_TData(self, job): - pass - - def setComp(self, job, comp): - self.job = job - self.comp = comp - - def execCommand(self, comp, command): - """ method to execute the statement - this method should only called by the class itself """ - raise Exception(B.EXCEPT_NOT_IMPLEMENT) diff --git a/utils/cli_const.py b/utils/cli_const.py deleted file mode 100644 index 0ce6f66..0000000 --- a/utils/cli_const.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/python -""" -constants for used for api-functions -""" - -DEFAULT_DB_PARTITION = "n" -""" attribute if table is partitioned - partitions are parametrized """ -DEFAULT_DB_CONN_JAR = "n" -""" attribute for connection-jar-file instead of connection by ip, port """ diff --git a/utils/clibash_tool.py b/utils/clibash_tool.py deleted file mode 100644 index 3322e2f..0000000 --- a/utils/clibash_tool.py +++ /dev/null @@ -1,21 +0,0 @@ -# -# ---------------------------------------------------------- -""" -This module implements the technique to interact via bash to the test-object. -The class has to be constructed by the tool-Handling because of keyword "bash" in the configuration, -then it will be called with the interface / abstract-class cli_abstract -""" -import os -import utils.cli_abstract -import basic - -class CliFcts(utils.cli_abstract.CliFcts): - def execCmd(self, cmds): - """ - - :param cmds: - :return: - """ - for cmd in cmds: - rc = os.system(cmd) - self.comp.m.logInfo("RC "+str(rc)+" zu CMD "+cmd) \ No newline at end of file diff --git a/utils/clicmd_tool.py b/utils/clicmd_tool.py deleted file mode 100644 index 5bb7294..0000000 --- a/utils/clicmd_tool.py +++ /dev/null @@ -1,27 +0,0 @@ -# -# ---------------------------------------------------------- -""" -This module implements the technique to interact via win-cmd to the test-object. -The class has to be constructed by the tool-Handling because of keyword "cmd" in the configuration, -then it will be called with the interface / abstract-class cli_abstract -""" -import os -import utils.cli_abstract -import basic - -class CliFcts(utils.cli_abstract.CliFcts): - def execCmd(self, cmds): - """ - executes an array of commands on a windows-machine - the commands will be intern translated before execution - :param cmds: written in linux-bash - :return: - """ - for cmd in cmds: - cmd = self.translate(cmd) - rc = os.system(cmd) - self.comp.m.logInfo("RC "+str(rc)+" zu CMD "+cmd) - return "ok" - - def translate(self, cmd): - """ translates a bash-cmd (default) into a windows-cmd """ - return cmd \ No newline at end of file diff --git a/utils/clihadoop_tool.py b/utils/clihadoop_tool.py deleted file mode 100644 index fcb5862..0000000 --- a/utils/clihadoop_tool.py +++ /dev/null @@ -1,27 +0,0 @@ -# -# ---------------------------------------------------------- -""" -This module implements the technique to interact via hadoop-cmd to the test-object. -The class has to be constructed by the tool-Handling because of keyword "hadoop" in the configuration, -then it will be called with the interface / abstract-class cli_abstract -""" -import os -import utils.cli_abstract -import basic - -class CliFcts(utils.cli_abstract.CliFcts): - def execCmd(self, cmds): - """ - executes an array of commands on a hadoop-machine - the commands will be intern translated before execution - :param cmds: written in linux-bash - :return: - """ - for cmd in cmds: - cmd = self.translate(cmd) - rc = os.system(cmd) - self.comp.m.logInfo("RC "+str(rc)+" zu CMD "+cmd) - return "ok" - - def translate(self, cmd): - """ translates a bash-cmd (default) into a windows-cmd """ - return cmd \ No newline at end of file diff --git a/utils/clissh_tool.py b/utils/clissh_tool.py deleted file mode 100644 index ebbc61e..0000000 --- a/utils/clissh_tool.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# ---------------------------------------------------------- -""" -This module implements the technique to interact via ssh to the test-object. -The class has to be constructed by the tool-Handling because of keyword "ssh" in the configuration, -then it will be called with the interface / abstract-class cli_abstract -""" -import os -import utils.cli_abstract -import basic -import paramiko - -class CliFcts(utils.cli_abstract.CliFcts): - def execCmd(self, cmds): - """ - - :param cmds: - :return: - """ - ssh = paramiko.SSHClient() - ssh.load_system_host_keys(os.path.expanduser('~/.ssh/known_hosts')) - if self.conn["password"]: - ssh.connect(self.conn["host"], username=self.conn["user"], password=self.conn["password"]) - else: - ssh.connect(self.conn["host"], username=self.conn["user"]) - shell = ssh.invoke_shell() - for cmd in cmds: - stdin, stdout, stderr = ssh.exec_command(cmd + "\n") - self.sysout = stdout.read() - stdin.close() - stderr.close() - stdout.close() - ssh.close() \ No newline at end of file diff --git a/utils/config/path.yml b/utils/config/path.yml deleted file mode 100644 index 21e8b48..0000000 --- a/utils/config/path.yml +++ /dev/null @@ -1,61 +0,0 @@ -# -pattern: - # Keywords - log: log - parfile: PARAMETER_{job.par.application}_{job.par.environment}.yml - precond: vorher - postcond: nachher - diff: diff_fach - prediff: diff_init - rundiff: diff_ablauf - result: Ergebnisse/{comp.name} - origin: original - parts: teilergebnisse - sumfile: xxx - backup: backup - reffile: Herkunft.txt - appdir: APP - tc: testfall - ts: testlauf - debugname: debug - logname: log - preexec: env-pre-exec # only for dry unit-test - postexec: env-post-exec # only for dry unit-test - debugs: "{job.conf.home}/test/log" - # workspace - workbase: "{job.conf.data}/workspace" - worklog: "{workbase}/{log}" - workparfile: "{workbase}/PARAMETER_workspace" - workappdir: "{workbase}/{appdir}/{comp.name}" - # environment - envbase: "{job.conf.environment}/{job.par.environment}" - envlog: "{envbase}/{log}" - envparfile: "{envbase}/{parfile}" - envappdir: "{envbase}/{appdir}/{comp.name}" - # testcase - tcbase: "{job.conf.archiv}/{job.par.testcase}/{job.par.tctime}" - tclog: "{tcbase}/{log}" - tcresult: "{tcbase}/{result}" - tcparfile: "{tcbase}/{parfile}" - tcdiff: "{tcresult}/{diff}" - tcprediff: "{tcresult}/{prediff}" - tcrundiff: "{tcresult}/{rundiff}" - tcprecond: "{tcresult}/{precond}" - tcpostcond: "{tcresult}/{postcond}" - # testdata - tdbase: "{job.conf.testdata}/{job.par.testcase}" - tdresult: "{tdbase}/{result}" - tdparfile: "{tdbase}/{parfile}" - tdprecond: "{tdresult}/{precond}" - tdpostcond: "{tdresult}/{postcond}" - tdpreexec: "{tdbase}/{preexec}/{comp.name}" # only for dry unit-test - tdpostexec: "{tdbase}/{postexec}/{comp.name}" # only for dry unit-test - # testset - tsbase: "{job.conf.archiv}/{ts}/{job.par.usecase}_{job.par.tstime}" - tslog: "{tsbase}/{log}" - tsparfile: "{tsbase}/{parfile}" - tssum: "{tsbase}/Ergebnis" - # expectation-result rs - xpbase: "{job.conf.expect}/{job.par.branch}" - xpresult: "{xpbase}/{result}" - xpbackup: "{xpbase}/{result}" diff --git a/utils/config_tool.py b/utils/config_tool.py deleted file mode 100644 index baab2bc..0000000 --- a/utils/config_tool.py +++ /dev/null @@ -1,362 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung -# --------------------------------------------------------------------------------------------------------- -import sys -import basic.constants as B - -try: - import basic.program -except ImportError: - print("ImportError: " + str(ImportError.with_traceback())) - pass -import basic.componentHandling -import utils.path_tool -import utils.file_tool -import os.path -import basic.constants as B -import utils.data_const as D -import utils.path_const as P - -COMP_FILES = [D.DDL_FILENAME] -CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV] - -def getExistgetConfigPath(job, pathnames): - for p in pathnames: - if p[-1:] == ".": - p = p[0:-1] - for format in CONFIG_FORMAT: - pathname = p+"."+format - if os.path.exists(pathname): - return pathname - return None - -def getConfigPath(job, modul, name, subname=""): - """ - gets the most specified configuration of different sources - Parameter: - * typ -- (basic, comp, tool) - * name -- the specific class - sources: - * programm << - * install << - * environ << basis-conf - * release << basis-conf - * testset << parameter/environ - * testcase << parameter - the parameter-files could be one of these file-types: - * yaml, json, csv - """ - if job is None: - verify = False # job = basic.program.Job.getInstance() - else: - verify = job.getDebugLevel("config_tool")-4 - if verify: job.debug(verify, "getConfig " + modul + ", " + name) - #TODO path rejoin, config as const - if modul == P.KEY_TOOL: - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - P.VAL_CONFIG, P.KEY_TOOL+"_"+name+"."+format) - if verify: job.debug(verify, "1 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_HOME), - P.VAL_CONFIG, P.KEY_TOOL+"_"+name+"."+format) - if verify: job.debug(verify, "1 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM), - P.VAL_UTIL, P.VAL_CONFIG, name+"."+format) - if verify: job.debug(verify, "2 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_ENV), - job.par.environment, P.VAL_CONFIG, P.KEY_TOOL+"_"+ name+"."+format) - if verify: job.debug(verify, "3 " + pathname) - if os.path.exists(pathname): - return pathname - job.debug(verify, "3x " + pathname) - raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name) - elif modul == P.KEY_COMP: - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_HOME), - P.VAL_CONFIG, P.KEY_COMP+"_" + name + "."+format) - if verify: job.debug(verify, "4 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - basic.componentHandling.getComponentFolder(name), "CONFIG." + format) - if verify: job.debug(verify, "5 " + pathname) - if os.path.exists(pathname): - return pathname - if verify: job.debug(verify, "6 " + pathname) - raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name) - elif modul in COMP_FILES: - # for example DATASTRUCURE or the table - pathnames = [] - pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - basic.componentHandling.getComponentFolder(name), modul)) - pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - basic.componentHandling.getComponentFolder(subname), modul)) - pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM), P.VAL_BASIC, modul)) - pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM), P.VAL_BASIC, subname)) - configpath = getExistgetConfigPath(job, pathnames) - if configpath is not None: - return configpath - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - basic.componentHandling.getComponentFolder(name), modul+"."+format) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - if len(subname) > 1: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - basic.componentHandling.getComponentFolder(name), subname+"."+format) - if os.path.exists(pathname): - return pathname - raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name) - elif modul == P.KEY_BASIC: - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - P.VAL_CONFIG , name + "."+format) - if verify: job.debug(verify, "4 " + pathname) - if os.path.exists(pathname): - return pathname - elif modul == P.KEY_TESTCASE: - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA), - name, D.DFILE_TESTCASE_NAME + "."+format) - if verify: job.debug(verify, "4 " + pathname) - if os.path.exists(pathname): - return pathname - elif modul == P.KEY_TESTSUITE: - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA), - name, D.DFILE_TESTSUITE_NAME + "." + format) - if verify: job.debug(verify, "4 " + pathname) - if os.path.exists(pathname): - return pathname - elif modul == P.KEY_CATALOG: - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA), - P.KEY_CATALOG, name + "." + format) - if verify: job.debug(verify, "4 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), - P.KEY_CATALOG, name + "." + format) - if verify: job.debug(verify, "4 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM), - P.KEY_CATALOG, name + "." + format) - if verify: job.debug(verify, "4 " + pathname) - if os.path.exists(pathname): - return pathname - else: - pathname = utils.path_tool.composePath(job, P.P_TCPARFILE) - if verify: job.debug(verify, "7 " + pathname) - if os.path.exists(pathname): - return pathname - pathname = utils.path_tool.composePath(job, P.P_TSPARFILE) - if verify: job.debug(verify, "8 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - if len(subname) > 1: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_RELEASE), - P.VAL_CONFIG, "basis."+format) - if verify: job.debug(verify, "9 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - if len(subname) > 1: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_ENV), - P.VAL_CONFIG, "basis."+format) - if verify: job.debug(verify, "9 " + pathname) - if os.path.exists(pathname): - return pathname - for format in CONFIG_FORMAT: - if len(subname) > 1: - pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_HOME), - P.VAL_CONFIG, "basis."+format) - if verify: job.debug(verify, "9 " + pathname) - if os.path.exists(pathname): - return pathname - raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name) - - -def getConfValue(attribute, comp): - if attribute == B.ATTR_CONN_DBTYPE: - if not hasAttr(comp.conf[B.SUBJECT_CONN], "dbtype"): - if hasAttr(comp.conf[B.SUBJECT_CONN], "types") and hasAttr(comp.conf[B.SUBJECT_CONN]["types"], "dbtype"): - dbtype = comp.conf[B.SUBJECT_CONN]["types"]["dbtype"] - else: - raise LookupError("dbtype is not set in comp " + comp.name) - else: - dbtype = comp.conf["conn"]["dbtype"] - return "" - return "" - - -def getAttr(o, name): - if (isinstance(o, dict)): - if (name in o.keys()): - return o[name] - elif (isinstance(o, list)): - pass - elif hasattr(o, name): - return getattr(o, name) - return False - - -def hasAttr(o, name): - if (isinstance(o, dict)): - if (name in o.keys()): - return True - elif (isinstance(o, list)): - pass - - - elif hasattr(o, name): - return True - return False - - -def getConfig(job, modul, name, subname=""): - if job is None: - verify = 24 - else: - verify = job.getDebugLevel("config_tool")-4 - msg = None - if hasattr(job, "m"): msg = job.m - pathname = getConfigPath(job, modul, name, subname) - confs = {} - job.debug(verify, "getConfig " + pathname) - if len(pathname) < 1: - return confs - doc = utils.file_tool.readFileDict(job, pathname, msg) - if modul == D.DDL_FILENAME: - # in csv the root is the subname - # from the Dict-structure of DDL_FILENAME pick the substructure of the subname - keys = list(doc.keys()) - if subname not in keys and len(keys) == 1: - doc0 = doc[keys[0]] - doc = doc0 - keys = list(doc.keys()) - if subname in keys: - doc0 = doc[subname] - doc = doc0 - for i, v in doc.items(): - confs[i] = v - return confs - - -def getAttribute(comp, path, attr, job): - attrList = getAttributeList(comp, path, job) - if attr in attrList: - return attrList[attr] - else: - return "" - - -def getAttributeList(comp, path, job): - """ - gets a concrete attribute-list for an arteifact-element from the config-attributes from the connection-attributes - https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie - :param comp: - :param path: artifact-type.artifact-name for example: DB.person - :return: list of all attributes for the artifact-element - """ - attrList = {} - a = path.split(".") - artType = a[0] - artName = a[1] - if B.SUBJECT_CONN not in comp.conf: - raise Exception ("Environment is not configured") - if artType in comp.conf[B.SUBJECT_CONN]: - if artName in comp.conf[B.SUBJECT_CONN][artType]: - for attr, val in comp.conf[B.SUBJECT_CONN][artType][artName].items(): - if attr not in B.LIST_ATTR[artType]: - continue - attrList[attr] = val - for attr, val in comp.conf[B.SUBJECT_CONN][artType].items(): - if attr not in B.LIST_ATTR[artType]: - continue - if attr in attrList: - continue - attrList[attr] = val - if artType in comp.conf[B.SUBJECT_ARTS]: - if artName in comp.conf[B.SUBJECT_ARTS][artType]: - for attr, val in comp.conf[B.SUBJECT_ARTS][artType][artName].items(): - if attr not in B.LIST_ATTR[artType]: - continue - if attr in attrList: - continue - attrList[attr] = val - for attr, val in comp.conf[B.SUBJECT_ARTS][artType].items(): - if attr not in B.LIST_ATTR[artType]: - continue - if attr in attrList: - continue - attrList[attr] = val - return attrList - -def mergeConn(msg, conf, conn): - """ - merges the config-attributes from the connection-attributes - because the connection-attributes has to overwrite the config-attributes if the subject is configured - https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie - :param conf: - :param conn: - :return: - """ - if B.SUBJECT_INST not in conf: - conf[B.SUBJECT_INST] = {} - for a in conn[B.SUBJECT_INST]: - conf[B.SUBJECT_INST][a] = conn[B.SUBJECT_INST][a] - for topic in [B.TOPIC_NODE_DB, B.TOPIC_NODE_CLI, B.TOPIC_NODE_API, B.TOPIC_NODE_FILE]: - if topic not in conf[B.SUBJECT_ARTS]: - continue - if topic == B.TOPIC_NODE_DB: - list = B.LIST_DB_ATTR - if topic == B.TOPIC_NODE_CLI: - list = B.LIST_CLI_ATTR - if topic == B.TOPIC_NODE_API: - list = B.LIST_API_ATTR - if topic == B.TOPIC_NODE_FILE: - list = B.LIST_FILE_ATTR - print(" --- merge-conn " + topic + " " + str(list)) - for a in conf[B.SUBJECT_ARTS][topic]: - if topic not in conn: - continue - if a in list: - if a in conn[topic]: - conf[B.SUBJECT_ARTS][topic][a] = conn[topic][a] - else: - for b in conf[B.SUBJECT_ARTS][topic][a]: - print(" --- merge-conn b " + topic + " " + a+" "+b) - if b not in list: - msg.logError("not-topic-attribute in topic-connection: "+topic+", "+b) - continue - if a not in conn[topic]: - continue - if b in conn[topic][a]: - conf[B.SUBJECT_ARTS][topic][a][b] = conn[topic][a][b] - for a in list: - if topic not in conn: - break - if topic not in conn: - continue - if a in conn[topic]: - conf[B.SUBJECT_ARTS][topic][a] = conn[topic][a] - return conf \ No newline at end of file diff --git a/utils/conn_tool.py b/utils/conn_tool.py deleted file mode 100644 index a928a2d..0000000 --- a/utils/conn_tool.py +++ /dev/null @@ -1,81 +0,0 @@ -# functions about connections to other instances -# ------------------------------------------------------------------- -""" - -""" -import basic.program -import utils.config_tool -import basic.constants as B -import utils.data_const as D - - -def getConnection(job, comp, nr): - #job = basic.program.Job.getInstance() - verify = job.getDebugLevel("conn_tool") - conn = {} - if job.conf.confs.get(B.SUBJECT_TOOL).get("connsrc") == D.DFILE_TYPE_YML: - conn = utils.config_tool.getConfig(job, "tool", B.SUBJECT_CONN) - xtypes = None - if ("types" in conn["env"][comp]): - xtypes = conn["env"][comp]["types"] - instnr = "inst" + str(nr) - if conn["env"][comp][instnr]: - if (xtypes is not None): - conn["env"][comp][instnr]["types"] = xtypes - return conn["env"][comp][instnr] - else: - job.m.setFatal("Conn-Tool: Comp not configured " + comp + " " + str(nr)) - return None - - -def getConnections(job, comp): - """ - it reads the connection-attributes for each instances of this component - general attributes are added to the connection-attributes - :param comp: - :return: - """ - #job = basic.program.Job.getInstance() - verify = job.getDebugLevel("conn_tool") - msg = None - if hasattr(comp, "m") and comp.m is not None: - msg = comp.m - elif hasattr(job, "m") and job.m is not None: - msg = job.m - else: - raise Exception("message-object is missing") - msg.debug(verify, "getConnections " + comp) - conn = {} - conns = [] - # if a datest-database exists read the connections - conndb = {} - if job.conf.confs.get("db"): - # select - pass - - conn = utils.config_tool.getConfig(job, "tool", B.SUBJECT_CONN) - if not comp in conn[B.SUBJECT_ENV]: - job.m.setFatal("Conn-Tool: Comp not configured " + comp) - - attr = {} - if B.CONF_NODE_GENERAL in conn[B.SUBJECT_ENV]: - for a in conn[B.SUBJECT_ENV][B.CONF_NODE_GENERAL]: - attr[a] = conn[B.SUBJECT_ENV][B.CONF_NODE_GENERAL] - for a in conn[B.SUBJECT_ENV][comp]: - if "inst" in a and a != B.SUBJECT_INST: - continue - attr[a] = conn["env"][comp][a] - #if ("types" in conn["env"][comp]): - # xtypes = conn["env"][comp]["types"] - for i in range(conn[B.SUBJECT_ENV][comp][B.SUBJECT_INST][B.ATTR_INST_CNT]): - #print("range " + str(i + 1)) - instnr = "inst" + str(i + 1) - #if (xtypes is not None): - # conn["env"][comp][instnr]["types"] = xtypes - for a in attr: - if a in conn["env"][comp][instnr]: - continue # dont overwrite an instance-specific value - conn["env"][comp][instnr][a] = attr[a] - conns.append(conn["env"][comp][instnr]) - - return conns diff --git a/utils/css_tool.py b/utils/css_tool.py deleted file mode 100644 index a2d7dfe..0000000 --- a/utils/css_tool.py +++ /dev/null @@ -1,70 +0,0 @@ -import basic.program -import basic.constants as B - -CSS_CLASS = { - "general": { - "table, td, th": "border: 1px solid grey;font-family:sans-serif" - }, - "diffFiles": { - "novalue": "color:grey", - "diffA": "color:green;font-weight:bold;", - "diffB": "color:crimson;font-weight:bold;", - "acceptA": "color:darkblue;", - "acceptB": "color:darkmagenta;" - }, - "resultFile": { - "result0": "background-color:white;", - "result1": "background-color:lightgreen;", - "result2": "background-color:yellow;", - "result3": "background-color:tomato;", - "result4": "background-color:tomato;" - } -} - -def getInlineStyle(job, filetype, cssclass): - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("css_tool")) - 1 - # job.debug(verify, "getDiffHeader ") - if job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "inline": - out = "style=\""+CSS_CLASS[filetype][cssclass]+"\"" - else: - out = "class=\"" + cssclass + "\"" - return out - -def getInternalStyle(job, filetype): - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - out = "" - if job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "internal": - out = "" - elif job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "external": - out = " " - else: - out = "" - return out - -def getExternalStyle(job, filetype): - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - out = "" - if job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "external": - arr = filetype.split(",") - for a in arr: - for c in CSS_CLASS[a]: - out += c+" {\n "+CSS_CLASS[a][c].replace(":", ": ").replace(";", ";\n ")+"}\n" - out.replace("\n \n}", "\n}") - return out diff --git a/utils/date_tool.py b/utils/date_tool.py deleted file mode 100644 index 1f17cfc..0000000 --- a/utils/date_tool.py +++ /dev/null @@ -1,176 +0,0 @@ -# functions related to Date-fields -# ----------------------------------------------------- -""" -additionally functions for calculating date with formulas like [DATE+2M] and for comparison of date related on two reference-dates -""" -import datetime -#from dateutil.relativedelta import relativedelta -import re -import utils.data_const as D - - -F_DIR = "%Y-%m-%d_%H-%M-%S" -F_DB_DATE = "%Y-%m-%d" -F_DB_TIME = "%Y-%m-%d %H:%M:%S" -F_DE = "%d.%m.%Y" -F_N8 = "%Y%m%d" -F_LOG = "%Y%m%d_%H%M%S" -F_DE_TSTAMP = "%d.%m.%Y %H:%M:%S" -MONTH_EN = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"] -MONTH_DE = ["jan", "feb", "mar", "apr", "mai", "jun", "jul", "aug", "sep", "okt", "nov", "dez"] -F_TIME_DEFAULT = F_DIR - -def getActdate(format): - return getFormatdate(datetime.datetime.now(), format) - - -def getFormatdate(date, format): - """ it return the date as string in the format """ - return date.strftime(format) - - -def getFormatDatetupel(dtupel, format): - """ it return the date as string in the format """ - if format == F_N8: - return f'{dtupel[0]:04}'+f'{dtupel[1]:02}'+f'{dtupel[2]:02}' - return getFormatdate(datetime.datetime(dtupel[0], dtupel[1], dtupel[2], - dtupel[3], dtupel[4], dtupel[5]) ,format) - -def formatParsedDate(instring, format): - dtupel = parseDate(instring) - #print ("---------------"+str(dtupel)) - return getFormatDatetupel(dtupel, format) - -def parseFormula(instring): - """ - the function parses the string as a formula. In the formula the input-date - actdate or explicite date - - will be added resp. subtracted with years, months or dates which are specified in the formula. - The structure of the formula: DATE +/- mY +/-nM +/-qD - :param instring: - :return: - """ - instring = instring.upper() - if instring[2:6] == "DATE": - refdate = datetime.datetime.today() - formula = instring[7:-2].upper() - else: - dstring = instring[2:instring.find(" ")] - res = parseDate(dstring) - refdate = datetime.datetime(res[0], res[1], res[2], res[3], res[4], res[5]) - formula = instring[2+len(dstring):-2] - formula = re.sub(r' ', '', formula) - year = refdate.year - mon = refdate.month - day = refdate.day - hour = refdate.hour - min = refdate.minute - sec = refdate.second - if re.match(r"[-+]\d+[JYMDT]", formula): - ress = re.compile(r"([-+])(\d+)([JYMDT])") - for res in ress.finditer(formula): - summand = int(res.group(2)) - if res.group(1) == "-": - summand = summand * (-1) - if res.group(3) in "JY": - year = year + summand - if res.group(3) in "M": - mon = mon + summand - while mon <= 0: - mon = mon + 12 - year = year - 1 - while mon > 12: - mon = mon - 12 - year = year + 1 - if res.group(3) in "DT": - refdate = datetime.datetime(year, mon, day, hour, min, sec) - refdate = refdate + datetime.timedelta(days=summand) - year = refdate.year - mon = refdate.month - day = refdate.day - hour = refdate.hour - min = refdate.minute - sec = refdate.second - return (year, mon, day, hour, min, sec) - else: - print("re matcht nicht") - return (year, mon, day, hour, min, sec) - -def getMonthInt(instring): - i = 0 - j = 0 - for l in [MONTH_EN, MONTH_DE]: - i = 0 - for m in l: - i += 1 - if instring.lower() == m: - j = i - break - if j > 0: - break - return j - -def parseDate(instring): - """ - the function parses the string as a date or timestamp which is formed in one of the typical formates - :param the string to be parse: - :return timestamp as tupel (y, m, d, H, M ,S): - """ - year = 0 - mon = 0 - day = 0 - hour = 0 - min = 0 - sec = 0 - #print(instring) - if instring[0:2] == "{(" and instring[-2:] == ")}": - return parseFormula(instring) - if re.match(r"\d{8}_\d{6}", instring): - year = int(instring[0:4]) - mon = int(instring[4:6]) - day = int(instring[6:8]) - hour = int(instring[9:11]) - min = int(instring[11:13]) - sec = int(instring[13:]) - return (year, mon, day, hour, min, sec) - if len(instring) > 8: - for d in ["_", " "]: - if d in instring and instring.find(d) > 8: - dstring = instring[0:instring.find(d)] - tstring = instring[instring.find(d)+1:] - dres = parseDate(dstring) - tres = parseDate(tstring) - return (dres[0], dres[1], dres[2], tres[3], tres[4], tres[5]) - if re.match(r"\d{4}[-./]\d{2}[-./]\d{2}", instring): - res = re.match(r"(\d{4})[-./](\d{2})[-./](\d{2})", instring) - year = int(res.group(1)) - mon = int(res.group(2)) - day = int(res.group(3)) - return (year, mon, day, hour, min, sec) - if re.match(r"\d{1,2}[-./]\d{1,2}[-./]\d{4}", instring): - res = re.match(r"(\d{1,2})[-./](\d{1,2})[-./](\d{4})", instring) - year = int(res.group(3)) - mon = int(res.group(2)) - day = int(res.group(1)) - return (year, mon, day, hour, min, sec) - if re.match(r"\w{3} \w{3}\s+\d{1,2} \d{1,2}[:]\d{1,2}[:]\d{2} \d{4}", instring.strip()): - res = re.search(r"\w{3} (\w{3})\s+(\d{1,2}) (\d{1,2})[:](\d{1,2})[:](\d{2}) (\d{4})", instring.strip()) - month = res.group(1) - mon = getMonthInt(month) - day = int(res.group(2)) - hour = int(res.group(3)) - min = int(res.group(4)) - sec = int(res.group(5)) - year = int(res.group(6)) - return (year, mon, day, hour, min, sec) - if re.match(r"\d{8}", instring): - year = instring[0:4] - mon = instring[4:6] - day = instring[6:8] - return (year, mon, day, hour, min, sec) - if re.match(r"\d{2}[-:]\d{2}[-:/]\d{2}", instring): - res = re.match(r"(\d{2})[-:/](\d{2})[-:/](\d{2})", instring) - hour = int(res.group(1)) - min = int(res.group(2)) - sec = int(res.group(3)) - return (year, mon, day, hour, min, sec) - return (year, mon, day, hour, min, sec) \ No newline at end of file diff --git a/utils/env_tool.py b/utils/env_tool.py deleted file mode 100644 index 5d1ab22..0000000 --- a/utils/env_tool.py +++ /dev/null @@ -1,39 +0,0 @@ -import utils.config_tool -import utils.file_tool -import basic.program - - -def importEnvProperty(job): - #job = basic.program.Job.getInstance() - path = utils.config_tool.getConfig(job, "tool", "env") - props = utils.file_tool.readFileDict(job, path, job.m) - job.conf.confs["env"] = props["prop"] - - -def exportEnvProperty(job): - # job = basic.program.Job.getInstance() - props = {} - if not hasattr(job, "conf"): return - if not hasattr(job.conf, "confs"): return - if not "env" in job.confconfs: return - props["prop"] = job.conf.confs["env"] - path = utils.config_tool.getConfig(job, "tool", "env") - utils.file_tool.writeFileDict(job.m, job, path, props) - - -def setEnvProp(job, props): - # job = basic.program.Job.getInstance() - path = utils.config_tool.getConfig(job, "tool", "env") - utils.file_tool.writeFileDict(job.m, job, path, props) - - -def getEnvProperty(job, propname): - # job = basic.program.Job.getInstance() - if "env" not in job.conf.confs: - importEnvProperty(job) - prop = job.conf.confs[propname] - if (prop["type"] == "succ"): - val = prop["value"] - val += 1 - prop["value"] = val - return val \ No newline at end of file diff --git a/utils/file_abstract.py b/utils/file_abstract.py deleted file mode 100644 index 31a918c..0000000 --- a/utils/file_abstract.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import os -import re -import basic.program -import basic.catalog -import utils.config_tool -import basic.constants as B -import basic.toolHandling -import utils.data_const as D -import utils.file_tool -import utils.path_tool -import xml.etree.ElementTree as ET -import basic.catalog - - -class FileFcts(): - """ - this is an abstract class - """ - def __init__(self): - pass - - def reset_TData(self, job): - pass - - def setComp(self, job, comp=None): - self.job = job - self.comp = comp - - def parseText(self, text): - """ - this function parses the text and translates it to dict - :param text: - :return: - """ - - def file2dict(self): - pass - - # Funktionen - #=== init_testcase === - def removeFiles(self): - utils.file_tool.removeFiles(self.comp.m, "envpath", "pattern", self.comp.conf["conn"]) - - def copyFiles(self): - fileList = [] - srcpath = "" - envpath = "" - pattern = "" - utils.file_tool.copyFiles(self.job, fileList, srcpath, envpath, pattern) - - def readEnvFiles(self, job): - envpath = "" - pattern = "" - fileList = utils.file_tool.getFiles(self.comp.m, job, envpath, pattern, self.comp.conf["conn"]) - - - # === execute_testcase === - def create_request(self, job, tdata, step): - mapping = "" - schema = "" - archivpath = "" - filename = step.args["filename"] - txt = "" - for o in self.comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_FILE]: - if o["name"] != filename: - continue - mapping = o["mapping"] - schema = o["schema"] - archivpath = os.path.join(utils.path_tool.composePattern(job, "{tcresult}/request", self.comp), filename) # ergebnisse/comp/request ) - #txt = self.createDict() - utils.file_tool.writeFileText(self.comp.m, job, archivpath, txt) - - def send_request(self, job, step): - archivpath = "" - filename = step.args["filename"] - technique = step.args["technique"] - archivpath = os.path.join(utils.path_tool.composePattern(job, "{tcresult}/request", self.comp), filename) - if technique == "cli": - for o in self.comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_FILE]: - if o["name"] != filename: - continue - envpath = o["envpath"] - envpath = utils.path_tool.composePattern(job, envpath, self.comp) - fct = basic.toolHandling.getCliTool(job, self.comp) - fct.copy(self.job, archivpath, envpath) - elif technique == "api": - txt = utils.file_tool.readFileText(job, archivpath, self.comp.m) - fct = basic.toolHandling.getApiTool(job, self.comp) - response = fct.send(self.job, self.comp, txt) - archivpath = os.path.join(utils.path_tool.composePattern(job, "{tcresult}/response", self.comp), filename) - -""" -get_response: -- channel (sync/async) - ... implement -- archivpath ( ergebnisse/comp/response ) -- envpath ( ./log) / envconn ( = in Request empfangen ) - -=== collect_testcase === -- envpath -- pattern -> readfiles -""" \ No newline at end of file diff --git a/utils/file_tool.py b/utils/file_tool.py deleted file mode 100644 index 046daf0..0000000 --- a/utils/file_tool.py +++ /dev/null @@ -1,287 +0,0 @@ -# Funktionen zum Dateizugriff mit Suchen, Lesen, Schreiben -# ------------------------------------------------------------ -""" - -""" -import codecs -import json -import os -import os.path -import re -import time - -import xmltodict -import yaml -import platform - -import basic.message -import basic.program -import utils.data_const as D -from pprint import pp -import utils.tdata_tool -import utils.date_tool - -def getDump(obj): - result="" - print (str(type(obj))) - result = vars(obj) - return str(result) -# if type(obj) == "__dict__" - - -def getFiles(msg, job, path, pattern, conn): - """ - search filenames in the directory - if conn is set search remote - :param msg: -- msg-Objekt - :param path: -- path - String - :param pattern: -- filename or pattern - :param conn: - :return: Array with filenames - """ - if conn is not None: - return getRemoteFiles(msg, path, pattern, conn) - # job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - out = [] - msg.debug(verify, "getFiles " + path + " , " + pattern) - if not os.path.exists(path): - return out - for f in os.listdir(path): - msg.debug(verify, "getFiles " + f) - if re.search(pattern, f): - msg.debug(verify, "match " + f) - out.append(f) - return out - - -def removeFiles(msg, path, pattern, conn): - """ - search filenames in the directory and removes it - - if conn is set search remote - :param msg: -- msg-Objekt - :param path: -- path - String - :param pattern: -- filename as Pattern - :param conn: - :return: Array filenames - """ - pass - - -def copyFiles(job, fileList, source, target, comp): - """ - copies files from source to target - :param job: - :param fileList: - :param source: - :param target: - :param comp: - :return: - """ - pass - - -def getRemoteFiles(msg, path, pattern, conn): - """ - search filenames in the directory - if conn is set search remote - :param msg: -- msg-Objekt - :param path: -- path - String - :param pattern: -- filename as Pattern - :param conn: - :return: Array filenames - """ - pass - - -def getFilesRec(msg, job, path, pattern): - """ - Sucht Dateien im Verzeichnis rekursiv - :param msg: -- msg-Objekt - :param path: -- Pfad - String - :param pattern: -- Dateiname als Pattern - :return: Array mit gefundenen Dateien, absoluter Pfad - """ - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - out = [] - msg.debug(verify, "getFilesRec " + path + " , " + pattern) - for (r, dirs, files) in os.walk(path): - for f in files: - msg.debug(verify, "getFilesRec " + f) - if re.search(pattern, f): - msg.debug(verify, "match " + f) - out.append(os.path.join(r, f)) - return out - - -def getTree(msg, job, pfad): - # job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - msg.debug(verify, "getTree " + pfad ) - tree = {} - files = [] - for f in os.listdir(pfad): - if os.path.isDir(os.path.join(pfad, f)): - tree[f] = getTree(msg, job, os.path.join(pfad, f)) - elif os.path.isFile(os.path.join(pfad, f)): - files.append(f) - tree["_files_"] = files - return tree - - -def mkPaths(job, path, msg): - # job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - #modus = job.conf.confs["paths"]["mode"] - dirname = os.path.dirname(path) - if os.path.exists(dirname): - return - os.makedirs(dirname, exist_ok=True) - - -def getFileEncoding(msg, job, path): - print("--- getFileEncoding "+path) - encodings = ['utf-8', 'iso-8859-1'] # add more - for e in encodings: - print(e) - try: - fh = codecs.open(path, 'r', encoding=e) - fh.readlines() - fh.seek(0) - except UnicodeDecodeError: - print('got unicode error with %s , trying different encoding' % e) - except: - print("except") - else: - print('opening the file with encoding: %s ' % e) - return e - return detectFileEncode(job, path, msg) - - -def detectFileEncode(job, path, msg): # return "" - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - print(path) - cntIso = 0 - cntUtf = 0 - j = 0 - CHAR_ISO = [ 196, 228, 214, 246, 220, 252, 191 ] - with open(path, 'rb') as file: - byte = file.read(1) - while (byte): - i = int.from_bytes(byte, "little") - #byte = file.read(1) - if (i in CHAR_ISO): - cntIso += 1 - if (i == 160): - pass - elif (i > 127): - cntUtf += 1 - j += 1 - l = i - byte = file.read(1) - file.close() - if (cntIso > cntUtf): - return 'iso-8859-1' - return 'utf-8' - - -def readFileLines(job, path, msg): - lines = readFileText(job, path, msg) - if isinstance(lines, (str)): - return lines.splitlines() - return [] - - -def readFileText(job, path, msg): - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - if not os.path.exists(path): - return "" - enc = detectFileEncode(job, path, msg) - with open(path, 'r', encoding=enc) as file: - text = file.read() - file.close() - return text - -def getModTime(job, filepath): - out = "" - mtime = os.path.getmtime(filepath) - out = utils.date_tool.formatParsedDate(time.ctime(mtime), utils.date_tool.F_TIME_DEFAULT) - return out - -def readFileDict(job, path, msg): - """ - reads and gets general a dict from any kind of filetyp - :param path: with extension of filetype - :param msg: optionally - :return: - """ - # 20220329 generalize - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - doc = {} - if not os.path.exists(path): - return doc - enc = detectFileEncode(job, path, msg) - if D.DFILE_TYPE_YML in path[-4:]: - with open(path, 'r', encoding=enc) as file: - doc = yaml.full_load(file) - file.close() - elif D.DFILE_TYPE_JSON in path[-5:]: - with open(path, 'r', encoding=enc) as file: - doc = json.load(file) - file.close() - elif D.DFILE_TYPE_XML in path[-4:]: - with open(path, 'r', encoding=enc) as file: - res = xmltodict.parse(file.read()) - # doc = dict(res) - doc = castOrderedDict(res) - file.close() - elif D.DFILE_TYPE_CSV in path[-5:]: - doc = utils.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF) - return doc - -def castOrderedDict(res, job=None, key=""): - if isinstance(res, dict): - doc = dict(res) - for x in doc: - doc[x] = castOrderedDict(doc[x], job, x) - elif isinstance(res, list): - sublist = [] - for i in range(0, len(res)): - sublist.append(castOrderedDict(res[i], job, "")) - doc = sublist - else: - doc = res - return doc - - -def writeFileText(msg, job, path, text, enc="utf-8"): - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("file_tool")) - mkPaths(job, path, msg) - with open(path, 'w', encoding=enc) as file: - file.write(text) - file.close() - - -def writeFileDict(msg, job, path, dict, enc="utf-8"): - #job = basic.program.Job.getInstance() - mkPaths(job, path, msg) - if D.DFILE_TYPE_YML in path[-5:]: - with open(path, 'w', encoding=enc) as file: - yaml.dump(dict, file) - file.close() - elif D.DFILE_TYPE_JSON in path[-5:]: - with open(path, 'w', encoding=enc) as file: - doc = json.dumps(dict, indent=4) - file.write(doc) - file.close() - elif D.DFILE_TYPE_XML in path[-4:]: - with open(path, 'w', encoding=enc) as file: - text = xmltodict.unparse(dict, pretty=True) - if "\n" + text - file.write(text) - file.close() - diff --git a/utils/filejson_tool.py b/utils/filejson_tool.py deleted file mode 100644 index bc66ceb..0000000 --- a/utils/filejson_tool.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import basic.program -import utils.config_tool -import utils.file_abstract -import basic.constants as B -import utils.path_tool -import utils.file_tool -import utils.tdata_tool - - -class FileFcts(utils.file_abstract.FileFcts): - - def __init__(self): - pass - diff --git a/utils/filelog_tool.py b/utils/filelog_tool.py deleted file mode 100644 index 0642ccc..0000000 --- a/utils/filelog_tool.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import basic.program -import utils.config_tool -import utils.file_abstract -import basic.constants as B -import utils.path_tool -import utils.file_tool -import utils.tdata_tool - - -class FileFcts(utils.file_abstract.FileFcts): - - def __init__(self): - pass diff --git a/utils/filexml_tool.py b/utils/filexml_tool.py deleted file mode 100644 index 062a6e5..0000000 --- a/utils/filexml_tool.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import xmltodict -import basic.program -import utils.config_tool -import utils.file_abstract -import basic.constants as B -import utils.path_tool -import utils.file_tool -import utils.tdata_tool - - -class FileFcts(utils.file_abstract.FileFcts): - - def __init__(self): - pass - - - def parseFile2Dict(self): - pass \ No newline at end of file diff --git a/utils/flask_tool.py b/utils/flask_tool.py deleted file mode 100644 index c23cedc..0000000 --- a/utils/flask_tool.py +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/python -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- diff --git a/utils/gen_const.py b/utils/gen_const.py deleted file mode 100644 index d507589..0000000 --- a/utils/gen_const.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/python -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/232-testfallgenerierung -# --------------------------------------------------------------------------------------------------------- - -KEY_RANDOM = "random" -KEY_LIST = "list" -KEY_VARNUM = "varnum" -KEY_VARSTR = "varstr" -KEY_VARDAT = "vardat" -KEY_PREFIX_X = "x" -VAL_DELIMITER = "," -VAL_SECTOR = " .. " -VAL_CATALOG = "cat" - -CLS_MISFORMAT = "missformat" -CLS_NONE = "none" -CLS_EMPTY = "empty" -CLS_LESS = "less" -CLS_GREATER = "more" -ATTR_MIN_COUNT = "mincount" \ No newline at end of file diff --git a/utils/gen_tool.py b/utils/gen_tool.py deleted file mode 100644 index 330ec8d..0000000 --- a/utils/gen_tool.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/python -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/232-testfallgenerierung -# --------------------------------------------------------------------------------------------------------- -import re -import utils.config_tool -import utils.path_const as P -import basic.constants as B -import basic.program -import utils.gen_const as G -import random -import basic.catalog - -VAL_CLASSES: { - "xvarnum": { - G.CLS_MISFORMAT: "str", - G.CLS_NONE: None, - G.CLS_EMPTY: 0, - G.CLS_LESS: True, - G.CLS_GREATER: True, - G.ATTR_MIN_COUNT: 5 - }, - "xvarnum": { - G.CLS_MISFORMAT: "str,feb", - G.CLS_NONE: None, - G.CLS_EMPTY: 0, - G.CLS_LESS: True, - G.CLS_GREATER: True, - G.ATTR_MIN_COUNT: 6 - }, - "xvarstr": { - G.CLS_MISFORMAT: "num,sym,koeln", - G.CLS_NONE: None, - G.CLS_EMPTY: 0, - G.CLS_LESS: False, - G.CLS_GREATER: False, - G.ATTR_MIN_COUNT: 7 - } -} - - -def getCntElement(values, job): - if G.VAL_SECTOR in values: - return 2 - elif G.VAL_DELIMITER in values: - a = values.split(G.VAL_DELIMITER) - return len(a) - elif G.VAL_CATALOG + ":" in values: - a = [0, 1, 2, 3, 4] - return len(a) - return 1 - - -def getValueList(values, count, job): - out = [] - for i in range(0, count): - out.append(values[i % len(values)]) - print(str(out)) - return out - - -def getMinCount(formula, values, job): - """ - :param formula: - :param values: definition of value-list - :param job: - :return: count of necessary values - """ - if G.KEY_RANDOM in formula: - return 1 - elif formula[0:1] == G.KEY_PREFIX_X: - elems = getCntElement(values, job) - factor = 1 - if VAL_CLASSES[formula][G.CLS_LESS]: - factor = factor * 2 - if VAL_CLASSES[formula][G.CLS_GREATER]: - factor = factor * 2 - return VAL_CLASSES[formula][G.ATTR_MIN_COUNT] + factor * (elems - 1) - elif formula == G.KEY_LIST: - return getCntElement(values, job) - return 1 - - -def getElemList(formula, values, count, job): - """ - - :param formula: - :param values: - :param count: - :param job: - :return: - """ - out = [] - verbose = False - if verbose: print(values+" , "+str(count)) - sector_regex = r"(.*)" + re.escape(G.VAL_SECTOR)+ r"(.*)" - delim_regex = r"(.*)" + re.escape(G.VAL_DELIMITER)+ r"(.*)" - catalog_regex = re.escape(G.VAL_CATALOG)+ r":(.*)" - if re.match(sector_regex, values): - if verbose: print("match 1") - temp = [] - res = re.search(sector_regex, values) - start = res.group(1) - target = res.group(2) - if start.isdecimal() and target.isdecimal(): - for i in range(int(start), int(target)): - temp.append(str(i)) - if target not in temp: - temp.append(target) - if verbose: print(str(start)+" - "+str(target)+" : "+str(temp)) - elif re.match(delim_regex, values): - if verbose: print("match 2") - temp = values.split(G.VAL_DELIMITER) - for i in range(0, len(temp)): temp[i] = temp[i].strip() - if verbose: print(str(temp)) - elif re.match(catalog_regex, values): - res = re.search(catalog_regex, values) - domain = res.group(1) - catalog = basic.catalog.Catalog.getInstance() - temp = catalog.getKeys(domain, job) - if not isinstance(temp, list): - temp = [] - while len(temp) > 0 and len(out) < count: - out += temp - return out[0:count] diff --git a/utils/i18n_tool.py b/utils/i18n_tool.py deleted file mode 100644 index ec4abd3..0000000 --- a/utils/i18n_tool.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import utils.config_tool -import utils.path_const as P -import basic.constants as B -import basic.program - -DEFAULT_LANGUAGE = "en" -EXP_KEY_MISSING = "key is missing {}" -EXP_KEY_DOESNT_EXIST = "key doesnt exist in domain {}" - -class I18n: - __instance = None - cache = {} - """ - in this class there should be managed each defined key-value-pairs - the pairs ara loaded from the path testdata/catalog: - * initially the csv-file catalog.csv - * on demand other csv-files in the path - """ - - def __init__(self, job): - self.cache = {} - self.cache = utils.config_tool.getConfig(job, P.KEY_TOOL, "i18n") - I18n.__instance = self - pass - - - @staticmethod - def getInstance(job): - if I18n.__instance == None: - return I18n(job) - return I18n.__instance - - - def getMessage(self, job, key, args=[]): - print("getMessage "+key+" "+str(args)) - out = self.getText(key, job) - out = out.format(args) - return out - - def getText(self, key, job): - """ - this function gets the text depending on language which is set in job.conf - :param key: MUST GIVEN WITH (f"{CONST=}", .. - :return: - return self.cache[language][key] - """ - if "language" in job.conf.confs: - language = job.conf.confs["language"] - else: - language = "en" - if language not in self.cache: - raise Exception(EXP_KEY_MISSING, (key)) - if "=" in key: - out = self.extractText(key) - key = self.extractKey(key) - if key in self.cache[language]: - out = self.cache[language][key] - elif key in self.cache[DEFAULT_LANGUAGE]: - out = self.cache[DEFAULT_LANGUAGE][key] - return str(out) - - def getAliasList(self, key, job): - out = [] - out.append(self.extractText(key)) - key = self.extractKey(key) - for language in self.cache: - if key not in self.cache[language]: - continue - out.append(self.cache[language][key]) - return out - - def extractKey(self, key): - if "=" in key: - i = key.find("=") - x = key[0:i] - if "." in x: - i = x.find(".") - y = x[i + 1:] - else: - y = x - key = y - else: - y = key - return key - - def extractText(self, key): - if "=" in key: - i = key.find("=") - return key[i + 2:-1] - return "" \ No newline at end of file diff --git a/utils/job_tool.py b/utils/job_tool.py deleted file mode 100644 index 8265235..0000000 --- a/utils/job_tool.py +++ /dev/null @@ -1,97 +0,0 @@ -# GrundFunktionen zur Ablaufsteuerung -# -# -------------------------------------------------------- -""" -1. Programm -- implementiert in Main-Klasse -2. Anwndung -- steuert zu pruefende System [ in basis_Config ] -3. application -- steuert zu pruefende Maschine [ in dir/applicationen ] -4. release -- steuert zu prufendes Release [ aus dir/release kann spez. release_Config geladen werden, dir/lauf/release ] -5. ~Verz -- Dokumentationsverzeichnis zu Testlauf/Testfall/Soll-Branch -6. zyklus -- optional unterscheidet echte und entwicklungsLaeufe -7. Programmspezifische Parameter -8. loglevel -- steuert Protokollierung; default debug (fatal/error/warn/msg/info/debug1/debug2/trace1/trace2) -10. Laufart -- steuert die Verarbeitung; default echt -- echt-auto Lauf aus Automatisierung (1-7) -- test Lauf ohne Ausfuehrungen am Testsystem, wohl aber in Testverzeichnissen -- echt-spez Wiederholung einer spezifischen Funktion (1-13) -- unit Ausfuehrung der Unittests -11. Modul -- schraenkt Verarbeitung auf parametriserte componenten ein -12. Funktion -- schraenkt Verarbeitung auf parametriserte Funktionen ein -13. Tool -- schraenkt Protokollierung/Verarbeitung auf parametriserte Tools ein -""" -import basic.program -import basic.constants as B -import collect_testcase -import compare_testcase -import execute_testcase -import finish_testsuite -import init_testcase -import init_testsuite -import test_executer -import utils.path_tool -import utils.file_tool -import components.utils.job_tool -def hasModul(komp): - #job = Job.getInstance() - return False -def hasFunction(fct): - #job = Job.getInstance() - return False -def hasTool(tool): - #job = Job.getInstance() - return False - -def createJob(parentJob, jobargs): - job = basic.program.Job("temp") # meaning temp - job.par.setParameterArgs(job, jobargs) - job.startJob() - return - -def startJobProcesses(job): - """ function to open processes like db-connection """ - components.utils.job_tool.startJobProcesses(job) - pass - -def stopJobProcesses(job): - """ function to close processes like db-connection """ - components.utils.job_tool.stopJobProcesses(job) - pass - -def startProcess(job, process): - print(str(process)) - path = utils.path_tool.getActualJsonPath(job) - print("------- "+path) - utils.file_tool.writeFileDict(job.m, job, path, process) - jobargs = {} - jobargs[B.PAR_APP] = process["app"] - jobargs[B.PAR_ENV] = process["env"] - if B.PAR_STEP in process: - jobargs[B.PAR_STEP] = process[B.PAR_STEP] - if B.PAR_TCDIR in process: - jobargs[B.PAR_TCDIR] = process[B.PAR_TCDIR] - jobargs[B.PAR_TESTCASE] = process["entity"] - elif B.PAR_TSDIR in process: - jobargs[B.PAR_TSDIR] = process[B.PAR_TSDIR] - jobargs[B.PAR_TESTSUITE] = process["entity"] - print("process-programm "+process["program"]) - myjob = basic.program.Job(process["program"], jobargs) - myjob.startJob() - if process["program"] == "init_testcase": - init_testcase.startPyJob(myjob) - elif process["program"] == "execute_testcase": - execute_testcase.startPyJob(myjob) - elif process["program"] == "collect_testcase": - collect_testcase.startPyJob(myjob) - elif process["program"] == "compare_testcase": - compare_testcase.startPyJob(myjob) - elif process["program"] == "init_testsuite": - init_testsuite.startPyJob(myjob) - elif process["program"] == "execute_testsuite": - print("execute_testsuite.startPyJob(myjob) not implemented") - elif process["program"] == "collect_testsuite": - print("collect_testsuite.startPyJob(myjob) not implemented") - elif process["program"] == "finish_testsuite": - finish_testsuite.startPyJob(myjob) - elif process["program"] == "test_executer": - test_executer.startPyJob(myjob) - diff --git a/utils/map_tool.py b/utils/map_tool.py deleted file mode 100644 index 81dc73c..0000000 --- a/utils/map_tool.py +++ /dev/null @@ -1,375 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import os -import re -import basic.program -import basic.catalog -import utils.config_tool -import basic.constants as B -import basic.toolHandling -import utils.data_const as D -import utils.file_tool -import utils.path_tool -import basic.catalog - -ACT_ID = "actid" -ID_LIST = "idlist" -MAP_ID = "_id" -MAP_FOR = "_foreach" -MAP_ROW = "_row" -MAP_FCTS = [ MAP_FOR, MAP_ID, MAP_ROW ] -MODUL_NAME = "map_tool" - - -def mapTdata(job, mapping, tdata, step, comp): - """ - initialize the mapping from testdata into the mapping-structure - :param job: - :param mapping: - :param tdata: - :param step: - :param comp: - :return: - """ - verify = job.getDebugLevel(MODUL_NAME) - out = {} - path = "" - job.debug(verify, mapping) - args = {} - args[B.DATA_NODE_COMP] = comp - args[B.DATA_NODE_DATA] = tdata - args[B.DATA_NODE_STEPS] = step - args[ACT_ID] = {} - args[ID_LIST] = {} - out = mapElement(job, args, mapping, path, out) - job.debug(verify, ">>>>>>>>>>> \n"+str(out)) - return out - - -def mapElement(job, args, elem, path, out): - """ - recursive mapping with building the dict of the testdata - :param job: - :param args: - :param elem: - :param path: - :param out: - :return: - """ - verify = job.getDebugLevel(MODUL_NAME) - job.debug(verify, "mapElem "+path+" id: "+str(args[ACT_ID])) - if isinstance(elem, dict): - job.debug(verify, "##### dict ") - nodes = [] - attrNodes = [] - leafNodes = [] - objNodes = [] - for k in elem: - if MAP_ID in elem[k] or MAP_FOR in elem[k]: - objNodes.append(k) - elif k[0:1] == '@' or k[0:1] == '#': - attrNodes.append(k) - else: - leafNodes.append(k) - job.debug(verify, "nodes "+str(attrNodes)+" - "+str(leafNodes)+" - "+str(objNodes)) - nodes = attrNodes + leafNodes + objNodes - for k in nodes: - # iterating this elem is declared inside of the elem - # like foreach but only one element - job.debug(verify, "# # "+k) - if MAP_ID in elem[k] or MAP_FOR in elem[k]: - job.debug(verify, "# + k in obj : val "+k) - if MAP_ID in elem[k]: - key = elem[k][MAP_ID][0:elem[k][MAP_ID].find("=")] - idlist = getIds(job, args, elem[k][MAP_ID]) - if len(idlist) > 1: - uniqueKeys = {} - for x in idlist: - uniqueKeys[x] = x - if len(uniqueKeys) > 1: - raise Exception("bei keyword _id there is only one element allowed "+str(idlist)) - else: - idlist = uniqueKeys.keys() - elif MAP_FOR in elem[k]: - key = elem[k][MAP_FOR][0:elem[k][MAP_FOR].find("=")] - idlist = getIds(job, args, elem[k][MAP_FOR]) - sublist = [] - a = path.split(",") - a.append(k) - npath = ",".join(a) - for id in idlist: - args[ACT_ID][key] = str(id) - if MAP_ROW in elem[k]: - row = getRow(job, args, elem[k][MAP_ROW]) - sublist.append(mapElement(job, args, elem[k], npath, {})) - out[k] = sublist - elif k == MAP_ID or k == MAP_FOR or k == MAP_ROW: - job.debug(verify, "# + k in MAP : continue "+k) - continue - else: - job.debug(verify, "# + k in leaf : val "+k) - a = path.split(",") - a.append(k) - npath = ",".join(a) - job.debug(verify, "mapElem - dict "+k) - out[k] = mapElement(job, args, elem[k], npath, {}) - elif isinstance(elem, list): - out = [] - i = 0 - for k in elem: - job.debug(verify, "mapElem - list "+str(k)) - a = path.split(",") - a.append(str(i)) - npath = ",".join(a) - out.append(mapElement(job, args, elem[i], path, {})) - i += 1 - else: - job.debug(verify, "mapElem - leaf " + elem) - - if elem[0:1] == "{" and elem[-1:] == "}": - elem = elem[1:-1] - out = toSimpleType(job, getValue(job, args, elem)) - return out - - -def toSimpleType(job, value): - if isinstance(value, (list, tuple)) and len(value) == 1: - return value[0] - return value - -def extractIds(job, idval): - ids = [] - if isinstance(idval, list) or isinstance(idval, tuple): - a = idval - else: - a = idval.split(",") - for k in a: - if "-" in k: - b = k.split("-") - for i in range(int(b[0].strip()), int(b[1].strip())+1): - ids.append(str(i).strip()) - elif isinstance(k, str): - ids.append(k.strip()) - else: - ids.append(k) - return ids - - -def getRow(job, args, fieldkey): - a = fieldkey.split(".") - row = getValue(job, args, fieldkey) - if B.DATA_NODE_ROW not in args: args[B.DATA_NODE_ROW] = {} - a[1] = a[1][0:a[1].find("(")] - args[B.DATA_NODE_ROW][a[1]] = row[0] - return row - -def getIds(job, args, fieldkey): - """ - sets the id resp list of ids into args[idlist] - the fieldkey has a formula like id={_source.table.field(condition)} - :param job: - :param args: - :param fieldky: - :return: - """ - verify = job.getDebugLevel(MODUL_NAME) - job.debug(verify, "match = "+fieldkey) - out = [] - idfield = fieldkey - if re.match(r"(.+)\=(.+)", fieldkey): - res = re.search(r"(.+)\=(.+)", fieldkey) - idfield = res.group(1) - fieldkey = res.group(2) - if fieldkey[0:1] == "{" and fieldkey[-1:] == "}": - fieldkey = fieldkey[1:-1] - if "temp" not in args: args["temp"] = {} - i = 0 - while "(" in fieldkey and ")" in fieldkey: - innerCond = fieldkey[fieldkey.rfind("(")+1:fieldkey.find(")")] - if "." not in innerCond: - break - innerkey = "temp_"+str(i) - args["temp"][innerkey] = {} - args["temp"][innerkey]["idfield"] = idfield - args["temp"][innerkey]["fieldkey"] = fieldkey - innerlist = getIds(job, args, innerkey+"={"+innerCond+"}") - args[ACT_ID][innerkey] = ",".join(innerlist) - fieldkey = fieldkey.replace(innerCond, innerkey) - idfield = args["temp"][innerkey]["idfield"] - i += 1 - if i > 3: - raise Exception("too much searches "+str(args["temp"])) - val = getValue(job, args, fieldkey) - idlist = extractIds(job, val) - args[ID_LIST][idfield] = idlist - job.debug(verify, "idlist " + str(args[ID_LIST])) - return idlist - - -def getConditions(job, args, fieldkey): - """ - gets a list of condition-value - :param job: - :param args: - :param fieldkey: in formula (c_1, c_2, ..) - :return: [v_1} ..] - """ - verify = job.getDebugLevel(MODUL_NAME) - while fieldkey[0:1] == "(" and fieldkey[-1:] == ")": - fieldkey = fieldkey[1:-1] - while fieldkey[0:1] == "{" and fieldkey[-1:] == "}": - fieldkey = fieldkey[1:-1] - out = [] - a = fieldkey.split(",") - if len(a) > 1: - job.m.logError("map-condition should not have more parts - use tupel "+fieldkey) - for k in a: - if "." in k: - raise Exception("Formatfehler in " + fieldkey) - job.debug(verify, "actid " + str(args[ACT_ID])) - idelem = {} - idelem[k] = args[ACT_ID][k] - out.append(args[ACT_ID][k]) - return out - - -def getValue(job, args, fieldkey): - """ - gets the value of the formula like {_source.table.field(condition)} - :param job: - :param args: - :param fieldkey: - :return: - """ - verify = job.getDebugLevel(MODUL_NAME) - job.debug(verify, "getValue "+fieldkey) - while fieldkey[0:1] == "{" and fieldkey[-1:] == "}": - fieldkey = fieldkey[1:-1] - val = "" - idfield = "" - source = "" - table = "" - field = "" - cond = "" - condIds = [] - # special cases of id-management - # args[actid][xid] = actual id with name xid - # args[idlist][xid] = list of all ids with name xid - # a) declaration of the id : id={fielddefinition} - if re.match(r".+\=.+", fieldkey): - job.debug(verify, "getValue 222 " + fieldkey) - raise Exception("getIds sollte an passender Stelle direkt aufgerufen werden "+fieldkey) - return getIds(job, args, fieldkey) - # b) set of defined ids - neither fielddefinition nor a function - #elif "." not in fieldkey and "(" not in fieldkey or re.match(r"\(.+\)", fieldkey): - #print("getValue 226 " + fieldkey) - #raise Exception("getConditions sollte an passender Stelle direkt aufgerufen werden") - #return getConditions(job, args, fieldkey) - # fielddefinition with .-separated parts - b = fieldkey.split(".") - job.debug(verify, "match field "+fieldkey) - if re.match(r"(_.+)\..+\..+\(.+\)", fieldkey): - res = re.match(r"(_.+)\.(.+)\.(.+\(.+\))", fieldkey) - job.debug(verify, "match mit ()") - source = res.group(1) - table = res.group(2) - field = res.group(3) - #cond = res.group(4) - #condIds = getValue(job, args, cond) - elif len(b) == 1: - field = b[0] - elif len(b) == 2: - source = b[0] - field = b[1] - elif len(b) == 3: - source = b[0] - table = b[1] - field = b[2] - if re.match(r".+\(.+\)", field): - res = re.match(r"(.+)\((.+)\)", field) - field = res.group(1) - cond = res.group(2) - condIds = getConditions(job, args, cond) - job.debug(verify, source + " - " + table + " - " + field + " - " + cond + " : " + str(condIds)) - if source == B.DATA_NODE_ROW: - if table not in args[B.DATA_NODE_ROW]: - raise Exception("row not initialiazed for table "+table+" "+str(args[B.DATA_NODE_ROW])) - row = args[B.DATA_NODE_ROW][table] - val = row[field] - elif source == B.DATA_NODE_DATA: - job.debug(verify, "data " + b[1]) - if len(b) == 3: - job.debug(verify, table + ", " + field + ", " + cond + ", " + str(condIds)) - val = toSimpleType(job, getFieldVal(job, args, table, field, condIds)) - elif len(b) == 2: - job.debug(verify, table + ", " + field + ", " + cond + ", " + str(condIds)) - val = getTdataRow(job, args[B.DATA_NODE_DATA], field, condIds) - elif source == B.DATA_NODE_STEPS: - job.debug(verify, "steps " + table+" - "+ field) - if hasattr(args[B.DATA_NODE_STEPS], field): - val = getattr(args[B.DATA_NODE_STEPS], field) - elif hasattr(args[B.DATA_NODE_STEPS], table): - row = getattr(args[B.DATA_NODE_STEPS], table) - if field in row: - val = row[field] - elif source[1:] == B.DATA_NODE_PAR: - job.debug(verify, "par " + b[1]) - if getattr(job.par, b[1]): - val = getattr(job.par, b[1]) - elif source == B.DATA_NODE_CATALOG: - job.debug(verify, "catalog " + table+", "+ field) - if len(b) != 3: - job.debug(verify, "catalog-Fehler") - return "Fehler-145" - row = basic.catalog.Catalog.getInstance().getValue(table, args[ACT_ID][cond], job) - if isinstance(row, dict) and field in row: - val = row[field] - elif source[1:] == B.DATA_NODE_COMP: - job.debug(verify, "comp "+table+", "+field) - comp = args[B.DATA_NODE_COMP] - if table == B.DATA_NODE_DDL: - fields = comp.conf[B.DATA_NODE_DDL][field][B.DATA_NODE_HEADER] - val = ",".join(fields) - else: - val = fieldkey - job.debug(verify, "return val "+str(val)) - return val - -def getFieldVal(job, args, table, field, condIds): - out = [] - fields = field.split(",") - for row in getTdataRow(job, args[B.DATA_NODE_DATA], table, condIds): - if len(fields) == 1 and field in row: - out.append( str(row[field]).strip()) - else: - tup = tuple() - for f in fields: - if f in row: - t = tuple( str(row[f]).strip() ) - tup = tup + t - else: - raise Exception("field is missing in row "+f+", "+str(row)) - out.append(tup) - return out - -def getTdataRow(job, tdata, table, condIds): - verify = job.getDebugLevel(MODUL_NAME) - out = [] - idFields = {} - job.debug(verify, "getTdata "+str(condIds)) - for i in range(0, len(condIds)): - idFields[tdata[B.DATA_NODE_TABLES][table][B.DATA_NODE_HEADER][i]] = condIds[i] - job.debug(verify, "idFields "+str(idFields)) - for row in tdata[B.DATA_NODE_TABLES][table][B.DATA_NODE_DATA]: - i = 0 - for k in idFields: - for j in range(0, len(idFields[k])): - if row[k] == idFields[k][j]: - i += 1 - if i == len(idFields): - out.append(row) - return out diff --git a/utils/match_const.py b/utils/match_const.py deleted file mode 100644 index 075477d..0000000 --- a/utils/match_const.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/python -""" -constants for used for api-functions -""" - -SIM_BUSINESS = "B" -SIM_TECHNICAL = "T" -SIM_DEFAULT = "BT" - -M_FILEPATTERN = "filepattern" -M_SIDE_LONG = "long" -M_SIDE_SHORT = "short" -M_SIDE_A = "A" -M_SIDE_B = "B" - -MATCH_SIDE_PREEXPECT = "preexpect" -""" it implies the precondition of the expectation """ -MATCH_DICT_PREEXPECT = { - M_SIDE_SHORT: "SV", - M_SIDE_LONG: "Soll-Vorher", - M_FILEPATTERN: "rsprecond" -} -MATCH_SIDE_POSTEXPECT = "postexpect" -""" it implies the postcondition of the expectation - it is the expectation""" -MATCH_DICT_POSTEXPECT = { - M_SIDE_SHORT: "SN", - M_SIDE_LONG: "Soll-Nachher", - M_FILEPATTERN: "rsprecond" -} -MATCH_SIDE_PREACTUAL = "preactual" -""" it implies the precondition of the actual execution """ -MATCH_DICT_PREACTUAL = { - M_SIDE_SHORT: "IV", - M_SIDE_LONG: "Ist-Vorher", - M_FILEPATTERN: "rsprecond" -} -MATCH_SIDE_POSTACTUAL = "postactual" -""" it implies the postondition of the actual execution - it is the result """ -MATCH_DICT_POSTACTUAL = { - M_SIDE_SHORT: "IN", - M_SIDE_LONG: "Ist-Nachher", - M_FILEPATTERN: "rsprecond" -} -MATCH_SIDE_PRESTEP = "prestep" -""" it implies the postcondition of a preceding step of the actual execution - the preceding step must be configured in the component""" -MATCH_DICT_PRESTEP = { - M_SIDE_SHORT: "VS", - M_SIDE_LONG: "Vorhergehender Schritt (Nachher)", - M_FILEPATTERN: "rsprecond" -} -MATCH_SIDE_TESTCASE = "testexample" -""" it implies the postcondition of an exemplary testcase - the exemplary testcase must be parametrized """ -MATCH_DICT_TESTCASE = { - M_SIDE_SHORT: "VT", - M_SIDE_LONG: "Vergleichstestfall (Nachher)", - M_FILEPATTERN: "rsprecond" -} -MATCH_SIDES = [MATCH_SIDE_PREEXPECT, MATCH_SIDE_POSTEXPECT, MATCH_SIDE_PREACTUAL, MATCH_SIDE_POSTACTUAL, MATCH_SIDE_PRESTEP, MATCH_SIDE_TESTCASE] - -MATCH_SUCCESS = "success" -""" matches the action between pre- and postcondition of the actual testexecution """ -MATCH_PRECOND = "preconditions" -""" matches the preconditions betwenn the required result the the actual testexecution - - just for info if the both executions have the same precondition """ -MATCH_POSTCOND = "postconditions" -""" matches the postconditions betwenn the required result the the actual testexecution - - it is the main comparison """ -MATCH_PRESTEP = "prestep" -MATCH_TESTEXAMPLE = "testeample" -MATCH_TYPES = [MATCH_PRECOND, MATCH_PRESTEP, MATCH_TESTEXAMPLE, MATCH_SUCCESS, MATCH_POSTCOND] -MATCH = { - MATCH_SIDE_PREEXPECT: MATCH_DICT_PREEXPECT, - MATCH_SIDE_POSTEXPECT: MATCH_DICT_POSTEXPECT, - MATCH_SIDE_PREACTUAL: MATCH_DICT_PREACTUAL, - MATCH_SIDE_POSTACTUAL: MATCH_DICT_POSTACTUAL, - MATCH_SIDE_PRESTEP: MATCH_DICT_PRESTEP, - MATCH_SIDE_TESTCASE: MATCH_DICT_TESTCASE, - MATCH_PRECOND: { - "A": MATCH_SIDE_PREEXPECT, - "B": MATCH_SIDE_PREACTUAL, - "simorder": SIM_BUSINESS + SIM_TECHNICAL, - "mode": "info", - "filename": "01_Vorbedingungen", - "title": "Pruefung Vorbedingung (Soll-Vorher - Ist-Vorher)" - }, - MATCH_POSTCOND: { - "A": MATCH_SIDE_POSTEXPECT, - "B": MATCH_SIDE_POSTACTUAL, - "simorder": SIM_BUSINESS + SIM_TECHNICAL, - "mode": "hard", - "filename": "00_Fachabgleich", - "title": "Fachliche Auswertung (Soll-Nachher - Ist-Nachher)" - }, - MATCH_SUCCESS: { - "A": MATCH_SIDE_PREACTUAL, - "B": MATCH_SIDE_POSTACTUAL, - "simorder": SIM_TECHNICAL + SIM_BUSINESS, - "mode": "action", - "filename": "04_Ablauf", - "title": "Ablauf-Differenz (Ist-Vorher - Ist-Nachher)" - }, - MATCH_PRESTEP: { - "A": MATCH_SIDE_PRESTEP, - "B": MATCH_SIDE_POSTACTUAL, - "simorder": SIM_TECHNICAL + SIM_BUSINESS, - "mode": "action", - "filename": "02_Vorschritt", - "title": "Schritt-Differenz (Vorschritt-Nachher - Ist-Nachher)" - }, - MATCH_TESTEXAMPLE: { - "A": MATCH_SIDE_TESTCASE, - "B": MATCH_SIDE_POSTACTUAL, - "simorder": SIM_BUSINESS + SIM_TECHNICAL, - "mode": "action", - "filename": "03_Vergleichstestfall", - "title": "Vergleichstestfall (Vergleich-Soll - Ist-Nachher)" - }, -} diff --git a/utils/match_tool.py b/utils/match_tool.py deleted file mode 100644 index a6b9293..0000000 --- a/utils/match_tool.py +++ /dev/null @@ -1,551 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import json -import utils.css_tool -import utils.report_tool -import basic.program -import basic.constants as B -import utils.match_const as M -import utils.data_const as D - -# ------------------------------------------------------------ -""" - -""" - - -class Matching(): - def __init__(self, job, comp): - self.job = job - self.comp = comp - self.elements = {} - self.matchfiles = {} - self.assignedFiles = {} - self.linksA = {} - self.linksB = {} - self.nomatch = {} - self.matchkeys = {} - self.htmltext = "" - self.sideA = [] - self.sideB = [] - self.mode = "" - self.matchtype = "" - self.cssClass = "result0" - self.preIds = {} - - def setData(self, tdata, match): - """ - selects testdata for actual matching - :param tdata: - :param match: kind of actual match - :return: - """ - sideA = M.MATCH[match][M.M_SIDE_A] - sideB = M.MATCH[match][M.M_SIDE_B] - self.sideA = tdata[sideA]["data"] - self.sideB = tdata[sideB]["data"] - self.matchfiles[M.M_SIDE_A] = tdata[sideA]["path"] - self.matchfiles[M.M_SIDE_B] = tdata[sideB]["path"] - self.matchtype = match - self.mode = M.MATCH[match]["mode"] - self.setDiffHeader() - self.report = utils.report_tool.Report.getInstance(self.job) - self.resetHits() - if match == M.MATCH_PRECOND: - self.preIds = {} # structure db:scheme:table:... - # get ddl - # for _data: for ddl._header: if SIM_TECHNICAL in ddl[h][key]: preIds.append(_data[h]) - - def resetHits(self): - self.linksA = {} - self.linksB = {} - self.nomatch = {} - self.matchkeys = {} - self.cssClass = "result0" - - def setCssClass(self, cssClass): - if cssClass > self.cssClass: - self.cssClass = cssClass - - def isHitA(self, key): - return ((key in self.linksA) and (self.linksA[key] != B.SVAL_NULL)) - - def isHitB(self, key): - return ((key in self.linksB) and (self.linksB[key] != B.SVAL_NULL)) - - def setHit(self, keyA, keyB): - if (not self.isHitA(keyA)) and (not self.isHitB(keyB)): - if (keyA != B.SVAL_NULL): self.linksA[keyA] = keyB - if (keyB != B.SVAL_NULL): self.linksB[keyB] = keyA - return "OK" - raise Exception("one of the links are set") - - def setNohit(self, similarity, keyA, keyB): - """ The similarity must be unique. Only a mismatch will be set. """ - if similarity in self.nomatch: - raise Exception("similarity " + similarity + " exists") - if (self.isHitA(keyA) or self.isHitB(keyB)): - return - self.nomatch[similarity] = [keyA, keyB] - - def getTableDdl(self, path): - a = path.split(":") - ddl = self.comp.conf[B.DATA_NODE_DDL] - for x in a: - if (len(x) < 2): continue - if (x == B.DATA_NODE_DATA): break - if x in ddl: ddl = ddl[x] - return ddl - - def setDiffHeader(matching): - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - job.debug(verify, "getDiffHeader ") - htmltxt = "" - htmltxt += "" - htmltxt += "" + M.MATCH[matching.matchtype]["title"] + "" - htmltxt += utils.css_tool.getInternalStyle(job, "diffFiles") - htmltxt += "" - htmltxt += "" - htmltxt += "

" + M.MATCH[matching.matchtype]["title"] + "

" - htmltxt += "

" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_A]][M.M_SIDE_LONG] + ": " + matching.matchfiles[ - M.M_SIDE_A] + "

" - htmltxt += "

" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_B]][M.M_SIDE_LONG] + ": " + matching.matchfiles[ - M.M_SIDE_B] + "


" - matching.htmltext = htmltxt - - def setDiffFooter(self): - job = self.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 4 - job.debug(verify, "getDiffFooter ") - htmltext = self.htmltext - htmltext += "" - self.htmltext = htmltext - - -def ignorePreid(matching, key): - if matching.matchtype == M.MATCH_POSTCOND: - key_int = extractKeyI(key) - if matching.linksB[key] != B.SVAL_NULL: - return False - if matching.preIds is None or len(matching.preIds) < 1: - return False - for ids in matching.preIds: - hit = True - for k in ids: - row = matching.sideB[key_int] - if ids[k] == row[k]: - pass - else: - hit = False - if hit: - return True - return False - - -def matchFiles(matching): - """ - - post: - :param matching: - :return: - """ - - -def matchBestfit(matching, path): - """ - in this strategy the difference-score of all elements of both sides will be calculated. - the best fit will assigned together until nothing is - * the elements can be each kind of object - * the count of elements should not be high - :param matching: - :return: - """ - # initialize all potential links with null - i = 0 - if (matching.sideA is not None): - for r in matching.sideA: - k = composeKey("a", i) - matching.setHit(k, B.SVAL_NULL) - i += 1 - i = 0 - if (matching.sideB is not None): - for r in matching.sideB: - k = composeKey("b", i) - matching.setHit(B.SVAL_NULL, k) - i += 1 - ia = 0 - ix = 1 - # CASE: one side is None - if (matching.sideA is None) or (matching.sideB is None): - return - # CASE: to match - for rA in matching.sideA: - ib = 0 - for rB in matching.sideB: - if (matching.isHitA(composeKey("a", ia))): - continue - if (matching.isHitB(composeKey("b", ib))): - ib += 1 - continue - similarity = getSimilarity(matching, rA, rB, ix, M.MATCH[matching.matchtype]["simorder"]) - if (similarity == "MATCH"): - matching.setHit(composeKey("a", ia), composeKey("b", ib)) - continue - else: - matching.setNohit(similarity, composeKey("a", ia), composeKey("b", ib)) - ib += 1 - ix += 1 - ia += 1 - - -def matchRestfit(matching): - """ """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - job.debug(verify, "matchRestfit ") - for x in sorted(matching.nomatch, reverse=True): - job.debug(verify, "matchRestfit " + x) - pair = matching.nomatch[x] - if (matching.isHitA(pair[0])): - job.debug(verify, "A " + pair[0] + " bereits zugeordnet mit " + matching.linksA[pair[0]]) - pass - if (matching.isHitB(pair[1])): - job.debug(verify, "B " + pair[1] + " bereits zugeordnet mit " + matching.linksB[pair[1]]) - continue - if (matching.isHitA(pair[0])): - continue - job.debug(verify, "neues Matching " + pair[0] + " " + pair[1]) - matching.setHit(pair[0], pair[1]) - - -def composeKey(side, i): - return side.lower() + str(i + 1).zfill(4) - - -def extractKeyI(key): - return int(key[1:]) - 1 - - -def setMatchkeys(matching, path): - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - job.debug(verify, "getSimilarity " + path) - if len(matching.matchkeys) > 0: - return - if (B.DATA_NODE_DDL in matching.comp.conf): - job.debug(verify, "ddl " + path) - a = path.split(":") - ddl = matching.comp.conf[B.DATA_NODE_DDL] - for x in a: - if (len(x) < 2): - continue - if (x == B.DATA_NODE_DATA): - break - if x in ddl: - ddl = ddl[x] - job.debug(verify, "ddl " + json.dumps(ddl)) - keys = {} - for f in ddl: - job.debug(verify, "ddl " + f) - if (D.DDL_KEY in ddl[f]) and (len(ddl[f][D.DDL_KEY]) > 0): - b = ddl[f][D.DDL_KEY].split(":") - if (len(b) != 2): - raise Exception("falsch formatierter Schluessel " + ddl[f][D.DDL_KEY]) - if (not b[1].isnumeric()): - raise Exception("falsch formatierter Schluessel " + ddl[f][D.DDL_KEY]) - if (b[0] not in M.SIM_DEFAULT): - raise Exception("falsch formatierter Schluessel " + ddl[f][D.DDL_KEY]) - k = "k"+b[0]+""+b[1].zfill(2) - job.debug(verify, "ddl " + f) - keys[k] = {"ktyp": b[0], D.DDL_FNAME: ddl[f][D.DDL_FNAME], D.DDL_TYPE: ddl[f][D.DDL_TYPE], "rule": ddl[f][D.DDL_ACCEPTANCE]} - matching.matchkeys = keys - job.debug(verify, "ddl " + json.dumps(keys)) - - -def getSimilarity(matching, rA, rB, i, simorder=M.SIM_DEFAULT): - """ it calculates the similarity between both rows by: - concat each criteria with single-similarity 00..99 and i with 999..000 """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - job.debug(verify, "getSimilarity ") - mBsim = "" - mTsim = "" - topBsim = "" - topTsim = "" - for k in sorted(matching.matchkeys): - if not D.DDL_FNAME in matching.matchkeys[k]: - job.m.logError("technical Attribut is missing "+k ) - continue - if M.SIM_TECHNICAL in k: - if matching.matchkeys[k][D.DDL_FNAME] in rA and matching.matchkeys[k][D.DDL_FNAME] in rB: - mTsim += getStringSimilarity(job, str(rA[matching.matchkeys[k][D.DDL_FNAME]]), - str(rB[matching.matchkeys[k][D.DDL_FNAME]])) - else: - mTsim += "00" - topTsim += "99" - if M.SIM_BUSINESS in k: - if matching.matchkeys[k][D.DDL_FNAME] in rA and matching.matchkeys[k][D.DDL_FNAME] in rB: - mBsim += getStringSimilarity(job, str(rA[matching.matchkeys[k][D.DDL_FNAME]]), - str(rB[matching.matchkeys[k][D.DDL_FNAME]])) - else: - mBsim += "55" - topBsim += "99" - - if mBsim == topBsim and mTsim == topTsim: - job.debug(verify, "Treffer ") - return "MATCH" - elif simorder[0:1] == M.SIM_BUSINESS and mBsim == topBsim: - job.debug(verify, "Treffer ") - return "MATCH" - elif simorder[0:1] == M.SIM_TECHNICAL and mTsim == topTsim: - job.debug(verify, "Treffer ") - return "MATCH" - elif simorder[0:1] == M.SIM_TECHNICAL: - return "S"+mTsim+mBsim+str(i).zfill(3) - else: # if simorder[0:1] == M.SIM_BUSINESS: - return "S" + mBsim + mTsim + str(i).zfill(3) - - -def matchTree(matching): - """ - :param matching: - :return: - """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 4 - job.debug(verify, "..>> start matching " + matching.mode) - matchElement(matching, matching.sideA, matching.sideB, "") - matching.setDiffFooter() - job.debug(verify, "..>> ende matching " + matching.htmltext) - return matching.htmltext - - -def matchElement(matching, A, B, path): - """ travers through the datatree """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 4 - job.debug(verify, "matchElem " + path + " A " + str(type(A)) + " B " + str(type(B))) - if ((A is not None) and (isinstance(A, list))) \ - or ((B is not None) and (isinstance(B, list))): - return matchArray(matching, A, B, path) - elif ((A is not None) and (isinstance(A, dict))) \ - or ((B is not None) and (isinstance(B, dict))): - return matchDict(matching, A, B, path) - else: - return matching - - -def getStringSimilarity(job, strA, strB): - #job = basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - job.debug(verify, "getStringSimilarity " + strA + " ?= " + strB) - if (strA == strB): return "99" - if (strA.strip() == strB.strip()): return "77" - if (strA.lower() == strB.lower()): return "66" - if (strA.strip().lower() == strB.strip().lower()): return "55" - return "00" - - -def getEvaluation(matching, type, acceptance, sideA, sideB): - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - job.debug(verify, "getEvaluation " + str(sideA) + " ?= " + str(sideB)) - match = getStringSimilarity(job, str(sideA), str(sideB)) - classA = "novalue" - classB = "novalue" - result = "test" - if match == "99": return ["MATCH", "novalue", "novalue", "novalue", "novalue"] - if acceptance == "ignore": result = "ignore" - if (matching.matchtype == M.MATCH_POSTCOND) and (result == "test"): - result = "hard" - classA = "diffA" - classB = "diffB" - else: - classA = "acceptA" - classB = "acceptB" - return [result, classA, classB] - - -def matchDict(matching, sideA, sideB, path): - """ travers through the datatree """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 4 - job.debug(verify, "matchDict " + path) - if (sideA is not None): - for k in sideA: - job.debug(verify, "matchDict 400 " + k + ".") - if k in ["_match", D.DATA_ATTR_COUNT, D.DATA_ATTR_DATE, B.DATA_NODE_HEADER]: - continue - if (sideB is not None) and (k in sideB): - if (isinstance(sideA[k], dict)): sideA[k]["_match"] = "Y" - if (isinstance(sideB[k], dict)): sideB[k]["_match"] = "Y" - job.debug(verify, "matchDict 404 " + k + "." + path) - matchElement(matching, sideA[k], sideB[k], path + ":" + k) - else: - if (isinstance(sideA[k], dict)): sideA[k]["_match"] = "N" - job.debug(verify, "matchDict 408 " + path) - matchElement(matching, sideA[k], None, path + ":" + k) - if (sideB is not None): - for k in sideB: - job.debug(verify, "matchDict 412 " + k + ".") - if k in ["_match", D.DATA_ATTR_COUNT, D.DATA_ATTR_DATE, B.DATA_NODE_HEADER]: - continue - if (sideA is not None) and (k in sideA): - continue - elif (sideA is None) or (k not in sideA): - if (sideA is not None) and (isinstance(sideA[k], dict)): sideB[k]["_match"] = "N" - job.debug(verify, "matchDict 418 " + k + "." + path) - matchElement(matching, None, sideB[k], path + ":" + k) - job.debug(verify, "matchDict 420 ...<<---") - return matching - - -def matchArray(matching, sideA, sideB, path): - """ matches the datarows of the datatree """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 4 - job.debug(verify, "matchArray " + path + "\n.." + matching.htmltext) - matching.sideA = sideA - matching.sideB = sideB - setMatchkeys(matching, path) - matchBestfit(matching, path) - matchRestfit(matching) - a = path.split(":") - for x in a: - if (x == "_data"): break - table = x - report = utils.report_tool.Report.getInstance(job) - report.setPaths(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype, - matching.matchfiles[M.M_SIDE_A], matching.matchfiles[M.M_SIDE_B]) - # report.setMatchResult("TC0001", "comp01", "arte01", m, "result" + str(i), "" + str(i) + "
") - htmltext = report.getTitle(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype) - htmltext += report.getComponentHead(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype) - htmltext += report.getArtefactBlock(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype) - htmltext += compareRows(matching, path) - matching.htmltext += htmltext - - -def compareRows(matching, path): - """ traverse through matched rows """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 1 - ddl = matching.getTableDdl(path) - report = utils.report_tool.Report.getInstance(job) - table = "" - header = [] - # "

Tabelle : "+table+"

" - htmltext = "" - for f in ddl[B.DATA_NODE_HEADER]: - job.debug(verify, "ddl " + f + " ") - header.append({D.DDL_FNAME: f, D.DDL_TYPE: ddl[f][D.DDL_TYPE], D.DDL_ACCEPTANCE: ddl[f][D.DDL_ACCEPTANCE]}) - htmltext += "" - htmltext += "" - matching.difftext = htmltext - for k in sorted(matching.linksA): - print(k) - if (matching.isHitA(k)): - htmltext += compareRow(matching, header, matching.sideA[int(extractKeyI(k))], - matching.sideB[int(extractKeyI(matching.linksA[k]))]) - else: - htmltext += markRow(matching, header, matching.sideA[int(extractKeyI(k))], M.M_SIDE_A) - matching.setCssClass("result3") - for k in sorted(matching.linksB): - if ignorePreid(matching, k): - continue - if (not matching.isHitB(k)): - htmltext += markRow(matching, header, matching.sideB[int(extractKeyI(k))], M.M_SIDE_B) - matching.setCssClass("result2") - htmltext += "
" + f + "
" - matching.difftext += "" - return htmltext - - -def markRow(matching, header, row, side): - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 4 - text = "" - cssClass = "" - for f in header: - if not f[D.DDL_FNAME] in row: - val = "" - cssClass = "novalue" - elif side == M.M_SIDE_A: - res = getEvaluation(matching, f[D.DDL_TYPE], f[D.DDL_ACCEPTANCE], row[f[D.DDL_FNAME]], "") - val = str(row[f[D.DDL_FNAME]]) - cssClass = res[1] - else: - res = getEvaluation(matching, f[D.DDL_TYPE], f[D.DDL_ACCEPTANCE], "", row[f[D.DDL_FNAME]]) - val = str(row[f[D.DDL_FNAME]]) - cssClass = res[2] - text += "" + val + "" - text = "" \ - + M.MATCH[M.MATCH[matching.matchtype][side]]["short"] + "" + text + "" - matching.difftext += text - return text - - -def compareRow(matching, header, rA, rB): - """ traverse through matched rows """ - job = matching.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel("match_tool")) - 4 - allident = True - textA = "" - textB = "" - text = "" - valA = "" - valB = "" - classA = "novalue" - classB = "novalue" - for f in header: - print(f) - if f[D.DDL_FNAME] in rA and f[D.DDL_FNAME] in rB: - res = getEvaluation(matching, f[D.DDL_TYPE], f[D.DDL_ACCEPTANCE], rA[f[D.DDL_FNAME]], rB[f[D.DDL_FNAME]]) - match = res[0] - classA = res[1] - classB = res[2] - valA = str(rA[f[D.DDL_FNAME]]) - valB = str(rB[f[D.DDL_FNAME]]) - elif f[D.DDL_FNAME] in rA: - valA = str(rA[f[D.DDL_FNAME]]) - match = "ddl" - classA = "acceptA" - classB = "acceptB" - elif f[D.DDL_FNAME] in rB: - valB = str(rB[f[D.DDL_FNAME]]) - match = "ddl" - classA = "acceptA" - classB = "acceptB" - else: - match = "MATCH" - classA = "acceptA" - classB = "acceptB" - if (match == "MATCH"): - textA += "" + valA + "" - textB += "" + valB + "" - matching.setCssClass("result1") - elif (match == "hard"): - allident = False - textA += "" + valA + "" - textB += "" + valB + "" - matching.setCssClass("result3") - else: - allident = False - textA += "" + valA + " (" + match + ")" - textB += "" + valB + " (" + match + ")" - matching.setCssClass("result1") - if allident: - return "" + textA + "" - text = "" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_A]]["short"] + "" + textA + "" - text += "" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_B]]["short"] + "" + textB + "" - matching.difftext += text - return text - - -# -------------------------------------------------------------------------- -def matchLines(matching): - pass diff --git a/utils/path_const.py b/utils/path_const.py deleted file mode 100644 index 5c6a0be..0000000 --- a/utils/path_const.py +++ /dev/null @@ -1,115 +0,0 @@ -import basic.constants as B -# ------------------------------------------------------------- -# values and keywords -KEY_PRECOND = "precond" -KEY_POSTCOND = "postcond" -KEY_RESULT = "result" -KEY_ORIGIN = "origin" -KEY_PARTS = "parts" -KEY_SUMFILE = "sumfile" -KEY_BACKUP = "backup" -KEY_REFFILE = "reffile" -KEY_TESTCASE = "tc" -KEY_TESTSUITE = "ts" -KEY_CATALOG = "catalog" -KEY_DEBUGNAME = "debugname" -KEY_LOGNAME = "logname" -KEY_BASIC = "basic" -""" keyword for basic config in components """ -KEY_COMP = "comp" -""" keyword for individual component """ -KEY_TOOL = "tool" -""" keyword for technical tools """ -VAL_UTIL = "utils" -""" subdir for any technical tools """ -VAL_CONFIG = "config" -""" subdir for any place of config-files """ -VAL_COMPS = "components" -""" subdir for the plugin components """ -VAL_BASIC = "basic" -""" subdir for the basic job-framework """ -VAL_BASE_DATA = "data" -""" subdir for the basis data-folder """ -VAL_TDATA = "testdata" -""" subdir for the basis data-folder """ -# ------------------------------------------------------------- -# parameter with arguments -PAR_APP = "job.par." + B.PAR_APP -PAR_ENV = "job.par." + B.PAR_ENV -PAR_REL = "job.par." + B.PAR_REL -PAR_TSDIR = "job.par." + B.PAR_TSDIR -PAR_TCDIR = "job.par." + B.PAR_TCDIR -PAR_XPDIR = "job.par." + B.PAR_XPDIR -PAR_TDTYP = "job.par." + B.PAR_TDTYP -PAR_TDSRC = "job.par." + B.PAR_TDSRC -PAR_TDNAME = "job.par." + B.PAR_TDNAME -PAR_LOG = "job.par." + B.PAR_LOG -PAR_MODUS = "job.par." + B.PAR_MODUS -PAR_COMP = "job.par." + B.PAR_COMP -PAR_FCT = "job.par." + B.PAR_FCT -PAR_TOOL = "job.par." + B.PAR_TOOL -PAR_STEP = "job.par." + B.PAR_STEP -PAR_DESCRIPT = "job.par." + B.PAR_DESCRIPT -PAR_TESTCASE = "job.par." + B.PAR_TESTCASE -PAR_TESTCASES = "job.par." + B.PAR_TESTCASES -PAR_TESTSUITE = "job.par." + B.PAR_TESTSUITE -PAR_TCTIME = "job.par." + B.PAR_TCTIME -PAR_TSTIME = "job.par." + B.PAR_TSTIME -PAR_TESTINSTANCES = "job.par." + B.PAR_TESTINSTANCES - -# ------------------------------------------------------------- -# attributes -ATTR_PATH_MODE = "mode" -""" This constant defines the home-folder in filesystem of test """ -ATTR_PATH_HOME = "home" -""" This constant defines the home-folder in testing-filesystem """ -ATTR_PATH_DEBUG = "debugs" -""" This constant defines the debug-folder in testing-filesystem """ -ATTR_PATH_ARCHIV = "archiv" -""" This constant defines the folder in testing-filesystem for results and log of execution """ -ATTR_PATH_PROGRAM = "program" -""" This constant defines the program-folder in the workspace """ -ATTR_PATH_COMPONENTS = "components" -""" This constant defines the program-folder in the workspace """ -ATTR_PATH_ENV = "environment" -""" This constant defines the folder in testing-filesystem, used for configs related to environments """ -ATTR_PATH_RELEASE = "release" -""" This constant defines the folder in testing-filesystem, used for configs related to release """ -ATTR_PATH_TDATA = "testdata" -""" This constant defines the folder in testing-filesystem with the testcase-specifications """ -ATTR_PATH_TEMP = "temp" -""" This constant defines the debug-folder in testing-filesystem """ -ATTR_PATH_PATTN = "pattern" -""" This constant defines the debug-folder in testing-filesystem """ - - - -# ------------------------------------------------------------- -# structure - nodes -P_DEBUGS = "debugs" -P_ENVBASE = "envbase" -P_ENVLOG = "envlog" -P_ENVPARFILE = "envparfile" -P_TCBASE = "tcbase" -P_TCLOG = "tclog" -P_TCRESULT = "tcresult" -P_TCPARFILE = "tcparfile" -P_TCDIFF = "tcdiff" -P_TCPREDIFF = "tcprediff" -P_TCRUNDIFF = "tcrundiff" -P_TCPRECOND = "tcprecond" -P_TCPOSTCOND = "tcpostcond" -P_TSBASE = "tsbase" -P_TSLOG = "tslog" -P_TSPARFILE = "tsparfile" -P_TSSUM = "tssum" -P_XPBASE = "xpbase" -P_XPRESULT = "xpresult" -P_XPBACKUP = "xpbackup" - -# ------------------------------------------------------------- -# exception texts -EXP_COMP_MISSING = "Component is missing for {}" -""" excetion for the case that a specific component doesnt exist, 1 parameter (context) """ -EXP_CONFIG_MISSING = "Configuration is missing for {}" -""" excetion for the case that a specific configuration is missing, 1 parameter (context) """ \ No newline at end of file diff --git a/utils/report_tool.py b/utils/report_tool.py deleted file mode 100644 index 42784a7..0000000 --- a/utils/report_tool.py +++ /dev/null @@ -1,292 +0,0 @@ -# functions in order to report the summaries -# ------------------------------------------------------- -""" -the reporting-system in bottom-up -(0) raw-data -(0.1) artificats created from test-system - raw-files up to work-files - -(0.2) in logs all contacts to test-system - raw-info - -(1) comparison-result -(1.1) comparison-result as difference for each file-comparison in html-files in testcases - diff-files - -(1.2) extraction of diff-files (only not accepted differences) as html-file in test-set - result-report - -(2) result-code -(2.1) result-code of each test-step for each component in testcases - parameter-file - -(2.2) transfer of the summary result-code of each testcase to an extern reporting-system - protocol - -(2.3) visualization of final result-code of each component and each testcase in test-set - result-report - -(2.4) visualization of statistical result-codes of each component and each test-set in test-context - result-report - - | testcase-artefact | testcase-report | testsuite-report -title | comparison | testcase | testsuite -table0 | | each component | each testcase x each component - h1 | | | testcase - h2 | | component | component - h3 | -- | main-comparison | main-comparison - h4 | | other comparison | other comparison -""" -import os -import re -import basic.program -import utils.match_const as M -import basic.constants as B -import utils.css_tool -import utils.path_tool - -REP_TITLE = "Ergebnisbericht" -REP_TC = "Testfall" -REP_TS = "Testsuite" -REP_COMP = "Komponente" -REP_ART = "Artefakt" - -TOOL_NAME = "report_tool" - - -class Report: - __instance = None - __instances = {} - - @staticmethod - def getInstance(job): - if (job.jobid in Report.__instances): - return Report.__instances[job.jobid] - else: - return Report(job) - - def __init__(self, job): - """ - :param report: matchtype - :param comp: optional on matching - structure: - report 1:k - testcase 1:l # only in testsuite-report - components 1:m - artefacts 1:n - matches - the html-side - title : subject of result for artefact/testcase/testsuite - overview : table with links, only for testcase/testsuite - testcase : h1 with anker, only testcase/testsuite - component: h2 with anker - artefact : h3 - matches : with links - paths : p with links to row-results - matching : table complete match for artefact-result resp. only diff for other matches - """ - #job = basic.program.Job.getInstance() - self.job = job - self.report = {} - Report.__instances[job.jobid] = self - self.report["testcases"] = [] - self.testcase = "" - self.component = "" - self.artefact = "" - self.matchtype = "" - #return self - - def setPaths(self, testcase, component, artefact, matchtype, pathA, pathB): - if not testcase in self.report: - self.report[testcase] = {} - self.report["testcases"].append(testcase) - if not component in self.report[testcase]: - self.report[testcase][component] = {} - if not artefact in self.report[testcase][component]: - self.report[testcase][component][artefact] = {} - self.report[testcase][component][artefact][matchtype] = {} - self.report[testcase][component][artefact][matchtype]["pathA"] = pathA - self.report[testcase][component][artefact][matchtype]["pathB"] = pathB - self.testcase = testcase - self.component = component - self.artefact = artefact - self.matchtype = matchtype - - def setMatchResult(self, testcase, component, artefact, matchtype, cssClass, diffTable): - self.report[testcase][component][artefact][matchtype]["css"] = cssClass - if matchtype == M.MATCH_POSTCOND: - self.report[testcase][component][artefact][matchtype]["diff"] = diffTable - - def getTitle(self, testcase="", component="", artefact="", matchtype=""): - job = self.job # basic.program.Job.getInstance() - if len(matchtype) > 1: - html = ""+M.MATCH[matchtype]["title"]+"

"+M.MATCH[matchtype]["title"]+"

" - elif len(testcase) > 1: - html = ""+REP_TITLE+" "+REP_TC+" "+testcase+"" - html += "

"+REP_TITLE+" "+REP_TC+" "+testcase+"

" - elif hasattr(job.par, B.PAR_TESTSUITE): - html = "" + REP_TITLE + " " + REP_TS + " " + getattr(job.par, B.PAR_TESTSUITE) + "" - html += "

" + REP_TITLE + " " + REP_TS + " " + getattr(job.par, B.PAR_TESTSUITE) + "

" - elif hasattr(job.par, B.PAR_TESTCASE): - html = "" + REP_TITLE + " " + REP_TC + " " + getattr(job.par, B.PAR_TESTCASE) + "" - html += "

" + REP_TITLE + " " + REP_TC + " " + getattr(job.par, B.PAR_TESTCASE) + "

" - if hasattr(job.par, B.PAR_DESCRIPT): - html += "

"+getattr(job.par, B.PAR_DESCRIPT)+"

" - return html - - def getCssClass(self, testcase="", component="", artefact="", match=""): - """ - the function calculates the maximum cssClass of the granularity - the cssClass means the fault-class of the matching - :param testcase: - :param component: - :param artefact: - :return: - """ - cssClass = "result0" - if len(match) > 0 and len(artefact) > 0: - if match in self.report[testcase][component][artefact]: - if "css" in self.report[testcase][component][artefact][match]: - if cssClass < self.report[testcase][component][artefact][match]["css"]: - cssClass = self.report[testcase][component][artefact][match]["css"] - return cssClass - elif len(match) > 0: - for a in self.report[testcase][component]: - val = self.getCssClass(testcase, component, a, match) - if cssClass < val: - cssClass = val - return cssClass - elif len(artefact) > 0: - for match in self.report[testcase][component][artefact]: - if "css" in self.report[testcase][component][artefact][match]: - if cssClass < self.report[testcase][component][artefact][match]["css"]: - cssClass = self.report[testcase][component][artefact][match]["css"] - return cssClass - elif len(component) > 0: - for a in self.report[testcase][component]: - val = self.getCssClass(testcase, component, a) - if cssClass < val: - cssClass = val - return cssClass - elif len(testcase) > 0: - for c in self.report[testcase]: - val = self.getCssClass(testcase, c, "") - if cssClass < val: - cssClass = val - return cssClass - return cssClass - - def getHeader(self): - job = self.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel(TOOL_NAME))-1 - htmltxt = "" - htmltxt += "" - htmltxt += utils.css_tool.getInternalStyle(job, "diffFiles") - return htmltxt - - def getOverviewHead(self, testcase=""): - if len(testcase) < 1: - return "" - htmlHead = "" - for c in self.report[testcase]: - if c in ["overview", "block"]: - continue - htmlHead += "" - return htmlHead - - def getOverview(self, testcase=""): - if len(testcase) < 1: - return "" - htmlHead = self.getOverviewHead(testcase) - htmlBody = "" - for c in self.report[testcase]: - cssClass = self.getCssClass(testcase, c, "") - htmlBody += "" - return htmlHead+""+htmlBody+"
"+REP_TC+"" + c + "
"+testcase+""+testcase+"
" - - def getTestcaseHead(self, testcase="", component="", artefact="", matchtype=""): - if len(testcase) < 1: - return "" - cssClass = self.getCssClass(testcase, "", "") - html = "

"+REP_TC+" "+testcase+"

" - return html - - def getComponentHead(self, testcase="", component="", artefact="", matchtype=""): - job = self.job # basic.program.Job.getInstance() - html = "

"+REP_COMP+" "+component+"

" - return html - - def getArtefactBlock(self, testcase, component, artefact, matchtype=""): - job = self.job # basic.program.Job.getInstance() - html = "

"+REP_ART+" "+artefact+"

" - for match in self.report[testcase][component][artefact]: - cssClass = self.getCssClass(testcase, component, artefact, match) - path = self.getFilepath(job, testcase, component, artefact, match) - path = path.replace(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_ARCHIV], os.path.join("..", "..")) - html += " "+M.MATCH[match]["filename"]+" " - html += "

" - if len(matchtype) < 1: - matchtype = M.MATCH_POSTCOND - html += "

\n" - for side in ["A", "B"]: - path = self.report[testcase][component][artefact][matchtype]["path"+side] - href = path.replace(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_ARCHIV], os.path.join("..", "..")) - html += M.MATCH[M.MATCH[matchtype][side]]["long"]+" " - html += ""+path+"\n" - if side == "A": html += "
" - html += "

" - return html - - def getFilepath(self, job, testcase, component, artefact, matchtype): - cm = basic.componentHandling.ComponentManager.getInstance(self.job, "init") - comp = cm.getComponent(component) - path = os.path.join(utils.path_tool.composePattern(self.job, "{tcresult}", comp), artefact+"_"+M.MATCH[matchtype]["filename"]+".html") - return path - - def getComparisonBlock(self, testcase, component, artefact, matchtype): - html = self.report[testcase][component][artefact][matchtype]["diff"] - return html - - def reportTestcase(self, testcase): - job = self.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel(TOOL_NAME)) - 1 - html = self.getHeader() - html += self.getTitle(testcase) - html += self.getOverview(testcase) - html += "
" - html += self.getTestcaseHead(testcase) - for component in self.report[testcase]: - html += self.getComponentHead(testcase, component) - for artefact in self.report[testcase][component]: - html += self.getArtefactBlock(testcase, component, artefact, M.MATCH_POSTCOND) - html += self.getComparisonBlock(testcase, component, artefact, M.MATCH_POSTCOND) - html += "
<-- class=\"tc-block\" -->" - html += "" - return html - - def extractTestcase(self, testcase, html): - job = self.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel(TOOL_NAME)) - 1 - if not testcase in self.report: - self.report = {} - overview = re.findall("", html) - if len(overview) == 1: - self.report[testcase]["overview"] = overview[0] - startP = html.index("
") - block = html[startP:endP] - if len(block) > 1: - self.report[testcase]["block"] = block - - def reportTestsuite(self): - job = self.job # basic.program.Job.getInstance() - verify = int(job.getDebugLevel(TOOL_NAME)) - 1 - testinstances = getattr(job.par, B.PAR_TESTCASES) - html = self.getHeader() - html += self.getTitle() - i = 0 - for testcase in testinstances: - if i == 0: html += self.getOverviewHead(testcase) - html += self.report[testcase]["overview"] - i += 1 - html += "" - for testcase in testinstances: - html += self.report[testcase]["block"] - html += "" - return html - - -def report_testsuite(job, tcpath): - """ - creates header - :param tcpath: - :return: html-code with result-codes of each component - """ - # job = basic.program.Job.getInstance() - verify = -0+job.getDebugLevel(TOOL_NAME) - job.debug(verify, "writeDataTable " + str(tcpath)) - diff --git a/utils/tdata_tool.py b/utils/tdata_tool.py deleted file mode 100644 index f12538d..0000000 --- a/utils/tdata_tool.py +++ /dev/null @@ -1,451 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# --------------------------------------------------------------------------------------------------------- -# Author : Ulrich Carmesin -# Source : gitea.ucarmesin.de -# --------------------------------------------------------------------------------------------------------- -import os.path -import inspect -import basic.program -import utils.config_tool -import utils.file_tool -import basic.constants as B -import utils.data_const as D -import utils.path_const as P -import utils.path_tool -import utils.date_tool -import basic.step -import utils.i18n_tool -import re - -TOOL_NAME = "tdata_tool" -list_blocks = {} # lists of aliases - - -def getTestdata(job=None): - """ - get the testdata from one of the possible sources - for the testcase resp testsuite of the job - :return: - """ - if "testcase" in job.program: - return collectTestdata(B.PAR_TESTCASE, getattr(job.par, B.PAR_TESTCASE), job) - else: - return collectTestdata(B.PAR_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), job) - - -def collectTestdata(gran, testentity, job): - """ - collects the testdata from kind of the possible sources - for the testcase resp testsuite - :return: - """ - setBlockLists(job) - if gran == B.PAR_TESTCASE: - basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity) - pathname = utils.config_tool.getConfigPath(job, P.KEY_TESTCASE, getattr(job.par, B.PAR_TESTCASE), "") - if gran == B.PAR_TESTSUITE: - basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity) - pathname = utils.config_tool.getConfigPath(job, P.KEY_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), "") - if pathname[-3:] == D.DFILE_TYPE_CSV: - tdata = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA) - else: - tdata = utils.file_tool.readFileDict(job, pathname, job.m) - # get explicit specdata of includes - if D.CSV_BLOCK_IMPORT in tdata: - for pathname in tdata[D.CSV_BLOCK_IMPORT]: - pathname = utils.path_tool.rejoinPath(pathname) - if job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA] not in pathname: - pathname = utils.path_tool.rejoinPath(basispath, pathname) - if pathname[-3:] == D.DFILE_TYPE_CSV: - data = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA) - else: - data = utils.file_tool.readFileDict(job, pathname, job.m) - for table in data[D.CSV_BLOCK_TABLES]: - if table in tdata[D.CSV_BLOCK_TABLES]: - print("Fehler") - tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table] - # get implicit specdata of spec-library - for prefix in list_blocks[D.DFILE_TABLE_PREFIX]: - files = utils.file_tool.getFiles(job.m, job, basispath, prefix, None) - if len(files) < 0: - continue - for f in files: - if f in tdata[D.CSV_BLOCK_TABLES]: - continue - pathname = utils.path_tool.rejoinPath(basispath, f) - if pathname[-3:] == D.DFILE_TYPE_CSV: - data = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA) - else: - data = utils.file_tool.readFileDict(job, pathname, job.m) - for table in data[D.CSV_BLOCK_TABLES]: - if table in tdata[D.CSV_BLOCK_TABLES]: - print("Fehler") - tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table] - # fill the options into job-parameter - for p in tdata[D.CSV_BLOCK_OPTION]: - setattr(job.par, p, tdata[D.CSV_BLOCK_OPTION][p]) - return tdata - - -def setBlockLists(job): - for block in D.LIST_BLOCK_CONST + D.LIST_ATTR_CONST + D.LIST_DFNAME_CONST: - list = utils.i18n_tool.I18n.getInstance(job).getAliasList(block+"='"+eval("D."+block)+"'", job) - #list.append(eval("D."+block)) - list_blocks[eval("D." + block)] = [] - for x in list: - list_blocks[eval("D." + block)].append(x.lower()) - - -def readCsv(msg, job, filename, comp, aliasNode=""): - lines = utils.file_tool.readFileLines(job, filename, msg) - print("readCsv "+filename) - return parseCsv(msg, job, filename, lines, comp, aliasNode) - - -def parseCsv(msg, job, filename, lines, comp, aliasNode=""): - if len(list_blocks) < 1: - setBlockLists(job) - tdata = {} - if len(aliasNode) < 1: - print(str(list_blocks)) - aliasNode = extractAliasNode(filename, comp, job) - if len(aliasNode) > 3: - tdata[D.DATA_ATTR_ALIAS] = aliasNode - return parseCsvSpec(msg, lines, D.CSV_SPECTYPE_DATA, tdata, job) - - -def extractAliasNode(filename, comp, job): - basename = os.path.basename(filename)[0:-4] - for prefix in list_blocks[D.DFILE_TABLE_PREFIX]: - if basename.find(prefix) == 0: - basename = basename[len(prefix):] - if comp is None: - return "" - if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basename in comp.conf[B.DATA_NODE_DDL]: - return B.DATA_NODE_TABLES+":"+basename - return "" - - -def getCsvSpec(msg, job, filename, ttype): - """ - reads the specification from a csv-file and maps it into the internal data-structure - :param msg: - :param filename: - :param type: - :param job: - :return: - """ - #if job is None: - # job = basic.program.Job.getInstance() - lines = utils.file_tool.readFileLines(job, filename, msg) - tdata = {} # the result - return parseCsvSpec(msg, lines, ttype, tdata, job) - - -def parseCsvSpec(msg, lines, ttype, tdata, job): - """ - - :param msg: - :param lines: - :param type: - :param job: - :return: - """ - if len(list_blocks) < 1: - setBlockLists(job) - status = "start" - verbose = False - tableAttr = {} # table - tableDict = {} # table - for l in lines: - if verbose: print("lines "+l) - fields = splitFields(l, D.CSV_DELIMITER, job) - # check empty line, comment - if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1): - status = "start" - continue - if (fields[0][0:1] == "#"): - continue - a = fields[0].lower().split(":") - # keywords option, step, table - if verbose: print(str(a)+" -- "+str(fields)) - tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job) - if (tableAttr["_hit"]): - status = "TABLE_ALIAS" - continue - if (a[0].lower() in list_blocks[D.CSV_BLOCK_HEAD]): - if verbose: print("head "+l) - setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job) - status = "start" - continue - elif (a[0].lower() in list_blocks[D.CSV_BLOCK_OPTION]): - if verbose: print("option " + l) - setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job) - status = "start" - continue - elif (a[0].lower() in list_blocks[D.CSV_BLOCK_STEP]): - if verbose: print("step "+l) - step = basic.step.parseStep(job, fields) - if D.CSV_BLOCK_STEP not in tdata: - tdata[D.CSV_BLOCK_STEP] = [] - tdata[D.CSV_BLOCK_STEP].append(step) - status = "start" - continue - elif (a[0].lower() in list_blocks[D.CSV_BLOCK_IMPORT]): - if verbose: print("includes " + l) - if D.CSV_BLOCK_IMPORT not in tdata: - tdata[D.CSV_BLOCK_IMPORT] = [] - tdata[D.CSV_BLOCK_IMPORT].append(fields[1]) - status = "start" - continue - elif (a[0].lower() in list_blocks[D.CSV_BLOCK_TABLES]): - if verbose: print("tables "+l) - h = a - h[0] = B.DATA_NODE_TABLES - if ttype == D.CSV_SPECTYPE_CONF: - del h[0] - tableDict = getTdataContent(msg, tdata, h) - setTableHeader(tableDict, tableAttr, fields, ttype, job) - status = D.CSV_SPECTYPE_DATA - elif (status == D.CSV_SPECTYPE_DATA): - tableDict = getTdataContent(msg, tdata, h) - if verbose: print("setTableData "+str(h)+" "+str(tableDict)) - setTableData(tableDict, fields, ttype, job) - elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata: - alias = tdata[D.DATA_ATTR_ALIAS] - b = alias.split(":") - h = [B.DATA_NODE_TABLES] + b - tableDict = getTdataContent(msg, tdata, h) - tableDict[D.DATA_ATTR_ALIAS] = alias - fields = [alias] + fields - setTableHeader(tableDict, tableAttr, fields, ttype, job) - status = D.CSV_SPECTYPE_DATA - if ttype == D.CSV_SPECTYPE_CONF: - header = [] - for k in tdata: - if k in D.LIST_DATA_ATTR: - continue - if B.DATA_NODE_DATA in tdata[k]: - tdata[k].pop(B.DATA_NODE_DATA) - for f in tdata[k]: - if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR: - continue - header.append(f) - tdata[k][B.DATA_NODE_HEADER] = header - header = [] - if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]: - for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]: - if k in tdata[B.DATA_NODE_TABLES]: - if verbose: print("Error") - else: - tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k] - tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES) - return tdata - - -def setTableHeader(tableDict, tableAttr, fields, ttype, job): - header = [] - for i in range(1, len(fields)): - header.append(fields[i].strip()) - tableDict[B.DATA_NODE_HEADER] = header - for attr in tableAttr: - tableDict[attr] = tableAttr[attr] - # preparate the sub-structure for row-data - if ttype == D.CSV_SPECTYPE_TREE: - tableDict[B.DATA_NODE_DATA] = {} - elif ttype == D.CSV_SPECTYPE_KEYS: - tableDict[D.CSV_NODETYPE_KEYS] = {} - tableDict[D.DATA_ATTR_KEY] = 1 - if D.DATA_ATTR_KEY in tableAttr: - tableDict[D.DATA_ATTR_KEY] = header.index(tableAttr[D.DATA_ATTR_KEY]) + 1 - else: - tableDict[B.DATA_NODE_DATA] = [] - return tableDict - - -def setTableData(tableDict, fields, ttype, job): - row = {} - if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict: - fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields - i = 1 - for f in tableDict[B.DATA_NODE_HEADER]: - row[f] = fields[i].strip() - i += 1 - if ttype == D.CSV_SPECTYPE_DATA: - if B.ATTR_DATA_COMP in tableDict: - tcomps = tableDict[B.ATTR_DATA_COMP] - else: - tcomps = {} - row[B.ATTR_DATA_COMP] = {} - for c in fields[0].split(","): - a = c.split(":") - tcomps[a[0]] = a[1] - row[B.ATTR_DATA_COMP][a[0]] = a[1].strip() - tableDict[B.DATA_NODE_DATA].append(row) - tableDict[B.ATTR_DATA_COMP] = tcomps - elif ttype == D.CSV_SPECTYPE_KEYS: - tableDict[D.CSV_NODETYPE_KEYS][fields[tableDict[D.DATA_ATTR_KEY]].strip()] = row - elif ttype == D.CSV_SPECTYPE_CONF: - tableDict[fields[1]] = row - return tableDict - - -def setTableAttribute(tableAttr, key, val, job): - for attr in D.LIST_DATA_ATTR: - if (key.lower() in list_blocks[attr]): - tableAttr[attr] = val.strip() - tableAttr["_hit"] = True - return tableAttr - tableAttr["_hit"] = False - return tableAttr - - -def setTdataLine(tdata, fields, block, job): - """ - sets field(s) into tdata as a key-value-pair - additional fields will be concatenate to a intern separated list - :param tdata: - :param fields: - :param block: - :param job: - :return: - """ - a = fields[0].lower().split(":") - a[0] = block # normalized key - val = "" - for i in range(1, len(fields)-1): - val += D.INTERNAL_DELIMITER+fields[i] - if len(val) > len(D.INTERNAL_DELIMITER): - val = val[len(D.INTERNAL_DELIMITER):] - setTdataContent(job.m, tdata, val, a) - return tdata - - -def setTdataContent(msg, data, tabledata, path): - setTdataStructure(msg, data, path) - if len(path) == 2: - data[path[0]][path[1]] = tabledata - elif len(path) == 3: - data[path[0]][path[1]][path[2]] = tabledata - elif len(path) == 4: - data[path[0]][path[1]][path[2]][path[3]] = tabledata - - -def getTdataContent(msg, data, path): - setTdataStructure(msg, data, path) - if len(path) == 2: - return data[path[0]][path[1]] - elif len(path) == 3: - return data[path[0]][path[1]][path[2]] - elif len(path) == 4: - return data[path[0]][path[1]][path[2]][path[3]] - elif len(path) == 1: - return data[path[0]] - else: - return None - - -def setTdataStructure(msg, data, path): - if len(path) >= 1 and path[0] not in data: - data[path[0]] = {} - if len(path) >= 2 and path[1] not in data[path[0]]: - data[path[0]][path[1]] = {} - if len(path) >= 3 and path[2] not in data[path[0]][path[1]]: - data[path[0]][path[1]][path[2]] = {} - if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]: - data[path[0]][path[1]][path[2]][path[3]] = {} - return data - - -def splitFields(line, delimiter, job): - out = [] - fields = line.split(delimiter) - for i in range(0, len(fields)): - if fields[i][0:1] == "#": - break - if re.match(r"^\"(.*)\"$", fields[i]): - fields[i] = fields[i][1:-1] - out.append(fields[i]) - return out - - -def writeCsvData(filename, tdata, comp, job): - text = "" - data = tdata - for p in [B.DATA_NODE_TABLES, P.KEY_PRECOND, P.KEY_POSTCOND]: - if p in data: - print("data "+p) - data = data[p] - for k in data: - text += buildCsvData(data, k, comp, job) - text += "\n" - utils.file_tool.writeFileText(comp.m, job, filename, text) - - -def buildCsvData(tdata, tableName, comp, job=None): - """ - writes the testdata into a csv-file for documentation of the test-run - :param teststatus: - :param tdata: - :param comp: if specific else None - :return: - """ - text = "" - for k in [D.DATA_ATTR_DATE, D.DATA_ATTR_COUNT]: - if k in tdata: - text += k+";"+str(tdata[k])+"\n" - if tableName in tdata: - actdata = tdata[tableName] - else: - actdata = tdata - header = str(utils.i18n_tool.I18n.getInstance(job).getText(f"{B.DATA_NODE_TABLES=}", job)) +":" + tableName - for f in actdata[B.DATA_NODE_HEADER]: - header += D.CSV_DELIMITER+f - text += header + "\n" - i = 0 - for r in actdata[B.DATA_NODE_DATA]: - row = "" - if B.ATTR_DATA_COMP in r: - for k in r[B.ATTR_DATA_COMP]: - row += ","+k+":"+r[B.ATTR_DATA_COMP][k] - row = row[1:] - i += 1 - for f in actdata[B.DATA_NODE_HEADER]: - if f in r: - row += D.CSV_DELIMITER+str(r[f]) - else: - row += D.CSV_DELIMITER - text += row - text += "\n" - return text - - -def buildCsvSpec(tdata, job=None): - text = "" - if D.CSV_BLOCK_IMPORT in tdata: - for k in tdata[D.CSV_BLOCK_HEAD]: - text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_HEAD=}", job) - text += ":"+k+D.CSV_DELIMITER+tdata[D.CSV_BLOCK_HEAD][k]+"\n" - text += "# option:key ;values;..;;;;\n" - if D.CSV_BLOCK_OPTION in tdata: - for k in tdata[D.CSV_BLOCK_OPTION]: - text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_OPTION=}", job) - text += ":" + k + D.CSV_DELIMITER + getHeadArgs(tdata[D.CSV_BLOCK_OPTION][k], job)+"\n" - text += "#;;;;;;\n" - if D.CSV_BLOCK_STEP in tdata: - text += basic.step.getStepHeader(job) - i = 1 - for step in tdata[D.CSV_BLOCK_STEP]: - text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_STEP=}", job) + ":" + str(i) - text += D.CSV_DELIMITER + step.getStepText(job) - i += 1 - text += "#;;;;;;\n" - if D.CSV_BLOCK_TABLES in tdata: - for k in tdata[D.CSV_BLOCK_TABLES]: - text += buildCsvData(tdata, k, None, job) - text += "#;;;;;;\n" - return text - -def getHeadArgs(value, job): - return value.replace(D.INTERNAL_DELIMITER, D.CSV_DELIMITER) diff --git a/utils/xml2_tool.py b/utils/xml2_tool.py deleted file mode 100644 index fc33e43..0000000 --- a/utils/xml2_tool.py +++ /dev/null @@ -1,45 +0,0 @@ -import os, sys, json -import xmltodict -import pprint - -class fcts: - def dict2xml(tree): - out = xmltodict.unparse(tree, pretty=True) - return out - - def xml2dict(xmlstring): - tree = {} - pp = pprint.PrettyPrinter(indent=4) - tree = xmlstring.parse(xmlstring) - return tree - -def readXml(filename): - pass - -def addNode(xpath, value): - pass - -def writeDataTable(teststatus, tdata, comp): - """ - writes the testdata into a csv-file for documentation of the test-run - :param teststatus: - :param tdata: - :param comp: if specific else None - :return: - """ - #output = xmljson.badgerfish.etree(tdata, root=xmljson.badgerfish.Element('root')) - result = bf.etree(tdata, root=Element('xml')) - # xmljson.badgerfish.tostring(output) - txt = tostring(result, pretty_print=True) - print (txt.decode('utf-8')) - - # out = tostring(result, pretty_print=True) - #print (prettify(result)) - pass - -def prettify(elem): - """Return a pretty-printed XML string for the Element. - """ - rough_string = tostring(elem, 'utf-8') - reparsed = xml.dom.minidom.parseString(rough_string ) - return reparsed.toprettyxml(indent=" ") \ No newline at end of file diff --git a/utils/zip_tool.py b/utils/zip_tool.py deleted file mode 100644 index 9da73b4..0000000 --- a/utils/zip_tool.py +++ /dev/null @@ -1,61 +0,0 @@ -import zipfile -import tarfile -import os -import basic.program - - -def untarFolder(target, targetFile): - tar_file = tarfile.open(os.path.join(target, targetFile), 'r:gz') - tar_file.extractall(path=os.path.join(target, 'tarliste')) - tar_file.close() - pass - -def openNewTarFile(job, target, targetFile): - tarfilename = os.path.join(target, targetFile) - if os.path.exists(tarfilename): - os.remove(tarfilename) - job.m.logInfo("Archiv angelegt "+tarfilename) - return tarfile.open(tarfilename, 'w:gz') - -def appendFolderIntoTarFile(job, source, sourceFolder, tarFile): - workFolder = os.path.join(source, sourceFolder) - for folderName, subfolders, filenames in os.walk(workFolder): - for filename in filenames: - folderShort = folderName[len(source)+1:] - # create complete filepath of file in directory - filePath = os.path.join(folderName, filename) - # Add file to zip - tarFile.add(filePath, os.path.join(folderShort, filename)) - - -def tarFolder(source, sourceFolder, target, targetFile): - with tarfile.open(os.path.join(target, targetFile), 'w:gz') as tar_file: - for folderName, subfolders, filenames in os.walk(os.path.join(source, sourceFolder)): - for filename in filenames: - folderShort = folderName.replace(source + '/', '') - # create complete filepath of file in directory - filePath = os.path.join(folderName, filename) - # Add file to zip - tar_file.add(filePath, os.path.join(folderShort, filename)) - tar_file.close() - - -def unzipFolder(target, targetFile): - zip_file = zipfile.ZipFile(os.path.join(target, targetFile), 'r') - zip_file.extractall(path=os.path.join(target, 'liste')) - zip_file.close() - pass - -def zipFolder(source, sourceFolder, target, targetFile): - with zipfile.ZipFile(os.path.join(target, targetFile), 'w') as zip_file: - # Iterate over all the files in directory - for folderName, subfolders, filenames in os.walk(os.path.join(source, sourceFolder)): - for filename in filenames: - folderShort = folderName.replace(source+'/', '') - # create complete filepath of file in directory - filePath = os.path.join(folderName, filename) - # Add file to zip - zip_file.write(filePath, os.path.join(folderShort, filename)) - zip_file.close() - return "" -