Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

340 lines
13 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
3 years ago
"""
the issue of this tool is to transform extern data to the internal structure and the internal structure into extern data - i.e. mostly test-results.
* * * * * * * *
the testdata have several elements
* parameter (-td --tdata) : to identify which testdata should be loaded
3 years ago
* source (flaskdb: dbname / dir: filename) : always structured in a table (easy to specify) with columns
3 years ago
* node : where the rows are
* action : what should be done - default insert
+ fields : dates in relation of a reference<day or a formula
* interface : configured in components and used in comparison with attributes to each field:
* ignored - if it should be ignored on differences, it is necessary on technical ID-fields
* id-field - necessary
* * * * * * * *
the testdata itself which are written in different artifacts of modern applications are mostly stored as tree
- so as xml, json, always with plain data in the leaf. So the intern structure should be also a tree - in python: dictionary.
"""
import os.path
3 years ago
import basic.program
import utils.file_tool
import basic.constants as B
DATA_SRC_DIR = "dir"
DATA_SRC_CSV = "csv"
CSV_HEADER_START = ["node", "table", "tabelle"]
CSV_DELIMITER = ";"
CSV_SPECTYPE_DATA = "data"
CSV_SPECTYPE_TREE = "tree"
3 years ago
CSV_SPECTYPE_KEYS = "keys"
CSV_SPECTYPE_CONF = "conf"
CSV_NODETYPE_KEYS = "_keys"
3 years ago
ATTR_SRC_TYPE = "tdtyp"
ATTR_SRC_DATA = "tdsrc"
ATTR_SRC_NAME = "tdname"
def getTdataAttr():
job = basic.program.Job.getInstance()
out = {} #
out[ATTR_SRC_TYPE] = DATA_SRC_DIR
3 years ago
print("---getTdataAttr")
print(vars(job.par))
if hasattr(job.par, B.PAR_TESTCASE):
out[ATTR_SRC_NAME] = getattr(job.par, B.PAR_TESTCASE)
elif hasattr(job.par, B.PAR_TESTSUITE):
out[ATTR_SRC_NAME] = getattr(job.par, B.PAR_TESTSUITE)
for p in [ATTR_SRC_TYPE, ATTR_SRC_DATA, ATTR_SRC_NAME]:
3 years ago
# out[p] = ""
if hasattr(job.par, p):
out[p] = getattr(job.par, p)
return out
3 years ago
def getTestdata():
"""
get the testdata from one of the possible soources
* dir: each file in the specific testarchiv
* csv: specific file
* db: specific db with a testcase-catalogue
:return:
"""
3 years ago
job = basic.program.Job.getInstance()
3 years ago
#reftyp = getattr(job.par, "tdtyp")
#source = getattr(job.par, "tdsrc")
#criteria = getattr(job.par, "tdname")
tdata = getTdataAttr() # {"reftyp": reftyp, "source": source, "criteria": criteria}
3 years ago
print(tdata)
if tdata[ATTR_SRC_TYPE] == "flaskdb":
3 years ago
# read data-structure with sourcename
# connect to source
# select with all data with datastructure
job.m.setInfo("Test-Data readed from " + tdata[ATTR_SRC_TYPE] + " for " + tdata[ATTR_SRC_NAME])
elif tdata[ATTR_SRC_TYPE] == DATA_SRC_CSV:
3 years ago
# read file in testdata
job.m.logInfo("Test-Data readed from " + tdata[ATTR_SRC_TYPE] + " for " + tdata[ATTR_SRC_NAME])
elif tdata[ATTR_SRC_TYPE] == DATA_SRC_DIR:
filename = os.path.join(job.conf.getJobConf(B.SUBJECT_PATH+":"+B.ATTR_PATH_TDATA), tdata[ATTR_SRC_NAME], "testspec.csv")
data = getCsvSpec(job.m, filename, CSV_SPECTYPE_DATA)
for k in data:
tdata[k] = data[k]
if (k == "option"):
for p in data[k]:
setattr(job.par, p, data[k][p])
3 years ago
else:
job.m.setFatal("test-Data: reftyp " + tdata[ATTR_SRC_TYPE] + " is not implemented")
3 years ago
return tdata
def getCsvSpec(msg, filename, type):
"""
get data from a csv-file
3 years ago
a = field[0] delimited by :
a) data : like a table with data-array of key-value-pairs
3 years ago
a_0 is keyword [option, step, CSV_HEADER_START ]
a_0 : { a_1 : { f_1 : v_1, .... } # option, step
a_0 : { .. a_n : { _header : [ .. ], _data : [ rows... ] # table, node
b) tree : as a tree - the rows must be unique identified by the first column
3 years ago
a_0 is keyword in CSV_HEADER_START
a_0 : { .. a_n : { _header : [ fields.. ], _data : { field : value }
c) keys : as a tree - the rows must be unique identified by the first column
a_0 is keyword in CSV_HEADER_START
a_1 ... a_n is key characterized by header-field like _fk* or _pk*
a_0 : { .. a_n : { _keys : [ _fpk*.. ] , _header : [ fields.. ], _data : { pk_0 : { ... pk_n : { field : value }
d) conf:
_header : [ field_0, ... ]
{ field_0 : { attr_0 : val_0, .. }, field_1 : { ... }, ... }
"""
data = {}
header = []
3 years ago
h = [] # from a[]
lines = utils.file_tool.readFileLines(filename, msg)
status = "start"
3 years ago
tableDict = {}
for l in lines:
print("lines "+l)
fields = l.split(CSV_DELIMITER)
# check empty line, comment
if (len(l.strip().replace(CSV_DELIMITER,"")) < 1):
status = "start"
continue
if (fields[0][0:1] == "#"):
continue
a = fields[0].lower().split(":")
# keywords option, step, table
if (a[0] not in data):
data[a[0]] = {}
if (a[0].lower() == "step"):
if (not B.DATA_NODE_STEPS in data):
data[B.DATA_NODE_STEPS] = []
step = {}
step[B.DATA_NODE_COMP] = fields[1]
step[B.ATTR_DATA_REF] = fields[2]
step[B.ATTR_STEP_ARGS] = {}
a = fields[3].split(",")
for arg in a:
b = arg.split(":")
step[B.ATTR_STEP_ARGS][b[0]] = b[1]
data[B.DATA_NODE_STEPS].append(step)
continue
elif (a[0].lower() == "option"):
data[a[0]][a[1]] = fields[1]
continue
elif (a[0].lower() in CSV_HEADER_START):
3 years ago
# create deep structure a_0 ... a_n
print("tdata 136 CSV_HEADER_START "+str(len(a)))
h = a
tableDict = getTabContent(msg, data, h)
i = 0
for f in fields:
i += 1
if i <= 1:
continue
if len(f) < 1:
break
header.append(f)
tableDict[B.DATA_NODE_HEADER] = header
if type == CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
elif type == CSV_SPECTYPE_KEYS:
tableDict[CSV_NODETYPE_KEYS] = {}
elif type == CSV_SPECTYPE_CONF:
tableDict = {}
headerFields = []
else:
tableDict[B.DATA_NODE_DATA] = []
setTabContent(msg, data, tableDict, h)
status = CSV_SPECTYPE_DATA
continue
elif (status == CSV_SPECTYPE_DATA):
# check A-col for substructure
# fill data
tableDict = getTabContent(msg, data, h)
row = {}
i = 1
3 years ago
# case-differentiation DATA or TREE
for f in header:
row[f] = fields[i]
3 years ago
if type == CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA][f] = fields[i]
i += 1
3 years ago
if type == CSV_SPECTYPE_DATA:
tableDict[B.DATA_NODE_DATA].append(row)
elif type == CSV_SPECTYPE_KEYS:
tableDict[CSV_NODETYPE_KEYS][fields[1]] = row
elif type == CSV_SPECTYPE_CONF:
tableDict[fields[1]] = row
headerFields.append(fields[1])
setTabContent(msg, data, tableDict, h)
if (status in [CSV_SPECTYPE_DATA, CSV_SPECTYPE_KEYS]):
tableDict = getTabContent(msg, data, h)
if type == CSV_SPECTYPE_DATA:
tableDict[B.DATA_NODE_HEADER] = headerFields
setTabContent(msg, data, tableDict, h)
print("return getCsvSpec "+str(data))
return data
def setTabContent(msg, data, tabledata, path):
3 years ago
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
if len(path) == 2:
data[path[0]][path[1]] = tabledata
elif len(path) == 3:
data[path[0]][path[1]][path[2]] = tabledata
elif len(path) == 4:
data[path[0]][path[1]][path[2]][path[3]] = tabledata
def getTabContent(msg, data, path):
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
if len(path) == 2:
return data[path[0]][path[1]]
elif len(path) == 3:
return data[path[0]][path[1]][path[2]]
elif len(path) == 4:
return data[path[0]][path[1]][path[2]][path[3]]
3 years ago
def readCsv(msg, filename, comp):
3 years ago
job = basic.program.Job.getInstance()
3 years ago
verify = -1+job.getDebugLevel("tdata_tool")
job.debug(verify, "readCsv " + filename)
fields = []
nodes = []
columns = []
output = {}
state = 0
data = []
cnt = 0
lines = utils.file_tool.readFileLines(filename, msg)
basename = os.path.basename(filename)[0:-4]
startCols = 1
3 years ago
for line in lines:
fields = line.split(';')
testline = line.replace(";", "")
job.debug(verify, str(state) + " line " + line + " :" + str(len(fields)) + ": " + str(fields))
if len(testline) < 2 and state < 1:
state = 0
elif fields[0].lower() in CSV_HEADER_START:
3 years ago
state = 2
columns = []
cnt = len(fields)
job.debug(verify, str(state) + " cnt " + str(cnt))
j = 0
for i in range(1, cnt-1):
if fields[0][0:1] == "_":
startCols += 1
continue
3 years ago
job.debug(verify, str(i) + " cnt " + str(fields[i]))
if len(fields[i]) > 0:
columns.append(fields[i])
j = j + 1
cnt = j
job.debug(verify, str(state) + " " + str(cnt) + " cols " + str(columns))
elif state >= 2 and len(testline) > 2:
if state == 2 and not fields[0].isspace():
struct = fields[0].split(":")
for x in struct:
if len(x) > 2:
nodes.append(x)
3 years ago
job.debug(verify, str(state) + " nodes " + str(nodes))
state = 3
row = {}
for i in range(startCols, cnt-1):
row[columns[i-startCols]] = fields[i]
3 years ago
job.debug(verify, str(state) + " row " + str(row))
data.append(row)
elif state == 3:
job.debug(verify, "structure " + str(state) + ": " + str(nodes))
output = setSubnode(0, nodes, data, output)
data = []
state = 0
if len(nodes) < 1:
nodes.append(basename)
3 years ago
output = setSubnode(0, nodes, data, output)
return output
def setSubnode(i, nodes, data, tree):
print("setSubnode " + str(i) + ": " + ": " + str(tree))
if i >= len(nodes):
print("setSubnode a " + str(i))
tree["_data"] = data
3 years ago
elif tree is not None and nodes[i] in tree.keys():
print("setSubnode b " + str(i))
tree[nodes[i]] = setSubnode(i+1, nodes, data, tree[nodes[i]])
else:
print("setSubnode c " + str(i))
tree[nodes[i]] = setSubnode((i + 1), nodes, data, {})
return tree
def getDataStructure(comp):
# gets data-structure from the vml in the component-folder
3 years ago
job = basic.program.Job.getInstance()
3 years ago
verify = -1+job.getDebugLevel("tdata_tool")
job.debug(verify, "getDataStructure " + comp)
def normalizeDataRow(dstruct, xpathtupel, row, referencedate):
# normalize data of the row if necessary
# raw-value is saved as new field with _raw as suffix
3 years ago
job = basic.program.Job.getInstance()
3 years ago
verify = -1+job.getDebugLevel("tdata_tool")
job.debug(verify, "calcDataRow " + row)
def writeCsvData(filename, tdata, comp):
3 years ago
"""
writes the testdata into a csv-file for documentation of the test-run
:param teststatus:
:param tdata:
:param comp: if specific else None
:return:
"""
3 years ago
job = basic.program.Job.getInstance()
3 years ago
verify = -1+job.getDebugLevel("tdata_tool")
job.debug(verify, "writeDataTable " + str(comp))
text = "table"
for f in tdata[B.DATA_NODE_HEADER]:
text += ";"+f
for r in tdata[B.DATA_NODE_DATA]:
text += "\n"
for f in tdata[B.DATA_NODE_HEADER]:
if f in r:
text += ";"+str(r[f])
else:
text += ";"
text += "\n"
utils.file_tool.writeFileText(comp.m, filename, text)