Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

452 lines
16 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os.path
2 years ago
import inspect
3 years ago
import basic.program
2 years ago
import utils.config_tool
import utils.file_tool
import basic.constants as B
import utils.data_const as D
2 years ago
import utils.path_const as P
import utils.path_tool
import utils.date_tool
2 years ago
import basic.step
2 years ago
import utils.i18n_tool
import re
TOOL_NAME = "tdata_tool"
2 years ago
list_blocks = {} # lists of aliases
3 years ago
2 years ago
def getTestdata(job=None):
"""
2 years ago
get the testdata from one of the possible sources
for the testcase resp testsuite of the job
:return:
"""
2 years ago
if job is None:
job = basic.program.Job.getInstance()
if "testcase" in job.program:
return collectTestdata(B.PAR_TESTCASE, getattr(job.par, B.PAR_TESTCASE), job)
2 years ago
else:
return collectTestdata(B.PAR_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), job)
2 years ago
def collectTestdata(gran, testentity, job):
"""
collects the testdata from kind of the possible sources
for the testcase resp testsuite
:return:
"""
setBlockLists(job)
if gran == B.PAR_TESTCASE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(P.KEY_TESTCASE, getattr(job.par, B.PAR_TESTCASE), "", job)
2 years ago
if gran == B.PAR_TESTSUITE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(P.KEY_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), "", job)
2 years ago
if pathname[-3:] == D.DFILE_TYPE_CSV:
tdata = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
3 years ago
else:
2 years ago
tdata = utils.file_tool.readFileDict(pathname, job.m)
# get explicit specdata of includes
if D.CSV_BLOCK_IMPORT in tdata:
for pathname in tdata[D.CSV_BLOCK_IMPORT]:
pathname = utils.path_tool.rejoinPath(pathname)
if job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA] not in pathname:
pathname = utils.path_tool.rejoinPath(basispath, pathname)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
data = utils.file_tool.readFileDict(pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
2 years ago
# get implicit specdata of spec-library
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
files = utils.file_tool.getFiles(job.m, basispath, prefix, None)
if len(files) < 0:
continue
for f in files:
if f in tdata[D.CSV_BLOCK_TABLES]:
continue
pathname = utils.path_tool.rejoinPath(basispath, f)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
data = utils.file_tool.readFileDict(pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
# fill the options into job-parameter
for p in tdata[D.CSV_BLOCK_OPTION]:
setattr(job.par, p, tdata[D.CSV_BLOCK_OPTION][p])
3 years ago
return tdata
2 years ago
def setBlockLists(job):
for block in D.LIST_BLOCK_CONST + D.LIST_ATTR_CONST + D.LIST_DFNAME_CONST:
list = utils.i18n_tool.I18n.getInstance().getAliasList(block+"='"+eval("D."+block)+"'")
#list.append(eval("D."+block))
list_blocks[eval("D." + block)] = []
for x in list:
list_blocks[eval("D." + block)].append(x.lower())
def readCsv(msg, filename, comp, aliasNode="", job=None):
if job is None:
job = basic.program.Job.getInstance()
lines = utils.file_tool.readFileLines(filename, msg)
print("readCsv "+filename)
return parseCsv(msg, filename, lines, comp, aliasNode, job)
def parseCsv(msg, filename, lines, comp, aliasNode="", job=None):
if job is None:
job = basic.program.Job.getInstance()
if len(list_blocks) < 1:
setBlockLists(job)
tdata = {}
if len(aliasNode) < 1:
print(str(list_blocks))
aliasNode = extractAliasNode(filename, comp, job)
if len(aliasNode) > 3:
tdata[D.DATA_ATTR_ALIAS] = aliasNode
return parseCsvSpec(msg, lines, D.CSV_SPECTYPE_DATA, tdata, job)
def extractAliasNode(filename, comp, job):
basename = os.path.basename(filename)[0:-4]
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
if basename.find(prefix) == 0:
basename = basename[len(prefix):]
if comp is None:
return ""
if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basename in comp.conf[B.DATA_NODE_DDL]:
return B.DATA_NODE_TABLES+":"+basename
return ""
def getCsvSpec(msg, filename, ttype, job=None):
"""
2 years ago
reads the specification from a csv-file and maps it into the internal data-structure
:param msg:
:param filename:
:param type:
:param job:
:return:
"""
2 years ago
if job is None:
job = basic.program.Job.getInstance()
2 years ago
lines = utils.file_tool.readFileLines(filename, msg)
2 years ago
tdata = {} # the result
return parseCsvSpec(msg, lines, ttype, tdata, job)
2 years ago
2 years ago
def parseCsvSpec(msg, lines, ttype, tdata, job=None):
"""
:param msg:
:param lines:
:param type:
:param job:
:return:
"""
if job is None:
job = basic.program.Job.getInstance()
if len(list_blocks) < 1:
setBlockLists(job)
status = "start"
2 years ago
verbose = False
2 years ago
tableAttr = {} # table
tableDict = {} # table
for l in lines:
2 years ago
if verbose: print("lines "+l)
2 years ago
fields = splitFields(l, D.CSV_DELIMITER, job)
# check empty line, comment
2 years ago
if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1):
status = "start"
continue
if (fields[0][0:1] == "#"):
continue
a = fields[0].lower().split(":")
# keywords option, step, table
2 years ago
if verbose: print(str(a)+" -- "+str(fields))
2 years ago
tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job)
if (tableAttr["_hit"]):
2 years ago
status = "TABLE_ALIAS"
continue
if (a[0].lower() in list_blocks[D.CSV_BLOCK_HEAD]):
2 years ago
if verbose: print("head "+l)
2 years ago
setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_OPTION]):
2 years ago
if verbose: print("option " + l)
2 years ago
setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_STEP]):
2 years ago
if verbose: print("step "+l)
2 years ago
step = basic.step.parseStep(job, fields)
2 years ago
if D.CSV_BLOCK_STEP not in tdata:
tdata[D.CSV_BLOCK_STEP] = []
tdata[D.CSV_BLOCK_STEP].append(step)
status = "start"
continue
2 years ago
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_IMPORT]):
2 years ago
if verbose: print("includes " + l)
2 years ago
if D.CSV_BLOCK_IMPORT not in tdata:
tdata[D.CSV_BLOCK_IMPORT] = []
tdata[D.CSV_BLOCK_IMPORT].append(fields[1])
status = "start"
continue
2 years ago
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_TABLES]):
2 years ago
if verbose: print("tables "+l)
h = a
2 years ago
h[0] = B.DATA_NODE_TABLES
2 years ago
if ttype == D.CSV_SPECTYPE_CONF:
del h[0]
tableDict = getTdataContent(msg, tdata, h)
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
elif (status == D.CSV_SPECTYPE_DATA):
2 years ago
tableDict = getTdataContent(msg, tdata, h)
2 years ago
if verbose: print("setTableData "+str(h)+" "+str(tableDict))
2 years ago
setTableData(tableDict, fields, ttype, job)
elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata:
alias = tdata[D.DATA_ATTR_ALIAS]
b = alias.split(":")
h = [B.DATA_NODE_TABLES] + b
tableDict = getTdataContent(msg, tdata, h)
tableDict[D.DATA_ATTR_ALIAS] = alias
fields = [alias] + fields
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
if ttype == D.CSV_SPECTYPE_CONF:
header = []
2 years ago
for k in tdata:
if k in D.LIST_DATA_ATTR:
continue
2 years ago
if B.DATA_NODE_DATA in tdata[k]:
tdata[k].pop(B.DATA_NODE_DATA)
for f in tdata[k]:
if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR:
continue
header.append(f)
tdata[k][B.DATA_NODE_HEADER] = header
header = []
2 years ago
if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]:
2 years ago
for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]:
if k in tdata[B.DATA_NODE_TABLES]:
2 years ago
if verbose: print("Error")
2 years ago
else:
tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k]
tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES)
return tdata
def setTableHeader(tableDict, tableAttr, fields, ttype, job):
header = []
for i in range(1, len(fields)):
2 years ago
header.append(fields[i].strip())
2 years ago
tableDict[B.DATA_NODE_HEADER] = header
for attr in tableAttr:
tableDict[attr] = tableAttr[attr]
# preparate the sub-structure for row-data
if ttype == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS] = {}
2 years ago
tableDict[D.DATA_ATTR_KEY] = 1
if D.DATA_ATTR_KEY in tableAttr:
tableDict[D.DATA_ATTR_KEY] = header.index(tableAttr[D.DATA_ATTR_KEY]) + 1
2 years ago
else:
tableDict[B.DATA_NODE_DATA] = []
return tableDict
2 years ago
2 years ago
def setTableData(tableDict, fields, ttype, job):
row = {}
if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict:
fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields
i = 1
for f in tableDict[B.DATA_NODE_HEADER]:
2 years ago
row[f] = fields[i].strip()
2 years ago
i += 1
if ttype == D.CSV_SPECTYPE_DATA:
if B.ATTR_DATA_COMP in tableDict:
tcomps = tableDict[B.ATTR_DATA_COMP]
else:
tcomps = {}
2 years ago
row[B.ATTR_DATA_COMP] = {}
for c in fields[0].split(","):
a = c.split(":")
tcomps[a[0]] = a[1]
2 years ago
row[B.ATTR_DATA_COMP][a[0]] = a[1].strip()
2 years ago
tableDict[B.DATA_NODE_DATA].append(row)
tableDict[B.ATTR_DATA_COMP] = tcomps
2 years ago
elif ttype == D.CSV_SPECTYPE_KEYS:
2 years ago
tableDict[D.CSV_NODETYPE_KEYS][fields[tableDict[D.DATA_ATTR_KEY]].strip()] = row
2 years ago
elif ttype == D.CSV_SPECTYPE_CONF:
tableDict[fields[1]] = row
return tableDict
2 years ago
def setTableAttribute(tableAttr, key, val, job):
for attr in D.LIST_DATA_ATTR:
if (key.lower() in list_blocks[attr]):
2 years ago
tableAttr[attr] = val.strip()
tableAttr["_hit"] = True
2 years ago
return tableAttr
tableAttr["_hit"] = False
2 years ago
return tableAttr
def setTdataLine(tdata, fields, block, job):
"""
sets field(s) into tdata as a key-value-pair
additional fields will be concatenate to a intern separated list
:param tdata:
:param fields:
:param block:
:param job:
:return:
"""
a = fields[0].lower().split(":")
a[0] = block # normalized key
val = ""
for i in range(1, len(fields)-1):
val += D.INTERNAL_DELIMITER+fields[i]
if len(val) > len(D.INTERNAL_DELIMITER):
val = val[len(D.INTERNAL_DELIMITER):]
setTdataContent(job.m, tdata, val, a)
return tdata
def setTdataContent(msg, data, tabledata, path):
setTdataStructure(msg, data, path)
3 years ago
if len(path) == 2:
data[path[0]][path[1]] = tabledata
elif len(path) == 3:
data[path[0]][path[1]][path[2]] = tabledata
elif len(path) == 4:
data[path[0]][path[1]][path[2]][path[3]] = tabledata
2 years ago
2 years ago
def getTdataContent(msg, data, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
return data[path[0]][path[1]]
elif len(path) == 3:
return data[path[0]][path[1]][path[2]]
elif len(path) == 4:
return data[path[0]][path[1]][path[2]][path[3]]
2 years ago
elif len(path) == 1:
return data[path[0]]
2 years ago
else:
2 years ago
return None
3 years ago
2 years ago
def setTdataStructure(msg, data, path):
if len(path) >= 1 and path[0] not in data:
data[path[0]] = {}
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
return data
3 years ago
2 years ago
def splitFields(line, delimiter, job):
out = []
fields = line.split(delimiter)
for i in range(0, len(fields)):
if fields[i][0:1] == "#":
break
if re.match(r"^\"(.*)\"$", fields[i]):
fields[i] = fields[i][1:-1]
out.append(fields[i])
return out
3 years ago
def writeCsvData(filename, tdata, comp, job):
text = ""
if B.DATA_NODE_TABLES in tdata:
for k in tdata[B.DATA_NODE_TABLES]:
text += buildCsvData(tdata[B.DATA_NODE_TABLES][k], k, job)
text += "\n"
utils.file_tool.writeFileText(comp.m, filename, text)
2 years ago
def buildCsvData(tdata, table, job=None):
3 years ago
"""
writes the testdata into a csv-file for documentation of the test-run
:param teststatus:
:param tdata:
:param comp: if specific else None
:return:
"""
text = ""
for k in [D.DATA_ATTR_DATE, D.DATA_ATTR_COUNT]:
if k in tdata:
text += k+";"+str(tdata[k])+"\n"
2 years ago
header = utils.i18n_tool.I18n.getInstance().getText(f"{B.DATA_NODE_TABLES=}", job)+":"+table
for f in tdata[B.DATA_NODE_HEADER]:
2 years ago
header += D.CSV_DELIMITER+f
2 years ago
text += header + "\n"
i = 0
for r in tdata[B.DATA_NODE_DATA]:
row = ""
2 years ago
if B.ATTR_DATA_COMP in r:
for k in r[B.ATTR_DATA_COMP]:
row += ","+k+":"+r[B.ATTR_DATA_COMP][k]
row = row[1:]
i += 1
for f in tdata[B.DATA_NODE_HEADER]:
if f in r:
2 years ago
row += D.CSV_DELIMITER+str(r[f])
else:
2 years ago
row += D.CSV_DELIMITER
2 years ago
text += row
text += "\n"
return text
2 years ago
def buildCsvSpec(tdata, job=None):
text = ""
if D.CSV_BLOCK_IMPORT in tdata:
for k in tdata[D.CSV_BLOCK_HEAD]:
text += utils.i18n_tool.I18n.getInstance().getText(f"{D.CSV_BLOCK_HEAD=}", job)
text += ":"+k+D.CSV_DELIMITER+tdata[D.CSV_BLOCK_HEAD][k]+"\n"
text += "# option:key ;values;..;;;;\n"
if D.CSV_BLOCK_OPTION in tdata:
for k in tdata[D.CSV_BLOCK_OPTION]:
text += utils.i18n_tool.I18n.getInstance().getText(f"{D.CSV_BLOCK_OPTION=}", job)
text += ":" + k + D.CSV_DELIMITER + getHeadArgs(tdata[D.CSV_BLOCK_OPTION][k], job)+"\n"
text += "#;;;;;;\n"
if D.CSV_BLOCK_STEP in tdata:
text += basic.step.getStepHeader(job)
i = 1
for step in tdata[D.CSV_BLOCK_STEP]:
text += utils.i18n_tool.I18n.getInstance().getText(f"{D.CSV_BLOCK_STEP=}", job) + ":" + str(i)
text += D.CSV_DELIMITER + step.getStepText(job)
i += 1
text += "#;;;;;;\n"
if D.CSV_BLOCK_TABLES in tdata:
for k in tdata[D.CSV_BLOCK_TABLES]:
text += buildCsvData(tdata[D.CSV_BLOCK_TABLES][k], k, job)
text += "#;;;;;;\n"
return text
def getHeadArgs(value, job):
return value.replace(D.INTERNAL_DELIMITER, D.CSV_DELIMITER)