Browse Source

program-config via catalog

refactor
Ulrich 2 years ago
parent
commit
067e0c7aca
  1. 60
      basic/component.py
  2. 14
      basic/componentHandling.py
  3. 10
      basic/constants.py
  4. 23
      basic/message.py
  5. 70
      basic/program.py
  6. 30
      basic/toolHandling.py
  7. 21
      catalog/programs.csv
  8. 0
      objects/__init__.py
  9. 106
      objects/catalog.py
  10. 4
      test/constants.py
  11. 33
      test/test_01date.py
  12. 25
      test/test_07catalog.py
  13. 46
      test/test_10job.py
  14. 8
      test/testtools.py
  15. 0
      tools/__init__.py
  16. 349
      tools/config_tool.py
  17. 81
      tools/conn_tool.py
  18. 116
      tools/data_const.py
  19. 175
      tools/date_tool.py
  20. 113
      tools/file_abstract.py
  21. 295
      tools/file_tool.py
  22. 246
      tools/filecsv_fcts.py
  23. 102
      tools/job_tool.py
  24. 115
      tools/path_const.py
  25. 320
      tools/path_tool.py
  26. 76
      tools/value_tool.py

60
basic/component.py

@ -8,12 +8,19 @@
# ---------------------------------------------------------------------
from datetime import datetime
import basic.compexec
#from basic.compexec import Testexecuter
import basic.message
import basic.program
#import basic.entity
import basic.constants as B
import tools.path_const as P
import inspect
import tools.config_tool
#import tools.git_tool
import basic.toolHandling
TABLE_NAMES = ["component", "co_table", "co_variant"]
DEFAULT_SYNC = "" #basic.entity.SYNC_HEAD_GIT2DB
class CompData:
def __init__(self):
@ -21,13 +28,56 @@ class CompData:
self.m = None
self.conf = None
def syncEnitity(job, elem):
"""
synchronize the configuration with the database
:param job:
:return:
"""
syncMethod = DEFAULT_SYNC
if B.SUBJECT_ENTITY in job.conf.confs:
syncMethod = job.conf.confs["entity"][TABLE_NAMES[0]]["storage"]
if syncMethod.count("-") < 2:
return
fileTime = basic.entity.VAL_ZERO_TIME
dbTime = basic.entity.VAL_ZERO_TIME
# get git-commit
if "git" in syncMethod:
comppath = tools.config_tool.getConfigPath(job, P.KEY_COMP, elem)
repopath = comppath[len(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_COMPS]) + 1:]
gitresult = tools.git_tool.gitLog(job, B.ATTR_PATH_COMPS, repopath, 1)
fileTime = gitresult[0]["date"]
print(str(gitresult))
if "db" in syncMethod:
if B.TOPIC_NODE_DB in job.conf.confs:
dbi = basic.toolHandling.getDbTool(job, job.testserver, job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
data = dbi.selectRows(TABLE_NAMES[0], job)
print(str(data[B.DATA_NODE_DATA]))
if len(data[B.DATA_NODE_DATA]) > 0:
dbTime = data[B.DATA_NODE_DATA][0]["updtime"]
if fileTime == dbTime:
print("gleich")
elif fileTime < dbTime:
print("db vorne")
#(appObjects, appDict) = selectEntities(job, dbi)
#print(str(appDict))
#applPath = tools.config_tool.getConfigPath(job, P.KEY_BASIC, B.SUBJECT_APPS)
#tools.file_tool.writeFileDict(job.m, job, applPath, appDict)
#
elif fileTime > dbTime:
print("git vorne")
compData = tools.config_tool.getConfig(job, P.KEY_COMP, elem)
#insertEntities(job, compData, dbTime, dbi)
#class Component(components.sysmonitor.SystemMonitor, components.testexec.Testexecuter, components.report.Report,
# components.maintain.Maintainer, components.catalog, threading.Thread):
class Component(basic.compexec.Testexecuter):
class Component():
"""
A component represents an application of the system-under-test or a data-artifact which is created from the system-under-test.
A component represents an application of the system-under-test or a data-artifact which is created from the system-under-test.g
As the representation it has to knowlegde of the url, which other components depends on this component.
During a test-run the component must be checked, prepared, appfiles has to be collected, etc. For this doing there are some standard-methods implemented.
"""
@ -171,8 +221,8 @@ class Component(basic.compexec.Testexecuter):
testreport = ""
# if job.par.context == "tset":
# for tc in testcases:
# header = utils.report_tool.getTcHeader()
# body = utils.report_tool.getTcExtraction()
# header = tools.report_tool.getTcHeader()
# body = tools.report_tool.getTcExtraction()
# if job.par.context == "tcontext":
# for ts in testsets:
reportheader = reportheader +'<\head>'

14
basic/componentHandling.py

@ -15,15 +15,15 @@ Each componente could be created mostly once, but not everytime:
Each kind of instance has its component-class and for each use should be an object be created.
Each crated component-onject are documented in the parameter-file.
"""
import utils.config_tool
import utils.conn_tool
import tools.config_tool
import tools.conn_tool
import basic.program
import basic.message
import basic.component
import importlib
import copy
import basic.constants as B
import utils.data_const as D
import tools.data_const as D
comps = {}
PARAM_NOSUBNODE = [B.SUBJECT_ARTS, "components", "instance"]
@ -141,8 +141,8 @@ class ComponentManager:
verify = job.getDebugLevel("job_tool")
componentName = componentName.lower()
job.debug(verify, "createComponent " + componentName)
confs = utils.config_tool.getConfig(job, "comp", componentName)
conns = utils.conn_tool.getConnections(job, componentName)
confs = tools.config_tool.getConfig(job, "comp", componentName)
conns = tools.conn_tool.getConnections(job, componentName)
instAttr = getInstanceAttributes(confs)
job.debug(verify, "createComponent -91- " + componentName + " : " + str(confs))
if nr > 0 and int(instAttr[B.ATTR_INST_CNT]) > 1:
@ -190,7 +190,7 @@ class ComponentManager:
c.classname = compName
c.m = basic.message.Message(job, basic.message.LIMIT_DEBUG, "logTime", name)
c.conf = utils.config_tool.mergeConn(c.m, confs["conf"], conns[i])
c.conf = tools.config_tool.mergeConn(c.m, confs["conf"], conns[i])
c.conf[B.SUBJECT_CONN] = conns[i]
c.init(job)
if parContent is not None:
@ -204,7 +204,7 @@ class ComponentManager:
for table in c.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB]:
if table in B.LIST_DB_ATTR:
continue
conf = utils.config_tool.getConfig(job, D.DDL_FILENAME, compName, table)
conf = tools.config_tool.getConfig(job, D.DDL_FILENAME, compName, table)
if B.DATA_NODE_TABLES in conf and table in conf[B.DATA_NODE_TABLES]:
c.conf[B.DATA_NODE_DDL][table] = conf[B.DATA_NODE_TABLES][table]
elif table in conf:

10
basic/constants.py

@ -7,9 +7,9 @@
"""
The constants desribes the keywords of the main datastructures, these are
* the configurations of
* program-configuration for general technical variables in tools - stored in internal conf-folder
* program-configuration for general technical variables in xtools - stored in internal conf-folder
it represents the general knowledge of this test-automatism
* basic-configuration for global variables in job - stored in external conf-folder
* xbasic-configuration for global variables in job - stored in external conf-folder
it represents the knowledge of your test-topic
* comp-configuration for component-attributes - stored in internal component-folder in
+ ddl-configuration of an entity of the component as attributes - stored in internal component-folder
@ -114,7 +114,7 @@ ATTR_ARTS_TYPE = "type"
ATTR_ARTS_NAME = "name"
""" optional attribute just for information """
ATTR_ARTS_PATH = "path"
""" optional attribute for the basic folder if the artifact is stored in the filesystem """
""" optional attribute for the xbasic folder if the artifact is stored in the filesystem """
ATTR_ARTS_RESET = "reset"
""" optional attribute if the artefact should be reset in the initializing-phase """
ATTR_ARTS_PRESTEP = "prestep"
@ -172,12 +172,12 @@ ATTR_CONN_PASSWD = "password"
LIST_CONN_ATTR = [ATTR_CONN_HOST, ATTR_CONN_IP, ATTR_CONN_PORT, ATTR_CONN_DOMPATH, ATTR_CONN_USER, ATTR_CONN_PASSWD]
# the configuration of a component or tool
# entity { : variable name of the group, basic, component-name or tool-name
# entity { : variable name of the group, xbasic, component-name or tool-name
# + subject { : variable subject-name - it correspondends to a tool
# + + sub-subject { : variable subject-name - it correspondends to a tool
# + attributes : constant of the tool in which the attribute ist implemented
# the main subjects # prog basic envir tool comp testcase main implentation module
# the main subjects # prog xbasic envir tool comp testcase main implentation module
SUBJECT_PATH = "paths" # | x | | x | | path_tool, config_tool
""" This constant defines the subject in order to define paths of filesystem of any testuse """
ATTR_PATH_MODE = "mode"

23
basic/message.py

@ -20,9 +20,9 @@ import basic.program
import os
import math
from datetime import datetime
import utils.path_tool
import utils.i18n_tool
import basic.text_const as T
import tools.path_tool
#import tools.i18n_tool
#import basic.text_const as T
import basic.constants as B
LIMIT_FATAL = 0
@ -72,7 +72,7 @@ class Message:
self.debugfile = job.m.debugfile
self.debug(verify, "> > > debugfile uebernommen zu " + str(componente))
else:
debugpath = job.conf.confs["paths"]["debugs"] + "/debug_" + logTime[0:-4] + "00.txt"
debugpath = job.conf["paths"]["debugs"] + "/debug_" + logTime[0:-4] + "00.txt"
print ("debugpathx "+debugpath)
if os.path.exists(debugpath):
self.debugfile = open(debugpath, "a")
@ -80,7 +80,7 @@ class Message:
self.debugfile = open(debugpath, "w")
self.debug(verify, "> > > debugfile geoeffnet zu " + job.program + " mit " + debugpath)
# init logfile - except for components or unittest
self.logDebug("logfile " + str(componente) + ", " + str(job.par.basedir))
#self.logDebug("logfile " + str(componente) + ", " + str(job.par.basedir))
if componente is not None: #
self.logfile = self.debugfile
elif job.program == "unit":
@ -98,8 +98,8 @@ class Message:
basedir = job.par.basedir
basedir = basedir.replace("base", "log")
os.makedirs(basedir, exist_ok=True)
# basedir = utils.path_tool.composePath(basedir, None)
# basedir = utils.path_tool.composePath(job, basedir, None)
# basedir = tools.path_tool.composePath(basedir, None)
# basedir = tools.path_tool.composePath(job, basedir, None)
logpath = job.getLogpath()
basedir = os.path.dirname(logpath)
os.makedirs(basedir, exist_ok=True)
@ -205,14 +205,7 @@ class Message:
self.logInfo(text)
def getMessageText(self, job, text, args):
out = ""
constName = ""
for i in range(0, len(T.LIST_EXP_TEXT)):
if text == T.LIST_EXP_TEXT[i]:
constName = T.LIST_EXP_CONST[i]
txt = utils.i18n_tool.I18n.getInstance(job).getMessage(self.job, constName, args)
out = txt.format(args)
return out
return text
def logFatal(self, text):
self.log(LIMIT_FATAL, "FATAL: " + text)

70
basic/program.py

@ -15,13 +15,13 @@ from datetime import datetime
import basic.constants as B
import basic.message
import basic.componentHandling
import utils.date_tool
import utils.path_tool
import utils.file_tool
import utils.config_tool
import tools.date_tool
import tools.path_tool
import tools.file_tool
import tools.config_tool
import test.constants as T
import utils.path_const as P
import utils.job_tool
import tools.path_const as P
import tools.job_tool
LIMIT_INFO = 16 #basic.message.LIMIT_INFO
LIMIT_DEBUG = 12 #basic.message.LIMIT_DEBUG
@ -254,14 +254,19 @@ class Job:
self.jobid = str(Job.__jobid)
self.program = program
conf = Configuration(self, program)
self.conf = conf
self.conf = conf.confs
try:
path = utils.config_tool.getConfigPath(self, P.KEY_BASIC, B.BASIS_FILE)
path = tools.config_tool.getConfigPath(self, P.KEY_BASIC, B.BASIS_FILE)
print("comps.basispath "+path)
self.conf.setConfiguration(self, path)
except:
pass # the special path is not necessary
appl = utils.config_tool.getConfig(self, P.KEY_BASIC, B.SUBJECT_APPS)
logTime = tools.date_tool.getActdate(tools.date_tool.F_LOG)
self.start = logTime
self.m = basic.message.Message(self, jobdef[program]["loglevel"], logTime, None)
def altinit(self, program, args):
appl = tools.config_tool.getConfig(self, P.KEY_BASIC, B.SUBJECT_APPS)
if appl is not None:
self.conf.confs[B.SUBJECT_APPS] = appl[B.SUBJECT_APPS]
if B.SUBJECT_PROJECTS in self.conf.confs:
@ -273,12 +278,9 @@ class Job:
self.conf.confs[B.SUBJECT_PROJECTS] = appl[B.SUBJECT_PROJECTS]
par = Parameter(self, program, args)
self.par = par
logTime = utils.date_tool.getActdate(utils.date_tool.F_LOG)
self.start = logTime
self.m = basic.message.Message(self, jobdef[program]["loglevel"], logTime, None)
def getLogpath(self):
path = utils.path_tool.composePattern(self, jobdef[self.program]["logpath"], None)
path = tools.path_tool.composePattern(self, jobdef[self.program]["logpath"], None)
return path
def murks(self, par, program):
@ -286,7 +288,7 @@ class Job:
dirpath = self.par.getDirParameter()
setGlobal()
if dirpath is not None:
utils.path_tool.extractPath(dirpath[0], dirpath[1])
tools.path_tool.extractPath(dirpath[0], dirpath[1])
if program == "unit": # no job will be started
self.start = datetime.now()
logTime = self.start.strftime("%Y%m%d_%H%M%S")
@ -307,7 +309,7 @@ class Job:
setattr(self.par, "parstring", parstring)
if not hasattr(self.par, jobdef[program]["dirname"]):
setattr(self.par, jobdef[program]["dirname"],
utils.path_tool.composePattern(self, "{"+basedir+"}", None))
tools.path_tool.composePattern(self, "{"+basedir+"}", None))
print(parstring)
self.par.setParameterLoaded(self)
@ -318,7 +320,7 @@ class Job:
logTime = self.start.strftime("%Y%m%d_%H%M%S")
self.m = basic.message.Message(self, basic.message.LIMIT_DEBUG, logTime, None)
print("prog-68 " + str(self.m.rc))
utils.job_tool.startJobProcesses(self)
tools.job_tool.startJobProcesses(self)
self.par.setParameterLoaded(self)
self.m.logInfo("# # # Start Job " + self.start.strftime("%d.%m.%Y %H:%M:%S") + " # # # ")
self.m.debug(basic.message.LIMIT_INFO, "# # # Start Job " + self.start.strftime("%d.%m.%Y %H:%M:%S") + " # # # ")
@ -327,12 +329,12 @@ class Job:
def stopJob(self, reboot=0):
utils.job_tool.stopJobProcesses(self)
tools.job_tool.stopJobProcesses(self)
self.ende = datetime.now()
self.dumpParameter()
footer1 = "# # " + self.m.topmessage + " # # # "
footer2 = "# # # Stop Job " + utils.date_tool.formatParsedDate(str(self.start), utils.date_tool.F_DE_TSTAMP)
footer2 += " # " + utils.date_tool.formatParsedDate(str(self.ende), utils.date_tool.F_DE_TSTAMP) + " # # # "
footer2 = "# # # Stop Job " + tools.date_tool.formatParsedDate(str(self.start), tools.date_tool.F_DE_TSTAMP)
footer2 += " # " + tools.date_tool.formatParsedDate(str(self.ende), tools.date_tool.F_DE_TSTAMP) + " # # # "
footer2 += " # # # RC: " + str(self.m.getFinalRc())
self.m.logInfo(footer1)
self.m.logInfo(footer2)
@ -349,12 +351,12 @@ class Job:
def dumpParameter(self):
if len(jobdef[self.program]["pfiletarget"]) < 2:
return
parpath = utils.path_tool.composePath(self, jobdef[self.program]["pfiletarget"], None)
parpath = tools.path_tool.composePath(self, jobdef[self.program]["pfiletarget"], None)
output = {}
cconf = basic.componentHandling.getComponentDict()
output["par"] = self.par.__dict__
if len(cconf) < 1:
utils.file_tool.writeFileDict(self.m, self, parpath, output)
tools.file_tool.writeFileDict(self.m, self, parpath, output)
return
output[B.SUBJECT_COMPS] = {}
for c in cconf:
@ -365,19 +367,19 @@ class Job:
output[B.SUBJECT_COMPS][c][x] = cconf[c][x]
if x == B.SUBJECT_CONN and "passwd" in cconf[c][x]:
cconf[B.SUBJECT_COMPS][c][x]["passwd"] = "xxxxx"
utils.file_tool.writeFileDict(self.m, self, parpath, output)
tools.file_tool.writeFileDict(self.m, self, parpath, output)
def loadParameter(self):
output = {}
if len(str(jobdef[self.program]["pfilesource"])) < 2:
return None
parpath = utils.path_tool.composePath(self, jobdef[self.program]["pfilesource"], None)
parpath = tools.path_tool.composePath(self, jobdef[self.program]["pfilesource"], None)
print("parpath "+parpath)
if not os.path.join(parpath):
return None
print("parpath "+parpath)
doc = utils.file_tool.readFileDict(self, parpath, None)
doc = tools.file_tool.readFileDict(self, parpath, None)
for k in doc.keys():
output[k] = copy.deepcopy(doc[k])
return output
@ -387,7 +389,7 @@ class Job:
if hasattr(self.par, parameter):
return getattr(self.par, parameter)
elif "xxxtime" in parameter:
neu = utils.date_tool.getActdate(utils.date_tool.F_DIR)
neu = tools.date_tool.getActdate(tools.date_tool.F_DIR)
# setattr(self.par, parameter, neu)
return neu
else:
@ -436,7 +438,7 @@ class Job:
def getMessageLevel(self, errtyp, elem):
if (not hasattr(self, "m")) or (self.m is None):
return basic.message.LIMIT_DEBUG
elif elem.find("tool") > 1:
elif elem.find("tool") > 1 and hasattr(self, "par"):
if not hasattr(self.par, "tool") or getattr(self.par, "tool").find(elem) <= 0:
return int(self.m.CONST_ERRTYP[errtyp]) -1
else:
@ -479,9 +481,9 @@ class Parameter:
job.par = self
if not hasattr(self, jobdef[program]["dirname"]):
setattr(self, jobdef[program]["dirname"],
utils.path_tool.composePattern(job, "{"+self.basedir+"}", None))
tools.path_tool.composePattern(job, "{"+self.basedir+"}", None))
else:
utils.path_tool.extractPath(job, jobdef[program]["basedir"], getattr(self, jobdef[program]["dirname"]))
tools.path_tool.extractPath(job, jobdef[program]["basedir"], getattr(self, jobdef[program]["dirname"]))
self.setParameterLoaded(job)
@ -489,14 +491,14 @@ class Parameter:
if jobdef[program]:
self.basedir = jobdef[program]["basedir"]
if hasattr(self, jobdef[program]["dirname"]):
utils.path_tool.extractPath(self.basedir, getattr(self, jobdef[program]["dirname"]))
tools.path_tool.extractPath(self.basedir, getattr(self, jobdef[program]["dirname"]))
elif self.basedir == "workbase":
home = utils.path_tool.getHome()
home = tools.path_tool.getHome()
dirpath = os.path.join(home, "data", "workspace")
setattr(self, jobdef[program]["dirname"], dirpath)
elif program != "unit":
# compose after setargs
dirpath = utils.path_tool.composePattern(job, "{"+jobdef[program]["basedir"]+"}", None)
dirpath = tools.path_tool.composePattern(job, "{"+jobdef[program]["basedir"]+"}", None)
setattr(self, jobdef[program]["dirname"], dirpath)
else:
self.basedir = "debugs"
@ -575,7 +577,7 @@ class Parameter:
self.setJobAttr(k , getattr(args, k))
dirpath = self.getDirParameter()
#if dirpath is not None:
# utils.path_tool.extractPath(job, dirpath[0], dirpath[1])
# tools.path_tool.extractPath(job, dirpath[0], dirpath[1])
if hasattr(self, "application") and self.application in job.conf.confs[B.SUBJECT_APPS]:
if B.ATTR_APPS_PROJECT in job.conf.confs[B.SUBJECT_APPS][self.application]:
setattr(self, B.ATTR_APPS_PROJECT, job.conf.confs[B.SUBJECT_APPS][self.application][B.ATTR_APPS_PROJECT])
@ -609,7 +611,7 @@ class Parameter:
class Configuration:
def __init__ (self, job, program):
self.program = program
path = utils.path_tool.getBasisConfigPath()
path = tools.path_tool.getBasisConfigPath()
print ("conf initialisieren "+self.program+" > "+path)
self.setConfiguration(job, path)
return
@ -622,7 +624,7 @@ class Configuration:
if not hasattr(self, "confs"):
self.confs = {}
self.confs["configpath"] = []
doc = utils.file_tool.readFileDict(job, path, None)
doc = tools.file_tool.readFileDict(job, path, None)
self.confs["configpath"].append(path)
if "basic" in doc:
for k, v in doc["basic"].items():

30
basic/toolHandling.py

@ -9,7 +9,7 @@ import os
import basic.program
import basic.constants as B
# -------------------------------------------------
import utils.config_tool
import tools.config_tool
def hasAttr(o, name):
@ -22,6 +22,7 @@ def hasAttr(o, name):
return True
return False
def getAttr(o, name):
if (isinstance(o, dict)):
if (name in o.keys()):
@ -31,10 +32,12 @@ def getAttr(o, name):
elif hasattr(o, name):
return o.get(name)
"""
Toolmanager
"""
def getCompAttr(comp, topic, attr, table=""):
out = ""
print(topic + " " + attr + " " + str(comp))
@ -48,6 +51,7 @@ def getCompAttr(comp, topic, attr, table=""):
return getAttr(comp.conf[B.SUBJECT_ARTS][topic], attr)
raise LookupError(topic + "." + attr + " is not set in comp " + comp.name)
def getTool(technicType, comp, job):
if technicType == B.TOPIC_NODE_DB:
return getDbTool(job, comp)
@ -60,64 +64,68 @@ def getTool(technicType, comp, job):
# denn zu einer Komponente koennen unterschiedliche Dateien vorkommen
return getFileTool(job, comp, "")
# class ToolManager:
def getDbTool(job, comp, dbtype=""):
verify = int(job.getDebugLevel("db_tool"))
if len(dbtype) < 3:
dbtype = getCompAttr(comp, B.TOPIC_NODE_DB, B.ATTR_TYPE, "")
toolname = "db" + dbtype + "_tool"
filepath = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "utils", toolname+".py")
filepath = os.path.join(job.conf[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "tools", toolname + ".py")
# comp.m.debug(verify, "toolname "+filepath)
if not os.path.exists(filepath):
raise FileNotFoundError("file for tool " + toolname + " does not exist " + filepath)
cmodul = importlib.import_module("utils."+toolname)
cmodul = importlib.import_module("tools." + toolname)
class_ = getattr(cmodul, "DbFcts")
c = class_()
c.setComp(job, comp)
return c
def getCliTool(job, comp):
verify = int(job.getDebugLevel("db_tool"))
clitype = getCompAttr(comp, B.TOPIC_NODE_CLI, B.ATTR_TYPE, "")
toolname = "cli" + clitype + "_tool"
filepath = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "utils", toolname+".py")
filepath = os.path.join(job.conf[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "tools", toolname + ".py")
# comp.m.debug(verify, "toolname "+filepath)
if not os.path.exists(filepath):
raise FileNotFoundError("file for tool " + toolname + " does not exist " + filepath)
cmodul = importlib.import_module("utils."+toolname)
cmodul = importlib.import_module("tools." + toolname)
class_ = getattr(cmodul, "CliFcts")
c = class_()
c.setComp(job, comp)
return c
def getApiTool(job, comp):
verify = int(job.getDebugLevel("db_tool"))
apitype = getCompAttr(comp, B.TOPIC_NODE_API, B.ATTR_TYPE, "")
toolname = "api" + apitype + "_tool"
filepath = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "utils", toolname+".py")
filepath = os.path.join(job.conf[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "tools", toolname + ".py")
# comp.m.debug(verify, "toolname "+filepath)
if not os.path.exists(filepath):
raise FileNotFoundError("file for tool " + toolname + " does not exist " + filepath)
cmodul = importlib.import_module("utils."+toolname)
cmodul = importlib.import_module("tools." + toolname)
class_ = getattr(cmodul, "ApiFcts")
c = class_()
c.setComp(job, comp)
return c
def getFileTool(job, comp, filenode=""):
verify = int(job.getDebugLevel("file_tool"))
if len(filenode) > 3 and "." in filenode and filenode[-1:] != ".":
filetype = utils.config_tool.getAttribute(comp, filenode, B.ATTR_ARTS_TYPE, job)
filetype = tools.config_tool.getAttribute(comp, filenode, B.ATTR_ARTS_TYPE, job)
elif len(filenode) > 2 and len(filenode) < 5:
filetype = filenode
else:
filetype = getCompAttr(comp, B.TOPIC_NODE_FILE, B.ATTR_TYPE, "")
toolname = "file"+filetype+"_tool"
filepath = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "utils", toolname+".py")
toolname = "file" + filetype + "_fcts"
filepath = os.path.join(job.conf[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "tools", toolname + ".py")
# comp.m.debug(verify, "toolname "+filepath)
if not os.path.exists(filepath):
raise FileNotFoundError("file for tool " + toolname + " does not exist " + filepath)
cmodul = importlib.import_module("utils."+toolname)
cmodul = importlib.import_module("tools." + toolname)
class_ = getattr(cmodul, "FileFcts")
c = class_()
c.setComp(job, comp)

21
catalog/programs.csv

@ -0,0 +1,21 @@
_key;name;;;;;;;;;;;;
table:programs;name;objtype;objname;time;env;app;variant;pardef;pfilesource;pfiletarget;basedir;loglevel;logpath
;test_executer;tp,ts,tc;m;m;m;m;o;;;;{objtype}base;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;init_testsuite;ts;m;o;m;m;o;;envparfile;tsparfile;tsbase;info;{job.par.tsdir}/{log}/{job.program}_{job.start}.txt
;execute_testsuite;ts;m;m;m;m;o;;tsparfile;tsparfile;tsbase;info;{job.par.tsdir}/{log}/{job.program}_{job.start}.txt
;collect_testsuite;ts;m;m;m;m;o;;tsparfile;tsparfile;tsbase;info;{job.par.tsdir}/{log}/{job.program}_{job.start}.txt
;finish_testsuite;ts;m;m;m;m;o;;tsparfile;tsparfile;tsbase;info;{job.par.tsdir}/{log}/{job.program}_{job.start}.txt
;unzip_testsuite;ts;m;m;m;m;o;;;;;info;{job.par.tsdir}/{log}/{job.program}_{job.start}.txt
;init_testcase;tc;m;o;m;m;o;;tsparfile;tcparfile;tcbase;info;{job.par.tcdir}/{log}/{job.program}_{job.start}.txt
;exec_testcase;tc;m;m;m;m;o;;tcparfile;tcparfile;tcbase;info;{job.par.tcdir}/{log}/{job.program}_{job.start}.txt
;collect_testcase;tc;m;m;m;m;o;;tcparfile;tcparfile;tcbase;info;{job.par.tcdir}/{log}/{job.program}_{job.start}.txt
;compare_testcase;tc;m;m;m;m;o;;tcparfile;tcparfile;tcbase;info;{job.par.tcdir}/{log}/{job.program}_{job.start}.txt
;check_environment;env;;;m;;o;;;envparfile;envbase;info;{job.par.envdir}/{log}/log_{job.start}.txt
;check_specification;tp,ts,tc;o;;;;;;;;wsbase;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;check_configuration;ws;o;;;o;;;;;wsbase;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;make_workspace;ws;;;_;;;;;;wsbase;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;unit_tester;ws;o;;d;;;;;;wsbase;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;declare_expection;ts,tc;m;m;m;m;o;;;;wsbase;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;clean_workspace;ws;o ;;_;;;;;;wsbase;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;start_dialog;ws;input;;input;input;input;;;;wsbase;info;{job.par.wsdir}/{log}/log_{job.start}.txt
;web_start;ws;;;;;;;;;wsbase;warn;{job.par.wsdir}/{log}/log_{job.start}.txt
1 _key name
2 table:programs name objtype objname time env app variant pardef pfilesource pfiletarget basedir loglevel logpath
3 test_executer tp,ts,tc m m m m o {objtype}base info {job.par.wsdir}/{log}/log_{job.start}.txt
4 init_testsuite ts m o m m o envparfile tsparfile tsbase info {job.par.tsdir}/{log}/{job.program}_{job.start}.txt
5 execute_testsuite ts m m m m o tsparfile tsparfile tsbase info {job.par.tsdir}/{log}/{job.program}_{job.start}.txt
6 collect_testsuite ts m m m m o tsparfile tsparfile tsbase info {job.par.tsdir}/{log}/{job.program}_{job.start}.txt
7 finish_testsuite ts m m m m o tsparfile tsparfile tsbase info {job.par.tsdir}/{log}/{job.program}_{job.start}.txt
8 unzip_testsuite ts m m m m o info {job.par.tsdir}/{log}/{job.program}_{job.start}.txt
9 init_testcase tc m o m m o tsparfile tcparfile tcbase info {job.par.tcdir}/{log}/{job.program}_{job.start}.txt
10 exec_testcase tc m m m m o tcparfile tcparfile tcbase info {job.par.tcdir}/{log}/{job.program}_{job.start}.txt
11 collect_testcase tc m m m m o tcparfile tcparfile tcbase info {job.par.tcdir}/{log}/{job.program}_{job.start}.txt
12 compare_testcase tc m m m m o tcparfile tcparfile tcbase info {job.par.tcdir}/{log}/{job.program}_{job.start}.txt
13 check_environment env m o envparfile envbase info {job.par.envdir}/{log}/log_{job.start}.txt
14 check_specification tp,ts,tc o wsbase info {job.par.wsdir}/{log}/log_{job.start}.txt
15 check_configuration ws o o wsbase info {job.par.wsdir}/{log}/log_{job.start}.txt
16 make_workspace ws _ wsbase info {job.par.wsdir}/{log}/log_{job.start}.txt
17 unit_tester ws o d wsbase info {job.par.wsdir}/{log}/log_{job.start}.txt
18 declare_expection ts,tc m m m m o wsbase info {job.par.wsdir}/{log}/log_{job.start}.txt
19 clean_workspace ws o _ wsbase info {job.par.wsdir}/{log}/log_{job.start}.txt
20 start_dialog ws input input input input wsbase info {job.par.wsdir}/{log}/log_{job.start}.txt
21 web_start ws wsbase warn {job.par.wsdir}/{log}/log_{job.start}.txt

0
objects/__init__.py

106
objects/catalog.py

@ -0,0 +1,106 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os
import basic.program
import basic.constants as B
import tools.path_const as P
import tools.data_const as D
import tools.config_tool
import tools.path_tool
import tools.file_tool
# import tools.tdata_tool
EXP_KEY_MISSING = "key is missing {}"
EXP_KEY_DOESNT_EXIST = "key doesnt exist in domain {}"
class Catalog:
__instance = None
"""
in this class there should be managed each defined key-value-pairs
the pairs ara loaded from the path testdata/catalog:
* initially the csv-file catalog.csv
* on demand other csv-files in the path
"""
def __init__(self):
self.catalog = {}
Catalog.__instance = self
pass
@staticmethod
def getInstance():
if Catalog.__instance == None:
return Catalog()
return Catalog.__instance
def getValue(self, domain, key, job):
"""
this function gets the value of the domain an key
:param domain:
:param key:
:return:
"""
if not (isinstance(domain, str) or len(domain) < 1):
raise Exception(EXP_KEY_MISSING, (domain, key))
if not (isinstance(key, str) or len(key) < 1):
job.m.setError(EXP_KEY_MISSING+" ("+domain+", "+key+")")
return ""
if domain not in self.catalog:
self.readDomain(domain, job)
if key not in self.catalog[domain]:
job.m.setError(EXP_KEY_DOESNT_EXIST+" ("+domain+", "+key+")")
return ""
return self.catalog[domain][key]
def getKeys(self, domain, job):
"""
this function gets the value of the domain an key
:param domain:
:param key:
:return:
"""
if not (isinstance(domain, str) or len(domain) < 1):
raise Exception(EXP_KEY_MISSING, (domain))
if domain not in self.catalog:
self.readDomain(domain, job)
if domain not in self.catalog:
return []
out = []
for x in self.catalog[domain].keys():
out.append(x)
return out
def readDomain(self, domain, job):
"""
this function reads the domain-entries
:param domain:
:return:
"""
if not (isinstance(domain, str) or len(domain) < 1):
raise Exception(EXP_KEY_MISSING, (domain))
if domain in self.catalog:
return
pathname = tools.config_tool.getConfigPath(job, P.KEY_CATALOG, domain)
if pathname is None:
raise Exception(EXP_KEY_MISSING, (domain))
data = tools.file_tool.readFileDict(job, pathname, job.m)
self.catalog[domain] = data[B.DATA_NODE_TABLES][domain][B.DATA_NODE_KEYS]
return data
def exportXSD(self, domain):
"""
this function exports the domain into xsd-declaration of simple types
:return:
"""
pass

4
test/constants.py

@ -10,11 +10,11 @@ constants
import os
home = os.getcwd()
prgdir = ""
if home[-4:] == "test" and home[-6:] != "datest":
if home[-5:] == "xtests" and home[-9:] != "program":
home = home[0:-5]
if home[-10:] == "components":
home = home[0:-11]
if home[-6:] == "datest":
if home[-9:] == "program":
prgdir = home[-6:]
home = home[0:-7]
elif home[-7:] == "program":

33
test/test_01date.py

@ -2,16 +2,15 @@ import json
import inspect
import unittest
import datetime
import utils.date_tool
import tools.date_tool
TEST_FUNCTIONS = ["test_dateformat", "test_parseFormula", "test_parseDate"]
TEST_FUNCTIONS = ["test_parseDate"]
TEST_FUNCTIONS = ["test_01dateformat", "test_10parseFormula", "test_11parseDate"]
#TEST_FUNCTIONS = ["test_11parseDate"]
verbose = True
# class MyTestCase(unittest.TestCase):
class MyTestCase(unittest.TestCase):
TEST_FUNCTIONS = ["test_dateformat", "test_parseFormula", "test_parseDate"]
mymsg = "--------------------------------------------------------------"
"""
@ -26,7 +25,7 @@ class MyTestCase(unittest.TestCase):
if verbose: print("run own test " + str(suite))
"""
def test_dateformat(self, result=None):
def test_01dateformat(self, result=None):
if verbose: print(str(result))
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
@ -42,31 +41,31 @@ class MyTestCase(unittest.TestCase):
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_parseFormula(self, result=None):
def test_10parseFormula(self, result=None):
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
res = utils.date_tool.parseFormula("{(21.12.2012 +1Y)}")
res = tools.date_tool.parseFormula("{(21.12.2012 +1Y)}")
if verbose: print(str(res))
self.assertEqual(res[0], 2013)
self.assertEqual(res[3], 0)
res = utils.date_tool.parseFormula("{(21.12.2012 +1Y -1M)}")
res = tools.date_tool.parseFormula("{(21.12.2012 +1Y -1M)}")
if verbose: print(str(res))
self.assertEqual(res[0], 2013)
self.assertEqual(res[1], 11)
self.assertEqual(res[3], 0)
res = utils.date_tool.parseFormula("{(21.12.2012 +1 Jahre +20 Tage)}")
res = tools.date_tool.parseFormula("{(21.12.2012 +1 Jahre +20 Tage)}")
if verbose: print(str(res))
self.assertEqual(res[0], 2014)
self.assertEqual(res[1], 1)
self.assertEqual(res[2], 10)
self.assertEqual(res[3], 0)
res = utils.date_tool.parseFormula("{(21.12.2012_11:12:43 +1Y)}")
res = tools.date_tool.parseFormula("{(21.12.2012_11:12:43 +1Y)}")
if verbose: print(str(res))
self.assertEqual(res[0], 2013)
self.assertEqual(res[5], 43)
res = utils.date_tool.parseFormula("{(21.12.2012 -60M)}")
res = tools.date_tool.parseFormula("{(21.12.2012 -60M)}")
if verbose: print(str(res))
self.assertEqual(res[0], 2007)
self.assertEqual(res[1], 12)
@ -74,28 +73,28 @@ class MyTestCase(unittest.TestCase):
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_parseDate(self, result=None):
def test_11parseDate(self, result=None):
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
res = utils.date_tool.parseDate("21.12.2012")
res = tools.date_tool.parseDate("21.12.2012")
if verbose: print(str(res))
self.assertEqual(res[0], 2012)
self.assertEqual(res[3], 0)
res = utils.date_tool.parseDate("{(21.12.2012 +1Y)}")
res = tools.date_tool.parseDate("{(21.12.2012 +1Y)}")
if verbose: print(str(res))
self.assertEqual(res[0], 2013)
self.assertEqual(res[3], 0)
res = utils.date_tool.parseDate("Wed May 18 23:32:55 2022 +0200")
res = tools.date_tool.parseDate("Wed May 18 23:32:55 2022 +0200")
if verbose: print(str(res))
self.assertEqual(res[0], 2022)
self.assertEqual(res[1], 5)
res = utils.date_tool.parseDate("Mit Dez 18 23:32:55 2021 +0200")
res = tools.date_tool.parseDate("Mit Dez 18 23:32:55 2021 +0200")
if verbose: print(str(res))
self.assertEqual(res[0], 2021)
self.assertEqual(res[1], 12)
res = utils.date_tool.parseDate("Sun Sep 4 15:12:21 2022 +0200")
res = tools.date_tool.parseDate("Sun Sep 4 15:12:21 2022 +0200")
if verbose: print(str(res))
self.assertEqual(res[0], 2022)
self.assertEqual(res[1], 9)

25
test/test_07catalog.py

@ -1,21 +1,23 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import unittest
import os
import inspect
import utils.path_tool
import basic.message
import basic.program
import basic.constants as B
import test.constants
import test.testtools
import utils.path_const as P
import basic.catalog
import objects.catalog
HOME_PATH = test.constants.HOME_PATH
OS_SYSTEM = test.constants.OS_SYSTEM
# here you can select single testfunction for developping the tests
TEST_FUNCTIONS = ["test_01class", "test_02read", "test_03key"]
#TEST_FUNCTIONS = [ "test_03key"]
# TEST_FUNCTIONS = [ "test_02read"]
verbose = False
class MyTestCase(unittest.TestCase):
@ -28,7 +30,7 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
catalog = basic.catalog.Catalog.getInstance()
catalog = objects.catalog.Catalog.getInstance()
self.assertIsNotNone(catalog)
cnttest += 1
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
@ -41,11 +43,16 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
catalog = basic.catalog.Catalog.getInstance()
catalog = objects.catalog.Catalog.getInstance()
self.assertRaises(Exception, catalog.readDomain, ("xxx", job))
cnttest += 1
try:
res = catalog.readDomain("countries", job)
self.assertEqual(isinstance(res, dict), True)
res = catalog.readDomain("programs", job)
self.assertEqual(isinstance(res, dict), True)
except:
print ("except")
cnttest += 1
countries = catalog.getKeys("countries", job)
self.assertEqual(len(countries), 21)
@ -60,7 +67,7 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
catalog = basic.catalog.Catalog.getInstance()
catalog = objects.catalog.Catalog.getInstance()
res = catalog.getValue("countries", "key", job)
self.assertEqual(res, "")
self.assertEqual(job.m.rc, basic.message.RC_ERROR)

46
test/test_10job.py

@ -0,0 +1,46 @@
import unittest
import os
import inspect
import shutil
import tools.path_tool
import basic.program
from basic.componentHandling import ComponentManager
import test.constants
import basic.constants as B
import test.constants as T
import basic.componentHandling
import tools.file_tool
HOME_PATH = test.constants.HOME_PATH
PYTHON_CMD = "python"
TEST_FUNCTIONS = ["test_00init"]
PROGRAM_NAME = "clean_workspace"
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
def test_00init(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
# simple job instantiate - without parameter and only simple messaging
job = basic.program.Job(PROGRAM_NAME)
print(str(job.__dict__))
self.checkSimpleJob(job)
def checkSimpleJob(self, job):
self.assertIn("conf", job.__dict__)
self.assertIn("jobid", job.__dict__)
self.assertIn("program", job.__dict__)
self.assertIn("start", job.__dict__)
self.assertIn("configpath", job.conf)
self.assertIn("paths", job.conf)
if __name__ == '__main__':
unittest.main()

8
test/testtools.py

@ -1,9 +1,9 @@
import basic.program
import basic.constants as B
import basic.component
import utils.data_const as D
import tools.data_const as D
import test.constants as T
import utils.config_tool
import tools.config_tool
DEFAULT_GRAN = "tc"
DEFAULT_APP = "TESTAPP"
@ -89,8 +89,8 @@ def getComp(job, componentName=""):
componentName = DEFAULT_COMP
comp.conf = {}
comp.name = componentName
confs = utils.config_tool.getConfig(job, "comp", componentName)
conns = utils.conn_tool.getConnections(job, componentName)
confs = tools.config_tool.getConfig(job, "comp", componentName)
conns = tools.conn_tool.getConnections(job, componentName)
comp.conf = confs["conf"]
comp.conf[B.SUBJECT_CONN] = conns[0]
return comp

0
tools/__init__.py

349
tools/config_tool.py

@ -0,0 +1,349 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung
# ---------------------------------------------------------------------------------------------------------
import sys
import basic.constants as B
try:
import basic.program
except ImportError:
print("ImportError: " + str(ImportError.with_traceback()))
pass
import basic.componentHandling
import tools.path_tool
import tools.file_tool
import os.path
import basic.constants as B
import tools.data_const as D
import tools.path_const as P
COMP_FILES = [D.DDL_FILENAME]
CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV]
def getExistingPath(job, pathnames):
if isinstance(pathnames, str):
pathnames = [pathnames]
for p in pathnames:
if p[-1:] == ".":
p = p[0:-1]
for format in CONFIG_FORMAT:
pathname = p+"."+format
if os.path.exists(pathname):
return pathname
return None
def getConfigPath(job, modul, name, subname=""):
"""
gets the most specified configuration of different sources
Parameter:
* typ -- (basic, comp, tool)
* name -- the specific class
sources:
* programm <<
* install <<
* environ << basis-conf
* release << basis-conf
* testset << parameter/environ
* testcase << parameter
the parameter-files could be one of these file-types:
* yaml, json, csv
"""
if job is None:
verify = False # job = basic.program.Job.getInstance()
else:
verify = job.getDebugLevel("config_tool")-4
if verify: job.debug(verify, "getConfig " + modul + ", " + name)
if modul == P.KEY_TOOL:
path = getExistingPath(job, os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
P.VAL_CONFIG, P.KEY_TOOL+"_"+name))
if path is not None:
return path
path = getExistingPath(job, os.path.join(job.conf["paths"][P.ATTR_PATH_HOME],
P.VAL_CONFIG, P.KEY_TOOL+"_"+name))
if path is not None:
return path
path = getExistingPath(job, os.path.join(job.conf["paths"][P.ATTR_PATH_PROGRAM],
P.VAL_UTIL, P.VAL_CONFIG, name))
if path is not None:
return path
path = getExistingPath(job, os.path.join(job.conf["paths"][P.ATTR_PATH_ENV],
job.par.environment, P.VAL_CONFIG, P.KEY_TOOL+"_"+ name))
if path is not None:
return path
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
elif modul == P.KEY_COMP:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_HOME],
P.VAL_CONFIG, P.KEY_COMP+"_" + name + "."+format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(name), "CONFIG." + format)
if verify: job.debug(verify, "5 " + pathname)
if os.path.exists(pathname):
return pathname
if verify: job.debug(verify, "6 " + pathname)
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
elif modul in COMP_FILES:
# for example DATASTRUCURE or the table
pathnames = []
pathnames.append(os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(name), modul))
pathnames.append(os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(subname), modul))
pathnames.append(os.path.join(job.conf["paths"][P.ATTR_PATH_PROGRAM], P.VAL_BASIC, modul))
pathnames.append(os.path.join(job.conf["paths"][P.ATTR_PATH_PROGRAM], P.VAL_BASIC, subname))
configpath = getExistingPath(job, pathnames)
if configpath is not None:
return configpath
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(name), modul+"."+format)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(name), subname+"."+format)
if os.path.exists(pathname):
return pathname
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
elif modul == P.KEY_BASIC:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
P.VAL_CONFIG , name + "."+format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTCASE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_TDATA],
name, D.DFILE_TESTCASE_NAME + "."+format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTSUITE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_TDATA],
name, D.DFILE_TESTSUITE_NAME + "." + format)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_CATALOG:
path = getExistingPath(job, os.path.join(job.conf["paths"][P.ATTR_PATH_TDATA],
P.KEY_CATALOG, name))
if path is not None:
return path
path = getExistingPath(job, os.path.join(job.conf["paths"][P.ATTR_PATH_COMPONENTS],
P.KEY_CATALOG, name))
if path is not None:
return path
path = getExistingPath(job, os.path.join(job.conf["paths"][P.ATTR_PATH_PROGRAM],
P.KEY_CATALOG, name))
if path is not None:
return path
raise Exception(P.EXP_CONFIG_MISSING, name)
else:
pathname = tools.path_tool.composePath(job, P.P_TCPARFILE)
if verify: job.debug(verify, "7 " + pathname)
if os.path.exists(pathname):
return pathname
pathname = tools.path_tool.composePath(job, P.P_TSPARFILE)
if verify: job.debug(verify, "8 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_RELEASE],
P.VAL_CONFIG, "basis."+format)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_ENV],
P.VAL_CONFIG, "basis."+format)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
if len(subname) > 1:
pathname = os.path.join(job.conf["paths"][P.ATTR_PATH_HOME],
P.VAL_CONFIG, "basis."+format)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname):
return pathname
raise Exception(P.EXP_CONFIG_MISSING, modul+", "+name)
def getConfValue(attribute, comp):
if attribute == B.ATTR_CONN_DBTYPE:
if not hasAttr(comp.conf[B.SUBJECT_CONN], "dbtype"):
if hasAttr(comp.conf[B.SUBJECT_CONN], "types") and hasAttr(comp.conf[B.SUBJECT_CONN]["types"], "dbtype"):
dbtype = comp.conf[B.SUBJECT_CONN]["types"]["dbtype"]
else:
raise LookupError("dbtype is not set in comp " + comp.name)
else:
dbtype = comp.conf["conn"]["dbtype"]
return ""
return ""
def getAttr(o, name):
if (isinstance(o, dict)):
if (name in o.keys()):
return o[name]
elif (isinstance(o, list)):
pass
elif hasattr(o, name):
return getattr(o, name)
return False
def hasAttr(o, name):
if (isinstance(o, dict)):
if (name in o.keys()):
return True
elif (isinstance(o, list)):
pass
elif hasattr(o, name):
return True
return False
def getConfig(job, modul, name, subname=""):
if job is None:
verify = 24
else:
verify = job.getDebugLevel("config_tool")-4
msg = None
if hasattr(job, "m"): msg = job.m
pathname = getConfigPath(job, modul, name, subname)
confs = {}
job.debug(verify, "getConfig " + pathname)
if len(pathname) < 1:
return confs
doc = tools.file_tool.readFileDict(job, pathname, msg)
if modul == D.DDL_FILENAME:
# in csv the root is the subname
# from the Dict-structure of DDL_FILENAME pick the substructure of the subname
keys = list(doc.keys())
if subname not in keys and len(keys) == 1:
doc0 = doc[keys[0]]
doc = doc0
keys = list(doc.keys())
if subname in keys:
doc0 = doc[subname]
doc = doc0
for i, v in doc.items():
confs[i] = v
return confs
def getAttribute(comp, path, attr, job):
attrList = getAttributeList(comp, path, job)
if attr in attrList:
return attrList[attr]
else:
return ""
def getAttributeList(comp, path, job):
"""
gets a concrete attribute-list for an arteifact-element from the config-attributes from the connection-attributes
https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie
:param comp:
:param path: artifact-type.artifact-name for example: DB.person
:return: list of all attributes for the artifact-element
"""
attrList = {}
a = path.split(".")
artType = a[0]
artName = a[1]
if B.SUBJECT_CONN not in comp.conf:
raise Exception ("Environment is not configured")
if artType in comp.conf[B.SUBJECT_CONN]:
if artName in comp.conf[B.SUBJECT_CONN][artType]:
for attr, val in comp.conf[B.SUBJECT_CONN][artType][artName].items():
if attr not in B.LIST_ATTR[artType]:
continue
attrList[attr] = val
for attr, val in comp.conf[B.SUBJECT_CONN][artType].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
if artType in comp.conf[B.SUBJECT_ARTS]:
if artName in comp.conf[B.SUBJECT_ARTS][artType]:
for attr, val in comp.conf[B.SUBJECT_ARTS][artType][artName].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
for attr, val in comp.conf[B.SUBJECT_ARTS][artType].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
return attrList
def mergeConn(msg, conf, conn):
"""
merges the config-attributes from the connection-attributes
because the connection-attributes has to overwrite the config-attributes if the subject is configured
https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie
:param conf:
:param conn:
:return:
"""
if B.SUBJECT_INST not in conf:
conf[B.SUBJECT_INST] = {}
for a in conn[B.SUBJECT_INST]:
conf[B.SUBJECT_INST][a] = conn[B.SUBJECT_INST][a]
for topic in [B.TOPIC_NODE_DB, B.TOPIC_NODE_CLI, B.TOPIC_NODE_API, B.TOPIC_NODE_FILE]:
if topic not in conf[B.SUBJECT_ARTS]:
continue
if topic == B.TOPIC_NODE_DB:
list = B.LIST_DB_ATTR
if topic == B.TOPIC_NODE_CLI:
list = B.LIST_CLI_ATTR
if topic == B.TOPIC_NODE_API:
list = B.LIST_API_ATTR
if topic == B.TOPIC_NODE_FILE:
list = B.LIST_FILE_ATTR
print(" --- merge-conn " + topic + " " + str(list))
for a in conf[B.SUBJECT_ARTS][topic]:
if topic not in conn:
continue
if a in list:
if a in conn[topic]:
conf[B.SUBJECT_ARTS][topic][a] = conn[topic][a]
else:
for b in conf[B.SUBJECT_ARTS][topic][a]:
print(" --- merge-conn b " + topic + " " + a+" "+b)
if b not in list:
msg.logError("not-topic-attribute in topic-connection: "+topic+", "+b)
continue
if a not in conn[topic]:
continue
if b in conn[topic][a]:
conf[B.SUBJECT_ARTS][topic][a][b] = conn[topic][a][b]
for a in list:
if topic not in conn:
break
if topic not in conn:
continue
if a in conn[topic]:
conf[B.SUBJECT_ARTS][topic][a] = conn[topic][a]
return conf

81
tools/conn_tool.py

@ -0,0 +1,81 @@
# functions about connections to other instances
# -------------------------------------------------------------------
"""
"""
import basic.program
import tools.config_tool
import basic.constants as B
import tools.data_const as D
def getConnection(job, comp, nr):
#job = basic.program.Job.getInstance()
verify = job.getDebugLevel("conn_tool")
conn = {}
if job.conf.confs.get(B.SUBJECT_TOOL).get("connsrc") == D.DFILE_TYPE_YML:
conn = tools.config_tool.getConfig(job, "tool", B.SUBJECT_CONN)
xtypes = None
if ("types" in conn["env"][comp]):
xtypes = conn["env"][comp]["types"]
instnr = "inst" + str(nr)
if conn["env"][comp][instnr]:
if (xtypes is not None):
conn["env"][comp][instnr]["types"] = xtypes
return conn["env"][comp][instnr]
else:
job.m.setFatal("Conn-Tool: Comp not configured " + comp + " " + str(nr))
return None
def getConnections(job, comp):
"""
it reads the connection-attributes for each instances of this component
general attributes are added to the connection-attributes
:param comp:
:return:
"""
#job = basic.program.Job.getInstance()
verify = job.getDebugLevel("conn_tool")
msg = None
if hasattr(comp, "m") and comp.m is not None:
msg = comp.m
elif hasattr(job, "m") and job.m is not None:
msg = job.m
else:
raise Exception("message-object is missing")
msg.debug(verify, "getConnections " + comp)
conn = {}
conns = []
# if a datest-database exists read the connections
conndb = {}
if job.conf.confs.get("db"):
# select
pass
conn = tools.config_tool.getConfig(job, "tool", B.SUBJECT_CONN)
if not comp in conn[B.SUBJECT_ENV]:
job.m.setFatal("Conn-Tool: Comp not configured " + comp)
attr = {}
if B.CONF_NODE_GENERAL in conn[B.SUBJECT_ENV]:
for a in conn[B.SUBJECT_ENV][B.CONF_NODE_GENERAL]:
attr[a] = conn[B.SUBJECT_ENV][B.CONF_NODE_GENERAL]
for a in conn[B.SUBJECT_ENV][comp]:
if "inst" in a and a != B.SUBJECT_INST:
continue
attr[a] = conn["env"][comp][a]
#if ("types" in conn["env"][comp]):
# xtypes = conn["env"][comp]["types"]
for i in range(conn[B.SUBJECT_ENV][comp][B.SUBJECT_INST][B.ATTR_INST_CNT]):
#print("range " + str(i + 1))
instnr = "inst" + str(i + 1)
#if (xtypes is not None):
# conn["env"][comp][instnr]["types"] = xtypes
for a in attr:
if a in conn["env"][comp][instnr]:
continue # dont overwrite an instance-specific value
conn["env"][comp][instnr][a] = attr[a]
conns.append(conn["env"][comp][instnr])
return conns

116
tools/data_const.py

@ -0,0 +1,116 @@
#!/usr/bin/python
"""
constants for used for api-functions
"""
import basic.constants as B
DDL_FILENAME = "DATASTRUCTURE"
DATA_NODE_TYPE = "type"
TYPE_STRING = "string"
TYPE_STR = "str"
TYPE_TEXT = "text"
TYPE_INT = "int"
TYPE_FLOAT = "float"
TYPE_DOUBLE = "double"
TYPE_DATE = "date"
TYPE_TIME = "time"
TYPE_PK = "pk"
# fields in DDL
DDL_FNULLABLE = "nullable"
DDL_FNAME = "field"
DDL_ACCEPTANCE = "acceptance"
DDL_KEY = "key"
DDL_TYPE = "type"
DDL_INDEX = "index"
DFILE_TYPE_YML = "yml"
DFILE_TYPE_JSON = "json"
DFILE_TYPE_CSV = "csv"
DFILE_TYPE_XML = "xml"
DFILE_TESTCASE_NAME = "testspec"
DFILE_TESTSUITE_NAME = "testsuite"
DFILE_TABLE_PREFIX = "table_"
LIST_DFNAME_ATTR = [DFILE_TESTCASE_NAME, DFILE_TESTSUITE_NAME, DFILE_TABLE_PREFIX]
LIST_DFNAME_CONST = ["DFILE_TESTCASE_NAME", "DFILE_TESTSUITE_NAME", "DFILE_TABLE_PREFIX"]
DATA_SRC_DIR = "dir"
DATA_SRC_CSV = "csv"
DATA_ATTR_COUNT = "_count"
""" statistical information of data-count """
DATA_ATTR_DATE = "_date"
""" reference-date for computing the actual date in relation to specification or expectation """
DATA_ATTR_COMP = "_comp"
""" reference to using componente with their object """
DATA_ATTR_CHAR = "_char"
""" character of the data in order to delete it ión initialization """
DATA_ATTR_KEY = "_key"
""" key for a data-specification of a catalog-list - default: the first field is the key """
DATA_ATTR_ALIAS = "_alias"
DATA_ATTR_IDS = "_ids"
DATA_ATTR_REF = "_ref"
LIST_DATA_ATTR = [DATA_ATTR_COUNT, DATA_ATTR_DATE, DATA_ATTR_CHAR, DATA_ATTR_COMP,
DATA_ATTR_REF, DATA_ATTR_IDS, DATA_ATTR_ALIAS, DATA_ATTR_KEY]
LIST_ATTR_CONST = ["DATA_ATTR_COUNT", "DATA_ATTR_DATE", "DATA_ATTR_CHAR", "DATA_ATTR_COMP", "DATA_ATTR_ALIAS", "DATA_ATTR_KEY"]
HEAD_ATTR_DESCR = "decription"
HEAD_ATTR_TARGET = "target"
HEAD_ATTR_USECASE = "usecase"
HEAD_ATTR_UCID = "usecase-id"
HEAD_ATTR_STORY = "story"
HEAD_ATTR_STORYID = "storyid-id"
HEAD_ATTR_APPS = B.SUBJECT_APPS
HEAD_ATTR_DEPR = "deprecated"
LIST_HEAD_ATTR = [HEAD_ATTR_DESCR, HEAD_ATTR_TARGET, HEAD_ATTR_USECASE, HEAD_ATTR_UCID,
HEAD_ATTR_STORY, HEAD_ATTR_STORYID, HEAD_ATTR_APPS, HEAD_ATTR_DEPR]
LIST_HEAD_CONST = ["HEAD_ATTR_DESCR", "HEAD_ATTR_TARGET", "HEAD_ATTR_USECASE", "HEAD_ATTR_UCID",
"HEAD_ATTR_STORY", "HEAD_ATTR_STORYID", "HEAD_ATTR_APPS", "HEAD_ATTR_DEPR"]
CSV_HEADER_START = ["node", "table", "tabelle"]
CSV_DELIMITER = ";"
INTERNAL_DELIMITER = "||"
"""
internal structure of testdata
"""
CSV_SPECTYPE_DATA = "data"
CSV_SPECTYPE_TREE = "tree"
CSV_SPECTYPE_KEYS = "keys"
CSV_SPECTYPE_CONF = "conf"
CSV_NODETYPE_KEYS = "_keys"
CSV_BLOCK_HEAD = "_head"
CSV_BLOCK_OPTION = B.DATA_NODE_OPTION
CSV_BLOCK_STEP = B.DATA_NODE_STEPS
CSV_BLOCK_TABLES = B.DATA_NODE_TABLES
CSV_BLOCK_IMPORT = "_import"
LIST_CSV_BLOCKS = [CSV_BLOCK_HEAD, CSV_BLOCK_OPTION, CSV_BLOCK_STEP, CSV_BLOCK_TABLES, CSV_BLOCK_IMPORT]
LIST_BLOCK_CONST = ["CSV_BLOCK_HEAD", "CSV_BLOCK_OPTION", "CSV_BLOCK_STEP", "CSV_BLOCK_TABLES", "CSV_BLOCK_IMPORT"]
STEP_COMP_I = 1
STEP_EXECNR_I = 2
STEP_REFNR_I = 3
STEP_VARIANT_I = 4
STEP_ARGS_I = 5
STEP_LIST_I = 5
STEP_ATTR_COMP = "component"
STEP_ATTR_EXECNR = "exec-step"
STEP_ATTR_REFNR = "reference-nr"
STEP_ATTR_ARGS = "arguments"
LIST_STEP_ATTR = [STEP_ATTR_COMP, STEP_ATTR_EXECNR, STEP_ATTR_REFNR, STEP_ATTR_ARGS]
LIST_STEP_CONST = ["STEP_ATTR_COMP", "STEP_ATTR_EXECNR", "STEP_ATTR_REFNR", "STEP_ATTR_ARGS"]
EXCP_MALFORMAT = "malformated line: "
ATTR_SRC_TYPE = "tdtyp"
ATTR_SRC_DATA = "tdsrc"
ATTR_SRC_NAME = "tdname"
DEFAULT_DB_PARTITION = "n"
""" attribute if table is partitioned - partitions are parametrized """
DEFAULT_DB_CONN_JAR = "n"
""" attribute for connection-jar-file instead of connection by ip, port """

175
tools/date_tool.py

@ -0,0 +1,175 @@
# functions related to Date-fields
# -----------------------------------------------------
"""
additionally functions for calculating date with formulas like [DATE+2M] and for comparison of date related on two reference-dates
"""
import datetime
import re
import tools.data_const as D
F_DIR = "%Y-%m-%d_%H-%M-%S"
F_DB_DATE = "%Y-%m-%d"
F_DB_TIME = "%Y-%m-%d %H:%M:%S"
F_DE = "%d.%m.%Y"
F_N8 = "%Y%m%d"
F_LOG = "%Y%m%d_%H%M%S"
F_DE_TSTAMP = "%d.%m.%Y %H:%M:%S"
MONTH_EN = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]
MONTH_DE = ["jan", "feb", "mar", "apr", "mai", "jun", "jul", "aug", "sep", "okt", "nov", "dez"]
F_TIME_DEFAULT = F_DIR
def getActdate(format):
return getFormatdate(datetime.datetime.now(), format)
def getFormatdate(date, format):
""" it return the date as string in the format """
return date.strftime(format)
def getFormatDatetupel(dtupel, format):
""" it return the date as string in the format """
if format == F_N8:
return f'{dtupel[0]:04}'+f'{dtupel[1]:02}'+f'{dtupel[2]:02}'
return getFormatdate(datetime.datetime(dtupel[0], dtupel[1], dtupel[2],
dtupel[3], dtupel[4], dtupel[5]) ,format)
def formatParsedDate(instring, format):
dtupel = parseDate(instring)
#print ("---------------"+str(dtupel))
return getFormatDatetupel(dtupel, format)
def parseFormula(instring):
"""
the function parses the string as a formula. In the formula the input-date - actdate or explicite date -
will be added resp. subtracted with years, months or dates which are specified in the formula.
The structure of the formula: DATE +/- mY +/-nM +/-qD
:param instring:
:return:
"""
instring = instring.upper()
if instring[2:6] == "DATE":
refdate = datetime.datetime.today()
formula = instring[7:-2].upper()
else:
dstring = instring[2:instring.find(" ")]
res = parseDate(dstring)
refdate = datetime.datetime(res[0], res[1], res[2], res[3], res[4], res[5])
formula = instring[2+len(dstring):-2]
formula = re.sub(r' ', '', formula)
year = refdate.year
mon = refdate.month
day = refdate.day
hour = refdate.hour
min = refdate.minute
sec = refdate.second
if re.match(r"[-+]\d+[JYMDT]", formula):
ress = re.compile(r"([-+])(\d+)([JYMDT])")
for res in ress.finditer(formula):
summand = int(res.group(2))
if res.group(1) == "-":
summand = summand * (-1)
if res.group(3) in "JY":
year = year + summand
if res.group(3) in "M":
mon = mon + summand
while mon <= 0:
mon = mon + 12
year = year - 1
while mon > 12:
mon = mon - 12
year = year + 1
if res.group(3) in "DT":
refdate = datetime.datetime(year, mon, day, hour, min, sec)
refdate = refdate + datetime.timedelta(days=summand)
year = refdate.year
mon = refdate.month
day = refdate.day
hour = refdate.hour
min = refdate.minute
sec = refdate.second
return (year, mon, day, hour, min, sec)
else:
print("re matcht nicht")
return (year, mon, day, hour, min, sec)
def getMonthInt(instring):
i = 0
j = 0
for l in [MONTH_EN, MONTH_DE]:
i = 0
for m in l:
i += 1
if instring.lower() == m:
j = i
break
if j > 0:
break
return j
def parseDate(instring):
"""
the function parses the string as a date or timestamp which is formed in one of the typical formates
:param the string to be parse:
:return timestamp as tupel (y, m, d, H, M ,S):
"""
year = 0
mon = 0
day = 0
hour = 0
min = 0
sec = 0
#print(instring)
if instring[0:2] == "{(" and instring[-2:] == ")}":
return parseFormula(instring)
if re.match(r"\d{8}_\d{6}", instring):
year = int(instring[0:4])
mon = int(instring[4:6])
day = int(instring[6:8])
hour = int(instring[9:11])
min = int(instring[11:13])
sec = int(instring[13:])
return (year, mon, day, hour, min, sec)
if len(instring) > 8:
for d in ["_", " "]:
if d in instring and instring.find(d) > 8:
dstring = instring[0:instring.find(d)]
tstring = instring[instring.find(d)+1:]
dres = parseDate(dstring)
tres = parseDate(tstring)
return (dres[0], dres[1], dres[2], tres[3], tres[4], tres[5])
if re.match(r"\d{4}[-./]\d{2}[-./]\d{2}", instring):
res = re.match(r"(\d{4})[-./](\d{2})[-./](\d{2})", instring)
year = int(res.group(1))
mon = int(res.group(2))
day = int(res.group(3))
return (year, mon, day, hour, min, sec)
if re.match(r"\d{1,2}[-./]\d{1,2}[-./]\d{4}", instring):
res = re.match(r"(\d{1,2})[-./](\d{1,2})[-./](\d{4})", instring)
year = int(res.group(3))
mon = int(res.group(2))
day = int(res.group(1))
return (year, mon, day, hour, min, sec)
if re.match(r"\w{3} \w{3}\s+\d{1,2} \d{1,2}[:]\d{1,2}[:]\d{2} \d{4}", instring.strip()):
res = re.search(r"\w{3} (\w{3})\s+(\d{1,2}) (\d{1,2})[:](\d{1,2})[:](\d{2}) (\d{4})", instring.strip())
month = res.group(1)
mon = getMonthInt(month)
day = int(res.group(2))
hour = int(res.group(3))
min = int(res.group(4))
sec = int(res.group(5))
year = int(res.group(6))
return (year, mon, day, hour, min, sec)
if re.match(r"\d{8}", instring):
year = instring[0:4]
mon = instring[4:6]
day = instring[6:8]
return (year, mon, day, hour, min, sec)
if re.match(r"\d{2}[-:]\d{2}[-:/]\d{2}", instring):
res = re.match(r"(\d{2})[-:/](\d{2})[-:/](\d{2})", instring)
hour = int(res.group(1))
min = int(res.group(2))
sec = int(res.group(3))
return (year, mon, day, hour, min, sec)
return (year, mon, day, hour, min, sec)

113
tools/file_abstract.py

@ -0,0 +1,113 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os
import basic.program
import tools.config_tool
import basic.constants as B
import basic.toolHandling
import tools.file_tool
import tools.path_tool
class FileFcts():
"""
this is an abstract class
"""
def __init__(self):
pass
def setComp(self, job, comp=None):
self.job = job
self.comp = comp
def getMsg(self):
if self.comp is not None:
return self.comp.m
if self.job is not None:
return self.job.m
return None
def loadFile(self, path):
"""
this function parses the text and translates it to dict
:param text:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def file2dict(self):
pass
def reset_TData(self, job):
pass
# Funktionen
#=== init_testcase ===
def removeFiles(self):
tools.file_tool.removeFiles(self.comp.m, "envpath", "pattern", self.comp.conf["conn"])
def copyFiles(self):
fileList = []
srcpath = ""
envpath = ""
pattern = ""
tools.file_tool.copyFiles(self.job, fileList, srcpath, envpath, pattern)
def readEnvFiles(self, job):
envpath = ""
pattern = ""
fileList = tools.file_tool.getFiles(self.comp.m, job, envpath, pattern, self.comp.conf["conn"])
# === execute_testcase ===
def create_request(self, job, tdata, step):
mapping = ""
schema = ""
archivpath = ""
filename = step.args["filename"]
txt = ""
for o in self.comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_FILE]:
if o["name"] != filename:
continue
mapping = o["mapping"]
schema = o["schema"]
archivpath = os.path.join(tools.path_tool.composePattern(job, "{tcresult}/request", self.comp), filename) # ergebnisse/comp/request )
#txt = self.createDict()
tools.file_tool.writeFileText(self.comp.m, job, archivpath, txt)
def send_request(self, job, step):
archivpath = ""
filename = step.args["filename"]
technique = step.args["technique"]
archivpath = os.path.join(tools.path_tool.composePattern(job, "{tcresult}/request", self.comp), filename)
if technique == "cli":
for o in self.comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_FILE]:
if o["name"] != filename:
continue
envpath = o["envpath"]
envpath = tools.path_tool.composePattern(job, envpath, self.comp)
fct = basic.toolHandling.getCliTool(job, self.comp)
fct.copy(self.job, archivpath, envpath)
elif technique == "api":
txt = tools.file_tool.readFileText(job, archivpath, self.comp.m)
fct = basic.toolHandling.getApiTool(job, self.comp)
response = fct.send(self.job, self.comp, txt)
archivpath = os.path.join(tools.path_tool.composePattern(job, "{tcresult}/response", self.comp), filename)
"""
get_response:
- channel (sync/async)
... implement
- archivpath ( ergebnisse/comp/response )
- envpath ( ./log) / envconn ( = in Request empfangen )
=== collect_testcase ===
- envpath
- pattern
> readfiles
"""

295
tools/file_tool.py

@ -0,0 +1,295 @@
# Funktionen zum Dateizugriff mit Suchen, Lesen, Schreiben
# ------------------------------------------------------------
"""
"""
import codecs
import json
import os
import os.path
import re
import time
import xmltodict
import yaml
import platform
import basic.toolHandling
import basic.program
import tools.data_const as D
#import tools.tdata_tool
import tools.date_tool
def getDump(obj):
result = ""
print(str(type(obj)))
result = vars(obj)
return str(result)
# if type(obj) == "__dict__"
def getFiles(msg, job, path, pattern, conn):
"""
search filenames in the directory - if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename or pattern
:param conn:
:return: Array with filenames
"""
if conn is not None:
return getRemoteFiles(msg, path, pattern, conn)
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
out = []
msg.debug(verify, "getFiles " + path + " , " + pattern)
if not os.path.exists(path):
return out
for f in os.listdir(path):
msg.debug(verify, "getFiles " + f)
if re.search(pattern, f):
msg.debug(verify, "match " + f)
out.append(f)
return out
def removeFiles(msg, path, pattern, conn):
"""
search filenames in the directory and removes it
- if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename as Pattern
:param conn:
:return: Array filenames
"""
pass
def copyFiles(job, fileList, source, target, comp):
"""
copies files from source to target
:param job:
:param fileList:
:param source:
:param target:
:param comp:
:return:
"""
pass
def getRemoteFiles(msg, path, pattern, conn):
"""
search filenames in the directory - if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename as Pattern
:param conn:
:return: Array filenames
"""
pass
def getFilesRec(msg, job, path, pattern):
"""
Sucht Dateien im Verzeichnis rekursiv
:param msg: -- msg-Objekt
:param path: -- Pfad - String
:param pattern: -- Dateiname als Pattern
:return: Array mit gefundenen Dateien, absoluter Pfad
"""
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
out = []
msg.debug(verify, "getFilesRec " + path + " , " + pattern)
for (r, dirs, files) in os.walk(path):
for f in files:
msg.debug(verify, "getFilesRec " + f)
if re.search(pattern, f):
msg.debug(verify, "match " + f)
out.append(os.path.join(r, f))
return out
def getTree(msg, job, pfad):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
msg.debug(verify, "getTree " + pfad)
tree = {}
files = []
for f in os.listdir(pfad):
if os.path.isDir(os.path.join(pfad, f)):
tree[f] = getTree(msg, job, os.path.join(pfad, f))
elif os.path.isFile(os.path.join(pfad, f)):
files.append(f)
tree["_files_"] = files
return tree
def mkPaths(job, path, msg):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
# modus = job.conf.paths["mode"]
dirname = os.path.dirname(path)
if os.path.exists(dirname):
return
os.makedirs(dirname, exist_ok=True)
def getFileEncoding(msg, job, path):
print("--- getFileEncoding " + path)
encodings = ['utf-8', 'iso-8859-1'] # add more
for e in encodings:
print(e)
try:
fh = codecs.open(path, 'r', encoding=e)
fh.readlines()
fh.seek(0)
except UnicodeDecodeError:
print('got unicode error with %s , trying different encoding' % e)
except:
print("except")
else:
print('opening the file with encoding: %s ' % e)
return e
return detectFileEncode(job, path, msg)
def detectFileEncode(job, path, msg): # return ""
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
print(path)
cntIso = 0
cntUtf = 0
j = 0
CHAR_ISO = [196, 228, 214, 246, 220, 252, 191]
with open(path, 'rb') as file:
byte = file.read(1)
while (byte):
i = int.from_bytes(byte, "little")
# byte = file.read(1)
if (i in CHAR_ISO):
cntIso += 1
if (i == 160):
pass
elif (i > 127):
cntUtf += 1
j += 1
l = i
byte = file.read(1)
file.close()
if (cntIso > cntUtf):
return 'iso-8859-1'
return 'utf-8'
def readFileLines(job, path, msg):
lines = readFileText(job, path, msg)
if isinstance(lines, (str)):
return lines.splitlines()
return []
def readFileText(job, path, msg):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
if not os.path.exists(path):
return ""
enc = detectFileEncode(job, path, msg)
with open(path, 'r', encoding=enc) as file:
text = file.read()
file.close()
return text
def getModTime(job, filepath):
out = ""
mtime = os.path.getmtime(filepath)
out = tools.date_tool.formatParsedDate(time.ctime(mtime), tools.date_tool.F_TIME_DEFAULT)
return out
def readFileDict(job, path, msg):
"""
reads and gets general a dict from any kind of filetyp
:param path: with extension of filetype
:param msg: optionally
:return:
"""
# 20220329 generalize
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
doc = {}
if not os.path.exists(path):
return doc
enc = detectFileEncode(job, path, msg)
if D.DFILE_TYPE_YML in path[-4:]:
with open(path, 'r', encoding=enc) as file:
doc = yaml.full_load(file)
file.close()
elif D.DFILE_TYPE_JSON in path[-5:]:
with open(path, 'r', encoding=enc) as file:
doc = json.load(file)
file.close()
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'r', encoding=enc) as file:
res = xmltodict.parse(file.read())
# doc = dict(res)
doc = castOrderedDict(res)
file.close()
elif D.DFILE_TYPE_CSV in path[-5:]:
ffcts = basic.toolHandling.getFileTool(job, None, D.DFILE_TYPE_CSV)
#doc = tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF)
doc = ffcts.loadFile(path)
# tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF)
return doc
def castOrderedDict(res, job=None, key=""):
if isinstance(res, dict):
doc = dict(res)
for x in doc:
doc[x] = castOrderedDict(doc[x], job, x)
elif isinstance(res, list):
sublist = []
for i in range(0, len(res)):
sublist.append(castOrderedDict(res[i], job, ""))
doc = sublist
else:
doc = res
return doc
def writeFileText(msg, job, path, text, enc="utf-8"):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
mkPaths(job, path, msg)
with open(path, 'w', encoding=enc) as file:
file.write(text)
file.close()
def writeFileDict(msg, job, path, dict, enc="utf-8"):
# job = basic.program.Job.getInstance()
mkPaths(job, path, msg)
if D.DFILE_TYPE_YML in path[-5:]:
with open(path, 'w', encoding=enc) as file:
yaml.dump(dict, file)
file.close()
elif D.DFILE_TYPE_JSON in path[-5:]:
with open(path, 'w', encoding=enc) as file:
doc = json.dumps(dict, indent=4)
file.write(doc)
file.close()
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'w', encoding=enc) as file:
text = xmltodict.unparse(dict, pretty=True)
if "<?xml version=" not in text:
text = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + text
file.write(text)
file.close()

246
tools/filecsv_fcts.py

@ -0,0 +1,246 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import re
import basic.program
import tools.file_abstract
import basic.constants as B
import tools.data_const as D
import tools.file_tool
class FileFcts(tools.file_abstract.FileFcts):
def __init__(self):
pass
def loadFile(self, path):
"""
this function parses the text and translates it to dict
:param text:
:return:
"""
lines = tools.file_tool.readFileLines(self.job, path, self.getMsg())
return self.parseCsv(self.getMsg(), self.job, lines)
def parseCsv(self, msg, job, lines, ttype=""):
"""
:param msg:
:param lines:
:param type:
:param job:
:return:
"""
tdata = {}
status = "start"
verbose = False
tableAttr = {} # table
tableDict = {} # table
for l in lines:
if verbose: print("lines "+l)
fields = splitFields(l, D.CSV_DELIMITER, job)
# check empty line, comment
if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1):
status = "start"
continue
if (fields[0][0:1] == "#"):
continue
a = fields[0].lower().split(":")
# keywords option, step, table
if verbose: print(str(a)+" -- "+str(fields))
tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job)
if a[0].lower() in D.LIST_DATA_ATTR:
status = "TABLE_ALIAS"
if a[0].lower() == D.DATA_ATTR_KEY:
ttype = D.CSV_SPECTYPE_KEYS
continue
if (a[0].lower() in [D.CSV_BLOCK_HEAD]):
if verbose: print("head "+l)
setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job)
status = "start"
continue
elif (a[0].lower() == D.CSV_BLOCK_OPTION):
if verbose: print("option " + l)
setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job)
status = "start"
continue
elif (a[0].lower() == D.CSV_BLOCK_STEP):
if verbose: print("step "+l)
step = basic.step.parseStep(job, fields)
if D.CSV_BLOCK_STEP not in tdata:
tdata[D.CSV_BLOCK_STEP] = []
tdata[D.CSV_BLOCK_STEP].append(step)
status = "start"
continue
elif (a[0].lower() == D.CSV_BLOCK_IMPORT):
if verbose: print("includes " + l)
if D.CSV_BLOCK_IMPORT not in tdata:
tdata[D.CSV_BLOCK_IMPORT] = []
tdata[D.CSV_BLOCK_IMPORT].append(fields[1])
status = "start"
continue
elif (a[0].lower() == D.CSV_BLOCK_TABLES) or (a[0].lower() in D.CSV_HEADER_START):
if verbose: print("tables "+l)
h = a
h[0] = B.DATA_NODE_TABLES
if ttype == D.CSV_SPECTYPE_CONF:
del h[0]
tableDict = getTdataContent(msg, tdata, h)
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
elif (status == D.CSV_SPECTYPE_DATA):
tableDict = getTdataContent(msg, tdata, h)
if verbose: print("setTableData "+str(h)+" "+str(tableDict))
setTableData(tableDict, fields, ttype, job)
elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata:
alias = tdata[D.DATA_ATTR_ALIAS]
b = alias.split(":")
h = [B.DATA_NODE_TABLES] + b
tableDict = getTdataContent(msg, tdata, h)
tableDict[D.DATA_ATTR_ALIAS] = alias
fields = [alias] + fields
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
if ttype == D.CSV_SPECTYPE_CONF:
header = []
for k in tdata:
if k in D.LIST_DATA_ATTR:
continue
if B.DATA_NODE_DATA in tdata[k]:
tdata[k].pop(B.DATA_NODE_DATA)
for f in tdata[k]:
if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR:
continue
header.append(f)
tdata[k][B.DATA_NODE_HEADER] = header
header = []
if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]:
for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]:
if k in tdata[B.DATA_NODE_TABLES]:
if verbose: print("Error")
else:
tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k]
tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES)
return tdata
def splitFields(line, delimiter, job):
out = []
fields = line.split(delimiter)
for i in range(0, len(fields)):
if fields[i][0:1] == "#":
break
if re.match(r"^\"(.*)\"$", fields[i]):
fields[i] = fields[i][1:-1]
out.append(fields[i])
return out
def setTableAttribute(tableAttr, key, val, job):
for attr in D.LIST_DATA_ATTR:
if (key.lower() == attr):
tableAttr[attr] = val.strip()
tableAttr["_hit"] = True
return tableAttr
tableAttr["_hit"] = False
return tableAttr
def setTdataLine(tdata, fields, block, job):
"""
sets field(s) into tdata as a key-value-pair
additional fields will be concatenate to a intern separated list
:param tdata:
:param fields:
:param block:
:param job:
:return:
"""
a = fields[0].lower().split(":")
a[0] = block # normalized key
val = ""
for i in range(1, len(fields)-1):
val += D.INTERNAL_DELIMITER+fields[i]
if len(val) > len(D.INTERNAL_DELIMITER):
val = val[len(D.INTERNAL_DELIMITER):]
setTdataContent(job.m, tdata, val, a)
return tdata
def setTdataContent(msg, data, tabledata, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
data[path[0]][path[1]] = tabledata
elif len(path) == 3:
data[path[0]][path[1]][path[2]] = tabledata
elif len(path) == 4:
data[path[0]][path[1]][path[2]][path[3]] = tabledata
def setTdataStructure(msg, data, path):
if len(path) >= 1 and path[0] not in data:
data[path[0]] = {}
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
return data
def getTdataContent(msg, data, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
return data[path[0]][path[1]]
elif len(path) == 3:
return data[path[0]][path[1]][path[2]]
elif len(path) == 4:
return data[path[0]][path[1]][path[2]][path[3]]
elif len(path) == 1:
return data[path[0]]
else:
return None
def setTableHeader(tableDict, tableAttr, fields, ttype, job):
header = []
for i in range(1, len(fields)):
header.append(fields[i].strip())
tableDict[B.DATA_NODE_HEADER] = header
for attr in tableAttr:
tableDict[attr] = tableAttr[attr]
# preparate the sub-structure for row-data
if ttype == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS] = {}
tableDict[D.DATA_ATTR_KEY] = 1
if D.DATA_ATTR_KEY in tableAttr:
tableDict[D.DATA_ATTR_KEY] = header.index(tableAttr[D.DATA_ATTR_KEY]) + 1
else:
tableDict[B.DATA_NODE_DATA] = []
return tableDict
def setTableData(tableDict, fields, ttype, job):
row = {}
if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict:
fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields
i = 1
for f in tableDict[B.DATA_NODE_HEADER]:
row[f] = fields[i].strip()
i += 1
if ttype == D.CSV_SPECTYPE_DATA:
if B.ATTR_DATA_COMP in tableDict:
tcomps = tableDict[B.ATTR_DATA_COMP]
else:
tcomps = {}
row[B.ATTR_DATA_COMP] = {}
for c in fields[0].split(","):
a = c.split(":")
tcomps[a[0]] = a[1]
row[B.ATTR_DATA_COMP][a[0]] = a[1].strip()
tableDict[B.DATA_NODE_DATA].append(row)
tableDict[B.ATTR_DATA_COMP] = tcomps
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS][fields[tableDict[D.DATA_ATTR_KEY]].strip()] = row
elif ttype == D.CSV_SPECTYPE_CONF:
tableDict[fields[1]] = row
return tableDict

102
tools/job_tool.py

@ -0,0 +1,102 @@
# GrundFunktionen zur Ablaufsteuerung
#
# --------------------------------------------------------
"""
1. Programm -- implementiert in Main-Klasse
2. Anwndung -- steuert zu pruefende System [ in basis_Config ]
3. application -- steuert zu pruefende Maschine [ in dir/applicationen ]
4. release -- steuert zu prufendes Release [ aus dir/release kann spez. release_Config geladen werden, dir/lauf/release ]
5. ~Verz -- Dokumentationsverzeichnis zu Testlauf/Testfall/Soll-Branch
6. zyklus -- optional unterscheidet echte und entwicklungsLaeufe
7. Programmspezifische Parameter
8. loglevel -- steuert Protokollierung; default debug (fatal/error/warn/msg/info/debug1/debug2/trace1/trace2)
10. Laufart -- steuert die Verarbeitung; default echt
- echt-auto Lauf aus Automatisierung (1-7)
- test Lauf ohne Ausfuehrungen am Testsystem, wohl aber in Testverzeichnissen
- echt-spez Wiederholung einer spezifischen Funktion (1-13)
- unit Ausfuehrung der Unittests
11. Modul -- schraenkt Verarbeitung auf parametriserte componenten ein
12. Funktion -- schraenkt Verarbeitung auf parametriserte Funktionen ein
13. Tool -- schraenkt Protokollierung/Verarbeitung auf parametriserte Tools ein
"""
import basic.program
import basic.constants as B
try:
import collect_testcase
import compare_testcase
import execute_testcase
import finish_testsuite
import init_testcase
import init_testsuite
import test_executer
except Exception as e:
pass
import tools.path_tool
import tools.file_tool
import components.tools.job_tool
def hasModul(komp):
#job = Job.getInstance()
return False
def hasFunction(fct):
#job = Job.getInstance()
return False
def hasTool(tool):
#job = Job.getInstance()
return False
def createJob(parentJob, jobargs):
job = basic.program.Job("temp") # meaning temp
job.par.setParameterArgs(job, jobargs)
job.startJob()
return
def startJobProcesses(job):
""" function to open processes like db-connection """
components.tools.job_tool.startJobProcesses(job)
pass
def stopJobProcesses(job):
""" function to close processes like db-connection """
components.tools.job_tool.stopJobProcesses(job)
pass
def startProcess(job, process):
print(str(process))
path = tools.path_tool.getActualJsonPath(job)
print("------- "+path)
tools.file_tool.writeFileDict(job.m, job, path, process)
jobargs = {}
jobargs[B.PAR_APP] = process["app"]
jobargs[B.PAR_ENV] = process["env"]
if B.PAR_STEP in process:
jobargs[B.PAR_STEP] = process[B.PAR_STEP]
if B.PAR_TCDIR in process:
jobargs[B.PAR_TCDIR] = process[B.PAR_TCDIR]
jobargs[B.PAR_TESTCASE] = process["entity"]
elif B.PAR_TSDIR in process:
jobargs[B.PAR_TSDIR] = process[B.PAR_TSDIR]
jobargs[B.PAR_TESTSUITE] = process["entity"]
print("process-programm "+process["program"])
myjob = basic.program.Job(process["program"], jobargs)
myjob.startJob()
if process["program"] == "init_testcase":
init_testcase.startPyJob(myjob)
elif process["program"] == "execute_testcase":
execute_testcase.startPyJob(myjob)
elif process["program"] == "collect_testcase":
collect_testcase.startPyJob(myjob)
elif process["program"] == "compare_testcase":
compare_testcase.startPyJob(myjob)
elif process["program"] == "init_testsuite":
init_testsuite.startPyJob(myjob)
elif process["program"] == "execute_testsuite":
print("execute_testsuite.startPyJob(myjob) not implemented")
elif process["program"] == "collect_testsuite":
print("collect_testsuite.startPyJob(myjob) not implemented")
elif process["program"] == "finish_testsuite":
finish_testsuite.startPyJob(myjob)
elif process["program"] == "test_executer":
test_executer.startPyJob(myjob)

115
tools/path_const.py

@ -0,0 +1,115 @@
import basic.constants as B
# -------------------------------------------------------------
# values and keywords
KEY_PRECOND = "precond"
KEY_POSTCOND = "postcond"
KEY_RESULT = "result"
KEY_ORIGIN = "origin"
KEY_PARTS = "parts"
KEY_SUMFILE = "sumfile"
KEY_BACKUP = "backup"
KEY_REFFILE = "reffile"
KEY_TESTCASE = "tc"
KEY_TESTSUITE = "ts"
KEY_CATALOG = "catalog"
KEY_DEBUGNAME = "debugname"
KEY_LOGNAME = "logname"
KEY_BASIC = "basic"
""" keyword for basic config in components """
KEY_COMP = "comp"
""" keyword for individual component """
KEY_TOOL = "tool"
""" keyword for technical tools """
VAL_UTIL = "utils"
""" subdir for any technical tools """
VAL_CONFIG = "config"
""" subdir for any place of config-files """
VAL_COMPS = "components"
""" subdir for the plugin components """
VAL_BASIC = "basic"
""" subdir for the basic job-framework """
VAL_BASE_DATA = "data"
""" subdir for the basis data-folder """
VAL_TDATA = "testdata"
""" subdir for the basis data-folder """
# -------------------------------------------------------------
# parameter with arguments
PAR_APP = "job.par." + B.PAR_APP
PAR_ENV = "job.par." + B.PAR_ENV
PAR_REL = "job.par." + B.PAR_REL
PAR_TSDIR = "job.par." + B.PAR_TSDIR
PAR_TCDIR = "job.par." + B.PAR_TCDIR
PAR_XPDIR = "job.par." + B.PAR_XPDIR
PAR_TDTYP = "job.par." + B.PAR_TDTYP
PAR_TDSRC = "job.par." + B.PAR_TDSRC
PAR_TDNAME = "job.par." + B.PAR_TDNAME
PAR_LOG = "job.par." + B.PAR_LOG
PAR_MODUS = "job.par." + B.PAR_MODUS
PAR_COMP = "job.par." + B.PAR_COMP
PAR_FCT = "job.par." + B.PAR_FCT
PAR_TOOL = "job.par." + B.PAR_TOOL
PAR_STEP = "job.par." + B.PAR_STEP
PAR_DESCRIPT = "job.par." + B.PAR_DESCRIPT
PAR_TESTCASE = "job.par." + B.PAR_TESTCASE
PAR_TESTCASES = "job.par." + B.PAR_TESTCASES
PAR_TESTSUITE = "job.par." + B.PAR_TESTSUITE
PAR_TCTIME = "job.par." + B.PAR_TCTIME
PAR_TSTIME = "job.par." + B.PAR_TSTIME
PAR_TESTINSTANCES = "job.par." + B.PAR_TESTINSTANCES
# -------------------------------------------------------------
# attributes
ATTR_PATH_MODE = "mode"
""" This constant defines the home-folder in filesystem of test """
ATTR_PATH_HOME = "home"
""" This constant defines the home-folder in testing-filesystem """
ATTR_PATH_DEBUG = "debugs"
""" This constant defines the debug-folder in testing-filesystem """
ATTR_PATH_ARCHIV = "archiv"
""" This constant defines the folder in testing-filesystem for results and log of execution """
ATTR_PATH_PROGRAM = "program"
""" This constant defines the program-folder in the workspace """
ATTR_PATH_COMPONENTS = "components"
""" This constant defines the program-folder in the workspace """
ATTR_PATH_ENV = "environment"
""" This constant defines the folder in testing-filesystem, used for configs related to environments """
ATTR_PATH_RELEASE = "release"
""" This constant defines the folder in testing-filesystem, used for configs related to release """
ATTR_PATH_TDATA = "testdata"
""" This constant defines the folder in testing-filesystem with the testcase-specifications """
ATTR_PATH_TEMP = "temp"
""" This constant defines the debug-folder in testing-filesystem """
ATTR_PATH_PATTN = "pattern"
""" This constant defines the debug-folder in testing-filesystem """
# -------------------------------------------------------------
# structure - nodes
P_DEBUGS = "debugs"
P_ENVBASE = "envbase"
P_ENVLOG = "envlog"
P_ENVPARFILE = "envparfile"
P_TCBASE = "tcbase"
P_TCLOG = "tclog"
P_TCRESULT = "tcresult"
P_TCPARFILE = "tcparfile"
P_TCDIFF = "tcdiff"
P_TCPREDIFF = "tcprediff"
P_TCRUNDIFF = "tcrundiff"
P_TCPRECOND = "tcprecond"
P_TCPOSTCOND = "tcpostcond"
P_TSBASE = "tsbase"
P_TSLOG = "tslog"
P_TSPARFILE = "tsparfile"
P_TSSUM = "tssum"
P_XPBASE = "xpbase"
P_XPRESULT = "xpresult"
P_XPBACKUP = "xpbackup"
# -------------------------------------------------------------
# exception texts
EXP_COMP_MISSING = "Component is missing for {}"
""" excetion for the case that a specific component doesnt exist, 1 parameter (context) """
EXP_CONFIG_MISSING = "Configuration is missing for {}"
""" excetion for the case that a specific configuration is missing, 1 parameter (context) """

320
tools/path_tool.py

@ -0,0 +1,320 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
""" In diesem Modul werden alle Funktionen zusammengefasst zur Generierung und Ermittlung von pathsn """
import os.path
import sys
import basic.program
import tools.config_tool
import re
import basic.constants as B
import tools.path_const as P
import tools.date_tool
import getpass
TOOL_NAME = "path_tool"
def getHome():
home = os.getcwd()
if home[-4:] == "test" and home[-6:] != "datest":
home = home[0:-5]
if home[-10:] == "components":
home = home[0:-11]
if home[-6:] == "datest":
prgdir = home[-6:]
home = home[0:-7]
elif home[-7:] == "program":
prgdir = home[-7:]
home = home[0:-8]
return home
def getBasisConfigPath():
home = os.getcwd()
a = home.split(os.path.sep)
for i in range(1, len(a)):
path = os.path.sep.join(a[0:-i])
path = os.path.join(path, P.VAL_CONFIG, B.BASIS_FILE)
for format in tools.config_tool.CONFIG_FORMAT:
filepath = path+"."+format
if os.path.isfile(filepath):
return filepath
if os.path.exists(filepath):
return filepath
raise Exception("no basis-configuration found")
def getActualJsonPath(job):
username = getpass.getuser()
path = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_DEBUG], username+"Job.json")
print("------ path "+path)
#if os.path.exists(path):
# return path
return path
def getKeyValue(job, key, comp=None):
"""
this function gets the value for the key which relates to an attribute in the job or in the component
:param key:
:param comp:
:return:
"""
#job = basic.program.Job.getInstance()
try:
verify = job.getDebugLevel(TOOL_NAME)-4
except:
verify = False
pt = PathConf.getInstance(job)
if verify: job.debug(verify, "getKeyValue " + key)
if 'job.par' in key:
val = job.getParameter(key[8:])
return val
elif 'job.conf' in key:
val = job.conf.confs[B.SUBJECT_PATH][key[9:]]
if verify: job.debug(verify, val)
return val
elif 'job.' in key:
a = key[4:].split(":")
val = getattr(job, a[0])
# only date with hours
if a[0] in ["start"]:
print("++++++++++++++"+str(val))
val = tools.date_tool.formatParsedDate(str(val), tools.date_tool.F_LOG)
print("++++++++++++++"+val)
if len(a) > 1 and a[1] == "H":
val = val[0:-4]+"00"
if verify: job.debug(verify, val)
return val
# return job.conf.paths[key[9:]]
elif 'comp.' in key:
if comp is None:
raise Exception(P.EXP_COMP_MISSING.format(key))
if tools.config_tool.hasAttr(comp.conf, key[5:]):
return tools.config_tool.getAttr(comp.conf, key[5:])
if tools.config_tool.hasAttr(comp, key[5:]):
return tools.config_tool.getAttr(comp, key[5:])
return ""
elif 'env.' in key:
if key[4:] in comp.conf["conn"]:
return comp.conf["conn"][key[4:]]
pass
elif key in pt.pattern:
return pt.pattern[key]
elif "time" in key and hasattr(job, "start"):
return getattr(job, "start")
else:
return "xx-"+key+"-xx"
def composePath(job, pathname, comp):
"""
this function composes a concrete path by the structured pathname
- the key of pathname is declared in path_const and the structure is configurated in config/path.yml.
:param pathname - plain keyword
:param comp:
:return:
"""
#job = basic.program.Job.getInstance()
verify = job.getDebugLevel(TOOL_NAME)
pt = PathConf.getInstance(job)
job.debug(verify, "composePath " + pathname + " zu " + str(pt) + "mit ")
job.debug(verify, str(pt.pattern))
if pt.pattern[pathname]:
return composePattern(job, pt.pattern[pathname], comp)
else:
job.debug(verify, "in Pattern nicht vorhanden: " + pathname)
def composePattern(job, pattern, comp):
"""
the function composes the pattern to the standardarized path with the attributes
which are stored in the job and the component
- the key of pathname is declared in path_const and the structure is configurated in config/path.yml.
:param pattern: - keyword surroundet with {}
:param comp:
:return: path
"""
#job = basic.program.Job.getInstance()
try:
verify = job.getDebugLevel(TOOL_NAME)
except:
verify = False
verbose = not False
#job.debug(verify, "composePattern " + pattern)
max=5
l = re.findall('\{.*?\}', pattern)
#job.debug(verify, l)
print(l)
for pat in l:
if verbose: print(str(max) + ": " + pattern + ": " + pat)
pit = getKeyValue(job, pat[1:-1], comp)
if verbose: print(str(pit) + ": " + pattern + ": " + pat)
#job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
pattern = pattern.replace(pat, pit)
#job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
while ("{" in pattern):
max = max-1
#job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
pattern = composePattern(job, pattern, comp)
#job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
if (max < 3) :
break
return pattern
def rejoinPath(a, b="", c="", d="", e="", f=""):
"""
this function concatenates the arguments to a path in the correct format for the operating-system
:param a:
:param b: optional
:param c: optional
:param d: optional
:param e: optional
:param f: optional
:return: path
"""
work = a+"/"+b+"/"+c+"/"+d+"/"+e+"/"+f
if a.find("://") > 1:
protocol = True
else:
protocol = False
work = re.sub(r'\\', '/', work)
work = re.sub(r'\/', '/', work)
work = re.sub(r'//', '/', work)
while work[-1:] == "/":
work = work[0:-1]
l = work.split("/")
out = ""
for x in l:
if len(x) < 1:
continue
if protocol:
if len(out) < 1:
out = x
else:
out = out+"/"+x
else:
out = os.path.join(out, x)
if out[1:2] == ":" and out[2:3] != "\\":
out = out[0:2]+"\\"+out[2:]
elif protocol:
if "://" not in out or out.index("://") > 8 or out.index("://") < 1:
i = out.index(":/")
out = out[0:i+1] + "/" + out[i+1:]
pass
if not protocol and out.count("\\") < 1 and out[0:1] != "/" and out[0:2] != "..":
out = "/"+out
return out
def extractPattern(job, pathtyp, comp=None):
"""
this function extracts recoursively all parts of the pathstrucure as key and gets the values from the
job-parameter and job-configuration
:param pathtyp: the name of the path-structure
:param comp:
:return: dictionary of all part (key) with their valuess
"""
#job = basic.program.Job.getInstance()
verify = job.getDebugLevel(TOOL_NAME)
out = []
pt = PathConf.getInstance(job)
pattern = pt.pattern[pathtyp]
work = pattern
while "{" in work:
i = work.index("{")
j = work.index("}")
pre = work[0:i]
pat = work[i+1:j]
job.debug(verify, work + " von " + str(i) + "-" + str(j) + " pre " + pre + "pat " + pat)
pit = getKeyValue(job, pat, comp)
tup = (pre, pat, pit)
out.append(tup)
work = work[j+1:]
return out
def extractPath(job, pathtyp, path):
"""
this function extracts parts of a concrete structered path and stores the parts
as attributes into the actual job. So these attributes can read from the concrete
path instead of the related parameter-arguments.
It stores the values into the job-parameter
:param pathtyp: the structure of the concrete path
:param path: the concrete path - it should be the directory in the parameter of the job
:return:
"""
#job = basic.program.Job.getInstance()
patterlist = extractPattern(job, pathtyp)
verbose = False
work = path
i = 0
if verbose: print("-- extractPatternList -- " + pathtyp + ":" + str(patterlist))
for p in patterlist:
if len(p) < 1 : continue
delim = p[0]
key = p[1]
val = p[2]
nextdelim = ""
if i >= len(patterlist) - 1:
nextdelim = ""
else:
nextdelim = patterlist[i+1][0]
if verbose: print("xPath delim " + delim + " " + str(len(delim)) + ", " + nextdelim + " work " + work)
work = work[len(delim):]
if verbose: print("xPath key " + key + " i " + str(i) + " work " + work)
if val is not None:
if verbose: print("val not none " + val)
if val in work:
if verbose: print("val ok")
work = work.replace(val, "")
elif "time" in key and "job.par" in key:
prop = ""
if i < len(patterlist) - 1:
prop = work[0:work.index(nextdelim)]
else:
prop = work
key = key[8:]
if verbose: print("setprop " + key + " = " + prop)
if hasattr(job.par, key): delattr(job.par, key)
setattr(job.par, key, val)
else:
if verbose: print("val not not ok " + val + " zu " + key)
elif "job.par" in key:
prop = ""
if i < len(patterlist) - 1:
if verbose: print("job.par nextdelim " + nextdelim)
prop = work[0:work.index(nextdelim)]
else:
prop = work
key = key[8:]
if verbose: print("setprop " + key + " = " + prop)
if hasattr(job.par, key): delattr(job.par, key)
setattr(job.par, key, prop)
work = work.replace(prop, "")
else:
if verbose: print("val is none " + key)
i = i +1
class PathConf:
"""
this class contains the structure-informations of the testrelevant directories
"""
__instance = None
def __init__(self, job=None):
#print('init pathConf')
confs = tools.config_tool.getConfig(job, "tool", "path")
self.pattern = confs["pattern"]
#print(self.pattern)
PathConf.__instance = self
@staticmethod
def getInstance(job = None):
#print("PathConf getInstance " + str(PathConf.__instance))
if (PathConf.__instance is None):
PathConf(job)
#print("PathConf getInstance " + str(PathConf.__instance))
return PathConf.__instance

76
tools/value_tool.py

@ -0,0 +1,76 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
""" In diesem Modul werden alle Funktionen zusammengefasst zur Generierung und Ermittlung von pathsn """
import os.path
import sys
import basic.program
import tools.config_tool
import re
import basic.constants as B
import tools.path_const as P
import tools.date_tool
import getpass
TOOL_NAME = "value_tool"
DOM_JOB = "job"
DOM_PAR = "par"
DOM_COMP = "comp"
DOM_CONF = "conf"
DOM_ENV = "env"
def getKeyValue(job, key, comp=None):
"""
this function gets the value for the key which relates to an attribute in the job or in the component
:param key:
:param comp:
:return:
"""
#job = basic.program.Job.getInstance()
try:
verify = job.getDebugLevel(TOOL_NAME)-4
except:
verify = False
#pt = PathConf.getInstance(job)
if verify: job.debug(verify, "getKeyValue " + key)
if DOM_JOB == key[0:3]:
if DOM_PAR in key[4:7]:
val = job.getParameter(key[8:])
return val
elif DOM_CONF in key[4:8]:
val = job.conf.confs[B.SUBJECT_PATH][key[9:]]
if verify: job.debug(verify, val)
return val
elif 'job.' in key:
a = key[4:].split(":")
val = getattr(job, a[0])
# only date with hours
if a[0] in ["start"]:
print("++++++++++++++"+str(val))
val = tools.date_tool.formatParsedDate(str(val), tools.date_tool.F_LOG)
print("++++++++++++++"+val)
if len(a) > 1 and a[1] == "H":
val = val[0:-4]+"00"
if verify: job.debug(verify, val)
return val
# return job.conf.paths[key[9:]]
elif DOM_COMP in key:
if comp is None:
raise Exception(P.EXP_COMP_MISSING.format(key))
if tools.config_tool.hasAttr(comp.conf, key[5:]):
return tools.config_tool.getAttr(comp.conf, key[5:])
if tools.config_tool.hasAttr(comp, key[5:]):
return tools.config_tool.getAttr(comp, key[5:])
return ""
elif DOM_ENV in key:
if key[4:] in comp.conf["conn"]:
return comp.conf["conn"][key[4:]]
pass
elif "time" in key and hasattr(job, "start"):
return getattr(job, "start")
else:
return "xx-"+key+"-xx"
Loading…
Cancel
Save