Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

462 lines
17 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import json
import re
import basic.program
import tools.file_abstract
import basic.constants as B
import tools.data_const as D
import tools.file_tool
2 years ago
from basic import toolHandling
class FileFcts(tools.file_abstract.FileFcts):
def __init__(self):
pass
2 years ago
def load_file(self, path, ttype=""):
"""
this function parses the text and translates it to dict
:param text:
:return:
"""
lines = tools.file_tool.read_file_lines(self.job, path, self.getMsg())
2 years ago
return self.parseCsv(self.getMsg(), self.job, lines, ttype)
1 year ago
def dump_file(self, data, path, ttype=""):
text = self.buildCsv(self.getMsg(), self.job, data, ttype)
return text
2 years ago
def isEmptyLine(self, msg, job, line, fields):
if (len(fields) < 1) or (len(line.strip().replace(D.CSV_DELIMITER, "")) < 1):
status = "start"
return True
if (fields[0][0:1] == "#"):
return True
return False
def isBlock(self, msg, job, field, block, status):
"""
detects the block either on keywords in the field which opens a block
or on status if there is no keyword in the field
:param msg: message-object maybe from component
:param job: job-object with parameter and configuration
:param field: field in the csv-file
:param block:
:param status:
:return:
"""
try:
blockPur = block.replace("_", "")
a = field.split(":")
if a[0] == blockPur:
return True
elif "_"+a[0] in [D.CSV_BLOCK_OPTION, D.CSV_BLOCK_HEAD, D.CSV_BLOCK_STEP, D.CSV_BLOCK_TABLES]:
return False
if blockPur == status:
return True
if block == D.CSV_BLOCK_ATTR and len(a) == 1 and field[0:1] == "_":
return True
return False
except:
print("isBlock "+field + "=?" + block)
def parseCsv(self, msg, job, lines, ttype=""):
"""
:param msg:
:param job:
:param lines:
:param ttype: content
a) catalog: key(s) - values # meta-spec, meta-auto
b) head: key - value # spec-info
c) option: key - value # spec -> job.par
d) step: key=function - values # spec (tp, ts) -> comp.function
e) step: key=usecase - values # spec (tc) -> comp.steps
f) ddl-table: key=field - vaulues=attributes # meta-spec, comp
g) data-table: array: field - values # spec.data, comp.artifacts
:return:
"""
tdata = {}
status = "start"
verbose = False
tableAttr = {} # table
tableDict = {} # table
# Zeilen parsen
for l in lines:
fields = splitFields(l, D.CSV_DELIMITER, job)
if self.isEmptyLine(msg, job, l, fields): continue
a = fields[0].lower().split(":")
# keywords option, step, table
if self.isBlock(msg, job, fields[0], D.CSV_BLOCK_ATTR, status): # a[0].lower() in D.LIST_DATA_ATTR:
tableAttr = setTableAttribute(job, tableAttr, a[0], fields)
2 years ago
if ttype == "" and D.DATA_ATTR_TYPE in tableAttr:
ttype = tableAttr[D.DATA_ATTR_TYPE]
elif D.DATA_ATTR_TYPE in tableAttr and ttype != tableAttr[D.DATA_ATTR_TYPE]:
msg.logWarn("System-Type " + ttype + " be overwrite by file-Type " + tableAttr[D.DATA_ATTR_TYPE])
ttype = tableAttr[D.DATA_ATTR_TYPE]
2 years ago
continue
1 year ago
elif self.isBlock(msg, job, fields[0], D.CSV_BLOCK_HEAD, status):
2 years ago
setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job)
status = "start"
continue
1 year ago
elif self.isBlock(msg, job, fields[0], D.CSV_BLOCK_OPTION, status):
2 years ago
setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job)
status = "start"
continue
1 year ago
elif (status != D.CSV_BLOCK_STEP) \
and self.isBlock(msg, job, fields[0], D.CSV_BLOCK_STEP, status):
h = []
h.append(B.DATA_NODE_STEPS)
2 years ago
if verbose: print(">> step "+l)
1 year ago
tableDict = getTdataContent(msg, tdata, h)
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_BLOCK_STEP
2 years ago
continue
elif self.isBlock(msg, job, fields[0], D.CSV_BLOCK_TABLES, status):
if verbose: print(">> tables " + l)
h = a
h[0] = B.DATA_NODE_TABLES
if ttype == D.CSV_SPECTYPE_CONF:
del h[0]
tableDict = getTdataContent(msg, tdata, h)
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
elif (status == D.CSV_SPECTYPE_DATA):
tableDict = getTdataContent(msg, tdata, h)
if verbose: print(">> setTableData " + str(h) + " " + str(tableDict))
setTableData(tableDict, fields, ttype, job)
1 year ago
elif (status == D.CSV_BLOCK_STEP):
print("step-line "+status+": "+l)
h = []
h.append(B.DATA_NODE_STEPS)
tableDict = getTdataContent(msg, tdata, h)
if verbose: print(">> setTableData " + str(h) + " " + str(tableDict))
setTableData(tableDict, fields, ttype, job)
#tableDict = getTdataContent(msg, tdata, h)
#if verbose: print(">> setTableData " + str(h) + " " + str(tableDict))
#setTableData(tableDict, fields, ttype, job)
2 years ago
if D.DATA_ATTR_TYPE not in tableAttr:
tableAttr[D.DATA_ATTR_TYPE] = ttype
if ttype in [D.CSV_SPECTYPE_DDL, D.CSV_SPECTYPE_CTLG]:
if len(tdata[B.DATA_NODE_TABLES]) > 1:
job.m.setError("Mehr als einr Tabelle in "+ttype)
elif len(tdata[B.DATA_NODE_TABLES]) == 0:
job.m.setError("Keine Tabelle in "+ttype)
tdata = {}
else:
data = {}
for k in tdata[B.DATA_NODE_TABLES]:
data[k] = tdata[B.DATA_NODE_TABLES][k]
tdata = data
for k in tableAttr:
tdata[k] = tableAttr[k]
if ttype == D.CSV_SPECTYPE_CONF:
fields = []
for k in tdata:
if k in ["_hit"] + D.LIST_DATA_ATTR:
continue
if B.DATA_NODE_DATA in tdata[k]:
tdata[k].pop(B.DATA_NODE_DATA)
for f in tdata[k]:
if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR:
continue
fields.append(f)
tdata[k][B.DATA_NODE_FIELDS] = fields
header = []
if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]:
for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]:
if k in tdata[B.DATA_NODE_TABLES]:
if verbose: print("Error")
else:
tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k]
tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES)
if "_hit" in tdata:
tdata.pop("_hit")
return tdata
def buildCsv(self, msg, job, data, ttype=""):
""""
d) conf:
_type : conf
_header : [ field_0, ... ]
{ field_0 : { attr_0 : val_0, .. },
field_1 : { ... }, ... }
-->
"_type;conf;;;;;;",
"table:lofts;_field;field;type;acceptance;key",
"lofts;street;a;str;;T:1",
";city;b;str;;F:1",
"#;;;;;;"
"""
out = ""
fields = []
table = ""
header = []
lines = []
tableData = {}
delimiter = D.CSV_DELIMITER
if D.DATA_ATTR_DLIM in data:
delimiter = data[D.DATA_ATTR_DLIM]
if D.DATA_ATTR_TYPE not in data and ttype != "":
data[D.DATA_ATTR_TYPE] = ttype
for f in D.LIST_DATA_ATTR:
if f in data and f == D.DATA_ATTR_TBL:
line = f + delimiter + data[f] + D.CSV_DELIMITER
lines.append(line)
elif ttype != "" and data[D.DATA_ATTR_TYPE] in [D.CSV_SPECTYPE_DDL]:
continue
elif f in data:
out += f + D.CSV_DELIMITER + data[f] + "\n"
if data[D.DATA_ATTR_TYPE] == D.CSV_SPECTYPE_CTLG:
for k in data:
if k in D.LIST_DATA_ATTR:
continue
if k in [B.DATA_NODE_TABLES, B.DATA_NODE_HEADER, "_hit"]:
continue
out += buildHeader(job, data[k][B.DATA_NODE_HEADER], k)
out += buildCtlg(job, data[k][B.DATA_NODE_HEADER], data[k][B.DATA_NODE_KEYS])
1 year ago
elif data[D.DATA_ATTR_TYPE] in [D.CSV_SPECTYPE_DDL, D.CSV_SPECTYPE_CONF]:
2 years ago
for k in data:
if k in D.LIST_DATA_ATTR:
continue
out += buildHeader(job, data[k][B.DATA_NODE_HEADER], k)
out += buildCtlg(job, data[k][B.DATA_NODE_HEADER], data[k])
1 year ago
if B.DATA_NODE_STEPS in data:
out += "step:header"
for h in data[B.DATA_NODE_STEPS][B.DATA_NODE_HEADER]:
out += delimiter + h
out += "\n"
for row in data[B.DATA_NODE_STEPS][B.DATA_NODE_DATA]:
for h in data[B.DATA_NODE_STEPS][B.DATA_NODE_HEADER]:
if h in [B.DATA_NODE_ARGS, "args"]:
for arg in row[B.DATA_NODE_ARGS]:
out += delimiter + arg + ":" + row[B.DATA_NODE_ARGS][arg]
else:
out += delimiter + row[h]
2 years ago
if len(out) > 0:
return out
if B.DATA_NODE_TABLES in data:
print("_tables in data")
for k in data[B.DATA_NODE_TABLES].keys():
tableData[k] = data[B.DATA_NODE_TABLES][k]
else:
for k in data.keys():
if k in D.LIST_DATA_ATTR:
continue
tableData[k] = data[k]
for k in tableData:
fields = []
if B.DATA_NODE_FIELDS in data[k]:
fields = data[k][B.DATA_NODE_FIELDS]
if B.DATA_NODE_HEADER in data[k]:
header = data[k][B.DATA_NODE_HEADER]
line = "table:" + k + D.CSV_DELIMITER + D.CSV_DELIMITER.join(header)
lines.append(line)
continue
if B.DATA_NODE_DATA in data[k]:
for row in data[k][B.DATA_NODE_DATA]:
for h in header:
line += D.CSV_DELIMITER + row[h]
lines.append(line)
else:
line = D.CSV_DELIMITER + k
for f in fields:
for h in header:
line += D.CSV_DELIMITER + tableData[f][h]
lines.append(line)
out = "\n".join(lines)
return out
def buildHeader(job, header, tableName):
return "table:" + tableName + ";" + ";".join(header) + "\n"
def buildCtlg(job, header, table):
out = ""
for k in table:
if k in D.LIST_DATA_ATTR:
continue
1 year ago
if k in [B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_DATA, "_hit"]:
2 years ago
continue
for h in header:
1 year ago
if h not in table[k]:
out += D.CSV_DELIMITER
elif isinstance(table[k][h], dict):
2 years ago
text = json.dumps(table[k][h])
out += "\"" + text + "\""
else:
out += D.CSV_DELIMITER + table[k][h]
out += "\n"
return out
def buildDdl(job, header, table):
out = ""
for k in table:
if k in D.LIST_DATA_ATTR:
continue
if k in [B.DATA_NODE_HEADER, B.DATA_NODE_DATA, "_hit"]:
continue
for h in header:
out += D.CSV_DELIMITER + table[k][h]
out += "\n"
return out
def convertRows2Text(job, header, tableData):
text = ""
for f in tableData:
if f in D.LIST_DATA_ATTR:
continue
for h in header:
print(h)
def splitFields(line, delimiter, job):
out = []
fields = line.split(delimiter)
for i in range(0, len(fields)):
if fields[i][0:1] == "#":
break
if re.match(r"^\"(.*)\"$", fields[i]):
fields[i] = fields[i][1:-1]
if fields[i].find("{\"") == 0:
if fields[i].find("{\"\"") == 0:
fields[i] = fields[i].replace("\"\"", "\"")
try:
val = json.loads(fields[i])
fields[i] = val
except Exception as e:
pass
out.append(fields[i])
return out
def setTableAttribute(job, tableAttr, key, val):
key = key.lower()
if key in D.LIST_DATA_ATTR:
if key in D.LIST_ATTR_MULTI:
values = {}
for i in range(1, len(val)):
if len(val[i].strip()) < 1:
continue
values["{:03d}".format(i)] = val[i].strip()
tableAttr[key] = values
else:
tableAttr[key] = val[1].strip()
tableAttr["_hit"] = True
return tableAttr
tableAttr["_hit"] = False
return tableAttr
def setTdataLine(tdata, fields, block, job):
"""
sets field(s) into tdata as a key-value-pair
additional fields will be concatenate to a intern separated list
:param tdata:
:param fields:
:param block:
:param job:
:return:
"""
a = fields[0].lower().split(":")
a[0] = block # normalized key
val = ""
for i in range(1, len(fields)-1):
val += D.INTERNAL_DELIMITER+fields[i]
if len(val) > len(D.INTERNAL_DELIMITER):
val = val[len(D.INTERNAL_DELIMITER):]
setTdataContent(job.m, tdata, val, a)
return tdata
def setTdataContent(msg, data, tabledata, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
data[path[0]][path[1]] = tabledata
elif len(path) == 3:
data[path[0]][path[1]][path[2]] = tabledata
elif len(path) == 4:
data[path[0]][path[1]][path[2]][path[3]] = tabledata
def setTdataStructure(msg, data, path):
if len(path) >= 1 and path[0] not in data:
data[path[0]] = {}
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
return data
def getTdataContent(msg, data, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
return data[path[0]][path[1]]
elif len(path) == 3:
return data[path[0]][path[1]][path[2]]
elif len(path) == 4:
return data[path[0]][path[1]][path[2]][path[3]]
elif len(path) == 1:
return data[path[0]]
else:
return None
def setTableHeader(tableDict, tableAttr, fields, ttype, job):
header = []
for i in range(1, len(fields)):
1 year ago
if len(fields[i].strip()) < 1:
continue
header.append(fields[i].strip())
tableDict[B.DATA_NODE_HEADER] = header
for attr in tableAttr:
tableDict[attr] = tableAttr[attr]
# preparate the sub-structure for row-data
if ttype == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
2 years ago
elif ttype in [D.CSV_SPECTYPE_KEYS, D.CSV_SPECTYPE_CTLG]:
tableDict[D.CSV_NODETYPE_KEYS] = {}
tableDict[D.DATA_ATTR_KEY] = 1
if D.DATA_ATTR_KEY in tableAttr:
tableDict[D.DATA_ATTR_KEY] = header.index(tableAttr[D.DATA_ATTR_KEY]) + 1
else:
tableDict[B.DATA_NODE_DATA] = []
return tableDict
def setTableData(tableDict, fields, ttype, job):
row = {}
if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict:
fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields
i = 1
for f in tableDict[B.DATA_NODE_HEADER]:
1 year ago
if f in [B.DATA_NODE_ARGS, "args"]:
arguments = {}
row[B.DATA_NODE_ARGS] = arguments
if B.DATA_NODE_ARGS in row:
a = fields[i].split(":")
row[B.DATA_NODE_ARGS][a[0]] = a[1]
else:
row[f] = fields[i]
i += 1
1 year ago
ln = len(tableDict[B.DATA_NODE_HEADER])
for arg in fields[len(tableDict[B.DATA_NODE_HEADER])+1:]:
if len(arg) == 0 or arg.strip()[0:1] == "#":
continue
print("arg "+arg)
a = arg.split(":")
row[B.DATA_NODE_ARGS][a[0]] = a[1]
if ttype == D.CSV_SPECTYPE_DATA:
tableDict[B.DATA_NODE_DATA].append(row)
2 years ago
elif ttype in [D.CSV_SPECTYPE_KEYS, D.CSV_SPECTYPE_CTLG]:
tableDict[D.CSV_NODETYPE_KEYS][fields[tableDict[D.DATA_ATTR_KEY]].strip()] = row
2 years ago
elif ttype in [D.CSV_SPECTYPE_CONF, D.CSV_SPECTYPE_DDL]:
tableDict[fields[1]] = row
return tableDict
2 years ago