Browse Source

remove old utils

refactor
Ulrich 1 year ago
parent
commit
302ebfb799
  1. 0
      utils/__init__.py
  2. 51
      utils/api_abstract.py
  3. 4
      utils/api_const.py
  4. 46
      utils/cli_abstract.py
  5. 9
      utils/cli_const.py
  6. 21
      utils/clibash_tool.py
  7. 27
      utils/clicmd_tool.py
  8. 27
      utils/clihadoop_tool.py
  9. 33
      utils/clissh_tool.py
  10. 61
      utils/config/path.yml
  11. 362
      utils/config_tool.py
  12. 81
      utils/conn_tool.py
  13. 70
      utils/css_tool.py
  14. 176
      utils/date_tool.py
  15. 39
      utils/env_tool.py
  16. 109
      utils/file_abstract.py
  17. 287
      utils/file_tool.py
  18. 20
      utils/filejson_tool.py
  19. 19
      utils/filelog_tool.py
  20. 24
      utils/filexml_tool.py
  21. 5
      utils/flask_tool.py
  22. 23
      utils/gen_const.py
  23. 127
      utils/gen_tool.py
  24. 96
      utils/i18n_tool.py
  25. 97
      utils/job_tool.py
  26. 375
      utils/map_tool.py
  27. 118
      utils/match_const.py
  28. 551
      utils/match_tool.py
  29. 115
      utils/path_const.py
  30. 292
      utils/report_tool.py
  31. 451
      utils/tdata_tool.py
  32. 45
      utils/xml2_tool.py
  33. 61
      utils/zip_tool.py

0
utils/__init__.py

51
utils/api_abstract.py

@ -1,51 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
This abstract class ApiFcts defines the interface for the execution of any command-line-functions.
It uses the following configuration
.a) COMP.conf->artifact->api->[system] : for structural attributes of the operating-system \n
.c) COMP.conf->conn->[api] : for connection-attributes and structural attributes,
maybe specialized for the operating-system
The attribute type resolves which technique is used, implemented in a specific tool-class:
* nifi,
The main tasks are: \n
"""
import basic.program
import utils.config_tool
class ApiFcts():
"""
This interface defines each necessary connection to any kind of database.
The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool.
"""
def __init__(self):
self.comp = None
pass
def reset_TData(self, job):
pass
def setComp(self, job, comp):
self.job = job
self.comp = comp
def startCommand(self, comp, args):
""" method to execute the statement
this method should only called by the class itself """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def statusCommand(self, comp, args):
""" method to execute the statement
this method should only called by the class itself """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def stopCommand(self, comp, args):
""" method to execute the statement
this method should only called by the class itself """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)

4
utils/api_const.py

@ -1,4 +0,0 @@
#!/usr/bin/python
"""
constants for used for api-functions
"""

46
utils/cli_abstract.py

@ -1,46 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
This abstract class CliFcts defines the interface for the execution of any command-line-functions.
It uses the following configuration
.a) COMP.conf->artifact->cli->[system] : for structural attributes of the operating-system \n
.c) COMP.conf->conn->[cli] : for connection-attributes and structural attributes,
maybe specialized for the operating-system
The attribute type resolves which technique is used, implemented in a specific tool-class:
* cmd,bash,powersh ... for specific local operating-system
* ssh,hadoop ... for specific remote operating-system
The main tasks are: \n
.1. executing the command-array - with attributes
* conn.ip, host, port, user, password, ... for synchronous db-connection
* conn.root, ... for path-definitions for file-implementations (csv, )
"""
import basic.program
import utils.config_tool
import basic.constants as B
class CliFcts():
"""
This interface defines each necessary connection to any kind of database.
The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool.
"""
def __init__(self):
self.comp = None
pass
def reset_TData(self, job):
pass
def setComp(self, job, comp):
self.job = job
self.comp = comp
def execCommand(self, comp, command):
""" method to execute the statement
this method should only called by the class itself """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)

9
utils/cli_const.py

@ -1,9 +0,0 @@
#!/usr/bin/python
"""
constants for used for api-functions
"""
DEFAULT_DB_PARTITION = "n"
""" attribute if table is partitioned - partitions are parametrized """
DEFAULT_DB_CONN_JAR = "n"
""" attribute for connection-jar-file instead of connection by ip, port """

21
utils/clibash_tool.py

@ -1,21 +0,0 @@
#
# ----------------------------------------------------------
"""
This module implements the technique to interact via bash to the test-object.
The class has to be constructed by the tool-Handling because of keyword "bash" in the configuration,
then it will be called with the interface / abstract-class cli_abstract
"""
import os
import utils.cli_abstract
import basic
class CliFcts(utils.cli_abstract.CliFcts):
def execCmd(self, cmds):
"""
:param cmds:
:return:
"""
for cmd in cmds:
rc = os.system(cmd)
self.comp.m.logInfo("RC "+str(rc)+" zu CMD "+cmd)

27
utils/clicmd_tool.py

@ -1,27 +0,0 @@
#
# ----------------------------------------------------------
"""
This module implements the technique to interact via win-cmd to the test-object.
The class has to be constructed by the tool-Handling because of keyword "cmd" in the configuration,
then it will be called with the interface / abstract-class cli_abstract
"""
import os
import utils.cli_abstract
import basic
class CliFcts(utils.cli_abstract.CliFcts):
def execCmd(self, cmds):
"""
executes an array of commands on a windows-machine - the commands will be intern translated before execution
:param cmds: written in linux-bash
:return:
"""
for cmd in cmds:
cmd = self.translate(cmd)
rc = os.system(cmd)
self.comp.m.logInfo("RC "+str(rc)+" zu CMD "+cmd)
return "ok"
def translate(self, cmd):
""" translates a bash-cmd (default) into a windows-cmd """
return cmd

27
utils/clihadoop_tool.py

@ -1,27 +0,0 @@
#
# ----------------------------------------------------------
"""
This module implements the technique to interact via hadoop-cmd to the test-object.
The class has to be constructed by the tool-Handling because of keyword "hadoop" in the configuration,
then it will be called with the interface / abstract-class cli_abstract
"""
import os
import utils.cli_abstract
import basic
class CliFcts(utils.cli_abstract.CliFcts):
def execCmd(self, cmds):
"""
executes an array of commands on a hadoop-machine - the commands will be intern translated before execution
:param cmds: written in linux-bash
:return:
"""
for cmd in cmds:
cmd = self.translate(cmd)
rc = os.system(cmd)
self.comp.m.logInfo("RC "+str(rc)+" zu CMD "+cmd)
return "ok"
def translate(self, cmd):
""" translates a bash-cmd (default) into a windows-cmd """
return cmd

33
utils/clissh_tool.py

@ -1,33 +0,0 @@
#
# ----------------------------------------------------------
"""
This module implements the technique to interact via ssh to the test-object.
The class has to be constructed by the tool-Handling because of keyword "ssh" in the configuration,
then it will be called with the interface / abstract-class cli_abstract
"""
import os
import utils.cli_abstract
import basic
import paramiko
class CliFcts(utils.cli_abstract.CliFcts):
def execCmd(self, cmds):
"""
:param cmds:
:return:
"""
ssh = paramiko.SSHClient()
ssh.load_system_host_keys(os.path.expanduser('~/.ssh/known_hosts'))
if self.conn["password"]:
ssh.connect(self.conn["host"], username=self.conn["user"], password=self.conn["password"])
else:
ssh.connect(self.conn["host"], username=self.conn["user"])
shell = ssh.invoke_shell()
for cmd in cmds:
stdin, stdout, stderr = ssh.exec_command(cmd + "\n")
self.sysout = stdout.read()
stdin.close()
stderr.close()
stdout.close()
ssh.close()

61
utils/config/path.yml

@ -1,61 +0,0 @@
#
pattern:
# Keywords
log: log
parfile: PARAMETER_{job.par.application}_{job.par.environment}.yml
precond: vorher
postcond: nachher
diff: diff_fach
prediff: diff_init
rundiff: diff_ablauf
result: Ergebnisse/{comp.name}
origin: original
parts: teilergebnisse
sumfile: xxx
backup: backup
reffile: Herkunft.txt
appdir: APP
tc: testfall
ts: testlauf
debugname: debug
logname: log
preexec: env-pre-exec # only for dry unit-test
postexec: env-post-exec # only for dry unit-test
debugs: "{job.conf.home}/test/log"
# workspace
workbase: "{job.conf.data}/workspace"
worklog: "{workbase}/{log}"
workparfile: "{workbase}/PARAMETER_workspace"
workappdir: "{workbase}/{appdir}/{comp.name}"
# environment
envbase: "{job.conf.environment}/{job.par.environment}"
envlog: "{envbase}/{log}"
envparfile: "{envbase}/{parfile}"
envappdir: "{envbase}/{appdir}/{comp.name}"
# testcase
tcbase: "{job.conf.archiv}/{job.par.testcase}/{job.par.tctime}"
tclog: "{tcbase}/{log}"
tcresult: "{tcbase}/{result}"
tcparfile: "{tcbase}/{parfile}"
tcdiff: "{tcresult}/{diff}"
tcprediff: "{tcresult}/{prediff}"
tcrundiff: "{tcresult}/{rundiff}"
tcprecond: "{tcresult}/{precond}"
tcpostcond: "{tcresult}/{postcond}"
# testdata
tdbase: "{job.conf.testdata}/{job.par.testcase}"
tdresult: "{tdbase}/{result}"
tdparfile: "{tdbase}/{parfile}"
tdprecond: "{tdresult}/{precond}"
tdpostcond: "{tdresult}/{postcond}"
tdpreexec: "{tdbase}/{preexec}/{comp.name}" # only for dry unit-test
tdpostexec: "{tdbase}/{postexec}/{comp.name}" # only for dry unit-test
# testset
tsbase: "{job.conf.archiv}/{ts}/{job.par.usecase}_{job.par.tstime}"
tslog: "{tsbase}/{log}"
tsparfile: "{tsbase}/{parfile}"
tssum: "{tsbase}/Ergebnis"
# expectation-result rs
xpbase: "{job.conf.expect}/{job.par.branch}"
xpresult: "{xpbase}/{result}"
xpbackup: "{xpbase}/{result}"

362
utils/config_tool.py

@ -1,362 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung
# ---------------------------------------------------------------------------------------------------------
import sys
import basic.constants as B
try:
import basic.program
except ImportError:
print("ImportError: " + str(ImportError.with_traceback()))
pass
import basic.componentHandling
import utils.path_tool
import utils.file_tool
import os.path
import basic.constants as B
import utils.data_const as D
import utils.path_const as P
COMP_FILES = [D.DDL_FILENAME]
CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV]
def getExistgetConfigPath(job, pathnames):
for p in pathnames:
if p[-1:] == ".":
p = p[0:-1]
for format in CONFIG_FORMAT:
pathname = p+"."+format
if os.path.exists(pathname):
return pathname
return None
def getConfigPath(job, modul, name, subname=""):
"""
gets the most specified configuration of different sources
Parameter:
* typ -- (basic, comp, tool)
* name -- the specific class
sources:
* programm <<
* install <<
* environ << basis-conf
* release << basis-conf
* testset << parameter/environ
* testcase << parameter
the parameter-files could be one of these file-types:
* yaml, json, csv
"""
if job is None:
verify = False # job = basic.program.Job.getInstance()
else:
verify = job.getDebugLevel("config_tool")-4
if verify: job.debug(verify, "getConfig " + modul + ", " + name)
#TODO path rejoin, config as const
if modul == P.KEY_TOOL:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
P.VAL_CONFIG, P.KEY_TOOL+"_"+name+"."+format)
if verify: job.debug(verify, "1 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_HOME),
P.VAL_CONFIG, P.KEY_TOOL+"_"+name+"."+format)
if verify: job.debug(verify, "1 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM),
P.VAL_UTIL, P.VAL_CONFIG, name+"."+format)
if verify: job.debug(verify, "2 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_ENV),
job.par.environment, P.VAL_CONFIG, P.KEY_TOOL+"_"+ name+"."+format)
if verify: job.debug(verify, "3 " + pathname)
if os.path.exists(pathname):
return pathname
job.debug(verify, "3x " + pathname)
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
elif modul == P.KEY_COMP:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_HOME),
P.VAL_CONFIG, P.KEY_COMP+"_" + name + "."+format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
basic.componentHandling.getComponentFolder(name), "CONFIG." + format)
if verify: job.debug(verify, "5 " + pathname)
if os.path.exists(pathname):
return pathname
if verify: job.debug(verify, "6 " + pathname)
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
elif modul in COMP_FILES:
# for example DATASTRUCURE or the table
pathnames = []
pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
basic.componentHandling.getComponentFolder(name), modul))
pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
basic.componentHandling.getComponentFolder(subname), modul))
pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM), P.VAL_BASIC, modul))
pathnames.append(os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM), P.VAL_BASIC, subname))
configpath = getExistgetConfigPath(job, pathnames)
if configpath is not None:
return configpath
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
basic.componentHandling.getComponentFolder(name), modul+"."+format)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
basic.componentHandling.getComponentFolder(name), subname+"."+format)
if os.path.exists(pathname):
return pathname
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
elif modul == P.KEY_BASIC:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
P.VAL_CONFIG , name + "."+format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTCASE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
name, D.DFILE_TESTCASE_NAME + "."+format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTSUITE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
name, D.DFILE_TESTSUITE_NAME + "." + format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_CATALOG:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
P.KEY_CATALOG, name + "." + format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
P.KEY_CATALOG, name + "." + format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM),
P.KEY_CATALOG, name + "." + format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
else:
pathname = utils.path_tool.composePath(job, P.P_TCPARFILE)
if verify: job.debug(verify, "7 " + pathname)
if os.path.exists(pathname):
return pathname
pathname = utils.path_tool.composePath(job, P.P_TSPARFILE)
if verify: job.debug(verify, "8 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_RELEASE),
P.VAL_CONFIG, "basis."+format)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_ENV),
P.VAL_CONFIG, "basis."+format)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_HOME),
P.VAL_CONFIG, "basis."+format)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname):
return pathname
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
def getConfValue(attribute, comp):
if attribute == B.ATTR_CONN_DBTYPE:
if not hasAttr(comp.conf[B.SUBJECT_CONN], "dbtype"):
if hasAttr(comp.conf[B.SUBJECT_CONN], "types") and hasAttr(comp.conf[B.SUBJECT_CONN]["types"], "dbtype"):
dbtype = comp.conf[B.SUBJECT_CONN]["types"]["dbtype"]
else:
raise LookupError("dbtype is not set in comp " + comp.name)
else:
dbtype = comp.conf["conn"]["dbtype"]
return ""
return ""
def getAttr(o, name):
if (isinstance(o, dict)):
if (name in o.keys()):
return o[name]
elif (isinstance(o, list)):
pass
elif hasattr(o, name):
return getattr(o, name)
return False
def hasAttr(o, name):
if (isinstance(o, dict)):
if (name in o.keys()):
return True
elif (isinstance(o, list)):
pass
elif hasattr(o, name):
return True
return False
def getConfig(job, modul, name, subname=""):
if job is None:
verify = 24
else:
verify = job.getDebugLevel("config_tool")-4
msg = None
if hasattr(job, "m"): msg = job.m
pathname = getConfigPath(job, modul, name, subname)
confs = {}
job.debug(verify, "getConfig " + pathname)
if len(pathname) < 1:
return confs
doc = utils.file_tool.readFileDict(job, pathname, msg)
if modul == D.DDL_FILENAME:
# in csv the root is the subname
# from the Dict-structure of DDL_FILENAME pick the substructure of the subname
keys = list(doc.keys())
if subname not in keys and len(keys) == 1:
doc0 = doc[keys[0]]
doc = doc0
keys = list(doc.keys())
if subname in keys:
doc0 = doc[subname]
doc = doc0
for i, v in doc.items():
confs[i] = v
return confs
def getAttribute(comp, path, attr, job):
attrList = getAttributeList(comp, path, job)
if attr in attrList:
return attrList[attr]
else:
return ""
def getAttributeList(comp, path, job):
"""
gets a concrete attribute-list for an arteifact-element from the config-attributes from the connection-attributes
https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie
:param comp:
:param path: artifact-type.artifact-name for example: DB.person
:return: list of all attributes for the artifact-element
"""
attrList = {}
a = path.split(".")
artType = a[0]
artName = a[1]
if B.SUBJECT_CONN not in comp.conf:
raise Exception ("Environment is not configured")
if artType in comp.conf[B.SUBJECT_CONN]:
if artName in comp.conf[B.SUBJECT_CONN][artType]:
for attr, val in comp.conf[B.SUBJECT_CONN][artType][artName].items():
if attr not in B.LIST_ATTR[artType]:
continue
attrList[attr] = val
for attr, val in comp.conf[B.SUBJECT_CONN][artType].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
if artType in comp.conf[B.SUBJECT_ARTS]:
if artName in comp.conf[B.SUBJECT_ARTS][artType]:
for attr, val in comp.conf[B.SUBJECT_ARTS][artType][artName].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
for attr, val in comp.conf[B.SUBJECT_ARTS][artType].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
return attrList
def mergeConn(msg, conf, conn):
"""
merges the config-attributes from the connection-attributes
because the connection-attributes has to overwrite the config-attributes if the subject is configured
https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie
:param conf:
:param conn:
:return:
"""
if B.SUBJECT_INST not in conf:
conf[B.SUBJECT_INST] = {}
for a in conn[B.SUBJECT_INST]:
conf[B.SUBJECT_INST][a] = conn[B.SUBJECT_INST][a]
for topic in [B.TOPIC_NODE_DB, B.TOPIC_NODE_CLI, B.TOPIC_NODE_API, B.TOPIC_NODE_FILE]:
if topic not in conf[B.SUBJECT_ARTS]:
continue
if topic == B.TOPIC_NODE_DB:
list = B.LIST_DB_ATTR
if topic == B.TOPIC_NODE_CLI:
list = B.LIST_CLI_ATTR
if topic == B.TOPIC_NODE_API:
list = B.LIST_API_ATTR
if topic == B.TOPIC_NODE_FILE:
list = B.LIST_FILE_ATTR
print(" --- merge-conn " + topic + " " + str(list))
for a in conf[B.SUBJECT_ARTS][topic]:
if topic not in conn:
continue
if a in list:
if a in conn[topic]:
conf[B.SUBJECT_ARTS][topic][a] = conn[topic][a]
else:
for b in conf[B.SUBJECT_ARTS][topic][a]:
print(" --- merge-conn b " + topic + " " + a+" "+b)
if b not in list:
msg.logError("not-topic-attribute in topic-connection: "+topic+", "+b)
continue
if a not in conn[topic]:
continue
if b in conn[topic][a]:
conf[B.SUBJECT_ARTS][topic][a][b] = conn[topic][a][b]
for a in list:
if topic not in conn:
break
if topic not in conn:
continue
if a in conn[topic]:
conf[B.SUBJECT_ARTS][topic][a] = conn[topic][a]
return conf

81
utils/conn_tool.py

@ -1,81 +0,0 @@
# functions about connections to other instances
# -------------------------------------------------------------------
"""
"""
import basic.program
import utils.config_tool
import basic.constants as B
import utils.data_const as D
def getConnection(job, comp, nr):
#job = basic.program.Job.getInstance()
verify = job.getDebugLevel("conn_tool")
conn = {}
if job.conf.confs.get(B.SUBJECT_TOOL).get("connsrc") == D.DFILE_TYPE_YML:
conn = utils.config_tool.getConfig(job, "tool", B.SUBJECT_CONN)
xtypes = None
if ("types" in conn["env"][comp]):
xtypes = conn["env"][comp]["types"]
instnr = "inst" + str(nr)
if conn["env"][comp][instnr]:
if (xtypes is not None):
conn["env"][comp][instnr]["types"] = xtypes
return conn["env"][comp][instnr]
else:
job.m.setFatal("Conn-Tool: Comp not configured " + comp + " " + str(nr))
return None
def getConnections(job, comp):
"""
it reads the connection-attributes for each instances of this component
general attributes are added to the connection-attributes
:param comp:
:return:
"""
#job = basic.program.Job.getInstance()
verify = job.getDebugLevel("conn_tool")
msg = None
if hasattr(comp, "m") and comp.m is not None:
msg = comp.m
elif hasattr(job, "m") and job.m is not None:
msg = job.m
else:
raise Exception("message-object is missing")
msg.debug(verify, "getConnections " + comp)
conn = {}
conns = []
# if a datest-database exists read the connections
conndb = {}
if job.conf.confs.get("db"):
# select
pass
conn = utils.config_tool.getConfig(job, "tool", B.SUBJECT_CONN)
if not comp in conn[B.SUBJECT_ENV]:
job.m.setFatal("Conn-Tool: Comp not configured " + comp)
attr = {}
if B.CONF_NODE_GENERAL in conn[B.SUBJECT_ENV]:
for a in conn[B.SUBJECT_ENV][B.CONF_NODE_GENERAL]:
attr[a] = conn[B.SUBJECT_ENV][B.CONF_NODE_GENERAL]
for a in conn[B.SUBJECT_ENV][comp]:
if "inst" in a and a != B.SUBJECT_INST:
continue
attr[a] = conn["env"][comp][a]
#if ("types" in conn["env"][comp]):
# xtypes = conn["env"][comp]["types"]
for i in range(conn[B.SUBJECT_ENV][comp][B.SUBJECT_INST][B.ATTR_INST_CNT]):
#print("range " + str(i + 1))
instnr = "inst" + str(i + 1)
#if (xtypes is not None):
# conn["env"][comp][instnr]["types"] = xtypes
for a in attr:
if a in conn["env"][comp][instnr]:
continue # dont overwrite an instance-specific value
conn["env"][comp][instnr][a] = attr[a]
conns.append(conn["env"][comp][instnr])
return conns

70
utils/css_tool.py

@ -1,70 +0,0 @@
import basic.program
import basic.constants as B
CSS_CLASS = {
"general": {
"table, td, th": "border: 1px solid grey;font-family:sans-serif"
},
"diffFiles": {
"novalue": "color:grey",
"diffA": "color:green;font-weight:bold;",
"diffB": "color:crimson;font-weight:bold;",
"acceptA": "color:darkblue;",
"acceptB": "color:darkmagenta;"
},
"resultFile": {
"result0": "background-color:white;",
"result1": "background-color:lightgreen;",
"result2": "background-color:yellow;",
"result3": "background-color:tomato;",
"result4": "background-color:tomato;"
}
}
def getInlineStyle(job, filetype, cssclass):
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("css_tool")) - 1
# job.debug(verify, "getDiffHeader ")
if job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "inline":
out = "style=\""+CSS_CLASS[filetype][cssclass]+"\""
else:
out = "class=\"" + cssclass + "\""
return out
def getInternalStyle(job, filetype):
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
out = ""
if job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "internal":
out = "<style>"
for c in CSS_CLASS["general"]:
line = "\n"+c+" { "+CSS_CLASS["general"][c]+"} "
line.replace(":", ": ").replace(";", "; ").replace("_Q_", ", ")
out += line
arr = filetype.split(",")
for a in arr:
for c in CSS_CLASS[a]:
out += "\n."+c+" { "+CSS_CLASS[a][c].replace(":", ": ").replace(";", "; ")+"} "
out += "\n</style>"
elif job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "external":
out = " <link rel=\"stylesheet\" href=\""+job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("file")+"\"> "
else:
out = "<style>"
for c in CSS_CLASS["general"]:
line = "\n "+c+" { "+CSS_CLASS["general"][c]+" } "
line.replace(":", ": ").replace(";", "; ").replace("_Q_", ", ")
out += line
out += " \n </style>"
return out
def getExternalStyle(job, filetype):
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
out = ""
if job.conf.confs.get(B.SUBJECT_TOOL).get("css").get("type") == "external":
arr = filetype.split(",")
for a in arr:
for c in CSS_CLASS[a]:
out += c+" {\n "+CSS_CLASS[a][c].replace(":", ": ").replace(";", ";\n ")+"}\n"
out.replace("\n \n}", "\n}")
return out

176
utils/date_tool.py

@ -1,176 +0,0 @@
# functions related to Date-fields
# -----------------------------------------------------
"""
additionally functions for calculating date with formulas like [DATE+2M] and for comparison of date related on two reference-dates
"""
import datetime
#from dateutil.relativedelta import relativedelta
import re
import utils.data_const as D
F_DIR = "%Y-%m-%d_%H-%M-%S"
F_DB_DATE = "%Y-%m-%d"
F_DB_TIME = "%Y-%m-%d %H:%M:%S"
F_DE = "%d.%m.%Y"
F_N8 = "%Y%m%d"
F_LOG = "%Y%m%d_%H%M%S"
F_DE_TSTAMP = "%d.%m.%Y %H:%M:%S"
MONTH_EN = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]
MONTH_DE = ["jan", "feb", "mar", "apr", "mai", "jun", "jul", "aug", "sep", "okt", "nov", "dez"]
F_TIME_DEFAULT = F_DIR
def getActdate(format):
return getFormatdate(datetime.datetime.now(), format)
def getFormatdate(date, format):
""" it return the date as string in the format """
return date.strftime(format)
def getFormatDatetupel(dtupel, format):
""" it return the date as string in the format """
if format == F_N8:
return f'{dtupel[0]:04}'+f'{dtupel[1]:02}'+f'{dtupel[2]:02}'
return getFormatdate(datetime.datetime(dtupel[0], dtupel[1], dtupel[2],
dtupel[3], dtupel[4], dtupel[5]) ,format)
def formatParsedDate(instring, format):
dtupel = parseDate(instring)
#print ("---------------"+str(dtupel))
return getFormatDatetupel(dtupel, format)
def parseFormula(instring):
"""
the function parses the string as a formula. In the formula the input-date - actdate or explicite date -
will be added resp. subtracted with years, months or dates which are specified in the formula.
The structure of the formula: DATE +/- mY +/-nM +/-qD
:param instring:
:return:
"""
instring = instring.upper()
if instring[2:6] == "DATE":
refdate = datetime.datetime.today()
formula = instring[7:-2].upper()
else:
dstring = instring[2:instring.find(" ")]
res = parseDate(dstring)
refdate = datetime.datetime(res[0], res[1], res[2], res[3], res[4], res[5])
formula = instring[2+len(dstring):-2]
formula = re.sub(r' ', '', formula)
year = refdate.year
mon = refdate.month
day = refdate.day
hour = refdate.hour
min = refdate.minute
sec = refdate.second
if re.match(r"[-+]\d+[JYMDT]", formula):
ress = re.compile(r"([-+])(\d+)([JYMDT])")
for res in ress.finditer(formula):
summand = int(res.group(2))
if res.group(1) == "-":
summand = summand * (-1)
if res.group(3) in "JY":
year = year + summand
if res.group(3) in "M":
mon = mon + summand
while mon <= 0:
mon = mon + 12
year = year - 1
while mon > 12:
mon = mon - 12
year = year + 1
if res.group(3) in "DT":
refdate = datetime.datetime(year, mon, day, hour, min, sec)
refdate = refdate + datetime.timedelta(days=summand)
year = refdate.year
mon = refdate.month
day = refdate.day
hour = refdate.hour
min = refdate.minute
sec = refdate.second
return (year, mon, day, hour, min, sec)
else:
print("re matcht nicht")
return (year, mon, day, hour, min, sec)
def getMonthInt(instring):
i = 0
j = 0
for l in [MONTH_EN, MONTH_DE]:
i = 0
for m in l:
i += 1
if instring.lower() == m:
j = i
break
if j > 0:
break
return j
def parseDate(instring):
"""
the function parses the string as a date or timestamp which is formed in one of the typical formates
:param the string to be parse:
:return timestamp as tupel (y, m, d, H, M ,S):
"""
year = 0
mon = 0
day = 0
hour = 0
min = 0
sec = 0
#print(instring)
if instring[0:2] == "{(" and instring[-2:] == ")}":
return parseFormula(instring)
if re.match(r"\d{8}_\d{6}", instring):
year = int(instring[0:4])
mon = int(instring[4:6])
day = int(instring[6:8])
hour = int(instring[9:11])
min = int(instring[11:13])
sec = int(instring[13:])
return (year, mon, day, hour, min, sec)
if len(instring) > 8:
for d in ["_", " "]:
if d in instring and instring.find(d) > 8:
dstring = instring[0:instring.find(d)]
tstring = instring[instring.find(d)+1:]
dres = parseDate(dstring)
tres = parseDate(tstring)
return (dres[0], dres[1], dres[2], tres[3], tres[4], tres[5])
if re.match(r"\d{4}[-./]\d{2}[-./]\d{2}", instring):
res = re.match(r"(\d{4})[-./](\d{2})[-./](\d{2})", instring)
year = int(res.group(1))
mon = int(res.group(2))
day = int(res.group(3))
return (year, mon, day, hour, min, sec)
if re.match(r"\d{1,2}[-./]\d{1,2}[-./]\d{4}", instring):
res = re.match(r"(\d{1,2})[-./](\d{1,2})[-./](\d{4})", instring)
year = int(res.group(3))
mon = int(res.group(2))
day = int(res.group(1))
return (year, mon, day, hour, min, sec)
if re.match(r"\w{3} \w{3}\s+\d{1,2} \d{1,2}[:]\d{1,2}[:]\d{2} \d{4}", instring.strip()):
res = re.search(r"\w{3} (\w{3})\s+(\d{1,2}) (\d{1,2})[:](\d{1,2})[:](\d{2}) (\d{4})", instring.strip())
month = res.group(1)
mon = getMonthInt(month)
day = int(res.group(2))
hour = int(res.group(3))
min = int(res.group(4))
sec = int(res.group(5))
year = int(res.group(6))
return (year, mon, day, hour, min, sec)
if re.match(r"\d{8}", instring):
year = instring[0:4]
mon = instring[4:6]
day = instring[6:8]
return (year, mon, day, hour, min, sec)
if re.match(r"\d{2}[-:]\d{2}[-:/]\d{2}", instring):
res = re.match(r"(\d{2})[-:/](\d{2})[-:/](\d{2})", instring)
hour = int(res.group(1))
min = int(res.group(2))
sec = int(res.group(3))
return (year, mon, day, hour, min, sec)
return (year, mon, day, hour, min, sec)

39
utils/env_tool.py

@ -1,39 +0,0 @@
import utils.config_tool
import utils.file_tool
import basic.program
def importEnvProperty(job):
#job = basic.program.Job.getInstance()
path = utils.config_tool.getConfig(job, "tool", "env")
props = utils.file_tool.readFileDict(job, path, job.m)
job.conf.confs["env"] = props["prop"]
def exportEnvProperty(job):
# job = basic.program.Job.getInstance()
props = {}
if not hasattr(job, "conf"): return
if not hasattr(job.conf, "confs"): return
if not "env" in job.confconfs: return
props["prop"] = job.conf.confs["env"]
path = utils.config_tool.getConfig(job, "tool", "env")
utils.file_tool.writeFileDict(job.m, job, path, props)
def setEnvProp(job, props):
# job = basic.program.Job.getInstance()
path = utils.config_tool.getConfig(job, "tool", "env")
utils.file_tool.writeFileDict(job.m, job, path, props)
def getEnvProperty(job, propname):
# job = basic.program.Job.getInstance()
if "env" not in job.conf.confs:
importEnvProperty(job)
prop = job.conf.confs[propname]
if (prop["type"] == "succ"):
val = prop["value"]
val += 1
prop["value"] = val
return val

109
utils/file_abstract.py

@ -1,109 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os
import re
import basic.program
import basic.catalog
import utils.config_tool
import basic.constants as B
import basic.toolHandling
import utils.data_const as D
import utils.file_tool
import utils.path_tool
import xml.etree.ElementTree as ET
import basic.catalog
class FileFcts():
"""
this is an abstract class
"""
def __init__(self):
pass
def reset_TData(self, job):
pass
def setComp(self, job, comp=None):
self.job = job
self.comp = comp
def parseText(self, text):
"""
this function parses the text and translates it to dict
:param text:
:return:
"""
def file2dict(self):
pass
# Funktionen
#=== init_testcase ===
def removeFiles(self):
utils.file_tool.removeFiles(self.comp.m, "envpath", "pattern", self.comp.conf["conn"])
def copyFiles(self):
fileList = []
srcpath = ""
envpath = ""
pattern = ""
utils.file_tool.copyFiles(self.job, fileList, srcpath, envpath, pattern)
def readEnvFiles(self, job):
envpath = ""
pattern = ""
fileList = utils.file_tool.getFiles(self.comp.m, job, envpath, pattern, self.comp.conf["conn"])
# === execute_testcase ===
def create_request(self, job, tdata, step):
mapping = ""
schema = ""
archivpath = ""
filename = step.args["filename"]
txt = ""
for o in self.comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_FILE]:
if o["name"] != filename:
continue
mapping = o["mapping"]
schema = o["schema"]
archivpath = os.path.join(utils.path_tool.composePattern(job, "{tcresult}/request", self.comp), filename) # ergebnisse/comp/request )
#txt = self.createDict()
utils.file_tool.writeFileText(self.comp.m, job, archivpath, txt)
def send_request(self, job, step):
archivpath = ""
filename = step.args["filename"]
technique = step.args["technique"]
archivpath = os.path.join(utils.path_tool.composePattern(job, "{tcresult}/request", self.comp), filename)
if technique == "cli":
for o in self.comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_FILE]:
if o["name"] != filename:
continue
envpath = o["envpath"]
envpath = utils.path_tool.composePattern(job, envpath, self.comp)
fct = basic.toolHandling.getCliTool(job, self.comp)
fct.copy(self.job, archivpath, envpath)
elif technique == "api":
txt = utils.file_tool.readFileText(job, archivpath, self.comp.m)
fct = basic.toolHandling.getApiTool(job, self.comp)
response = fct.send(self.job, self.comp, txt)
archivpath = os.path.join(utils.path_tool.composePattern(job, "{tcresult}/response", self.comp), filename)
"""
get_response:
- channel (sync/async)
... implement
- archivpath ( ergebnisse/comp/response )
- envpath ( ./log) / envconn ( = in Request empfangen )
=== collect_testcase ===
- envpath
- pattern
> readfiles
"""

287
utils/file_tool.py

@ -1,287 +0,0 @@
# Funktionen zum Dateizugriff mit Suchen, Lesen, Schreiben
# ------------------------------------------------------------
"""
"""
import codecs
import json
import os
import os.path
import re
import time
import xmltodict
import yaml
import platform
import basic.message
import basic.program
import utils.data_const as D
from pprint import pp
import utils.tdata_tool
import utils.date_tool
def getDump(obj):
result=""
print (str(type(obj)))
result = vars(obj)
return str(result)
# if type(obj) == "__dict__"
def getFiles(msg, job, path, pattern, conn):
"""
search filenames in the directory - if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename or pattern
:param conn:
:return: Array with filenames
"""
if conn is not None:
return getRemoteFiles(msg, path, pattern, conn)
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
out = []
msg.debug(verify, "getFiles " + path + " , " + pattern)
if not os.path.exists(path):
return out
for f in os.listdir(path):
msg.debug(verify, "getFiles " + f)
if re.search(pattern, f):
msg.debug(verify, "match " + f)
out.append(f)
return out
def removeFiles(msg, path, pattern, conn):
"""
search filenames in the directory and removes it
- if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename as Pattern
:param conn:
:return: Array filenames
"""
pass
def copyFiles(job, fileList, source, target, comp):
"""
copies files from source to target
:param job:
:param fileList:
:param source:
:param target:
:param comp:
:return:
"""
pass
def getRemoteFiles(msg, path, pattern, conn):
"""
search filenames in the directory - if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename as Pattern
:param conn:
:return: Array filenames
"""
pass
def getFilesRec(msg, job, path, pattern):
"""
Sucht Dateien im Verzeichnis rekursiv
:param msg: -- msg-Objekt
:param path: -- Pfad - String
:param pattern: -- Dateiname als Pattern
:return: Array mit gefundenen Dateien, absoluter Pfad
"""
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
out = []
msg.debug(verify, "getFilesRec " + path + " , " + pattern)
for (r, dirs, files) in os.walk(path):
for f in files:
msg.debug(verify, "getFilesRec " + f)
if re.search(pattern, f):
msg.debug(verify, "match " + f)
out.append(os.path.join(r, f))
return out
def getTree(msg, job, pfad):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
msg.debug(verify, "getTree " + pfad )
tree = {}
files = []
for f in os.listdir(pfad):
if os.path.isDir(os.path.join(pfad, f)):
tree[f] = getTree(msg, job, os.path.join(pfad, f))
elif os.path.isFile(os.path.join(pfad, f)):
files.append(f)
tree["_files_"] = files
return tree
def mkPaths(job, path, msg):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
#modus = job.conf.confs["paths"]["mode"]
dirname = os.path.dirname(path)
if os.path.exists(dirname):
return
os.makedirs(dirname, exist_ok=True)
def getFileEncoding(msg, job, path):
print("--- getFileEncoding "+path)
encodings = ['utf-8', 'iso-8859-1'] # add more
for e in encodings:
print(e)
try:
fh = codecs.open(path, 'r', encoding=e)
fh.readlines()
fh.seek(0)
except UnicodeDecodeError:
print('got unicode error with %s , trying different encoding' % e)
except:
print("except")
else:
print('opening the file with encoding: %s ' % e)
return e
return detectFileEncode(job, path, msg)
def detectFileEncode(job, path, msg): # return ""
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
print(path)
cntIso = 0
cntUtf = 0
j = 0
CHAR_ISO = [ 196, 228, 214, 246, 220, 252, 191 ]
with open(path, 'rb') as file:
byte = file.read(1)
while (byte):
i = int.from_bytes(byte, "little")
#byte = file.read(1)
if (i in CHAR_ISO):
cntIso += 1
if (i == 160):
pass
elif (i > 127):
cntUtf += 1
j += 1
l = i
byte = file.read(1)
file.close()
if (cntIso > cntUtf):
return 'iso-8859-1'
return 'utf-8'
def readFileLines(job, path, msg):
lines = readFileText(job, path, msg)
if isinstance(lines, (str)):
return lines.splitlines()
return []
def readFileText(job, path, msg):
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
if not os.path.exists(path):
return ""
enc = detectFileEncode(job, path, msg)
with open(path, 'r', encoding=enc) as file:
text = file.read()
file.close()
return text
def getModTime(job, filepath):
out = ""
mtime = os.path.getmtime(filepath)
out = utils.date_tool.formatParsedDate(time.ctime(mtime), utils.date_tool.F_TIME_DEFAULT)
return out
def readFileDict(job, path, msg):
"""
reads and gets general a dict from any kind of filetyp
:param path: with extension of filetype
:param msg: optionally
:return:
"""
# 20220329 generalize
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
doc = {}
if not os.path.exists(path):
return doc
enc = detectFileEncode(job, path, msg)
if D.DFILE_TYPE_YML in path[-4:]:
with open(path, 'r', encoding=enc) as file:
doc = yaml.full_load(file)
file.close()
elif D.DFILE_TYPE_JSON in path[-5:]:
with open(path, 'r', encoding=enc) as file:
doc = json.load(file)
file.close()
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'r', encoding=enc) as file:
res = xmltodict.parse(file.read())
# doc = dict(res)
doc = castOrderedDict(res)
file.close()
elif D.DFILE_TYPE_CSV in path[-5:]:
doc = utils.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF)
return doc
def castOrderedDict(res, job=None, key=""):
if isinstance(res, dict):
doc = dict(res)
for x in doc:
doc[x] = castOrderedDict(doc[x], job, x)
elif isinstance(res, list):
sublist = []
for i in range(0, len(res)):
sublist.append(castOrderedDict(res[i], job, ""))
doc = sublist
else:
doc = res
return doc
def writeFileText(msg, job, path, text, enc="utf-8"):
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
mkPaths(job, path, msg)
with open(path, 'w', encoding=enc) as file:
file.write(text)
file.close()
def writeFileDict(msg, job, path, dict, enc="utf-8"):
#job = basic.program.Job.getInstance()
mkPaths(job, path, msg)
if D.DFILE_TYPE_YML in path[-5:]:
with open(path, 'w', encoding=enc) as file:
yaml.dump(dict, file)
file.close()
elif D.DFILE_TYPE_JSON in path[-5:]:
with open(path, 'w', encoding=enc) as file:
doc = json.dumps(dict, indent=4)
file.write(doc)
file.close()
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'w', encoding=enc) as file:
text = xmltodict.unparse(dict, pretty=True)
if "<?xml version=" not in text:
text = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + text
file.write(text)
file.close()

20
utils/filejson_tool.py

@ -1,20 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.program
import utils.config_tool
import utils.file_abstract
import basic.constants as B
import utils.path_tool
import utils.file_tool
import utils.tdata_tool
class FileFcts(utils.file_abstract.FileFcts):
def __init__(self):
pass

19
utils/filelog_tool.py

@ -1,19 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.program
import utils.config_tool
import utils.file_abstract
import basic.constants as B
import utils.path_tool
import utils.file_tool
import utils.tdata_tool
class FileFcts(utils.file_abstract.FileFcts):
def __init__(self):
pass

24
utils/filexml_tool.py

@ -1,24 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import xmltodict
import basic.program
import utils.config_tool
import utils.file_abstract
import basic.constants as B
import utils.path_tool
import utils.file_tool
import utils.tdata_tool
class FileFcts(utils.file_abstract.FileFcts):
def __init__(self):
pass
def parseFile2Dict(self):
pass

5
utils/flask_tool.py

@ -1,5 +0,0 @@
#!/usr/bin/python
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------

23
utils/gen_const.py

@ -1,23 +0,0 @@
#!/usr/bin/python
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/232-testfallgenerierung
# ---------------------------------------------------------------------------------------------------------
KEY_RANDOM = "random"
KEY_LIST = "list"
KEY_VARNUM = "varnum"
KEY_VARSTR = "varstr"
KEY_VARDAT = "vardat"
KEY_PREFIX_X = "x"
VAL_DELIMITER = ","
VAL_SECTOR = " .. "
VAL_CATALOG = "cat"
CLS_MISFORMAT = "missformat"
CLS_NONE = "none"
CLS_EMPTY = "empty"
CLS_LESS = "less"
CLS_GREATER = "more"
ATTR_MIN_COUNT = "mincount"

127
utils/gen_tool.py

@ -1,127 +0,0 @@
#!/usr/bin/python
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/232-testfallgenerierung
# ---------------------------------------------------------------------------------------------------------
import re
import utils.config_tool
import utils.path_const as P
import basic.constants as B
import basic.program
import utils.gen_const as G
import random
import basic.catalog
VAL_CLASSES: {
"xvarnum": {
G.CLS_MISFORMAT: "str",
G.CLS_NONE: None,
G.CLS_EMPTY: 0,
G.CLS_LESS: True,
G.CLS_GREATER: True,
G.ATTR_MIN_COUNT: 5
},
"xvarnum": {
G.CLS_MISFORMAT: "str,feb",
G.CLS_NONE: None,
G.CLS_EMPTY: 0,
G.CLS_LESS: True,
G.CLS_GREATER: True,
G.ATTR_MIN_COUNT: 6
},
"xvarstr": {
G.CLS_MISFORMAT: "num,sym,koeln",
G.CLS_NONE: None,
G.CLS_EMPTY: 0,
G.CLS_LESS: False,
G.CLS_GREATER: False,
G.ATTR_MIN_COUNT: 7
}
}
def getCntElement(values, job):
if G.VAL_SECTOR in values:
return 2
elif G.VAL_DELIMITER in values:
a = values.split(G.VAL_DELIMITER)
return len(a)
elif G.VAL_CATALOG + ":" in values:
a = [0, 1, 2, 3, 4]
return len(a)
return 1
def getValueList(values, count, job):
out = []
for i in range(0, count):
out.append(values[i % len(values)])
print(str(out))
return out
def getMinCount(formula, values, job):
"""
:param formula:
:param values: definition of value-list
:param job:
:return: count of necessary values
"""
if G.KEY_RANDOM in formula:
return 1
elif formula[0:1] == G.KEY_PREFIX_X:
elems = getCntElement(values, job)
factor = 1
if VAL_CLASSES[formula][G.CLS_LESS]:
factor = factor * 2
if VAL_CLASSES[formula][G.CLS_GREATER]:
factor = factor * 2
return VAL_CLASSES[formula][G.ATTR_MIN_COUNT] + factor * (elems - 1)
elif formula == G.KEY_LIST:
return getCntElement(values, job)
return 1
def getElemList(formula, values, count, job):
"""
:param formula:
:param values:
:param count:
:param job:
:return:
"""
out = []
verbose = False
if verbose: print(values+" , "+str(count))
sector_regex = r"(.*)" + re.escape(G.VAL_SECTOR)+ r"(.*)"
delim_regex = r"(.*)" + re.escape(G.VAL_DELIMITER)+ r"(.*)"
catalog_regex = re.escape(G.VAL_CATALOG)+ r":(.*)"
if re.match(sector_regex, values):
if verbose: print("match 1")
temp = []
res = re.search(sector_regex, values)
start = res.group(1)
target = res.group(2)
if start.isdecimal() and target.isdecimal():
for i in range(int(start), int(target)):
temp.append(str(i))
if target not in temp:
temp.append(target)
if verbose: print(str(start)+" - "+str(target)+" : "+str(temp))
elif re.match(delim_regex, values):
if verbose: print("match 2")
temp = values.split(G.VAL_DELIMITER)
for i in range(0, len(temp)): temp[i] = temp[i].strip()
if verbose: print(str(temp))
elif re.match(catalog_regex, values):
res = re.search(catalog_regex, values)
domain = res.group(1)
catalog = basic.catalog.Catalog.getInstance()
temp = catalog.getKeys(domain, job)
if not isinstance(temp, list):
temp = []
while len(temp) > 0 and len(out) < count:
out += temp
return out[0:count]

96
utils/i18n_tool.py

@ -1,96 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import utils.config_tool
import utils.path_const as P
import basic.constants as B
import basic.program
DEFAULT_LANGUAGE = "en"
EXP_KEY_MISSING = "key is missing {}"
EXP_KEY_DOESNT_EXIST = "key doesnt exist in domain {}"
class I18n:
__instance = None
cache = {}
"""
in this class there should be managed each defined key-value-pairs
the pairs ara loaded from the path testdata/catalog:
* initially the csv-file catalog.csv
* on demand other csv-files in the path
"""
def __init__(self, job):
self.cache = {}
self.cache = utils.config_tool.getConfig(job, P.KEY_TOOL, "i18n")
I18n.__instance = self
pass
@staticmethod
def getInstance(job):
if I18n.__instance == None:
return I18n(job)
return I18n.__instance
def getMessage(self, job, key, args=[]):
print("getMessage "+key+" "+str(args))
out = self.getText(key, job)
out = out.format(args)
return out
def getText(self, key, job):
"""
this function gets the text depending on language which is set in job.conf
:param key: MUST GIVEN WITH (f"{CONST=}", ..
:return:
return self.cache[language][key]
"""
if "language" in job.conf.confs:
language = job.conf.confs["language"]
else:
language = "en"
if language not in self.cache:
raise Exception(EXP_KEY_MISSING, (key))
if "=" in key:
out = self.extractText(key)
key = self.extractKey(key)
if key in self.cache[language]:
out = self.cache[language][key]
elif key in self.cache[DEFAULT_LANGUAGE]:
out = self.cache[DEFAULT_LANGUAGE][key]
return str(out)
def getAliasList(self, key, job):
out = []
out.append(self.extractText(key))
key = self.extractKey(key)
for language in self.cache:
if key not in self.cache[language]:
continue
out.append(self.cache[language][key])
return out
def extractKey(self, key):
if "=" in key:
i = key.find("=")
x = key[0:i]
if "." in x:
i = x.find(".")
y = x[i + 1:]
else:
y = x
key = y
else:
y = key
return key
def extractText(self, key):
if "=" in key:
i = key.find("=")
return key[i + 2:-1]
return ""

97
utils/job_tool.py

@ -1,97 +0,0 @@
# GrundFunktionen zur Ablaufsteuerung
#
# --------------------------------------------------------
"""
1. Programm -- implementiert in Main-Klasse
2. Anwndung -- steuert zu pruefende System [ in basis_Config ]
3. application -- steuert zu pruefende Maschine [ in dir/applicationen ]
4. release -- steuert zu prufendes Release [ aus dir/release kann spez. release_Config geladen werden, dir/lauf/release ]
5. ~Verz -- Dokumentationsverzeichnis zu Testlauf/Testfall/Soll-Branch
6. zyklus -- optional unterscheidet echte und entwicklungsLaeufe
7. Programmspezifische Parameter
8. loglevel -- steuert Protokollierung; default debug (fatal/error/warn/msg/info/debug1/debug2/trace1/trace2)
10. Laufart -- steuert die Verarbeitung; default echt
- echt-auto Lauf aus Automatisierung (1-7)
- test Lauf ohne Ausfuehrungen am Testsystem, wohl aber in Testverzeichnissen
- echt-spez Wiederholung einer spezifischen Funktion (1-13)
- unit Ausfuehrung der Unittests
11. Modul -- schraenkt Verarbeitung auf parametriserte componenten ein
12. Funktion -- schraenkt Verarbeitung auf parametriserte Funktionen ein
13. Tool -- schraenkt Protokollierung/Verarbeitung auf parametriserte Tools ein
"""
import basic.program
import basic.constants as B
import collect_testcase
import compare_testcase
import execute_testcase
import finish_testsuite
import init_testcase
import init_testsuite
import test_executer
import utils.path_tool
import utils.file_tool
import components.utils.job_tool
def hasModul(komp):
#job = Job.getInstance()
return False
def hasFunction(fct):
#job = Job.getInstance()
return False
def hasTool(tool):
#job = Job.getInstance()
return False
def createJob(parentJob, jobargs):
job = basic.program.Job("temp") # meaning temp
job.par.setParameterArgs(job, jobargs)
job.startJob()
return
def startJobProcesses(job):
""" function to open processes like db-connection """
components.utils.job_tool.startJobProcesses(job)
pass
def stopJobProcesses(job):
""" function to close processes like db-connection """
components.utils.job_tool.stopJobProcesses(job)
pass
def startProcess(job, process):
print(str(process))
path = utils.path_tool.getActualJsonPath(job)
print("------- "+path)
utils.file_tool.writeFileDict(job.m, job, path, process)
jobargs = {}
jobargs[B.PAR_APP] = process["app"]
jobargs[B.PAR_ENV] = process["env"]
if B.PAR_STEP in process:
jobargs[B.PAR_STEP] = process[B.PAR_STEP]
if B.PAR_TCDIR in process:
jobargs[B.PAR_TCDIR] = process[B.PAR_TCDIR]
jobargs[B.PAR_TESTCASE] = process["entity"]
elif B.PAR_TSDIR in process:
jobargs[B.PAR_TSDIR] = process[B.PAR_TSDIR]
jobargs[B.PAR_TESTSUITE] = process["entity"]
print("process-programm "+process["program"])
myjob = basic.program.Job(process["program"], jobargs)
myjob.startJob()
if process["program"] == "init_testcase":
init_testcase.startPyJob(myjob)
elif process["program"] == "execute_testcase":
execute_testcase.startPyJob(myjob)
elif process["program"] == "collect_testcase":
collect_testcase.startPyJob(myjob)
elif process["program"] == "compare_testcase":
compare_testcase.startPyJob(myjob)
elif process["program"] == "init_testsuite":
init_testsuite.startPyJob(myjob)
elif process["program"] == "execute_testsuite":
print("execute_testsuite.startPyJob(myjob) not implemented")
elif process["program"] == "collect_testsuite":
print("collect_testsuite.startPyJob(myjob) not implemented")
elif process["program"] == "finish_testsuite":
finish_testsuite.startPyJob(myjob)
elif process["program"] == "test_executer":
test_executer.startPyJob(myjob)

375
utils/map_tool.py

@ -1,375 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os
import re
import basic.program
import basic.catalog
import utils.config_tool
import basic.constants as B
import basic.toolHandling
import utils.data_const as D
import utils.file_tool
import utils.path_tool
import basic.catalog
ACT_ID = "actid"
ID_LIST = "idlist"
MAP_ID = "_id"
MAP_FOR = "_foreach"
MAP_ROW = "_row"
MAP_FCTS = [ MAP_FOR, MAP_ID, MAP_ROW ]
MODUL_NAME = "map_tool"
def mapTdata(job, mapping, tdata, step, comp):
"""
initialize the mapping from testdata into the mapping-structure
:param job:
:param mapping:
:param tdata:
:param step:
:param comp:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
out = {}
path = ""
job.debug(verify, mapping)
args = {}
args[B.DATA_NODE_COMP] = comp
args[B.DATA_NODE_DATA] = tdata
args[B.DATA_NODE_STEPS] = step
args[ACT_ID] = {}
args[ID_LIST] = {}
out = mapElement(job, args, mapping, path, out)
job.debug(verify, ">>>>>>>>>>> \n"+str(out))
return out
def mapElement(job, args, elem, path, out):
"""
recursive mapping with building the dict of the testdata
:param job:
:param args:
:param elem:
:param path:
:param out:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
job.debug(verify, "mapElem "+path+" id: "+str(args[ACT_ID]))
if isinstance(elem, dict):
job.debug(verify, "##### dict ")
nodes = []
attrNodes = []
leafNodes = []
objNodes = []
for k in elem:
if MAP_ID in elem[k] or MAP_FOR in elem[k]:
objNodes.append(k)
elif k[0:1] == '@' or k[0:1] == '#':
attrNodes.append(k)
else:
leafNodes.append(k)
job.debug(verify, "nodes "+str(attrNodes)+" - "+str(leafNodes)+" - "+str(objNodes))
nodes = attrNodes + leafNodes + objNodes
for k in nodes:
# iterating this elem is declared inside of the elem
# like foreach but only one element
job.debug(verify, "# # "+k)
if MAP_ID in elem[k] or MAP_FOR in elem[k]:
job.debug(verify, "# + k in obj : val "+k)
if MAP_ID in elem[k]:
key = elem[k][MAP_ID][0:elem[k][MAP_ID].find("=")]
idlist = getIds(job, args, elem[k][MAP_ID])
if len(idlist) > 1:
uniqueKeys = {}
for x in idlist:
uniqueKeys[x] = x
if len(uniqueKeys) > 1:
raise Exception("bei keyword _id there is only one element allowed "+str(idlist))
else:
idlist = uniqueKeys.keys()
elif MAP_FOR in elem[k]:
key = elem[k][MAP_FOR][0:elem[k][MAP_FOR].find("=")]
idlist = getIds(job, args, elem[k][MAP_FOR])
sublist = []
a = path.split(",")
a.append(k)
npath = ",".join(a)
for id in idlist:
args[ACT_ID][key] = str(id)
if MAP_ROW in elem[k]:
row = getRow(job, args, elem[k][MAP_ROW])
sublist.append(mapElement(job, args, elem[k], npath, {}))
out[k] = sublist
elif k == MAP_ID or k == MAP_FOR or k == MAP_ROW:
job.debug(verify, "# + k in MAP : continue "+k)
continue
else:
job.debug(verify, "# + k in leaf : val "+k)
a = path.split(",")
a.append(k)
npath = ",".join(a)
job.debug(verify, "mapElem - dict "+k)
out[k] = mapElement(job, args, elem[k], npath, {})
elif isinstance(elem, list):
out = []
i = 0
for k in elem:
job.debug(verify, "mapElem - list "+str(k))
a = path.split(",")
a.append(str(i))
npath = ",".join(a)
out.append(mapElement(job, args, elem[i], path, {}))
i += 1
else:
job.debug(verify, "mapElem - leaf " + elem)
if elem[0:1] == "{" and elem[-1:] == "}":
elem = elem[1:-1]
out = toSimpleType(job, getValue(job, args, elem))
return out
def toSimpleType(job, value):
if isinstance(value, (list, tuple)) and len(value) == 1:
return value[0]
return value
def extractIds(job, idval):
ids = []
if isinstance(idval, list) or isinstance(idval, tuple):
a = idval
else:
a = idval.split(",")
for k in a:
if "-" in k:
b = k.split("-")
for i in range(int(b[0].strip()), int(b[1].strip())+1):
ids.append(str(i).strip())
elif isinstance(k, str):
ids.append(k.strip())
else:
ids.append(k)
return ids
def getRow(job, args, fieldkey):
a = fieldkey.split(".")
row = getValue(job, args, fieldkey)
if B.DATA_NODE_ROW not in args: args[B.DATA_NODE_ROW] = {}
a[1] = a[1][0:a[1].find("(")]
args[B.DATA_NODE_ROW][a[1]] = row[0]
return row
def getIds(job, args, fieldkey):
"""
sets the id resp list of ids into args[idlist]
the fieldkey has a formula like id={_source.table.field(condition)}
:param job:
:param args:
:param fieldky:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
job.debug(verify, "match = "+fieldkey)
out = []
idfield = fieldkey
if re.match(r"(.+)\=(.+)", fieldkey):
res = re.search(r"(.+)\=(.+)", fieldkey)
idfield = res.group(1)
fieldkey = res.group(2)
if fieldkey[0:1] == "{" and fieldkey[-1:] == "}":
fieldkey = fieldkey[1:-1]
if "temp" not in args: args["temp"] = {}
i = 0
while "(" in fieldkey and ")" in fieldkey:
innerCond = fieldkey[fieldkey.rfind("(")+1:fieldkey.find(")")]
if "." not in innerCond:
break
innerkey = "temp_"+str(i)
args["temp"][innerkey] = {}
args["temp"][innerkey]["idfield"] = idfield
args["temp"][innerkey]["fieldkey"] = fieldkey
innerlist = getIds(job, args, innerkey+"={"+innerCond+"}")
args[ACT_ID][innerkey] = ",".join(innerlist)
fieldkey = fieldkey.replace(innerCond, innerkey)
idfield = args["temp"][innerkey]["idfield"]
i += 1
if i > 3:
raise Exception("too much searches "+str(args["temp"]))
val = getValue(job, args, fieldkey)
idlist = extractIds(job, val)
args[ID_LIST][idfield] = idlist
job.debug(verify, "idlist " + str(args[ID_LIST]))
return idlist
def getConditions(job, args, fieldkey):
"""
gets a list of condition-value
:param job:
:param args:
:param fieldkey: in formula (c_1, c_2, ..)
:return: [v_1} ..]
"""
verify = job.getDebugLevel(MODUL_NAME)
while fieldkey[0:1] == "(" and fieldkey[-1:] == ")":
fieldkey = fieldkey[1:-1]
while fieldkey[0:1] == "{" and fieldkey[-1:] == "}":
fieldkey = fieldkey[1:-1]
out = []
a = fieldkey.split(",")
if len(a) > 1:
job.m.logError("map-condition should not have more parts - use tupel "+fieldkey)
for k in a:
if "." in k:
raise Exception("Formatfehler in " + fieldkey)
job.debug(verify, "actid " + str(args[ACT_ID]))
idelem = {}
idelem[k] = args[ACT_ID][k]
out.append(args[ACT_ID][k])
return out
def getValue(job, args, fieldkey):
"""
gets the value of the formula like {_source.table.field(condition)}
:param job:
:param args:
:param fieldkey:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
job.debug(verify, "getValue "+fieldkey)
while fieldkey[0:1] == "{" and fieldkey[-1:] == "}":
fieldkey = fieldkey[1:-1]
val = ""
idfield = ""
source = ""
table = ""
field = ""
cond = ""
condIds = []
# special cases of id-management
# args[actid][xid] = actual id with name xid
# args[idlist][xid] = list of all ids with name xid
# a) declaration of the id : id={fielddefinition}
if re.match(r".+\=.+", fieldkey):
job.debug(verify, "getValue 222 " + fieldkey)
raise Exception("getIds sollte an passender Stelle direkt aufgerufen werden "+fieldkey)
return getIds(job, args, fieldkey)
# b) set of defined ids - neither fielddefinition nor a function
#elif "." not in fieldkey and "(" not in fieldkey or re.match(r"\(.+\)", fieldkey):
#print("getValue 226 " + fieldkey)
#raise Exception("getConditions sollte an passender Stelle direkt aufgerufen werden")
#return getConditions(job, args, fieldkey)
# fielddefinition with .-separated parts
b = fieldkey.split(".")
job.debug(verify, "match field "+fieldkey)
if re.match(r"(_.+)\..+\..+\(.+\)", fieldkey):
res = re.match(r"(_.+)\.(.+)\.(.+\(.+\))", fieldkey)
job.debug(verify, "match mit ()")
source = res.group(1)
table = res.group(2)
field = res.group(3)
#cond = res.group(4)
#condIds = getValue(job, args, cond)
elif len(b) == 1:
field = b[0]
elif len(b) == 2:
source = b[0]
field = b[1]
elif len(b) == 3:
source = b[0]
table = b[1]
field = b[2]
if re.match(r".+\(.+\)", field):
res = re.match(r"(.+)\((.+)\)", field)
field = res.group(1)
cond = res.group(2)
condIds = getConditions(job, args, cond)
job.debug(verify, source + " - " + table + " - " + field + " - " + cond + " : " + str(condIds))
if source == B.DATA_NODE_ROW:
if table not in args[B.DATA_NODE_ROW]:
raise Exception("row not initialiazed for table "+table+" "+str(args[B.DATA_NODE_ROW]))
row = args[B.DATA_NODE_ROW][table]
val = row[field]
elif source == B.DATA_NODE_DATA:
job.debug(verify, "data " + b[1])
if len(b) == 3:
job.debug(verify, table + ", " + field + ", " + cond + ", " + str(condIds))
val = toSimpleType(job, getFieldVal(job, args, table, field, condIds))
elif len(b) == 2:
job.debug(verify, table + ", " + field + ", " + cond + ", " + str(condIds))
val = getTdataRow(job, args[B.DATA_NODE_DATA], field, condIds)
elif source == B.DATA_NODE_STEPS:
job.debug(verify, "steps " + table+" - "+ field)
if hasattr(args[B.DATA_NODE_STEPS], field):
val = getattr(args[B.DATA_NODE_STEPS], field)
elif hasattr(args[B.DATA_NODE_STEPS], table):
row = getattr(args[B.DATA_NODE_STEPS], table)
if field in row:
val = row[field]
elif source[1:] == B.DATA_NODE_PAR:
job.debug(verify, "par " + b[1])
if getattr(job.par, b[1]):
val = getattr(job.par, b[1])
elif source == B.DATA_NODE_CATALOG:
job.debug(verify, "catalog " + table+", "+ field)
if len(b) != 3:
job.debug(verify, "catalog-Fehler")
return "Fehler-145"
row = basic.catalog.Catalog.getInstance().getValue(table, args[ACT_ID][cond], job)
if isinstance(row, dict) and field in row:
val = row[field]
elif source[1:] == B.DATA_NODE_COMP:
job.debug(verify, "comp "+table+", "+field)
comp = args[B.DATA_NODE_COMP]
if table == B.DATA_NODE_DDL:
fields = comp.conf[B.DATA_NODE_DDL][field][B.DATA_NODE_HEADER]
val = ",".join(fields)
else:
val = fieldkey
job.debug(verify, "return val "+str(val))
return val
def getFieldVal(job, args, table, field, condIds):
out = []
fields = field.split(",")
for row in getTdataRow(job, args[B.DATA_NODE_DATA], table, condIds):
if len(fields) == 1 and field in row:
out.append( str(row[field]).strip())
else:
tup = tuple()
for f in fields:
if f in row:
t = tuple( str(row[f]).strip() )
tup = tup + t
else:
raise Exception("field is missing in row "+f+", "+str(row))
out.append(tup)
return out
def getTdataRow(job, tdata, table, condIds):
verify = job.getDebugLevel(MODUL_NAME)
out = []
idFields = {}
job.debug(verify, "getTdata "+str(condIds))
for i in range(0, len(condIds)):
idFields[tdata[B.DATA_NODE_TABLES][table][B.DATA_NODE_HEADER][i]] = condIds[i]
job.debug(verify, "idFields "+str(idFields))
for row in tdata[B.DATA_NODE_TABLES][table][B.DATA_NODE_DATA]:
i = 0
for k in idFields:
for j in range(0, len(idFields[k])):
if row[k] == idFields[k][j]:
i += 1
if i == len(idFields):
out.append(row)
return out

118
utils/match_const.py

@ -1,118 +0,0 @@
#!/usr/bin/python
"""
constants for used for api-functions
"""
SIM_BUSINESS = "B"
SIM_TECHNICAL = "T"
SIM_DEFAULT = "BT"
M_FILEPATTERN = "filepattern"
M_SIDE_LONG = "long"
M_SIDE_SHORT = "short"
M_SIDE_A = "A"
M_SIDE_B = "B"
MATCH_SIDE_PREEXPECT = "preexpect"
""" it implies the precondition of the expectation """
MATCH_DICT_PREEXPECT = {
M_SIDE_SHORT: "SV",
M_SIDE_LONG: "Soll-Vorher",
M_FILEPATTERN: "rsprecond"
}
MATCH_SIDE_POSTEXPECT = "postexpect"
""" it implies the postcondition of the expectation - it is the expectation"""
MATCH_DICT_POSTEXPECT = {
M_SIDE_SHORT: "SN",
M_SIDE_LONG: "Soll-Nachher",
M_FILEPATTERN: "rsprecond"
}
MATCH_SIDE_PREACTUAL = "preactual"
""" it implies the precondition of the actual execution """
MATCH_DICT_PREACTUAL = {
M_SIDE_SHORT: "IV",
M_SIDE_LONG: "Ist-Vorher",
M_FILEPATTERN: "rsprecond"
}
MATCH_SIDE_POSTACTUAL = "postactual"
""" it implies the postondition of the actual execution - it is the result """
MATCH_DICT_POSTACTUAL = {
M_SIDE_SHORT: "IN",
M_SIDE_LONG: "Ist-Nachher",
M_FILEPATTERN: "rsprecond"
}
MATCH_SIDE_PRESTEP = "prestep"
""" it implies the postcondition of a preceding step of the actual execution - the preceding step must be configured in the component"""
MATCH_DICT_PRESTEP = {
M_SIDE_SHORT: "VS",
M_SIDE_LONG: "Vorhergehender Schritt (Nachher)",
M_FILEPATTERN: "rsprecond"
}
MATCH_SIDE_TESTCASE = "testexample"
""" it implies the postcondition of an exemplary testcase - the exemplary testcase must be parametrized """
MATCH_DICT_TESTCASE = {
M_SIDE_SHORT: "VT",
M_SIDE_LONG: "Vergleichstestfall (Nachher)",
M_FILEPATTERN: "rsprecond"
}
MATCH_SIDES = [MATCH_SIDE_PREEXPECT, MATCH_SIDE_POSTEXPECT, MATCH_SIDE_PREACTUAL, MATCH_SIDE_POSTACTUAL, MATCH_SIDE_PRESTEP, MATCH_SIDE_TESTCASE]
MATCH_SUCCESS = "success"
""" matches the action between pre- and postcondition of the actual testexecution """
MATCH_PRECOND = "preconditions"
""" matches the preconditions betwenn the required result the the actual testexecution
- just for info if the both executions have the same precondition """
MATCH_POSTCOND = "postconditions"
""" matches the postconditions betwenn the required result the the actual testexecution
- it is the main comparison """
MATCH_PRESTEP = "prestep"
MATCH_TESTEXAMPLE = "testeample"
MATCH_TYPES = [MATCH_PRECOND, MATCH_PRESTEP, MATCH_TESTEXAMPLE, MATCH_SUCCESS, MATCH_POSTCOND]
MATCH = {
MATCH_SIDE_PREEXPECT: MATCH_DICT_PREEXPECT,
MATCH_SIDE_POSTEXPECT: MATCH_DICT_POSTEXPECT,
MATCH_SIDE_PREACTUAL: MATCH_DICT_PREACTUAL,
MATCH_SIDE_POSTACTUAL: MATCH_DICT_POSTACTUAL,
MATCH_SIDE_PRESTEP: MATCH_DICT_PRESTEP,
MATCH_SIDE_TESTCASE: MATCH_DICT_TESTCASE,
MATCH_PRECOND: {
"A": MATCH_SIDE_PREEXPECT,
"B": MATCH_SIDE_PREACTUAL,
"simorder": SIM_BUSINESS + SIM_TECHNICAL,
"mode": "info",
"filename": "01_Vorbedingungen",
"title": "Pruefung Vorbedingung (Soll-Vorher - Ist-Vorher)"
},
MATCH_POSTCOND: {
"A": MATCH_SIDE_POSTEXPECT,
"B": MATCH_SIDE_POSTACTUAL,
"simorder": SIM_BUSINESS + SIM_TECHNICAL,
"mode": "hard",
"filename": "00_Fachabgleich",
"title": "Fachliche Auswertung (Soll-Nachher - Ist-Nachher)"
},
MATCH_SUCCESS: {
"A": MATCH_SIDE_PREACTUAL,
"B": MATCH_SIDE_POSTACTUAL,
"simorder": SIM_TECHNICAL + SIM_BUSINESS,
"mode": "action",
"filename": "04_Ablauf",
"title": "Ablauf-Differenz (Ist-Vorher - Ist-Nachher)"
},
MATCH_PRESTEP: {
"A": MATCH_SIDE_PRESTEP,
"B": MATCH_SIDE_POSTACTUAL,
"simorder": SIM_TECHNICAL + SIM_BUSINESS,
"mode": "action",
"filename": "02_Vorschritt",
"title": "Schritt-Differenz (Vorschritt-Nachher - Ist-Nachher)"
},
MATCH_TESTEXAMPLE: {
"A": MATCH_SIDE_TESTCASE,
"B": MATCH_SIDE_POSTACTUAL,
"simorder": SIM_BUSINESS + SIM_TECHNICAL,
"mode": "action",
"filename": "03_Vergleichstestfall",
"title": "Vergleichstestfall (Vergleich-Soll - Ist-Nachher)"
},
}

551
utils/match_tool.py

@ -1,551 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import json
import utils.css_tool
import utils.report_tool
import basic.program
import basic.constants as B
import utils.match_const as M
import utils.data_const as D
# ------------------------------------------------------------
"""
"""
class Matching():
def __init__(self, job, comp):
self.job = job
self.comp = comp
self.elements = {}
self.matchfiles = {}
self.assignedFiles = {}
self.linksA = {}
self.linksB = {}
self.nomatch = {}
self.matchkeys = {}
self.htmltext = ""
self.sideA = []
self.sideB = []
self.mode = ""
self.matchtype = ""
self.cssClass = "result0"
self.preIds = {}
def setData(self, tdata, match):
"""
selects testdata for actual matching
:param tdata:
:param match: kind of actual match
:return:
"""
sideA = M.MATCH[match][M.M_SIDE_A]
sideB = M.MATCH[match][M.M_SIDE_B]
self.sideA = tdata[sideA]["data"]
self.sideB = tdata[sideB]["data"]
self.matchfiles[M.M_SIDE_A] = tdata[sideA]["path"]
self.matchfiles[M.M_SIDE_B] = tdata[sideB]["path"]
self.matchtype = match
self.mode = M.MATCH[match]["mode"]
self.setDiffHeader()
self.report = utils.report_tool.Report.getInstance(self.job)
self.resetHits()
if match == M.MATCH_PRECOND:
self.preIds = {} # structure db:scheme:table:...
# get ddl
# for _data: for ddl._header: if SIM_TECHNICAL in ddl[h][key]: preIds.append(_data[h])
def resetHits(self):
self.linksA = {}
self.linksB = {}
self.nomatch = {}
self.matchkeys = {}
self.cssClass = "result0"
def setCssClass(self, cssClass):
if cssClass > self.cssClass:
self.cssClass = cssClass
def isHitA(self, key):
return ((key in self.linksA) and (self.linksA[key] != B.SVAL_NULL))
def isHitB(self, key):
return ((key in self.linksB) and (self.linksB[key] != B.SVAL_NULL))
def setHit(self, keyA, keyB):
if (not self.isHitA(keyA)) and (not self.isHitB(keyB)):
if (keyA != B.SVAL_NULL): self.linksA[keyA] = keyB
if (keyB != B.SVAL_NULL): self.linksB[keyB] = keyA
return "OK"
raise Exception("one of the links are set")
def setNohit(self, similarity, keyA, keyB):
""" The similarity must be unique. Only a mismatch will be set. """
if similarity in self.nomatch:
raise Exception("similarity " + similarity + " exists")
if (self.isHitA(keyA) or self.isHitB(keyB)):
return
self.nomatch[similarity] = [keyA, keyB]
def getTableDdl(self, path):
a = path.split(":")
ddl = self.comp.conf[B.DATA_NODE_DDL]
for x in a:
if (len(x) < 2): continue
if (x == B.DATA_NODE_DATA): break
if x in ddl: ddl = ddl[x]
return ddl
def setDiffHeader(matching):
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "getDiffHeader ")
htmltxt = "<!DOCTYPE html>"
htmltxt += "<html><head>"
htmltxt += "<title>" + M.MATCH[matching.matchtype]["title"] + "</title>"
htmltxt += utils.css_tool.getInternalStyle(job, "diffFiles")
htmltxt += "</head>"
htmltxt += "<body>"
htmltxt += "<h1>" + M.MATCH[matching.matchtype]["title"] + "</h1>"
htmltxt += "<h4>" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_A]][M.M_SIDE_LONG] + ": " + matching.matchfiles[
M.M_SIDE_A] + "</h4>"
htmltxt += "<h4>" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_B]][M.M_SIDE_LONG] + ": " + matching.matchfiles[
M.M_SIDE_B] + "</h4><br>"
matching.htmltext = htmltxt
def setDiffFooter(self):
job = self.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4
job.debug(verify, "getDiffFooter ")
htmltext = self.htmltext
htmltext += "</body></html>"
self.htmltext = htmltext
def ignorePreid(matching, key):
if matching.matchtype == M.MATCH_POSTCOND:
key_int = extractKeyI(key)
if matching.linksB[key] != B.SVAL_NULL:
return False
if matching.preIds is None or len(matching.preIds) < 1:
return False
for ids in matching.preIds:
hit = True
for k in ids:
row = matching.sideB[key_int]
if ids[k] == row[k]:
pass
else:
hit = False
if hit:
return True
return False
def matchFiles(matching):
"""
post:
:param matching:
:return:
"""
def matchBestfit(matching, path):
"""
in this strategy the difference-score of all elements of both sides will be calculated.
the best fit will assigned together until nothing is
* the elements can be each kind of object
* the count of elements should not be high
:param matching:
:return:
"""
# initialize all potential links with null
i = 0
if (matching.sideA is not None):
for r in matching.sideA:
k = composeKey("a", i)
matching.setHit(k, B.SVAL_NULL)
i += 1
i = 0
if (matching.sideB is not None):
for r in matching.sideB:
k = composeKey("b", i)
matching.setHit(B.SVAL_NULL, k)
i += 1
ia = 0
ix = 1
# CASE: one side is None
if (matching.sideA is None) or (matching.sideB is None):
return
# CASE: to match
for rA in matching.sideA:
ib = 0
for rB in matching.sideB:
if (matching.isHitA(composeKey("a", ia))):
continue
if (matching.isHitB(composeKey("b", ib))):
ib += 1
continue
similarity = getSimilarity(matching, rA, rB, ix, M.MATCH[matching.matchtype]["simorder"])
if (similarity == "MATCH"):
matching.setHit(composeKey("a", ia), composeKey("b", ib))
continue
else:
matching.setNohit(similarity, composeKey("a", ia), composeKey("b", ib))
ib += 1
ix += 1
ia += 1
def matchRestfit(matching):
""" """
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "matchRestfit ")
for x in sorted(matching.nomatch, reverse=True):
job.debug(verify, "matchRestfit " + x)
pair = matching.nomatch[x]
if (matching.isHitA(pair[0])):
job.debug(verify, "A " + pair[0] + " bereits zugeordnet mit " + matching.linksA[pair[0]])
pass
if (matching.isHitB(pair[1])):
job.debug(verify, "B " + pair[1] + " bereits zugeordnet mit " + matching.linksB[pair[1]])
continue
if (matching.isHitA(pair[0])):
continue
job.debug(verify, "neues Matching " + pair[0] + " " + pair[1])
matching.setHit(pair[0], pair[1])
def composeKey(side, i):
return side.lower() + str(i + 1).zfill(4)
def extractKeyI(key):
return int(key[1:]) - 1
def setMatchkeys(matching, path):
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "getSimilarity " + path)
if len(matching.matchkeys) > 0:
return
if (B.DATA_NODE_DDL in matching.comp.conf):
job.debug(verify, "ddl " + path)
a = path.split(":")
ddl = matching.comp.conf[B.DATA_NODE_DDL]
for x in a:
if (len(x) < 2):
continue
if (x == B.DATA_NODE_DATA):
break
if x in ddl:
ddl = ddl[x]
job.debug(verify, "ddl " + json.dumps(ddl))
keys = {}
for f in ddl:
job.debug(verify, "ddl " + f)
if (D.DDL_KEY in ddl[f]) and (len(ddl[f][D.DDL_KEY]) > 0):
b = ddl[f][D.DDL_KEY].split(":")
if (len(b) != 2):
raise Exception("falsch formatierter Schluessel " + ddl[f][D.DDL_KEY])
if (not b[1].isnumeric()):
raise Exception("falsch formatierter Schluessel " + ddl[f][D.DDL_KEY])
if (b[0] not in M.SIM_DEFAULT):
raise Exception("falsch formatierter Schluessel " + ddl[f][D.DDL_KEY])
k = "k"+b[0]+""+b[1].zfill(2)
job.debug(verify, "ddl " + f)
keys[k] = {"ktyp": b[0], D.DDL_FNAME: ddl[f][D.DDL_FNAME], D.DDL_TYPE: ddl[f][D.DDL_TYPE], "rule": ddl[f][D.DDL_ACCEPTANCE]}
matching.matchkeys = keys
job.debug(verify, "ddl " + json.dumps(keys))
def getSimilarity(matching, rA, rB, i, simorder=M.SIM_DEFAULT):
""" it calculates the similarity between both rows by:
concat each criteria with single-similarity 00..99 and i with 999..000 """
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "getSimilarity ")
mBsim = ""
mTsim = ""
topBsim = ""
topTsim = ""
for k in sorted(matching.matchkeys):
if not D.DDL_FNAME in matching.matchkeys[k]:
job.m.logError("technical Attribut is missing "+k )
continue
if M.SIM_TECHNICAL in k:
if matching.matchkeys[k][D.DDL_FNAME] in rA and matching.matchkeys[k][D.DDL_FNAME] in rB:
mTsim += getStringSimilarity(job, str(rA[matching.matchkeys[k][D.DDL_FNAME]]),
str(rB[matching.matchkeys[k][D.DDL_FNAME]]))
else:
mTsim += "00"
topTsim += "99"
if M.SIM_BUSINESS in k:
if matching.matchkeys[k][D.DDL_FNAME] in rA and matching.matchkeys[k][D.DDL_FNAME] in rB:
mBsim += getStringSimilarity(job, str(rA[matching.matchkeys[k][D.DDL_FNAME]]),
str(rB[matching.matchkeys[k][D.DDL_FNAME]]))
else:
mBsim += "55"
topBsim += "99"
if mBsim == topBsim and mTsim == topTsim:
job.debug(verify, "Treffer ")
return "MATCH"
elif simorder[0:1] == M.SIM_BUSINESS and mBsim == topBsim:
job.debug(verify, "Treffer ")
return "MATCH"
elif simorder[0:1] == M.SIM_TECHNICAL and mTsim == topTsim:
job.debug(verify, "Treffer ")
return "MATCH"
elif simorder[0:1] == M.SIM_TECHNICAL:
return "S"+mTsim+mBsim+str(i).zfill(3)
else: # if simorder[0:1] == M.SIM_BUSINESS:
return "S" + mBsim + mTsim + str(i).zfill(3)
def matchTree(matching):
"""
:param matching:
:return:
"""
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4
job.debug(verify, "..>> start matching " + matching.mode)
matchElement(matching, matching.sideA, matching.sideB, "")
matching.setDiffFooter()
job.debug(verify, "..>> ende matching " + matching.htmltext)
return matching.htmltext
def matchElement(matching, A, B, path):
""" travers through the datatree """
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4
job.debug(verify, "matchElem " + path + " A " + str(type(A)) + " B " + str(type(B)))
if ((A is not None) and (isinstance(A, list))) \
or ((B is not None) and (isinstance(B, list))):
return matchArray(matching, A, B, path)
elif ((A is not None) and (isinstance(A, dict))) \
or ((B is not None) and (isinstance(B, dict))):
return matchDict(matching, A, B, path)
else:
return matching
def getStringSimilarity(job, strA, strB):
#job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "getStringSimilarity " + strA + " ?= " + strB)
if (strA == strB): return "99"
if (strA.strip() == strB.strip()): return "77"
if (strA.lower() == strB.lower()): return "66"
if (strA.strip().lower() == strB.strip().lower()): return "55"
return "00"
def getEvaluation(matching, type, acceptance, sideA, sideB):
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "getEvaluation " + str(sideA) + " ?= " + str(sideB))
match = getStringSimilarity(job, str(sideA), str(sideB))
classA = "novalue"
classB = "novalue"
result = "test"
if match == "99": return ["MATCH", "novalue", "novalue", "novalue", "novalue"]
if acceptance == "ignore": result = "ignore"
if (matching.matchtype == M.MATCH_POSTCOND) and (result == "test"):
result = "hard"
classA = "diffA"
classB = "diffB"
else:
classA = "acceptA"
classB = "acceptB"
return [result, classA, classB]
def matchDict(matching, sideA, sideB, path):
""" travers through the datatree """
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4
job.debug(verify, "matchDict " + path)
if (sideA is not None):
for k in sideA:
job.debug(verify, "matchDict 400 " + k + ".")
if k in ["_match", D.DATA_ATTR_COUNT, D.DATA_ATTR_DATE, B.DATA_NODE_HEADER]:
continue
if (sideB is not None) and (k in sideB):
if (isinstance(sideA[k], dict)): sideA[k]["_match"] = "Y"
if (isinstance(sideB[k], dict)): sideB[k]["_match"] = "Y"
job.debug(verify, "matchDict 404 " + k + "." + path)
matchElement(matching, sideA[k], sideB[k], path + ":" + k)
else:
if (isinstance(sideA[k], dict)): sideA[k]["_match"] = "N"
job.debug(verify, "matchDict 408 " + path)
matchElement(matching, sideA[k], None, path + ":" + k)
if (sideB is not None):
for k in sideB:
job.debug(verify, "matchDict 412 " + k + ".")
if k in ["_match", D.DATA_ATTR_COUNT, D.DATA_ATTR_DATE, B.DATA_NODE_HEADER]:
continue
if (sideA is not None) and (k in sideA):
continue
elif (sideA is None) or (k not in sideA):
if (sideA is not None) and (isinstance(sideA[k], dict)): sideB[k]["_match"] = "N"
job.debug(verify, "matchDict 418 " + k + "." + path)
matchElement(matching, None, sideB[k], path + ":" + k)
job.debug(verify, "matchDict 420 ...<<---")
return matching
def matchArray(matching, sideA, sideB, path):
""" matches the datarows of the datatree """
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4
job.debug(verify, "matchArray " + path + "\n.." + matching.htmltext)
matching.sideA = sideA
matching.sideB = sideB
setMatchkeys(matching, path)
matchBestfit(matching, path)
matchRestfit(matching)
a = path.split(":")
for x in a:
if (x == "_data"): break
table = x
report = utils.report_tool.Report.getInstance(job)
report.setPaths(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype,
matching.matchfiles[M.M_SIDE_A], matching.matchfiles[M.M_SIDE_B])
# report.setMatchResult("TC0001", "comp01", "arte01", m, "result" + str(i), "<table>" + str(i) + "</table>")
htmltext = report.getTitle(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype)
htmltext += report.getComponentHead(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype)
htmltext += report.getArtefactBlock(getattr(job.par, B.PAR_TESTCASE), matching.comp.name, table, matching.matchtype)
htmltext += compareRows(matching, path)
matching.htmltext += htmltext
def compareRows(matching, path):
""" traverse through matched rows """
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 1
ddl = matching.getTableDdl(path)
report = utils.report_tool.Report.getInstance(job)
table = ""
header = []
# "<p>Tabelle : "+table+"</p>"
htmltext = "<table><tr><th></th>"
for f in ddl[B.DATA_NODE_HEADER]:
job.debug(verify, "ddl " + f + " ")
header.append({D.DDL_FNAME: f, D.DDL_TYPE: ddl[f][D.DDL_TYPE], D.DDL_ACCEPTANCE: ddl[f][D.DDL_ACCEPTANCE]})
htmltext += "<th>" + f + "</th>"
htmltext += "</tr>"
matching.difftext = htmltext
for k in sorted(matching.linksA):
print(k)
if (matching.isHitA(k)):
htmltext += compareRow(matching, header, matching.sideA[int(extractKeyI(k))],
matching.sideB[int(extractKeyI(matching.linksA[k]))])
else:
htmltext += markRow(matching, header, matching.sideA[int(extractKeyI(k))], M.M_SIDE_A)
matching.setCssClass("result3")
for k in sorted(matching.linksB):
if ignorePreid(matching, k):
continue
if (not matching.isHitB(k)):
htmltext += markRow(matching, header, matching.sideB[int(extractKeyI(k))], M.M_SIDE_B)
matching.setCssClass("result2")
htmltext += "</table>"
matching.difftext += "</table>"
return htmltext
def markRow(matching, header, row, side):
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4
text = ""
cssClass = ""
for f in header:
if not f[D.DDL_FNAME] in row:
val = ""
cssClass = "novalue"
elif side == M.M_SIDE_A:
res = getEvaluation(matching, f[D.DDL_TYPE], f[D.DDL_ACCEPTANCE], row[f[D.DDL_FNAME]], "")
val = str(row[f[D.DDL_FNAME]])
cssClass = res[1]
else:
res = getEvaluation(matching, f[D.DDL_TYPE], f[D.DDL_ACCEPTANCE], "", row[f[D.DDL_FNAME]])
val = str(row[f[D.DDL_FNAME]])
cssClass = res[2]
text += "<td " + utils.css_tool.getInlineStyle(job, "diffFiles", cssClass) + ">" + val + "</td>"
text = "<tr><td " + utils.css_tool.getInlineStyle(job, "diffFiles", cssClass) + ">" \
+ M.MATCH[M.MATCH[matching.matchtype][side]]["short"] + "</td>" + text + "</tr>"
matching.difftext += text
return text
def compareRow(matching, header, rA, rB):
""" traverse through matched rows """
job = matching.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4
allident = True
textA = ""
textB = ""
text = ""
valA = ""
valB = ""
classA = "novalue"
classB = "novalue"
for f in header:
print(f)
if f[D.DDL_FNAME] in rA and f[D.DDL_FNAME] in rB:
res = getEvaluation(matching, f[D.DDL_TYPE], f[D.DDL_ACCEPTANCE], rA[f[D.DDL_FNAME]], rB[f[D.DDL_FNAME]])
match = res[0]
classA = res[1]
classB = res[2]
valA = str(rA[f[D.DDL_FNAME]])
valB = str(rB[f[D.DDL_FNAME]])
elif f[D.DDL_FNAME] in rA:
valA = str(rA[f[D.DDL_FNAME]])
match = "ddl"
classA = "acceptA"
classB = "acceptB"
elif f[D.DDL_FNAME] in rB:
valB = str(rB[f[D.DDL_FNAME]])
match = "ddl"
classA = "acceptA"
classB = "acceptB"
else:
match = "MATCH"
classA = "acceptA"
classB = "acceptB"
if (match == "MATCH"):
textA += "<td>" + valA + "</td>"
textB += "<td>" + valB + "</td>"
matching.setCssClass("result1")
elif (match == "hard"):
allident = False
textA += "<td " + utils.css_tool.getInlineStyle(job, "diffFiles", classA) + ">" + valA + "</td>"
textB += "<td " + utils.css_tool.getInlineStyle(job, "diffFiles", classB) + ">" + valB + "</td>"
matching.setCssClass("result3")
else:
allident = False
textA += "<td " + utils.css_tool.getInlineStyle(job, "diffFiles", classA) + ">" + valA + " (" + match + ")</td>"
textB += "<td " + utils.css_tool.getInlineStyle(job, "diffFiles", classB) + ">" + valB + " (" + match + ")</td>"
matching.setCssClass("result1")
if allident:
return "<tr><td/>" + textA + "</tr>"
text = "<tr><td>" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_A]]["short"] + "</td>" + textA + "</tr>"
text += "<tr><td>" + M.MATCH[M.MATCH[matching.matchtype][M.M_SIDE_B]]["short"] + "</td>" + textB + "</tr>"
matching.difftext += text
return text
# --------------------------------------------------------------------------
def matchLines(matching):
pass

115
utils/path_const.py

@ -1,115 +0,0 @@
import basic.constants as B
# -------------------------------------------------------------
# values and keywords
KEY_PRECOND = "precond"
KEY_POSTCOND = "postcond"
KEY_RESULT = "result"
KEY_ORIGIN = "origin"
KEY_PARTS = "parts"
KEY_SUMFILE = "sumfile"
KEY_BACKUP = "backup"
KEY_REFFILE = "reffile"
KEY_TESTCASE = "tc"
KEY_TESTSUITE = "ts"
KEY_CATALOG = "catalog"
KEY_DEBUGNAME = "debugname"
KEY_LOGNAME = "logname"
KEY_BASIC = "basic"
""" keyword for basic config in components """
KEY_COMP = "comp"
""" keyword for individual component """
KEY_TOOL = "tool"
""" keyword for technical tools """
VAL_UTIL = "utils"
""" subdir for any technical tools """
VAL_CONFIG = "config"
""" subdir for any place of config-files """
VAL_COMPS = "components"
""" subdir for the plugin components """
VAL_BASIC = "basic"
""" subdir for the basic job-framework """
VAL_BASE_DATA = "data"
""" subdir for the basis data-folder """
VAL_TDATA = "testdata"
""" subdir for the basis data-folder """
# -------------------------------------------------------------
# parameter with arguments
PAR_APP = "job.par." + B.PAR_APP
PAR_ENV = "job.par." + B.PAR_ENV
PAR_REL = "job.par." + B.PAR_REL
PAR_TSDIR = "job.par." + B.PAR_TSDIR
PAR_TCDIR = "job.par." + B.PAR_TCDIR
PAR_XPDIR = "job.par." + B.PAR_XPDIR
PAR_TDTYP = "job.par." + B.PAR_TDTYP
PAR_TDSRC = "job.par." + B.PAR_TDSRC
PAR_TDNAME = "job.par." + B.PAR_TDNAME
PAR_LOG = "job.par." + B.PAR_LOG
PAR_MODUS = "job.par." + B.PAR_MODUS
PAR_COMP = "job.par." + B.PAR_COMP
PAR_FCT = "job.par." + B.PAR_FCT
PAR_TOOL = "job.par." + B.PAR_TOOL
PAR_STEP = "job.par." + B.PAR_STEP
PAR_DESCRIPT = "job.par." + B.PAR_DESCRIPT
PAR_TESTCASE = "job.par." + B.PAR_TESTCASE
PAR_TESTCASES = "job.par." + B.PAR_TESTCASES
PAR_TESTSUITE = "job.par." + B.PAR_TESTSUITE
PAR_TCTIME = "job.par." + B.PAR_TCTIME
PAR_TSTIME = "job.par." + B.PAR_TSTIME
PAR_TESTINSTANCES = "job.par." + B.PAR_TESTINSTANCES
# -------------------------------------------------------------
# attributes
ATTR_PATH_MODE = "mode"
""" This constant defines the home-folder in filesystem of test """
ATTR_PATH_HOME = "home"
""" This constant defines the home-folder in testing-filesystem """
ATTR_PATH_DEBUG = "debugs"
""" This constant defines the debug-folder in testing-filesystem """
ATTR_PATH_ARCHIV = "archiv"
""" This constant defines the folder in testing-filesystem for results and log of execution """
ATTR_PATH_PROGRAM = "program"
""" This constant defines the program-folder in the workspace """
ATTR_PATH_COMPONENTS = "components"
""" This constant defines the program-folder in the workspace """
ATTR_PATH_ENV = "environment"
""" This constant defines the folder in testing-filesystem, used for configs related to environments """
ATTR_PATH_RELEASE = "release"
""" This constant defines the folder in testing-filesystem, used for configs related to release """
ATTR_PATH_TDATA = "testdata"
""" This constant defines the folder in testing-filesystem with the testcase-specifications """
ATTR_PATH_TEMP = "temp"
""" This constant defines the debug-folder in testing-filesystem """
ATTR_PATH_PATTN = "pattern"
""" This constant defines the debug-folder in testing-filesystem """
# -------------------------------------------------------------
# structure - nodes
P_DEBUGS = "debugs"
P_ENVBASE = "envbase"
P_ENVLOG = "envlog"
P_ENVPARFILE = "envparfile"
P_TCBASE = "tcbase"
P_TCLOG = "tclog"
P_TCRESULT = "tcresult"
P_TCPARFILE = "tcparfile"
P_TCDIFF = "tcdiff"
P_TCPREDIFF = "tcprediff"
P_TCRUNDIFF = "tcrundiff"
P_TCPRECOND = "tcprecond"
P_TCPOSTCOND = "tcpostcond"
P_TSBASE = "tsbase"
P_TSLOG = "tslog"
P_TSPARFILE = "tsparfile"
P_TSSUM = "tssum"
P_XPBASE = "xpbase"
P_XPRESULT = "xpresult"
P_XPBACKUP = "xpbackup"
# -------------------------------------------------------------
# exception texts
EXP_COMP_MISSING = "Component is missing for {}"
""" excetion for the case that a specific component doesnt exist, 1 parameter (context) """
EXP_CONFIG_MISSING = "Configuration is missing for {}"
""" excetion for the case that a specific configuration is missing, 1 parameter (context) """

292
utils/report_tool.py

@ -1,292 +0,0 @@
# functions in order to report the summaries
# -------------------------------------------------------
"""
the reporting-system in bottom-up
(0) raw-data
(0.1) artificats created from test-system - raw-files up to work-files -
(0.2) in logs all contacts to test-system - raw-info -
(1) comparison-result
(1.1) comparison-result as difference for each file-comparison in html-files in testcases - diff-files -
(1.2) extraction of diff-files (only not accepted differences) as html-file in test-set - result-report -
(2) result-code
(2.1) result-code of each test-step for each component in testcases - parameter-file -
(2.2) transfer of the summary result-code of each testcase to an extern reporting-system - protocol -
(2.3) visualization of final result-code of each component and each testcase in test-set - result-report -
(2.4) visualization of statistical result-codes of each component and each test-set in test-context - result-report -
| testcase-artefact | testcase-report | testsuite-report
title | comparison | testcase | testsuite
table0 | | each component | each testcase x each component
h1 | | | testcase
h2 | | component | component
h3 | -- | main-comparison | main-comparison
h4 | | other comparison | other comparison
"""
import os
import re
import basic.program
import utils.match_const as M
import basic.constants as B
import utils.css_tool
import utils.path_tool
REP_TITLE = "Ergebnisbericht"
REP_TC = "Testfall"
REP_TS = "Testsuite"
REP_COMP = "Komponente"
REP_ART = "Artefakt"
TOOL_NAME = "report_tool"
class Report:
__instance = None
__instances = {}
@staticmethod
def getInstance(job):
if (job.jobid in Report.__instances):
return Report.__instances[job.jobid]
else:
return Report(job)
def __init__(self, job):
"""
:param report: matchtype
:param comp: optional on matching
structure:
report 1:k
testcase 1:l # only in testsuite-report
components 1:m
artefacts 1:n
matches
the html-side
title : subject of result for artefact/testcase/testsuite
overview : table with links, only for testcase/testsuite
testcase : h1 with anker, only testcase/testsuite
component: h2 with anker
artefact : h3
matches : with links
paths : p with links to row-results
matching : table complete match for artefact-result resp. only diff for other matches
"""
#job = basic.program.Job.getInstance()
self.job = job
self.report = {}
Report.__instances[job.jobid] = self
self.report["testcases"] = []
self.testcase = ""
self.component = ""
self.artefact = ""
self.matchtype = ""
#return self
def setPaths(self, testcase, component, artefact, matchtype, pathA, pathB):
if not testcase in self.report:
self.report[testcase] = {}
self.report["testcases"].append(testcase)
if not component in self.report[testcase]:
self.report[testcase][component] = {}
if not artefact in self.report[testcase][component]:
self.report[testcase][component][artefact] = {}
self.report[testcase][component][artefact][matchtype] = {}
self.report[testcase][component][artefact][matchtype]["pathA"] = pathA
self.report[testcase][component][artefact][matchtype]["pathB"] = pathB
self.testcase = testcase
self.component = component
self.artefact = artefact
self.matchtype = matchtype
def setMatchResult(self, testcase, component, artefact, matchtype, cssClass, diffTable):
self.report[testcase][component][artefact][matchtype]["css"] = cssClass
if matchtype == M.MATCH_POSTCOND:
self.report[testcase][component][artefact][matchtype]["diff"] = diffTable
def getTitle(self, testcase="", component="", artefact="", matchtype=""):
job = self.job # basic.program.Job.getInstance()
if len(matchtype) > 1:
html = "<title>"+M.MATCH[matchtype]["title"]+"</title></head><body><h1>"+M.MATCH[matchtype]["title"]+"</h1>"
elif len(testcase) > 1:
html = "<title>"+REP_TITLE+" "+REP_TC+" "+testcase+"</title></head>"
html += "<body><h1>"+REP_TITLE+" "+REP_TC+" "+testcase+"</h1>"
elif hasattr(job.par, B.PAR_TESTSUITE):
html = "<title>" + REP_TITLE + " " + REP_TS + " " + getattr(job.par, B.PAR_TESTSUITE) + "</title>"
html += "</head><body><h1>" + REP_TITLE + " " + REP_TS + " " + getattr(job.par, B.PAR_TESTSUITE) + "</h1>"
elif hasattr(job.par, B.PAR_TESTCASE):
html = "<title>" + REP_TITLE + " " + REP_TC + " " + getattr(job.par, B.PAR_TESTCASE) + "</title></head>"
html += "<body><h1>" + REP_TITLE + " " + REP_TC + " " + getattr(job.par, B.PAR_TESTCASE) + "</h1>"
if hasattr(job.par, B.PAR_DESCRIPT):
html += "<p>"+getattr(job.par, B.PAR_DESCRIPT)+"</p>"
return html
def getCssClass(self, testcase="", component="", artefact="", match=""):
"""
the function calculates the maximum cssClass of the granularity
the cssClass means the fault-class of the matching
:param testcase:
:param component:
:param artefact:
:return:
"""
cssClass = "result0"
if len(match) > 0 and len(artefact) > 0:
if match in self.report[testcase][component][artefact]:
if "css" in self.report[testcase][component][artefact][match]:
if cssClass < self.report[testcase][component][artefact][match]["css"]:
cssClass = self.report[testcase][component][artefact][match]["css"]
return cssClass
elif len(match) > 0:
for a in self.report[testcase][component]:
val = self.getCssClass(testcase, component, a, match)
if cssClass < val:
cssClass = val
return cssClass
elif len(artefact) > 0:
for match in self.report[testcase][component][artefact]:
if "css" in self.report[testcase][component][artefact][match]:
if cssClass < self.report[testcase][component][artefact][match]["css"]:
cssClass = self.report[testcase][component][artefact][match]["css"]
return cssClass
elif len(component) > 0:
for a in self.report[testcase][component]:
val = self.getCssClass(testcase, component, a)
if cssClass < val:
cssClass = val
return cssClass
elif len(testcase) > 0:
for c in self.report[testcase]:
val = self.getCssClass(testcase, c, "")
if cssClass < val:
cssClass = val
return cssClass
return cssClass
def getHeader(self):
job = self.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel(TOOL_NAME))-1
htmltxt = "<!DOCTYPE html>"
htmltxt += "<html><head>"
htmltxt += utils.css_tool.getInternalStyle(job, "diffFiles")
return htmltxt
def getOverviewHead(self, testcase=""):
if len(testcase) < 1:
return ""
htmlHead = "<table><tr><th>"+REP_TC+"</th>"
for c in self.report[testcase]:
if c in ["overview", "block"]:
continue
htmlHead += "<th>" + c + "</th>"
return htmlHead
def getOverview(self, testcase=""):
if len(testcase) < 1:
return ""
htmlHead = self.getOverviewHead(testcase)
htmlBody = "<tr class=\"tc-overview\"><td><a href=\"#"+testcase+"_testcase\">"+testcase+"</td>"
for c in self.report[testcase]:
cssClass = self.getCssClass(testcase, c, "")
htmlBody += "<td "+utils.css_tool.getInlineStyle("resultFile", cssClass)+"><a href=\"#"+testcase+"_"+c+"\">"+testcase+"</td>"
return htmlHead+"</tr>"+htmlBody+"</tr></table>"
def getTestcaseHead(self, testcase="", component="", artefact="", matchtype=""):
if len(testcase) < 1:
return ""
cssClass = self.getCssClass(testcase, "", "")
html = "<h1><a id=\""+testcase+"_testcase\">"+REP_TC+" "+testcase+"</h1>"
return html
def getComponentHead(self, testcase="", component="", artefact="", matchtype=""):
job = self.job # basic.program.Job.getInstance()
html = "<h2 id=\""+testcase+"_"+component+"\">"+REP_COMP+" "+component+"</h2>"
return html
def getArtefactBlock(self, testcase, component, artefact, matchtype=""):
job = self.job # basic.program.Job.getInstance()
html = "<h3>"+REP_ART+" "+artefact+"</h3><p>"
for match in self.report[testcase][component][artefact]:
cssClass = self.getCssClass(testcase, component, artefact, match)
path = self.getFilepath(job, testcase, component, artefact, match)
path = path.replace(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_ARCHIV], os.path.join("..", ".."))
html += " <a href=\""+path+"\" "+utils.css_tool.getInlineStyle(job, "resultFile", cssClass)+">"+M.MATCH[match]["filename"]+"<a> "
html += "</p>"
if len(matchtype) < 1:
matchtype = M.MATCH_POSTCOND
html += "<p>\n"
for side in ["A", "B"]:
path = self.report[testcase][component][artefact][matchtype]["path"+side]
href = path.replace(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_ARCHIV], os.path.join("..", ".."))
html += M.MATCH[M.MATCH[matchtype][side]]["long"]+" "
html += "<a href=\""+href+"\">"+path+"<a>\n"
if side == "A": html += "<br>"
html += "</p>"
return html
def getFilepath(self, job, testcase, component, artefact, matchtype):
cm = basic.componentHandling.ComponentManager.getInstance(self.job, "init")
comp = cm.getComponent(component)
path = os.path.join(utils.path_tool.composePattern(self.job, "{tcresult}", comp), artefact+"_"+M.MATCH[matchtype]["filename"]+".html")
return path
def getComparisonBlock(self, testcase, component, artefact, matchtype):
html = self.report[testcase][component][artefact][matchtype]["diff"]
return html
def reportTestcase(self, testcase):
job = self.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel(TOOL_NAME)) - 1
html = self.getHeader()
html += self.getTitle(testcase)
html += self.getOverview(testcase)
html += "<div class=\"tc-block\">"
html += self.getTestcaseHead(testcase)
for component in self.report[testcase]:
html += self.getComponentHead(testcase, component)
for artefact in self.report[testcase][component]:
html += self.getArtefactBlock(testcase, component, artefact, M.MATCH_POSTCOND)
html += self.getComparisonBlock(testcase, component, artefact, M.MATCH_POSTCOND)
html += "</div><-- class=\"tc-block\" -->"
html += "</body></html>"
return html
def extractTestcase(self, testcase, html):
job = self.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel(TOOL_NAME)) - 1
if not testcase in self.report:
self.report = {}
overview = re.findall("<tr class.+tc-overview.*</tr>", html)
if len(overview) == 1:
self.report[testcase]["overview"] = overview[0]
startP = html.index("<div class=\"tc-block\"")
endP = html.index("</div>")
block = html[startP:endP]
if len(block) > 1:
self.report[testcase]["block"] = block
def reportTestsuite(self):
job = self.job # basic.program.Job.getInstance()
verify = int(job.getDebugLevel(TOOL_NAME)) - 1
testinstances = getattr(job.par, B.PAR_TESTCASES)
html = self.getHeader()
html += self.getTitle()
i = 0
for testcase in testinstances:
if i == 0: html += self.getOverviewHead(testcase)
html += self.report[testcase]["overview"]
i += 1
html += "</table>"
for testcase in testinstances:
html += self.report[testcase]["block"]
html += "</body></html>"
return html
def report_testsuite(job, tcpath):
"""
creates header
:param tcpath:
:return: html-code with result-codes of each component
"""
# job = basic.program.Job.getInstance()
verify = -0+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "writeDataTable " + str(tcpath))

451
utils/tdata_tool.py

@ -1,451 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os.path
import inspect
import basic.program
import utils.config_tool
import utils.file_tool
import basic.constants as B
import utils.data_const as D
import utils.path_const as P
import utils.path_tool
import utils.date_tool
import basic.step
import utils.i18n_tool
import re
TOOL_NAME = "tdata_tool"
list_blocks = {} # lists of aliases
def getTestdata(job=None):
"""
get the testdata from one of the possible sources
for the testcase resp testsuite of the job
:return:
"""
if "testcase" in job.program:
return collectTestdata(B.PAR_TESTCASE, getattr(job.par, B.PAR_TESTCASE), job)
else:
return collectTestdata(B.PAR_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), job)
def collectTestdata(gran, testentity, job):
"""
collects the testdata from kind of the possible sources
for the testcase resp testsuite
:return:
"""
setBlockLists(job)
if gran == B.PAR_TESTCASE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(job, P.KEY_TESTCASE, getattr(job.par, B.PAR_TESTCASE), "")
if gran == B.PAR_TESTSUITE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(job, P.KEY_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), "")
if pathname[-3:] == D.DFILE_TYPE_CSV:
tdata = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA)
else:
tdata = utils.file_tool.readFileDict(job, pathname, job.m)
# get explicit specdata of includes
if D.CSV_BLOCK_IMPORT in tdata:
for pathname in tdata[D.CSV_BLOCK_IMPORT]:
pathname = utils.path_tool.rejoinPath(pathname)
if job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA] not in pathname:
pathname = utils.path_tool.rejoinPath(basispath, pathname)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA)
else:
data = utils.file_tool.readFileDict(job, pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
# get implicit specdata of spec-library
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
files = utils.file_tool.getFiles(job.m, job, basispath, prefix, None)
if len(files) < 0:
continue
for f in files:
if f in tdata[D.CSV_BLOCK_TABLES]:
continue
pathname = utils.path_tool.rejoinPath(basispath, f)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA)
else:
data = utils.file_tool.readFileDict(job, pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
# fill the options into job-parameter
for p in tdata[D.CSV_BLOCK_OPTION]:
setattr(job.par, p, tdata[D.CSV_BLOCK_OPTION][p])
return tdata
def setBlockLists(job):
for block in D.LIST_BLOCK_CONST + D.LIST_ATTR_CONST + D.LIST_DFNAME_CONST:
list = utils.i18n_tool.I18n.getInstance(job).getAliasList(block+"='"+eval("D."+block)+"'", job)
#list.append(eval("D."+block))
list_blocks[eval("D." + block)] = []
for x in list:
list_blocks[eval("D." + block)].append(x.lower())
def readCsv(msg, job, filename, comp, aliasNode=""):
lines = utils.file_tool.readFileLines(job, filename, msg)
print("readCsv "+filename)
return parseCsv(msg, job, filename, lines, comp, aliasNode)
def parseCsv(msg, job, filename, lines, comp, aliasNode=""):
if len(list_blocks) < 1:
setBlockLists(job)
tdata = {}
if len(aliasNode) < 1:
print(str(list_blocks))
aliasNode = extractAliasNode(filename, comp, job)
if len(aliasNode) > 3:
tdata[D.DATA_ATTR_ALIAS] = aliasNode
return parseCsvSpec(msg, lines, D.CSV_SPECTYPE_DATA, tdata, job)
def extractAliasNode(filename, comp, job):
basename = os.path.basename(filename)[0:-4]
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
if basename.find(prefix) == 0:
basename = basename[len(prefix):]
if comp is None:
return ""
if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basename in comp.conf[B.DATA_NODE_DDL]:
return B.DATA_NODE_TABLES+":"+basename
return ""
def getCsvSpec(msg, job, filename, ttype):
"""
reads the specification from a csv-file and maps it into the internal data-structure
:param msg:
:param filename:
:param type:
:param job:
:return:
"""
#if job is None:
# job = basic.program.Job.getInstance()
lines = utils.file_tool.readFileLines(job, filename, msg)
tdata = {} # the result
return parseCsvSpec(msg, lines, ttype, tdata, job)
def parseCsvSpec(msg, lines, ttype, tdata, job):
"""
:param msg:
:param lines:
:param type:
:param job:
:return:
"""
if len(list_blocks) < 1:
setBlockLists(job)
status = "start"
verbose = False
tableAttr = {} # table
tableDict = {} # table
for l in lines:
if verbose: print("lines "+l)
fields = splitFields(l, D.CSV_DELIMITER, job)
# check empty line, comment
if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1):
status = "start"
continue
if (fields[0][0:1] == "#"):
continue
a = fields[0].lower().split(":")
# keywords option, step, table
if verbose: print(str(a)+" -- "+str(fields))
tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job)
if (tableAttr["_hit"]):
status = "TABLE_ALIAS"
continue
if (a[0].lower() in list_blocks[D.CSV_BLOCK_HEAD]):
if verbose: print("head "+l)
setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_OPTION]):
if verbose: print("option " + l)
setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_STEP]):
if verbose: print("step "+l)
step = basic.step.parseStep(job, fields)
if D.CSV_BLOCK_STEP not in tdata:
tdata[D.CSV_BLOCK_STEP] = []
tdata[D.CSV_BLOCK_STEP].append(step)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_IMPORT]):
if verbose: print("includes " + l)
if D.CSV_BLOCK_IMPORT not in tdata:
tdata[D.CSV_BLOCK_IMPORT] = []
tdata[D.CSV_BLOCK_IMPORT].append(fields[1])
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_TABLES]):
if verbose: print("tables "+l)
h = a
h[0] = B.DATA_NODE_TABLES
if ttype == D.CSV_SPECTYPE_CONF:
del h[0]
tableDict = getTdataContent(msg, tdata, h)
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
elif (status == D.CSV_SPECTYPE_DATA):
tableDict = getTdataContent(msg, tdata, h)
if verbose: print("setTableData "+str(h)+" "+str(tableDict))
setTableData(tableDict, fields, ttype, job)
elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata:
alias = tdata[D.DATA_ATTR_ALIAS]
b = alias.split(":")
h = [B.DATA_NODE_TABLES] + b
tableDict = getTdataContent(msg, tdata, h)
tableDict[D.DATA_ATTR_ALIAS] = alias
fields = [alias] + fields
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
if ttype == D.CSV_SPECTYPE_CONF:
header = []
for k in tdata:
if k in D.LIST_DATA_ATTR:
continue
if B.DATA_NODE_DATA in tdata[k]:
tdata[k].pop(B.DATA_NODE_DATA)
for f in tdata[k]:
if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR:
continue
header.append(f)
tdata[k][B.DATA_NODE_HEADER] = header
header = []
if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]:
for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]:
if k in tdata[B.DATA_NODE_TABLES]:
if verbose: print("Error")
else:
tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k]
tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES)
return tdata
def setTableHeader(tableDict, tableAttr, fields, ttype, job):
header = []
for i in range(1, len(fields)):
header.append(fields[i].strip())
tableDict[B.DATA_NODE_HEADER] = header
for attr in tableAttr:
tableDict[attr] = tableAttr[attr]
# preparate the sub-structure for row-data
if ttype == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS] = {}
tableDict[D.DATA_ATTR_KEY] = 1
if D.DATA_ATTR_KEY in tableAttr:
tableDict[D.DATA_ATTR_KEY] = header.index(tableAttr[D.DATA_ATTR_KEY]) + 1
else:
tableDict[B.DATA_NODE_DATA] = []
return tableDict
def setTableData(tableDict, fields, ttype, job):
row = {}
if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict:
fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields
i = 1
for f in tableDict[B.DATA_NODE_HEADER]:
row[f] = fields[i].strip()
i += 1
if ttype == D.CSV_SPECTYPE_DATA:
if B.ATTR_DATA_COMP in tableDict:
tcomps = tableDict[B.ATTR_DATA_COMP]
else:
tcomps = {}
row[B.ATTR_DATA_COMP] = {}
for c in fields[0].split(","):
a = c.split(":")
tcomps[a[0]] = a[1]
row[B.ATTR_DATA_COMP][a[0]] = a[1].strip()
tableDict[B.DATA_NODE_DATA].append(row)
tableDict[B.ATTR_DATA_COMP] = tcomps
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS][fields[tableDict[D.DATA_ATTR_KEY]].strip()] = row
elif ttype == D.CSV_SPECTYPE_CONF:
tableDict[fields[1]] = row
return tableDict
def setTableAttribute(tableAttr, key, val, job):
for attr in D.LIST_DATA_ATTR:
if (key.lower() in list_blocks[attr]):
tableAttr[attr] = val.strip()
tableAttr["_hit"] = True
return tableAttr
tableAttr["_hit"] = False
return tableAttr
def setTdataLine(tdata, fields, block, job):
"""
sets field(s) into tdata as a key-value-pair
additional fields will be concatenate to a intern separated list
:param tdata:
:param fields:
:param block:
:param job:
:return:
"""
a = fields[0].lower().split(":")
a[0] = block # normalized key
val = ""
for i in range(1, len(fields)-1):
val += D.INTERNAL_DELIMITER+fields[i]
if len(val) > len(D.INTERNAL_DELIMITER):
val = val[len(D.INTERNAL_DELIMITER):]
setTdataContent(job.m, tdata, val, a)
return tdata
def setTdataContent(msg, data, tabledata, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
data[path[0]][path[1]] = tabledata
elif len(path) == 3:
data[path[0]][path[1]][path[2]] = tabledata
elif len(path) == 4:
data[path[0]][path[1]][path[2]][path[3]] = tabledata
def getTdataContent(msg, data, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
return data[path[0]][path[1]]
elif len(path) == 3:
return data[path[0]][path[1]][path[2]]
elif len(path) == 4:
return data[path[0]][path[1]][path[2]][path[3]]
elif len(path) == 1:
return data[path[0]]
else:
return None
def setTdataStructure(msg, data, path):
if len(path) >= 1 and path[0] not in data:
data[path[0]] = {}
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
return data
def splitFields(line, delimiter, job):
out = []
fields = line.split(delimiter)
for i in range(0, len(fields)):
if fields[i][0:1] == "#":
break
if re.match(r"^\"(.*)\"$", fields[i]):
fields[i] = fields[i][1:-1]
out.append(fields[i])
return out
def writeCsvData(filename, tdata, comp, job):
text = ""
data = tdata
for p in [B.DATA_NODE_TABLES, P.KEY_PRECOND, P.KEY_POSTCOND]:
if p in data:
print("data "+p)
data = data[p]
for k in data:
text += buildCsvData(data, k, comp, job)
text += "\n"
utils.file_tool.writeFileText(comp.m, job, filename, text)
def buildCsvData(tdata, tableName, comp, job=None):
"""
writes the testdata into a csv-file for documentation of the test-run
:param teststatus:
:param tdata:
:param comp: if specific else None
:return:
"""
text = ""
for k in [D.DATA_ATTR_DATE, D.DATA_ATTR_COUNT]:
if k in tdata:
text += k+";"+str(tdata[k])+"\n"
if tableName in tdata:
actdata = tdata[tableName]
else:
actdata = tdata
header = str(utils.i18n_tool.I18n.getInstance(job).getText(f"{B.DATA_NODE_TABLES=}", job)) +":" + tableName
for f in actdata[B.DATA_NODE_HEADER]:
header += D.CSV_DELIMITER+f
text += header + "\n"
i = 0
for r in actdata[B.DATA_NODE_DATA]:
row = ""
if B.ATTR_DATA_COMP in r:
for k in r[B.ATTR_DATA_COMP]:
row += ","+k+":"+r[B.ATTR_DATA_COMP][k]
row = row[1:]
i += 1
for f in actdata[B.DATA_NODE_HEADER]:
if f in r:
row += D.CSV_DELIMITER+str(r[f])
else:
row += D.CSV_DELIMITER
text += row
text += "\n"
return text
def buildCsvSpec(tdata, job=None):
text = ""
if D.CSV_BLOCK_IMPORT in tdata:
for k in tdata[D.CSV_BLOCK_HEAD]:
text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_HEAD=}", job)
text += ":"+k+D.CSV_DELIMITER+tdata[D.CSV_BLOCK_HEAD][k]+"\n"
text += "# option:key ;values;..;;;;\n"
if D.CSV_BLOCK_OPTION in tdata:
for k in tdata[D.CSV_BLOCK_OPTION]:
text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_OPTION=}", job)
text += ":" + k + D.CSV_DELIMITER + getHeadArgs(tdata[D.CSV_BLOCK_OPTION][k], job)+"\n"
text += "#;;;;;;\n"
if D.CSV_BLOCK_STEP in tdata:
text += basic.step.getStepHeader(job)
i = 1
for step in tdata[D.CSV_BLOCK_STEP]:
text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_STEP=}", job) + ":" + str(i)
text += D.CSV_DELIMITER + step.getStepText(job)
i += 1
text += "#;;;;;;\n"
if D.CSV_BLOCK_TABLES in tdata:
for k in tdata[D.CSV_BLOCK_TABLES]:
text += buildCsvData(tdata, k, None, job)
text += "#;;;;;;\n"
return text
def getHeadArgs(value, job):
return value.replace(D.INTERNAL_DELIMITER, D.CSV_DELIMITER)

45
utils/xml2_tool.py

@ -1,45 +0,0 @@
import os, sys, json
import xmltodict
import pprint
class fcts:
def dict2xml(tree):
out = xmltodict.unparse(tree, pretty=True)
return out
def xml2dict(xmlstring):
tree = {}
pp = pprint.PrettyPrinter(indent=4)
tree = xmlstring.parse(xmlstring)
return tree
def readXml(filename):
pass
def addNode(xpath, value):
pass
def writeDataTable(teststatus, tdata, comp):
"""
writes the testdata into a csv-file for documentation of the test-run
:param teststatus:
:param tdata:
:param comp: if specific else None
:return:
"""
#output = xmljson.badgerfish.etree(tdata, root=xmljson.badgerfish.Element('root'))
result = bf.etree(tdata, root=Element('xml'))
# xmljson.badgerfish.tostring(output)
txt = tostring(result, pretty_print=True)
print (txt.decode('utf-8'))
# out = tostring(result, pretty_print=True)
#print (prettify(result))
pass
def prettify(elem):
"""Return a pretty-printed XML string for the Element.
"""
rough_string = tostring(elem, 'utf-8')
reparsed = xml.dom.minidom.parseString(rough_string )
return reparsed.toprettyxml(indent=" ")

61
utils/zip_tool.py

@ -1,61 +0,0 @@
import zipfile
import tarfile
import os
import basic.program
def untarFolder(target, targetFile):
tar_file = tarfile.open(os.path.join(target, targetFile), 'r:gz')
tar_file.extractall(path=os.path.join(target, 'tarliste'))
tar_file.close()
pass
def openNewTarFile(job, target, targetFile):
tarfilename = os.path.join(target, targetFile)
if os.path.exists(tarfilename):
os.remove(tarfilename)
job.m.logInfo("Archiv angelegt "+tarfilename)
return tarfile.open(tarfilename, 'w:gz')
def appendFolderIntoTarFile(job, source, sourceFolder, tarFile):
workFolder = os.path.join(source, sourceFolder)
for folderName, subfolders, filenames in os.walk(workFolder):
for filename in filenames:
folderShort = folderName[len(source)+1:]
# create complete filepath of file in directory
filePath = os.path.join(folderName, filename)
# Add file to zip
tarFile.add(filePath, os.path.join(folderShort, filename))
def tarFolder(source, sourceFolder, target, targetFile):
with tarfile.open(os.path.join(target, targetFile), 'w:gz') as tar_file:
for folderName, subfolders, filenames in os.walk(os.path.join(source, sourceFolder)):
for filename in filenames:
folderShort = folderName.replace(source + '/', '')
# create complete filepath of file in directory
filePath = os.path.join(folderName, filename)
# Add file to zip
tar_file.add(filePath, os.path.join(folderShort, filename))
tar_file.close()
def unzipFolder(target, targetFile):
zip_file = zipfile.ZipFile(os.path.join(target, targetFile), 'r')
zip_file.extractall(path=os.path.join(target, 'liste'))
zip_file.close()
pass
def zipFolder(source, sourceFolder, target, targetFile):
with zipfile.ZipFile(os.path.join(target, targetFile), 'w') as zip_file:
# Iterate over all the files in directory
for folderName, subfolders, filenames in os.walk(os.path.join(source, sourceFolder)):
for filename in filenames:
folderShort = folderName.replace(source+'/', '')
# create complete filepath of file in directory
filePath = os.path.join(folderName, filename)
# Add file to zip
zip_file.write(filePath, os.path.join(folderShort, filename))
zip_file.close()
return ""
Loading…
Cancel
Save