#!/usr/bin/python # -*- coding: utf-8 -*- # --------------------------------------------------------------------------------------------------------- # Author : Ulrich Carmesin # Source : gitea.ucarmesin.de # --------------------------------------------------------------------------------------------------------- """ the issue of this tool is to transform extern data to the internal structure and the internal structure into extern data - i.e. mostly test-results. * * * * * * * * the testdata have several elements * parameter (-td --tdata) : to identify which testdata should be loaded * source (flaskdb: dbname / dir: filename) : always structured in a table (easy to specify) with columns * node : where the rows are * action : what should be done - default insert + fields : dates in relation of a reference= 2 and path[1] not in data[path[0]]: data[path[0]][path[1]] = {} if len(path) >= 3 and path[2] not in data[path[0]][path[1]]: data[path[0]][path[1]][path[2]] = {} if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]: data[path[0]][path[1]][path[2]][path[3]] = {} if len(path) == 2: data[path[0]][path[1]] = tabledata elif len(path) == 3: data[path[0]][path[1]][path[2]] = tabledata elif len(path) == 4: data[path[0]][path[1]][path[2]][path[3]] = tabledata def getTabContent(msg, data, path): if len(path) >= 2 and path[1] not in data[path[0]]: data[path[0]][path[1]] = {} if len(path) >= 3 and path[2] not in data[path[0]][path[1]]: data[path[0]][path[1]][path[2]] = {} if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]: data[path[0]][path[1]][path[2]][path[3]] = {} if len(path) == 2: return data[path[0]][path[1]] elif len(path) == 3: return data[path[0]][path[1]][path[2]] elif len(path) == 4: return data[path[0]][path[1]][path[2]][path[3]] else: pass def readCsv(msg, filename, comp, aliasNode=""): lines = utils.file_tool.readFileLines(filename, msg) return parseCsv(msg, filename, lines, comp, aliasNode) def parseCsv(msg, filename, lines, comp, aliasNode=""): job = basic.program.Job.getInstance() verify = -4+job.getDebugLevel(TOOL_NAME) job.debug(verify, "# # # # # # # # parseCsv " + filename + " :" + comp.name + ": " + str(lines)) fields = [] nodes = [] columns = [] output = {} state = 0 data = {} tableDict = {} tableDate = "" tableCnt = 0 cnt = 0 basename = os.path.basename(filename)[0:-4] startCols = 1 for line in lines: fields = line.split(';') testline = line.replace(";", "") a = fields[0].split(':') job.debug(verify, str(state) + " line " + line + " :" + str(len(fields)) + ": " + str(fields)) if len(testline) < 2 and state < 1: state = 0 elif a[0].lower() == D.ATTR_TABLE_DATE: tableDate = fields[1] elif a[0].lower() == D.ATTR_TABLE_CNT: tableCnt = fields[1] elif a[0].lower() in D.CSV_HEADER_START: state = 2 columns = [] h = a cnt = len(fields) job.debug(verify, str(state) + " cnt " + str(cnt)) data[B.DATA_NODE_TABLES] = {} h[0] = B.DATA_NODE_TABLES if not aliasNode.isspace() and len(aliasNode) > 3: struct = aliasNode.split(":") for x in struct: if len(x) > 2: nodes.append(x) job.debug(verify, str(state) + " nodes " + str(nodes)) elif len(h) > 1: for i in range(1, len(h)): nodes.append(h[i]) job.debug(verify, str(state) + " nodes " + str(nodes)) tableDict = getTabContent(msg, data, h) if len(tableDate) > 6: tableDict[D.ATTR_TABLE_DATE] = tableDate if int(tableCnt) > 0: tableDict[D.ATTR_TABLE_CNT] = tableCnt j = 0 for i in range(1, cnt): if fields[i][0:1] == "_": startCols += 1 continue job.debug(verify, str(i) + " cnt " + str(fields[i])) if len(fields[i]) > 0: columns.append(fields[i]) j = j + 1 cnt = j tableDict[B.DATA_NODE_HEADER] = columns job.debug(verify, str(state) + " " + str(cnt) + " cols " + str(columns)) elif state >= 2 and len(testline) > 2: job.debug(verify, str(state) + " " + str(len(testline))) tableDict = getTabContent(msg, data, h) state = 3 row = {} for i in range(startCols, cnt+startCols): if i >= len(columns)+startCols: break row[columns[i-startCols]] = fields[i] job.debug(verify, str(state) + " row " + str(row)) if B.DATA_NODE_DATA not in tableDict: tableDict[B.DATA_NODE_DATA] = [] tableDict[B.DATA_NODE_DATA].append(row) setTabContent(msg, data, tableDict, h) elif state == 3: job.debug(verify, "structure " + str(state) + ": " + str(nodes)) state = 0 return data def setSubnode(i, nodes, data, tree): print("setSubnode " + str(i) + ": " + ": " + str(tree)) if i >= len(nodes): print("setSubnode a " + str(i)) tree[B.DATA_NODE_DATA] = data elif tree is not None and nodes[i] in tree.keys(): print("setSubnode b " + str(i)) tree[nodes[i]] = setSubnode(i+1, nodes, data, tree[nodes[i]]) else: print("setSubnode c " + str(i)) tree[nodes[i]] = setSubnode((i + 1), nodes, data, {}) return tree def getDataStructure(comp): # gets data-structure from the vml in the component-folder job = basic.program.Job.getInstance() verify = -1+job.getDebugLevel(TOOL_NAME) job.debug(verify, "getDataStructure " + comp) def normalizeDataRow(dstruct, xpathtupel, row, referencedate): # normalize data of the row if necessary # raw-value is saved as new field with _raw as suffix job = basic.program.Job.getInstance() verify = -1+job.getDebugLevel(TOOL_NAME) job.debug(verify, "calcDataRow " + row) def buildCsvData(filename, tdata, comp): """ writes the testdata into a csv-file for documentation of the test-run :param teststatus: :param tdata: :param comp: if specific else None :return: """ job = basic.program.Job.getInstance() verify = -1+job.getDebugLevel(TOOL_NAME) job.debug(verify, "writeDataTable " + str(comp)) text = "" for k in [D.ATTR_TABLE_DATE, D.ATTR_TABLE_CNT]: if k in tdata: text += k+";"+tdata[k]+"\n" text += "table" for f in tdata[B.DATA_NODE_HEADER]: text += ";"+f for r in tdata[B.DATA_NODE_DATA]: text += "\n" for f in tdata[B.DATA_NODE_HEADER]: if f in r: text += ";"+str(r[f]) else: text += ";" text += "\n" return text def writeCsvData(filename, tdata, comp): text = "" if B.DATA_NODE_TABLES in tdata: for k in tdata[B.DATA_NODE_TABLES]: text += buildCsvData(filename, tdata[B.DATA_NODE_TABLES][k], comp) text += "\n" utils.file_tool.writeFileText(comp.m, filename, text)