Compare commits

...

3 Commits

  1. 24
      basic/compexec.py
  2. 28
      basic/program.py
  3. 91
      basic/step.py
  4. 9
      execute_testcase.py
  5. 3
      init_testcase.py
  6. 0
      start_dialog.py
  7. 3
      test/test_job.py
  8. 28
      test/test_tdata.py
  9. 52
      test_executer.py
  10. 12
      utils/config/path.yml
  11. 0
      utils/filexml_tool.py
  12. 5
      utils/tdata_tool.py

24
basic/testexec.py → basic/compexec.py

@ -229,6 +229,30 @@ class Testexecuter():
def get_Response(self, granularity):
pass
def execute_test(self, job, step, tdata):
"""
the function executes a teststep. The exact step with partial steps are defined in the component-configuration
under teststeps and the step-attribute.
:param job:
:param step: the step object
:param tdata:
:return:
"""
if step.fct in self.conf["teststeps"]:
for s in self.conf["teststeps"][step.fct]:
stepconf = self.conf["teststeps"][step.fct][s]
if stepconf["tool"] == "file":
print("file-tool")
elif stepconf["tool"] == "api":
print("api-tool")
elif stepconf["tool"] == "cli":
print("cli-tool")
elif stepconf["tool"] == "db":
print("db-tool")
else:
print("nichts da")
def finish_Test(self, granularity):
"""
initialization-routine for finish-step

28
basic/program.py

@ -278,17 +278,41 @@ class Job:
def hasElement(self, parameter, elem):
"""
the function searches in an optional job.parameter
(a) true, if the parameter does not exist (DEFAULT)
(b) true, if the element is member of the parameter
(c) false, if parameter exists and elem is not member of the parameter
:param parameter:
:param elem:
:return:
"""
if hasattr(self.par, parameter):
print (parameter + " in Parameter")
if getattr(self.par, parameter).find(elem) >= 0:
return True
return False
return True
def hascomponente(self, komp):
return self.hasElement("componente", komp)
def hascomponente(self, comp):
"""
it searches if comp is member of the optional job.parameter (default = each is member)
:param comp:
:return:
"""
return self.hasElement("componente", comp)
def hasFunction(self, fct):
"""
it searches if fct is member of the optional job.parameter (default = each is member)
:param fct:
:return:
"""
return self.hasElement("function", fct)
def hasTool(self, tool):
"""
it searches if tool is member of the optional job.parameter (default = each is member)
:param tool:
:return:
"""
return self.hasElement("tool", tool)
def getMessageLevel(self, errtyp, elem):
if (not hasattr(self, "m")) or (self.m is None):

91
basic/step.py

@ -0,0 +1,91 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
this module implements the functionality of a test-step
which is defined in the test-specification
and is executed by any executer
there are 2 kinds of test-step
a) execute specific component in the job
b) execute specific test-entity in the test-suite-execution
"""
import basic.constants as B
import utils.data_const as D
LIST_ARGS = [
"start", # for starting the specified main-program
"fct" # for calling the specified component-function
]
class Step:
comp = ""
refLine = "" # in a: references the data-line(s) to be executed
execStep = "" # in a,b: executes only if the step is set in the job
args = {}
"""
the class contains each attribute of a test-step
"""
def __init__(self):
self.comp = ""
self.refLine = ""
self.execStep = ""
self.args = {}
def parseOldStep(job, fields):
step = {}
step[B.DATA_NODE_COMP] = fields[D.STEP_COMP_I]
step[B.ATTR_EXEC_REF] = fields[D.STEP_EXECNR_I]
step[B.ATTR_DATA_REF] = fields[D.STEP_REFNR_I]
step[B.ATTR_STEP_ARGS] = {}
if D.STEP_ARGS_I == D.STEP_LIST_I:
args = ""
for i in range(D.STEP_ARGS_I, len(fields)):
if len(fields[i]) < 1:
continue
if fields[i][0:1] == "#":
continue
args += "," + fields[i]
args = args[1:]
else:
args = fields[D.STEP_ARGS_I]
a = args.split(",")
for arg in a:
print("arg " + arg)
b = arg.split(":")
if len(b) < 2:
raise Exception(D.EXCP_MALFORMAT + "" + l)
step[B.ATTR_STEP_ARGS][b[0]] = b[1]
# data[B.DATA_NODE_STEPS].append(step)
return step
def parseStep(job, fields):
step = Step
step.comp = fields[D.STEP_COMP_I]
step.execStep = fields[D.STEP_EXECNR_I]
step.refLine = fields[D.STEP_REFNR_I]
if D.STEP_ARGS_I == D.STEP_LIST_I:
args = ""
for i in range(D.STEP_ARGS_I, len(fields)):
if len(fields[i]) < 1:
continue
if fields[i][0:1] == "#":
continue
args += "," + fields[i]
args = args[1:]
else:
args = fields[D.STEP_ARGS_I]
a = args.split(",")
for arg in a:
print("arg " + arg)
b = arg.split(":")
if len(b) < 2:
raise Exception(D.EXCP_MALFORMAT + "" + str(fields))
step.args[b[0]] = b[1]
if b[0] in LIST_ARGS:
setattr(step, b[0], b[1])
# data[B.DATA_NODE_STEPS].append(step)
return step

9
execute_testcase.py

@ -1,6 +1,7 @@
# This is a sample Python script.
import sys#
import os
import basic.step
import basic.program as program
import utils.tdata_tool
import utils.report_tool
@ -24,12 +25,12 @@ def startPyJob(job):
if not "_steps" in tdata:
raise Exception("no steps to execute in testdata")
for (step) in tdata["_steps"]:
if step["comp"] in comps:
comp = cm.getComponent(step["comp"])
comp.execute_testcase(step, tdata)
if step.comp in comps:
comp = cm.getComponent(step.comp)
comp.execute_test(job, step, tdata)
job.m.merge(comp.m)
else:
job.m.setError(step["comp"]+" kann nicht aufgerufen werden!")
job.m.setError(step.comp+" kann nicht aufgerufen werden!")
# Press the green button in the gutter to run the script.

3
init_testcase.py

@ -23,8 +23,11 @@ def startPyJob(job):
for c in comps:
comp = cm.getComponent(c)
comp.m.logInfo("------- "+comp.name+" ----------------------------------------")
if job.hasFunction("reset_TData"):
comp.reset_TData(B.PAR_TESTCASE)
if job.hasFunction("load_TData"):
comp.load_TData(B.PAR_TESTCASE, testdata)
if job.hasFunction("read_TData"):
comp.read_TData("vorher", B.PAR_TESTCASE)
comp.m.logInfo("------- "+comp.name+" ----------------------------------------")
job.m.merge(comp.m)

0
start_dialog.py

3
test/test_job.py

@ -52,6 +52,7 @@ class MyTestCase(unittest.TestCase):
programs = ["init_testcase"]
testcase = "TC0001"
timexec = "2022-06-28_21-23-34"
fct = "read_TData"
# os.system("python "+os.path.join(HOME_PATH, "check_environment.py")+" -a TEST -e ENV01")
# os.system("python "+os.path.join(HOME_PATH, "init_testsuite.py")+" -a TEST -e ENV01 "
# "-ts "+os.path.join(HOME_PATH, "test","lauf","V0.1","implement_2021-08-28_23-50-51")+" -dt csv -ds implement -dn firstunit")
@ -64,7 +65,7 @@ class MyTestCase(unittest.TestCase):
if "init_testcase" in programs:
program = "init_testcase"
job = Job("unit")
args = { B.PAR_APP: "TESTAPP", B.PAR_ENV: "ENV01", "modus": "unit",
args = { B.PAR_APP: "TESTAPP", B.PAR_ENV: "ENV01", "modus": "unit", B.PAR_FCT: fct,
B.PAR_TCDIR: os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_ARCHIV], testcase, timexec),
"step": 1 }
# "usecase": "TST001", "tstime": "2022-03-17_17-28"}

28
test/test_tdata.py

@ -19,7 +19,7 @@ OS_SYSTEM = test.constants.OS_SYSTEM
# here you can select single testfunction for developping the tests
TEST_FUNCTIONS = ["test_tdata", "test_getCsvSpec_data", "test_getCsvSpec_tree", "test_getCsvSpec_key",
"test_getCsvSpec_conf", "test_extractPattern", "test_parseCsv"]
TEST_FUNCTIONS = ["test_getCsvSpec_data"]
# TEST_FUNCTIONS = ["test_getCsvSpec_data"]
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
@ -119,9 +119,21 @@ class MyTestCase(unittest.TestCase):
cnttest += 3
for step in tdata[B.DATA_NODE_STEPS]:
print(step)
self.assertIn(B.DATA_NODE_COMP, step)
self.assertIn(B.ATTR_DATA_REF, step)
self.assertIn(B.ATTR_STEP_ARGS, step)
self.assertEqual(hasattr(step, B.DATA_NODE_COMP), True)
# self.assertEqual(hasattr(step, B.ATTR_DATA_REF), True)
self.assertEqual(hasattr(step, B.ATTR_STEP_ARGS), True)
cnttest += 3
specLines = [
"step:1;testa;1;1;table:_lofts;action:export;;;;;",
"#;;;;;;"
]
tdata = {}
tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_DATA)
print(tdata)
self.assertEqual(1, len(tdata))
self.assertIn(B.DATA_NODE_STEPS, tdata)
self.assertIsInstance(tdata[B.DATA_NODE_STEPS], list)
self.assertEqual(2, len(tdata[B.DATA_NODE_STEPS][0].args))
cnttest += 3
if B.DATA_NODE_TABLES in tests:
specLines = [
@ -216,7 +228,7 @@ class MyTestCase(unittest.TestCase):
print(str(tdata))
self.assertEqual(1, len(tdata))
self.assertEqual(1, len(tdata["_tables"]))
self.assertEqual(2, len(tdata["_tables"]["capital"]))
self.assertEqual(3, len(tdata["_tables"]["capital"]))
self.assertEqual(3, len(tdata["_tables"]["capital"]["_keys"]))
cnttest += 4
specLines = [
@ -231,9 +243,9 @@ class MyTestCase(unittest.TestCase):
#tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_TREE)
print(str(tdata))
self.assertEqual(1, len(tdata))
self.assertNotIn("capital", tdata["_tables"])
self.assertEqual(1, len(tdata["_tables"]))
self.assertEqual(2, len(tdata["_tables"]["country"]))
self.assertIn("capital", tdata["_tables"])
self.assertEqual(2, len(tdata["_tables"]))
self.assertEqual(3, len(tdata["_tables"]["country"]))
self.assertEqual(2, len(tdata["_tables"]["country"]["_keys"]))
cnttest += 4
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)

52
test_executer.py

@ -1,10 +1,13 @@
from datetime import datetime
import traceback
import basic.program
import basic.constants as B
import utils.tdata_tool
import init_testcase
import init_testsuite
import execute_testcase
import collect_testcase
import compare_testcase
import finish_testsuite
PROGRAM_NAME = "test_executer"
@ -16,16 +19,18 @@ def getTime():
time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
return time
def start(myjob):
def startPy(pjob):
myjob = pjob
myjob.m.setMsg("# # # # # start executer # # # # # ")
tdata = utils.tdata_tool.getTestdata()
job = basic.program.Job("unit")
if not hasattr(myjob.par, B.PAR_STEP):
raise Exception("Parameter " + B.PAR_STEP + " is missing")
testcases = getattr(myjob.par, B.PAR_TESTCASE)
if not hasattr(myjob.par, "step"):
raise Exception("Parameter step is missing")
for step in tdata[B.DATA_NODE_STEPS]:
if int(step["_nr"]) != int(getattr(myjob.par, "step")):
if int(step.exexStep) != int(getattr(myjob.par, "step")):
continue
for arg in step["args"]:
if arg == "start":
if "testsuite" in step["args"][arg]:
@ -72,15 +77,48 @@ def start(myjob):
# myjob.stopJob(1)
def startStepProgram(step, job, jobargs):
myjob = basic.program.Job("unit") # meaning temp
myjob.par.setParameterArgs(jobargs)
myjob.setProgram(step.start)
myjob.pushInstance(myjob)
myjob.startJob()
try:
job.m.logInfo(step.start + " starting")
if step.start == "init_testcase":
init_testcase.startPyJob(myjob)
elif step.start == "execute_testcase":
execute_testcase.startPyJob(myjob)
elif step.start == "collect_testcase":
collect_testcase.startPyJob(myjob)
elif step.start == "compare_testcase":
compare_testcase.startPyJob(myjob)
job.m.logInfo(step.start + " finished")
except Exception as e:
txt1 = traceback.format_stack()
print("==================================================================0")
print(txt1)
print("==================================================================0")
txt2 = traceback.format_exc()
print(txt2)
print("==================================================================0")
job.m.setError(step.start + " aborted")
finally:
myjob.stopJob(1)
myjob.popInstance(myjob)
if __name__ == '__main__':
print(PROGRAM_NAME)
x = basic.program.Job(PROGRAM_NAME)
#m = message.Message(3)
#m.closeMessage()
x.startJob()
try:
x.m.logDebug(str(vars(x.par)) + "\n" + str(vars(x.conf)))
if x.m.isRc("fatal"):
x.stopJob()
exit(x.m.rc * (-1) + 3)
start(x)
startPy(x)
except:
x.m.setError(PROGRAM_NAME + " aborted")
finally:
x.stopJob()

12
utils/config/path.yml

@ -14,15 +14,19 @@ pattern:
sumfile: xxx
backup: backup
reffile: Herkunft.txt
appdir: APP
tc: testfall
ts: testlauf
debugname: debug
logname: log
preexec: env-pre-exec # only for dry unit-test
postexec: env-post-exec # only for dry unit-test
debugs: "{job.conf.home}/test/log"
# environment
envbase: "{job.conf.environment}/{job.par.environment}"
envlog: "{envbase}/{log}"
envparfile: "{envbase}/{parfile}"
envappdir: "{envbase}/{appdir}/{comp.name}"
# testcase
tcbase: "{job.conf.archiv}/{job.par.testcase}/{job.par.tctime}"
tclog: "{tcbase}/{log}"
@ -33,6 +37,14 @@ pattern:
tcrundiff: "{tcresult}/{rundiff}"
tcprecond: "{tcresult}/{precond}"
tcpostcond: "{tcresult}/{postcond}"
# testdata
tdbase: "{job.conf.testdata}/{job.par.testcase}"
tdresult: "{tdbase}/{result}"
tdparfile: "{tdbase}/{parfile}"
tdprecond: "{tdresult}/{precond}"
tdpostcond: "{tdresult}/{postcond}"
tdpreexec: "{tdbase}/{preexec}/{comp.name}" # only for dry unit-test
tdpostexec: "{tdbase}/{postexec}/{comp.name}" # only for dry unit-test
# testset
tsbase: "{job.conf.archiv}/{ts}/{job.par.usecase}_{job.par.tstime}"
tslog: "{tsbase}/{log}"

0
utils/filexml_tool.py

5
utils/tdata_tool.py

@ -27,6 +27,7 @@ import utils.file_tool
import basic.constants as B
import utils.data_const as D
import utils.date_tool
import basic.step
TOOL_NAME = "tdata_tool"
""" name of the tool in order to switch debug-info on """
@ -118,6 +119,7 @@ def getCsvSpec(msg, filename, type):
def parseCsvSpec(msg, lines, type):
job = basic.program.Job.getInstance()
data = {}
header = []
h = [] # from a[]
@ -140,6 +142,8 @@ def parseCsvSpec(msg, lines, type):
if (a[0].lower() == D.CSV_BLOCK_STEP):
if (not B.DATA_NODE_STEPS in data):
data[B.DATA_NODE_STEPS] = []
step = basic.step.parseStep(job, fields)
"""
step = {}
step[B.DATA_NODE_COMP] = fields[D.STEP_COMP_I]
step[B.ATTR_EXEC_REF] = fields[D.STEP_EXECNR_I]
@ -163,6 +167,7 @@ def parseCsvSpec(msg, lines, type):
if len(b) < 2:
raise Exception(D.EXCP_MALFORMAT + "" + l)
step[B.ATTR_STEP_ARGS][b[0]] = b[1]
"""
data[B.DATA_NODE_STEPS].append(step)
continue
elif (a[0].lower() == D.CSV_BLOCK_OPTION):

Loading…
Cancel
Save