Browse Source

refactor tdata

master
Ulrich Carmesin 2 years ago
parent
commit
a020e009fd
  1. 19
      utils/config_tool.py
  2. 683
      utils/tdata_tool.py

19
utils/config_tool.py

@ -23,7 +23,7 @@ import utils.path_const as P
COMP_FILES = [D.DDL_FILENAME]
CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV]
def getConfigPath(modul, name, subname=""):
def getConfigPath(modul, name, subname="", job=None):
"""
gets the most specified configuration of different sources
Parameter:
@ -39,7 +39,8 @@ def getConfigPath(modul, name, subname=""):
the parameter-files could be one of these file-types:
* yaml, json, csv
"""
job = basic.program.Job.getInstance()
if job is None:
job = basic.program.Job.getInstance()
verify = job.getDebugLevel("config_tool")-4
job.debug(verify, "getConfig " + modul + ", " + name)
#TODO path rejoin, config as const
@ -106,6 +107,20 @@ def getConfigPath(modul, name, subname=""):
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTCASE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
name, D.DFILE_TESTCASE_NAME + "."+format)
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTSUITE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
name, D.DFILE_TESTSUITE_NAME + "." + format)
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
else:
pathname = utils.path_tool.composePath(P.P_TCPARFILE)
job.debug(verify, "7 " + pathname)

683
utils/tdata_tool.py

@ -4,268 +4,313 @@
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
the issue of this tool is to transform extern data to the internal structure and the internal structure into extern data - i.e. mostly test-results.
* * * * * * * *
the testdata have several elements
* parameter (-td --tdata) : to identify which testdata should be loaded
* source (flaskdb: dbname / dir: filename) : always structured in a table (easy to specify) with columns
* node : where the rows are
* action : what should be done - default insert
+ fields : dates in relation of a reference<day or a formula
* interface : configured in components and used in comparison with attributes to each field:
* ignored - if it should be ignored on differences, it is necessary on technical ID-fields
* id-field - necessary
* * * * * * * *
the testdata itself which are written in different artifacts of modern applications are mostly stored as tree
- so as xml, json, always with plain data in the leaf. So the intern structure should be also a tree - in python: dictionary.
"""
import os.path
import basic.program
import utils.config_tool
import utils.file_tool
import basic.constants as B
import utils.data_const as D
import utils.path_const as P
import utils.path_tool
import utils.date_tool
import basic.step
import utils.i18n_tool
import re
TOOL_NAME = "tdata_tool"
""" name of the tool in order to switch debug-info on """
TDATA_NODES = [ D.CSV_BLOCK_OPTION ]
def getTdataAttr():
job = basic.program.Job.getInstance()
out = {} #
out[D.ATTR_SRC_TYPE] = D.DATA_SRC_DIR
print("---getTdataAttr")
print(vars(job.par))
if hasattr(job.par, B.PAR_TESTCASE):
out[D.ATTR_SRC_NAME] = getattr(job.par, B.PAR_TESTCASE)
elif hasattr(job.par, B.PAR_TESTSUITE):
out[D.ATTR_SRC_NAME] = getattr(job.par, B.PAR_TESTSUITE)
for p in [D.ATTR_SRC_TYPE, D.ATTR_SRC_DATA, D.ATTR_SRC_NAME]:
# out[p] = ""
if hasattr(job.par, p):
out[p] = getattr(job.par, p)
return out
list_blocks = {} # lists of aliases
def getTestdata():
def getTestdata(job=None):
"""
get the testdata from one of the possible soources
* dir: each file in the specific testarchiv
* csv: specific file
* db: specific db with a testcase-catalogue
get the testdata from one of the possible sources
for the testcase resp testsuite of the job
:return:
"""
job = basic.program.Job.getInstance()
#reftyp = getattr(job.par, "tdtyp")
#source = getattr(job.par, "tdsrc")
#criteria = getattr(job.par, "tdname")
tdata = getTdataAttr() # {"reftyp": reftyp, "source": source, "criteria": criteria}
print(tdata)
if tdata[D.ATTR_SRC_TYPE] == "flaskdb":
# read data-structure with sourcename
# connect to source
# select with all data with datastructure
job.m.setInfo("Test-Data readed from " + tdata[D.ATTR_SRC_TYPE] + " for " + tdata[D.ATTR_SRC_NAME])
elif tdata[D.ATTR_SRC_TYPE] == D.DATA_SRC_CSV:
# read file in testdata
job.m.logInfo("Test-Data readed from " + tdata[D.ATTR_SRC_TYPE] + " for " + tdata[D.ATTR_SRC_NAME])
elif tdata[D.ATTR_SRC_TYPE] == D.DATA_SRC_DIR:
path = os.path.join(job.conf.getJobConf(B.SUBJECT_PATH+":"+B.ATTR_PATH_TDATA), tdata[D.ATTR_SRC_NAME])
filename = os.path.join(path , "testspec.csv")
data = getCsvSpec(job.m, filename, D.CSV_SPECTYPE_DATA)
for k in data:
tdata[k] = data[k]
if (k == D.CSV_BLOCK_OPTION):
for p in data[k]:
setattr(job.par, p, data[k][p])
files = utils.file_tool.getFiles(job.m, path, "table_", None)
for f in files:
print(f)
filename = os.path.join(path, f)
data = readCsv(job.m, filename, None)
table = f[6:-4]
print(filename+" "+table)
if B.DATA_NODE_TABLES not in tdata:
tdata[B.DATA_NODE_TABLES] = {}
tdata[B.DATA_NODE_TABLES][table] = data[B.DATA_NODE_TABLES][table]
if job is None:
job = basic.program.Job.getInstance()
if "testcase" in job.program:
return collectTestdata(B.PAR_TESTCASE, job.par[B.PAR_TESTCASE], job)
else:
return collectTestdata(B.PAR_TESTSUITE, job.par[B.PAR_TESTSUITE], job)
def collectTestdata(gran, testentity, job):
"""
collects the testdata from kind of the possible sources
for the testcase resp testsuite
:return:
"""
setBlockLists(job)
if gran == B.PAR_TESTCASE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(P.KEY_TESTCASE, job.par[B.PAR_TESTCASE], "", job)
if gran == B.PAR_TESTSUITE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(P.KEY_TESTSUITE, job.par[B.PAR_TESTSUITE], "", job)
if pathname[-3:] == D.DFILE_TYPE_CSV:
tdata = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
job.m.setFatal("test-Data: reftyp " + tdata[D.ATTR_SRC_TYPE] + " is not implemented")
tdata = utils.file_tool.readFileDict(pathname, job.m)
# get explicit specdata of includes
for pathname in tdata[D.CSV_BLOCK_IMPORT]:
pathname = utils.path_tool.rejoinPath(pathname)
if job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA] not in pathname:
pathname = utils.path_tool.rejoinPath(basispath, pathname)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
data = utils.file_tool.readFileDict(pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
# get implicit specdata of spec-library
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
files = utils.file_tool.getFiles(job.m, basispath, prefix, None)
if len(files) < 0:
continue
for f in files:
if f in tdata[D.CSV_BLOCK_TABLES]:
continue
pathname = utils.path_tool.rejoinPath(basispath, f)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
data = utils.file_tool.readFileDict(pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
# fill the options into job-parameter
for p in tdata[D.CSV_BLOCK_OPTION]:
setattr(job.par, p, tdata[D.CSV_BLOCK_OPTION][p])
return tdata
def getCsvSpec(msg, filename, type):
def setBlockLists(job):
for block in D.LIST_BLOCK_CONST + D.LIST_ATTR_CONST + D.LIST_DFNAME_CONST:
list = utils.i18n_tool.I18n.getInstance().getAliasList(block+"='"+eval("D."+block)+"'")
#list.append(eval("D."+block))
list_blocks[eval("D." + block)] = []
for x in list:
list_blocks[eval("D." + block)].append(x.lower())
def readCsv(msg, filename, comp, aliasNode="", job=None):
if job is None:
job = basic.program.Job.getInstance()
lines = utils.file_tool.readFileLines(filename, msg)
print("readCsv "+filename)
return parseCsv(msg, filename, lines, comp, aliasNode, job)
def parseCsv(msg, filename, lines, comp, aliasNode="", job=None):
if job is None:
job = basic.program.Job.getInstance()
if len(list_blocks) < 1:
setBlockLists(job)
tdata = {}
if len(aliasNode) < 1:
print(str(list_blocks))
aliasNode = extractAliasNode(filename, comp, job)
if len(aliasNode) > 3:
tdata[D.DATA_ATTR_ALIAS] = aliasNode
return parseCsvSpec(msg, lines, D.CSV_SPECTYPE_DATA, tdata, job)
def extractAliasNode(filename, comp, job):
basename = os.path.basename(filename)[0:-4]
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
if basename.find(prefix) == 0:
basename = basename[len(prefix):]
if comp is None:
return ""
if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basename in comp.conf[B.DATA_NODE_DDL]:
return B.DATA_NODE_TABLES+":"+basename
return ""
def getCsvSpec(msg, filename, ttype, job=None):
"""
get data from a csv-file
a = field[0] delimited by :
a) data : like a table with data-array of key-value-pairs
a_0 is keyword [option, step, CSV_HEADER_START ]
a_0 : { a_1 : { f_1 : v_1, .... } # option, step
a_0 : { .. a_n : { _header : [ .. ], _data : [ rows... ] # table, node
b) tree : as a tree - the rows must be unique identified by the first column
a_0 is keyword in CSV_HEADER_START
a_0 : { .. a_n : { _header : [ fields.. ], _data : { field : value }
c) keys : as a tree - the rows must be unique identified by the first column
a_0 is keyword in CSV_HEADER_START
a_1 ... a_n is key characterized by header-field like _fk* or _pk*
a_0 : { .. a_n : { _keys : [ _fpk*.. ] , _header : [ fields.. ], _data : { pk_0 : { ... pk_n : { field : value }
d) conf:
_header : [ field_0, ... ]
{ field_0 : { attr_0 : val_0, .. }, field_1 : { ... }, ... }
reads the specification from a csv-file and maps it into the internal data-structure
:param msg:
:param filename:
:param type:
:param job:
:return:
"""
if job is None:
job = basic.program.Job.getInstance()
lines = utils.file_tool.readFileLines(filename, msg)
return parseCsvSpec(msg, lines, type)
tdata = {} # the result
return parseCsvSpec(msg, lines, ttype, tdata, job)
def parseCsvSpec(msg, lines, type):
job = basic.program.Job.getInstance()
data = {}
header = []
h = [] # from a[]
def parseCsvSpec(msg, lines, ttype, tdata, job=None):
"""
:param msg:
:param lines:
:param type:
:param job:
:return:
"""
if job is None:
job = basic.program.Job.getInstance()
if len(list_blocks) < 1:
setBlockLists(job)
status = "start"
tableDate = utils.date_tool.getActdate(utils.date_tool.F_DE)
tableDict = {}
tableAttr = {} # table
tableDict = {} # table
for l in lines:
print("lines "+l)
fields = l.split(D.CSV_DELIMITER)
fields = splitFields(l, D.CSV_DELIMITER, job)
# check empty line, comment
if (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1):
if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1):
status = "start"
continue
if (fields[0][0:1] == "#"):
continue
a = fields[0].lower().split(":")
# keywords option, step, table
if a[0] not in data and (a[0] in TDATA_NODES):
data[a[0]] = {}
if (a[0].lower() == D.CSV_BLOCK_STEP):
if (not B.DATA_NODE_STEPS in data):
data[B.DATA_NODE_STEPS] = []
print(str(a)+" -- "+str(fields))
tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job)
if (tableAttr["hit"]):
status = "TABLE_ALIAS"
continue
if (a[0].lower() in list_blocks[D.CSV_BLOCK_HEAD]):
print("head "+l)
setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_OPTION]):
print("option " + l)
setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_STEP]):
print("step "+l)
step = basic.step.parseStep(job, fields)
"""
step = {}
step[B.DATA_NODE_COMP] = fields[D.STEP_COMP_I]
step[B.ATTR_EXEC_REF] = fields[D.STEP_EXECNR_I]
step[B.ATTR_DATA_REF] = fields[D.STEP_REFNR_I]
step[B.ATTR_STEP_ARGS] = {}
if D.STEP_ARGS_I == D.STEP_LIST_I:
args = ""
for i in range(D.STEP_ARGS_I, len(fields)):
if len(fields[i]) < 1:
continue
if fields[i][0:1] == "#":
continue
args += "," + fields[i]
args = args[1:]
else:
args = fields[D.STEP_ARGS_I]
a = args.split(",")
for arg in a:
print("arg "+arg)
b = arg.split(":")
if len(b) < 2:
raise Exception(D.EXCP_MALFORMAT + "" + l)
step[B.ATTR_STEP_ARGS][b[0]] = b[1]
"""
data[B.DATA_NODE_STEPS].append(step)
if D.CSV_BLOCK_STEP not in tdata:
tdata[D.CSV_BLOCK_STEP] = []
tdata[D.CSV_BLOCK_STEP].append(step)
status = "start"
continue
elif (a[0].lower() == D.CSV_BLOCK_OPTION):
if len(a) < 2:
raise Exception(D.EXCP_MALFORMAT+""+l)
data[a[0]][a[1]] = fields[1]
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_IMPORT]):
print("includes " + l)
if D.CSV_BLOCK_IMPORT not in tdata:
tdata[D.CSV_BLOCK_IMPORT] = []
tdata[D.CSV_BLOCK_IMPORT].append(fields[1])
status = "start"
continue
elif a[0].lower() == D.DATA_ATTR_DATE:
tableDate = fields[1]
elif (a[0].lower() in D.CSV_HEADER_START):
# create deep structure a_0 ... a_n
print("tdata 136 CSV_HEADER_START "+str(len(a)))
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_TABLES]):
print("tables "+l)
h = a
header = []
if B.DATA_NODE_TABLES not in data:
data[B.DATA_NODE_TABLES] = {}
h[0] = B.DATA_NODE_TABLES
comps = {}
tableDict = getTabContent(msg, data, h)
i = 0
for f in fields:
i += 1
if i <= 1:
continue
if len(f) < 1:
break
header.append(f)
tableDict[B.DATA_NODE_HEADER] = header
print("tdata 165 header "+str(header))
if type == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
elif type == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS] = {}
elif type == D.CSV_SPECTYPE_CONF:
tableDict = {}
headerFields = []
else:
tableDict[B.DATA_NODE_DATA] = []
tableDict[D.DATA_ATTR_DATE] = tableDate
setTabContent(msg, data, tableDict, h)
if ttype == D.CSV_SPECTYPE_CONF:
del h[0]
tableDict = getTdataContent(msg, tdata, h)
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
continue
elif (status == D.CSV_SPECTYPE_DATA):
# check A-col for substructure
# fill data
tableDict = getTabContent(msg, data, h)
row = {}
print(fields)
i = 1
# case-differentiation DATA or TREE
for f in header:
print(str(i)+" "+str(len(fields))+" "+str(len(header)))
row[f] = fields[i]
if type == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA][f] = fields[i]
i += 1
if type == D.CSV_SPECTYPE_DATA:
print("parseSpec "+ str(fields[0]))
row[B.ATTR_DATA_COMP] = {}
for c in fields[0].split(","):
a = c.split(":")
print("parseSpec " + str(a))
comps[a[0]] = a[1]
row[B.ATTR_DATA_COMP][a[0]] = a[1]
#row[B.ATTR_DATA_COMP] = fields[0].split(",")
tableDict[B.ATTR_DATA_COMP] = comps
tableDict[B.DATA_NODE_DATA].append(row)
elif type == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS][fields[1]] = row
elif type == D.CSV_SPECTYPE_CONF:
tableDict[fields[1]] = row
headerFields.append(fields[1])
setTabContent(msg, data, tableDict, h)
if (status in [D.CSV_SPECTYPE_DATA, D.CSV_SPECTYPE_KEYS]):
tableDict = getTabContent(msg, data, h)
if type == D.CSV_SPECTYPE_CONF:
tableDict[B.DATA_NODE_HEADER] = headerFields
setTabContent(msg, data, tableDict, h)
if type == D.CSV_SPECTYPE_CONF:
data = data[B.DATA_NODE_TABLES]
print("return getCsvSpec "+str(data))
return data
tableDict = getTdataContent(msg, tdata, h)
print("setTableData "+str(h)+" "+str(tableDict))
setTableData(tableDict, fields, ttype, job)
elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata:
alias = tdata[D.DATA_ATTR_ALIAS]
b = alias.split(":")
h = [B.DATA_NODE_TABLES] + b
tableDict = getTdataContent(msg, tdata, h)
tableDict[D.DATA_ATTR_ALIAS] = alias
fields = [alias] + fields
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
if ttype == D.CSV_SPECTYPE_CONF:
for k in tdata:
if B.DATA_NODE_DATA in tdata[k]:
tdata[k].pop(B.DATA_NODE_DATA)
if B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]:
for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]:
if k in tdata[B.DATA_NODE_TABLES]:
print("Error")
else:
tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k]
tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES)
return tdata
def setTableHeader(tableDict, tableAttr, fields, ttype, job):
header = []
for i in range(1, len(fields)):
header.append(fields[i])
tableDict[B.DATA_NODE_HEADER] = header
for attr in tableAttr:
tableDict[attr] = tableAttr[attr]
# preparate the sub-structure for row-data
if ttype == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS] = {}
else:
tableDict[B.DATA_NODE_DATA] = []
return tableDict
def mergeTableComponents(comps, rowComps):
for c in rowComps.split(","):
a = c.split(":")
comps[a[0]] = a[1]
return comps
def setTableData(tableDict, fields, ttype, job):
row = {}
if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict:
fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields
i = 1
for f in tableDict[B.DATA_NODE_HEADER]:
row[f] = fields[i]
i += 1
if ttype == D.CSV_SPECTYPE_DATA:
row[B.ATTR_DATA_COMP] = {}
for c in fields[0].split(","):
a = c.split(":")
row[B.ATTR_DATA_COMP][a[0]] = a[1]
tableDict[B.DATA_NODE_DATA].append(row)
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS][fields[1]] = row
elif ttype == D.CSV_SPECTYPE_CONF:
tableDict[fields[1]] = row
return tableDict
def setTabContent(msg, data, tabledata, path):
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
def setTableAttribute(tableAttr, key, val, job):
for attr in D.LIST_DATA_ATTR:
if (key.lower() in list_blocks[attr]):
tableAttr[attr] = val
tableAttr["hit"] = True
return tableAttr
tableAttr["hit"] = False
return tableAttr
def setTdataLine(tdata, fields, block, job):
"""
sets field(s) into tdata as a key-value-pair
additional fields will be concatenate to a intern separated list
:param tdata:
:param fields:
:param block:
:param job:
:return:
"""
a = fields[0].lower().split(":")
a[0] = block # normalized key
val = ""
for i in range(1, len(fields)-1):
val += D.INTERNAL_DELIMITER+fields[i]
if len(val) > len(D.INTERNAL_DELIMITER):
val = val[len(D.INTERNAL_DELIMITER):]
setTdataContent(job.m, tdata, val, a)
return tdata
def setTdataContent(msg, data, tabledata, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
data[path[0]][path[1]] = tabledata
elif len(path) == 3:
@ -274,155 +319,45 @@ def setTabContent(msg, data, tabledata, path):
data[path[0]][path[1]][path[2]][path[3]] = tabledata
def getTabContent(msg, data, path):
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
def getTdataContent(msg, data, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
return data[path[0]][path[1]]
elif len(path) == 3:
return data[path[0]][path[1]][path[2]]
elif len(path) == 4:
return data[path[0]][path[1]][path[2]][path[3]]
elif len(path) == 1:
return data[path[0]]
else:
pass
return None
def readCsv(msg, filename, comp, aliasNode=""):
lines = utils.file_tool.readFileLines(filename, msg)
print("readCsv "+filename)
print(lines)
return parseCsv(msg, filename, lines, comp, aliasNode)
def parseCsv(msg, filename, lines, comp, aliasNode=""):
job = basic.program.Job.getInstance()
verify = -4+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "# # # # # # # # parseCsv " + filename + " :" + str(lines))
fields = []
nodes = []
columns = []
output = {}
state = 0
data = {}
tableDict = {}
tableDate = ""
tableCnt = 0
cnt = 0
basename = os.path.basename(filename)[0:-4]
startCols = 1
for line in lines:
fields = line.split(';')
testline = line.replace(";", "")
a = fields[0].split(':')
job.debug(verify, str(state) + " line " + line + " :" + str(len(fields)) + ": " + str(fields))
if len(testline) < 2 and state < 1:
state = 0
elif a[0].lower() == D.DATA_ATTR_DATE:
tableDate = fields[1]
state = 1
elif a[0].lower() == D.DATA_ATTR_COUNT:
tableCnt = fields[1]
state = 1
elif a[0].lower() in D.CSV_HEADER_START or \
(comp is not None and state == 1
and isCompTableFile(comp, filename)):
state = 2
columns = []
h = a
if len(h) < 2 and comp is not None:
a = ["table", basename]
h = a
startCols = 0
cnt = len(fields)
job.debug(verify, str(state) + " cnt " + str(cnt))
data[B.DATA_NODE_TABLES] = {}
h[0] = B.DATA_NODE_TABLES
if not aliasNode.isspace() and len(aliasNode) > 3:
struct = aliasNode.split(":")
for x in struct:
if len(x) > 2:
nodes.append(x)
job.debug(verify, str(state) + " nodes " + str(nodes))
elif len(h) > 1:
for i in range(1, len(h)):
nodes.append(h[i])
job.debug(verify, str(state) + " nodes " + str(nodes))
tableDict = getTabContent(msg, data, h)
tableDict[B.ATTR_DATA_COMP] = {}
if len(tableDate) > 6:
tableDict[D.DATA_ATTR_DATE] = tableDate
if int(tableCnt) > 0:
tableDict[D.DATA_ATTR_COUNT] = tableCnt
j = 0
for i in range(1, cnt):
if fields[i][0:1] == "_":
startCols += 1
continue
job.debug(verify, str(i) + " cnt " + str(fields[i]))
if len(fields[i]) > 0:
columns.append(fields[i])
j = j + 1
cnt = j
tableDict[B.DATA_NODE_HEADER] = columns
job.debug(verify, str(state) + " " + str(cnt) + " cols " + str(columns))
elif state >= 2 and len(testline) > 2:
job.debug(verify, str(state) + " " + str(len(testline)))
tableDict = getTabContent(msg, data, h)
state = 3
row = {}
print(line)
if startCols > 0:
row[B.ATTR_DATA_COMP] = {}
row[B.ATTR_DATA_COMP][a[0]] = a[1]
tableDict[B.ATTR_DATA_COMP][a[0]] = a[1]
for i in range(startCols, cnt+startCols):
print("for "+str(i)+" "+str(len(row))+" "+str(startCols)+" "+str(len(fields)))
print(str(fields[i]))
if i >= len(columns)+startCols:
break
row[columns[i-startCols]] = fields[i]
job.debug(verify, str(state) + " row " + str(row))
if B.DATA_NODE_DATA not in tableDict:
tableDict[B.DATA_NODE_DATA] = []
tableDict[B.DATA_NODE_DATA].append(row)
setTabContent(msg, data, tableDict, h)
elif state == 3:
job.debug(verify, "structure " + str(state) + ": " + str(nodes))
state = 0
return data
def setTdataStructure(msg, data, path):
if len(path) >= 1 and path[0] not in data:
data[path[0]] = {}
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
return data
def setSubnode(i, nodes, data, tree):
print("setSubnode " + str(i) + ": " + ": " + str(tree))
if i >= len(nodes):
print("setSubnode a " + str(i))
tree[B.DATA_NODE_DATA] = data
elif tree is not None and nodes[i] in tree.keys():
print("setSubnode b " + str(i))
tree[nodes[i]] = setSubnode(i+1, nodes, data, tree[nodes[i]])
else:
print("setSubnode c " + str(i))
tree[nodes[i]] = setSubnode((i + 1), nodes, data, {})
return tree
def getDataStructure(comp):
# gets data-structure from the vml in the component-folder
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "getDataStructure " + comp)
def normalizeDataRow(dstruct, xpathtupel, row, referencedate):
# normalize data of the row if necessary
# raw-value is saved as new field with _raw as suffix
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "calcDataRow " + row)
def splitFields(line, delimiter, job):
out = []
fields = line.split(delimiter)
for i in range(0, len(fields)):
if fields[i][0:1] == "#":
break
if re.match(r"^\"(.*)\"$", fields[i]):
fields[i] = fields[i][1:-1]
out.append(fields[i])
return out
def buildCsvData(filename, tdata, comp):
def buildCsvData(tdata, table, job=None):
"""
writes the testdata into a csv-file for documentation of the test-run
:param teststatus:
@ -430,22 +365,14 @@ def buildCsvData(filename, tdata, comp):
:param comp: if specific else None
:return:
"""
compColumn = not isCompTableFile(comp, filename)
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "writeDataTable " + str(comp))
text = ""
for k in [D.DATA_ATTR_DATE, D.DATA_ATTR_COUNT]:
if k in tdata:
text += k+";"+str(tdata[k])+"\n"
header = "table"
header = utils.i18n_tool.I18n.getInstance().getText(f"{B.DATA_NODE_TABLES=}", job)+":"+table
for f in tdata[B.DATA_NODE_HEADER]:
header += ";"+f
if compColumn:
text += header
else:
#text += "_nr;" + header[6:] + "\n"
text += header[6:] + "\n"
text += header + "\n"
i = 0
for r in tdata[B.DATA_NODE_DATA]:
row = ""
@ -455,30 +382,6 @@ def buildCsvData(filename, tdata, comp):
row += ";"+str(r[f])
else:
row += ";"
if compColumn:
text += row
else:
text += row[1:]
#text += str(i) + row
text += row
text += "\n"
return text
def writeCsvData(filename, tdata, comp):
text = ""
if B.DATA_NODE_TABLES in tdata:
for k in tdata[B.DATA_NODE_TABLES]:
text += buildCsvData(filename, tdata[B.DATA_NODE_TABLES][k], comp)
text += "\n"
utils.file_tool.writeFileText(comp.m, filename, text)
def isCompTableFile(comp, filename):
""" check if the filename belongs to the component """
basetable = os.path.basename(filename)[0:-4]
if comp is None:
return False
if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basetable in comp.conf[B.DATA_NODE_DDL] \
and comp.name in filename:
return True
return False
Loading…
Cancel
Save