#!/usr/bin/python # -*- coding: utf-8 -*- # --------------------------------------------------------------------------------------------------------- # Author : Ulrich Carmesin # Source : gitea.ucarmesin.de # --------------------------------------------------------------------------------------------------------- import os.path import inspect import basic.program import utils.config_tool import utils.file_tool import basic.constants as B import utils.data_const as D import utils.path_const as P import utils.path_tool import utils.date_tool import basic.step import utils.i18n_tool import re TOOL_NAME = "tdata_tool" list_blocks = {} # lists of aliases def getTestdata(job=None): """ get the testdata from one of the possible sources for the testcase resp testsuite of the job :return: """ if "testcase" in job.program: return collectTestdata(B.PAR_TESTCASE, getattr(job.par, B.PAR_TESTCASE), job) else: return collectTestdata(B.PAR_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), job) def collectTestdata(gran, testentity, job): """ collects the testdata from kind of the possible sources for the testcase resp testsuite :return: """ setBlockLists(job) if gran == B.PAR_TESTCASE: basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity) pathname = utils.config_tool.getConfigPath(job, P.KEY_TESTCASE, getattr(job.par, B.PAR_TESTCASE), "") if gran == B.PAR_TESTSUITE: basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity) pathname = utils.config_tool.getConfigPath(job, P.KEY_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), "") if pathname[-3:] == D.DFILE_TYPE_CSV: tdata = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA) else: tdata = utils.file_tool.readFileDict(job, pathname, job.m) # get explicit specdata of includes if D.CSV_BLOCK_IMPORT in tdata: for pathname in tdata[D.CSV_BLOCK_IMPORT]: pathname = utils.path_tool.rejoinPath(pathname) if job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA] not in pathname: pathname = utils.path_tool.rejoinPath(basispath, pathname) if pathname[-3:] == D.DFILE_TYPE_CSV: data = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA) else: data = utils.file_tool.readFileDict(job, pathname, job.m) for table in data[D.CSV_BLOCK_TABLES]: if table in tdata[D.CSV_BLOCK_TABLES]: print("Fehler") tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table] # get implicit specdata of spec-library for prefix in list_blocks[D.DFILE_TABLE_PREFIX]: files = utils.file_tool.getFiles(job.m, job, basispath, prefix, None) if len(files) < 0: continue for f in files: if f in tdata[D.CSV_BLOCK_TABLES]: continue pathname = utils.path_tool.rejoinPath(basispath, f) if pathname[-3:] == D.DFILE_TYPE_CSV: data = getCsvSpec(job.m, job, pathname, D.CSV_SPECTYPE_DATA) else: data = utils.file_tool.readFileDict(job, pathname, job.m) for table in data[D.CSV_BLOCK_TABLES]: if table in tdata[D.CSV_BLOCK_TABLES]: print("Fehler") tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table] # fill the options into job-parameter for p in tdata[D.CSV_BLOCK_OPTION]: setattr(job.par, p, tdata[D.CSV_BLOCK_OPTION][p]) return tdata def setBlockLists(job): for block in D.LIST_BLOCK_CONST + D.LIST_ATTR_CONST + D.LIST_DFNAME_CONST: list = utils.i18n_tool.I18n.getInstance(job).getAliasList(block+"='"+eval("D."+block)+"'", job) #list.append(eval("D."+block)) list_blocks[eval("D." + block)] = [] for x in list: list_blocks[eval("D." + block)].append(x.lower()) def readCsv(msg, job, filename, comp, aliasNode=""): lines = utils.file_tool.readFileLines(job, filename, msg) print("readCsv "+filename) return parseCsv(msg, job, filename, lines, comp, aliasNode) def parseCsv(msg, job, filename, lines, comp, aliasNode=""): if len(list_blocks) < 1: setBlockLists(job) tdata = {} if len(aliasNode) < 1: print(str(list_blocks)) aliasNode = extractAliasNode(filename, comp, job) if len(aliasNode) > 3: tdata[D.DATA_ATTR_ALIAS] = aliasNode return parseCsvSpec(msg, lines, D.CSV_SPECTYPE_DATA, tdata, job) def extractAliasNode(filename, comp, job): basename = os.path.basename(filename)[0:-4] for prefix in list_blocks[D.DFILE_TABLE_PREFIX]: if basename.find(prefix) == 0: basename = basename[len(prefix):] if comp is None: return "" if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basename in comp.conf[B.DATA_NODE_DDL]: return B.DATA_NODE_TABLES+":"+basename return "" def getCsvSpec(msg, job, filename, ttype): """ reads the specification from a csv-file and maps it into the internal data-structure :param msg: :param filename: :param type: :param job: :return: """ #if job is None: # job = basic.program.Job.getInstance() lines = utils.file_tool.readFileLines(job, filename, msg) tdata = {} # the result return parseCsvSpec(msg, lines, ttype, tdata, job) def parseCsvSpec(msg, lines, ttype, tdata, job): """ :param msg: :param lines: :param type: :param job: :return: """ if len(list_blocks) < 1: setBlockLists(job) status = "start" verbose = False tableAttr = {} # table tableDict = {} # table for l in lines: if verbose: print("lines "+l) fields = splitFields(l, D.CSV_DELIMITER, job) # check empty line, comment if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1): status = "start" continue if (fields[0][0:1] == "#"): continue a = fields[0].lower().split(":") # keywords option, step, table if verbose: print(str(a)+" -- "+str(fields)) tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job) if (tableAttr["_hit"]): status = "TABLE_ALIAS" continue if (a[0].lower() in list_blocks[D.CSV_BLOCK_HEAD]): if verbose: print("head "+l) setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job) status = "start" continue elif (a[0].lower() in list_blocks[D.CSV_BLOCK_OPTION]): if verbose: print("option " + l) setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job) status = "start" continue elif (a[0].lower() in list_blocks[D.CSV_BLOCK_STEP]): if verbose: print("step "+l) step = basic.step.parseStep(job, fields) if D.CSV_BLOCK_STEP not in tdata: tdata[D.CSV_BLOCK_STEP] = [] tdata[D.CSV_BLOCK_STEP].append(step) status = "start" continue elif (a[0].lower() in list_blocks[D.CSV_BLOCK_IMPORT]): if verbose: print("includes " + l) if D.CSV_BLOCK_IMPORT not in tdata: tdata[D.CSV_BLOCK_IMPORT] = [] tdata[D.CSV_BLOCK_IMPORT].append(fields[1]) status = "start" continue elif (a[0].lower() in list_blocks[D.CSV_BLOCK_TABLES]): if verbose: print("tables "+l) h = a h[0] = B.DATA_NODE_TABLES if ttype == D.CSV_SPECTYPE_CONF: del h[0] tableDict = getTdataContent(msg, tdata, h) setTableHeader(tableDict, tableAttr, fields, ttype, job) status = D.CSV_SPECTYPE_DATA elif (status == D.CSV_SPECTYPE_DATA): tableDict = getTdataContent(msg, tdata, h) if verbose: print("setTableData "+str(h)+" "+str(tableDict)) setTableData(tableDict, fields, ttype, job) elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata: alias = tdata[D.DATA_ATTR_ALIAS] b = alias.split(":") h = [B.DATA_NODE_TABLES] + b tableDict = getTdataContent(msg, tdata, h) tableDict[D.DATA_ATTR_ALIAS] = alias fields = [alias] + fields setTableHeader(tableDict, tableAttr, fields, ttype, job) status = D.CSV_SPECTYPE_DATA if ttype == D.CSV_SPECTYPE_CONF: header = [] for k in tdata: if k in D.LIST_DATA_ATTR: continue if B.DATA_NODE_DATA in tdata[k]: tdata[k].pop(B.DATA_NODE_DATA) for f in tdata[k]: if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR: continue header.append(f) tdata[k][B.DATA_NODE_HEADER] = header header = [] if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]: for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]: if k in tdata[B.DATA_NODE_TABLES]: if verbose: print("Error") else: tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k] tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES) return tdata def setTableHeader(tableDict, tableAttr, fields, ttype, job): header = [] for i in range(1, len(fields)): header.append(fields[i].strip()) tableDict[B.DATA_NODE_HEADER] = header for attr in tableAttr: tableDict[attr] = tableAttr[attr] # preparate the sub-structure for row-data if ttype == D.CSV_SPECTYPE_TREE: tableDict[B.DATA_NODE_DATA] = {} elif ttype == D.CSV_SPECTYPE_KEYS: tableDict[D.CSV_NODETYPE_KEYS] = {} tableDict[D.DATA_ATTR_KEY] = 1 if D.DATA_ATTR_KEY in tableAttr: tableDict[D.DATA_ATTR_KEY] = header.index(tableAttr[D.DATA_ATTR_KEY]) + 1 else: tableDict[B.DATA_NODE_DATA] = [] return tableDict def setTableData(tableDict, fields, ttype, job): row = {} if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict: fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields i = 1 for f in tableDict[B.DATA_NODE_HEADER]: row[f] = fields[i].strip() i += 1 if ttype == D.CSV_SPECTYPE_DATA: if B.ATTR_DATA_COMP in tableDict: tcomps = tableDict[B.ATTR_DATA_COMP] else: tcomps = {} row[B.ATTR_DATA_COMP] = {} for c in fields[0].split(","): a = c.split(":") tcomps[a[0]] = a[1] row[B.ATTR_DATA_COMP][a[0]] = a[1].strip() tableDict[B.DATA_NODE_DATA].append(row) tableDict[B.ATTR_DATA_COMP] = tcomps elif ttype == D.CSV_SPECTYPE_KEYS: tableDict[D.CSV_NODETYPE_KEYS][fields[tableDict[D.DATA_ATTR_KEY]].strip()] = row elif ttype == D.CSV_SPECTYPE_CONF: tableDict[fields[1]] = row return tableDict def setTableAttribute(tableAttr, key, val, job): for attr in D.LIST_DATA_ATTR: if (key.lower() in list_blocks[attr]): tableAttr[attr] = val.strip() tableAttr["_hit"] = True return tableAttr tableAttr["_hit"] = False return tableAttr def setTdataLine(tdata, fields, block, job): """ sets field(s) into tdata as a key-value-pair additional fields will be concatenate to a intern separated list :param tdata: :param fields: :param block: :param job: :return: """ a = fields[0].lower().split(":") a[0] = block # normalized key val = "" for i in range(1, len(fields)-1): val += D.INTERNAL_DELIMITER+fields[i] if len(val) > len(D.INTERNAL_DELIMITER): val = val[len(D.INTERNAL_DELIMITER):] setTdataContent(job.m, tdata, val, a) return tdata def setTdataContent(msg, data, tabledata, path): setTdataStructure(msg, data, path) if len(path) == 2: data[path[0]][path[1]] = tabledata elif len(path) == 3: data[path[0]][path[1]][path[2]] = tabledata elif len(path) == 4: data[path[0]][path[1]][path[2]][path[3]] = tabledata def getTdataContent(msg, data, path): setTdataStructure(msg, data, path) if len(path) == 2: return data[path[0]][path[1]] elif len(path) == 3: return data[path[0]][path[1]][path[2]] elif len(path) == 4: return data[path[0]][path[1]][path[2]][path[3]] elif len(path) == 1: return data[path[0]] else: return None def setTdataStructure(msg, data, path): if len(path) >= 1 and path[0] not in data: data[path[0]] = {} if len(path) >= 2 and path[1] not in data[path[0]]: data[path[0]][path[1]] = {} if len(path) >= 3 and path[2] not in data[path[0]][path[1]]: data[path[0]][path[1]][path[2]] = {} if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]: data[path[0]][path[1]][path[2]][path[3]] = {} return data def splitFields(line, delimiter, job): out = [] fields = line.split(delimiter) for i in range(0, len(fields)): if fields[i][0:1] == "#": break if re.match(r"^\"(.*)\"$", fields[i]): fields[i] = fields[i][1:-1] out.append(fields[i]) return out def writeCsvData(filename, tdata, comp, job): text = "" if B.DATA_NODE_TABLES in tdata: for k in tdata[B.DATA_NODE_TABLES]: text += buildCsvData(tdata, k, comp, job) text += "\n" utils.file_tool.writeFileText(comp.m, job, filename, text) def buildCsvData(tdata, tableName, comp, job=None): """ writes the testdata into a csv-file for documentation of the test-run :param teststatus: :param tdata: :param comp: if specific else None :return: """ text = "" for k in [D.DATA_ATTR_DATE, D.DATA_ATTR_COUNT]: if k in tdata: text += k+";"+str(tdata[k])+"\n" x0 = "-------"+str(f"{B.DATA_NODE_TABLES=}") x1 = "-------"+str(tableName) x2 = str(utils.i18n_tool.I18n.getInstance(job).getText(f"{B.DATA_NODE_TABLES=}", job)) print(x0+" "+x1+" "+x2) if tableName in tdata: actdata = tdata[tableName] else: actdata = tdata header = str(utils.i18n_tool.I18n.getInstance(job).getText(f"{B.DATA_NODE_TABLES=}", job)) +":" + tableName for f in actdata[B.DATA_NODE_HEADER]: header += D.CSV_DELIMITER+f text += header + "\n" i = 0 for r in actdata[B.DATA_NODE_DATA]: row = "" if B.ATTR_DATA_COMP in r: for k in r[B.ATTR_DATA_COMP]: row += ","+k+":"+r[B.ATTR_DATA_COMP][k] row = row[1:] i += 1 for f in actdata[B.DATA_NODE_HEADER]: if f in r: row += D.CSV_DELIMITER+str(r[f]) else: row += D.CSV_DELIMITER text += row text += "\n" return text def buildCsvSpec(tdata, job=None): text = "" if D.CSV_BLOCK_IMPORT in tdata: for k in tdata[D.CSV_BLOCK_HEAD]: text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_HEAD=}", job) text += ":"+k+D.CSV_DELIMITER+tdata[D.CSV_BLOCK_HEAD][k]+"\n" text += "# option:key ;values;..;;;;\n" if D.CSV_BLOCK_OPTION in tdata: for k in tdata[D.CSV_BLOCK_OPTION]: text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_OPTION=}", job) text += ":" + k + D.CSV_DELIMITER + getHeadArgs(tdata[D.CSV_BLOCK_OPTION][k], job)+"\n" text += "#;;;;;;\n" if D.CSV_BLOCK_STEP in tdata: text += basic.step.getStepHeader(job) i = 1 for step in tdata[D.CSV_BLOCK_STEP]: text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_STEP=}", job) + ":" + str(i) text += D.CSV_DELIMITER + step.getStepText(job) i += 1 text += "#;;;;;;\n" if D.CSV_BLOCK_TABLES in tdata: for k in tdata[D.CSV_BLOCK_TABLES]: text += buildCsvData(tdata, k, None, job) text += "#;;;;;;\n" return text def getHeadArgs(value, job): return value.replace(D.INTERNAL_DELIMITER, D.CSV_DELIMITER)