From a020e009fdc876bcd37ac1d8095d1efe7adb4179 Mon Sep 17 00:00:00 2001 From: Ulrich Carmesin Date: Sat, 20 Aug 2022 21:14:07 +0200 Subject: [PATCH] refactor tdata --- utils/config_tool.py | 19 +- utils/tdata_tool.py | 683 +++++++++++++++++++------------------------ 2 files changed, 310 insertions(+), 392 deletions(-) diff --git a/utils/config_tool.py b/utils/config_tool.py index 7ddc343..2171d20 100644 --- a/utils/config_tool.py +++ b/utils/config_tool.py @@ -23,7 +23,7 @@ import utils.path_const as P COMP_FILES = [D.DDL_FILENAME] CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV] -def getConfigPath(modul, name, subname=""): +def getConfigPath(modul, name, subname="", job=None): """ gets the most specified configuration of different sources Parameter: @@ -39,7 +39,8 @@ def getConfigPath(modul, name, subname=""): the parameter-files could be one of these file-types: * yaml, json, csv """ - job = basic.program.Job.getInstance() + if job is None: + job = basic.program.Job.getInstance() verify = job.getDebugLevel("config_tool")-4 job.debug(verify, "getConfig " + modul + ", " + name) #TODO path rejoin, config as const @@ -106,6 +107,20 @@ def getConfigPath(modul, name, subname=""): job.debug(verify, "4 " + pathname) if os.path.exists(pathname): return pathname + elif modul == P.KEY_TESTCASE: + for format in CONFIG_FORMAT: + pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA), + name, D.DFILE_TESTCASE_NAME + "."+format) + job.debug(verify, "4 " + pathname) + if os.path.exists(pathname): + return pathname + elif modul == P.KEY_TESTSUITE: + for format in CONFIG_FORMAT: + pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA), + name, D.DFILE_TESTSUITE_NAME + "." + format) + job.debug(verify, "4 " + pathname) + if os.path.exists(pathname): + return pathname else: pathname = utils.path_tool.composePath(P.P_TCPARFILE) job.debug(verify, "7 " + pathname) diff --git a/utils/tdata_tool.py b/utils/tdata_tool.py index f935f65..22d96d4 100644 --- a/utils/tdata_tool.py +++ b/utils/tdata_tool.py @@ -4,268 +4,313 @@ # Author : Ulrich Carmesin # Source : gitea.ucarmesin.de # --------------------------------------------------------------------------------------------------------- -""" -the issue of this tool is to transform extern data to the internal structure and the internal structure into extern data - i.e. mostly test-results. -* * * * * * * * -the testdata have several elements -* parameter (-td --tdata) : to identify which testdata should be loaded -* source (flaskdb: dbname / dir: filename) : always structured in a table (easy to specify) with columns - * node : where the rows are - * action : what should be done - default insert - + fields : dates in relation of a reference 3: + tdata[D.DATA_ATTR_ALIAS] = aliasNode + return parseCsvSpec(msg, lines, D.CSV_SPECTYPE_DATA, tdata, job) + + +def extractAliasNode(filename, comp, job): + basename = os.path.basename(filename)[0:-4] + for prefix in list_blocks[D.DFILE_TABLE_PREFIX]: + if basename.find(prefix) == 0: + basename = basename[len(prefix):] + if comp is None: + return "" + if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basename in comp.conf[B.DATA_NODE_DDL]: + return B.DATA_NODE_TABLES+":"+basename + return "" + + +def getCsvSpec(msg, filename, ttype, job=None): """ - get data from a csv-file - a = field[0] delimited by : - a) data : like a table with data-array of key-value-pairs - a_0 is keyword [option, step, CSV_HEADER_START ] - a_0 : { a_1 : { f_1 : v_1, .... } # option, step - a_0 : { .. a_n : { _header : [ .. ], _data : [ rows... ] # table, node - b) tree : as a tree - the rows must be unique identified by the first column - a_0 is keyword in CSV_HEADER_START - a_0 : { .. a_n : { _header : [ fields.. ], _data : { field : value } - c) keys : as a tree - the rows must be unique identified by the first column - a_0 is keyword in CSV_HEADER_START - a_1 ... a_n is key characterized by header-field like _fk* or _pk* - a_0 : { .. a_n : { _keys : [ _fpk*.. ] , _header : [ fields.. ], _data : { pk_0 : { ... pk_n : { field : value } - d) conf: - _header : [ field_0, ... ] - { field_0 : { attr_0 : val_0, .. }, field_1 : { ... }, ... } + reads the specification from a csv-file and maps it into the internal data-structure + :param msg: + :param filename: + :param type: + :param job: + :return: """ + if job is None: + job = basic.program.Job.getInstance() lines = utils.file_tool.readFileLines(filename, msg) - return parseCsvSpec(msg, lines, type) + tdata = {} # the result + return parseCsvSpec(msg, lines, ttype, tdata, job) -def parseCsvSpec(msg, lines, type): - job = basic.program.Job.getInstance() - data = {} - header = [] - h = [] # from a[] +def parseCsvSpec(msg, lines, ttype, tdata, job=None): + """ + + :param msg: + :param lines: + :param type: + :param job: + :return: + """ + if job is None: + job = basic.program.Job.getInstance() + if len(list_blocks) < 1: + setBlockLists(job) status = "start" - tableDate = utils.date_tool.getActdate(utils.date_tool.F_DE) - tableDict = {} + tableAttr = {} # table + tableDict = {} # table for l in lines: print("lines "+l) - fields = l.split(D.CSV_DELIMITER) + fields = splitFields(l, D.CSV_DELIMITER, job) # check empty line, comment - if (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1): + if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1): status = "start" continue if (fields[0][0:1] == "#"): continue a = fields[0].lower().split(":") # keywords option, step, table - if a[0] not in data and (a[0] in TDATA_NODES): - data[a[0]] = {} - if (a[0].lower() == D.CSV_BLOCK_STEP): - if (not B.DATA_NODE_STEPS in data): - data[B.DATA_NODE_STEPS] = [] + print(str(a)+" -- "+str(fields)) + tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job) + if (tableAttr["hit"]): + status = "TABLE_ALIAS" + continue + if (a[0].lower() in list_blocks[D.CSV_BLOCK_HEAD]): + print("head "+l) + setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job) + status = "start" + continue + elif (a[0].lower() in list_blocks[D.CSV_BLOCK_OPTION]): + print("option " + l) + setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job) + status = "start" + continue + elif (a[0].lower() in list_blocks[D.CSV_BLOCK_STEP]): + print("step "+l) step = basic.step.parseStep(job, fields) - """ - step = {} - step[B.DATA_NODE_COMP] = fields[D.STEP_COMP_I] - step[B.ATTR_EXEC_REF] = fields[D.STEP_EXECNR_I] - step[B.ATTR_DATA_REF] = fields[D.STEP_REFNR_I] - step[B.ATTR_STEP_ARGS] = {} - if D.STEP_ARGS_I == D.STEP_LIST_I: - args = "" - for i in range(D.STEP_ARGS_I, len(fields)): - if len(fields[i]) < 1: - continue - if fields[i][0:1] == "#": - continue - args += "," + fields[i] - args = args[1:] - else: - args = fields[D.STEP_ARGS_I] - a = args.split(",") - for arg in a: - print("arg "+arg) - b = arg.split(":") - if len(b) < 2: - raise Exception(D.EXCP_MALFORMAT + "" + l) - step[B.ATTR_STEP_ARGS][b[0]] = b[1] - """ - data[B.DATA_NODE_STEPS].append(step) + if D.CSV_BLOCK_STEP not in tdata: + tdata[D.CSV_BLOCK_STEP] = [] + tdata[D.CSV_BLOCK_STEP].append(step) + status = "start" continue - elif (a[0].lower() == D.CSV_BLOCK_OPTION): - if len(a) < 2: - raise Exception(D.EXCP_MALFORMAT+""+l) - data[a[0]][a[1]] = fields[1] + elif (a[0].lower() in list_blocks[D.CSV_BLOCK_IMPORT]): + print("includes " + l) + if D.CSV_BLOCK_IMPORT not in tdata: + tdata[D.CSV_BLOCK_IMPORT] = [] + tdata[D.CSV_BLOCK_IMPORT].append(fields[1]) + status = "start" continue - elif a[0].lower() == D.DATA_ATTR_DATE: - tableDate = fields[1] - elif (a[0].lower() in D.CSV_HEADER_START): - # create deep structure a_0 ... a_n - print("tdata 136 CSV_HEADER_START "+str(len(a))) + elif (a[0].lower() in list_blocks[D.CSV_BLOCK_TABLES]): + print("tables "+l) h = a - header = [] - if B.DATA_NODE_TABLES not in data: - data[B.DATA_NODE_TABLES] = {} h[0] = B.DATA_NODE_TABLES - comps = {} - tableDict = getTabContent(msg, data, h) - i = 0 - for f in fields: - i += 1 - if i <= 1: - continue - if len(f) < 1: - break - header.append(f) - tableDict[B.DATA_NODE_HEADER] = header - print("tdata 165 header "+str(header)) - if type == D.CSV_SPECTYPE_TREE: - tableDict[B.DATA_NODE_DATA] = {} - elif type == D.CSV_SPECTYPE_KEYS: - tableDict[D.CSV_NODETYPE_KEYS] = {} - elif type == D.CSV_SPECTYPE_CONF: - tableDict = {} - headerFields = [] - else: - tableDict[B.DATA_NODE_DATA] = [] - tableDict[D.DATA_ATTR_DATE] = tableDate - setTabContent(msg, data, tableDict, h) + if ttype == D.CSV_SPECTYPE_CONF: + del h[0] + tableDict = getTdataContent(msg, tdata, h) + setTableHeader(tableDict, tableAttr, fields, ttype, job) status = D.CSV_SPECTYPE_DATA - continue elif (status == D.CSV_SPECTYPE_DATA): - # check A-col for substructure - # fill data - tableDict = getTabContent(msg, data, h) - row = {} - print(fields) - i = 1 - # case-differentiation DATA or TREE - for f in header: - print(str(i)+" "+str(len(fields))+" "+str(len(header))) - row[f] = fields[i] - if type == D.CSV_SPECTYPE_TREE: - tableDict[B.DATA_NODE_DATA][f] = fields[i] - i += 1 - if type == D.CSV_SPECTYPE_DATA: - print("parseSpec "+ str(fields[0])) - row[B.ATTR_DATA_COMP] = {} - for c in fields[0].split(","): - a = c.split(":") - print("parseSpec " + str(a)) - comps[a[0]] = a[1] - row[B.ATTR_DATA_COMP][a[0]] = a[1] - #row[B.ATTR_DATA_COMP] = fields[0].split(",") - tableDict[B.ATTR_DATA_COMP] = comps - tableDict[B.DATA_NODE_DATA].append(row) - elif type == D.CSV_SPECTYPE_KEYS: - tableDict[D.CSV_NODETYPE_KEYS][fields[1]] = row - elif type == D.CSV_SPECTYPE_CONF: - tableDict[fields[1]] = row - headerFields.append(fields[1]) - setTabContent(msg, data, tableDict, h) - if (status in [D.CSV_SPECTYPE_DATA, D.CSV_SPECTYPE_KEYS]): - tableDict = getTabContent(msg, data, h) - if type == D.CSV_SPECTYPE_CONF: - tableDict[B.DATA_NODE_HEADER] = headerFields - setTabContent(msg, data, tableDict, h) - if type == D.CSV_SPECTYPE_CONF: - data = data[B.DATA_NODE_TABLES] - print("return getCsvSpec "+str(data)) - return data + tableDict = getTdataContent(msg, tdata, h) + print("setTableData "+str(h)+" "+str(tableDict)) + setTableData(tableDict, fields, ttype, job) + elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata: + alias = tdata[D.DATA_ATTR_ALIAS] + b = alias.split(":") + h = [B.DATA_NODE_TABLES] + b + tableDict = getTdataContent(msg, tdata, h) + tableDict[D.DATA_ATTR_ALIAS] = alias + fields = [alias] + fields + setTableHeader(tableDict, tableAttr, fields, ttype, job) + status = D.CSV_SPECTYPE_DATA + if ttype == D.CSV_SPECTYPE_CONF: + for k in tdata: + if B.DATA_NODE_DATA in tdata[k]: + tdata[k].pop(B.DATA_NODE_DATA) + if B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]: + for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]: + if k in tdata[B.DATA_NODE_TABLES]: + print("Error") + else: + tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k] + tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES) + return tdata + + +def setTableHeader(tableDict, tableAttr, fields, ttype, job): + header = [] + for i in range(1, len(fields)): + header.append(fields[i]) + tableDict[B.DATA_NODE_HEADER] = header + for attr in tableAttr: + tableDict[attr] = tableAttr[attr] + # preparate the sub-structure for row-data + if ttype == D.CSV_SPECTYPE_TREE: + tableDict[B.DATA_NODE_DATA] = {} + elif ttype == D.CSV_SPECTYPE_KEYS: + tableDict[D.CSV_NODETYPE_KEYS] = {} + else: + tableDict[B.DATA_NODE_DATA] = [] + return tableDict -def mergeTableComponents(comps, rowComps): - for c in rowComps.split(","): - a = c.split(":") - comps[a[0]] = a[1] - return comps +def setTableData(tableDict, fields, ttype, job): + row = {} + if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict: + fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields + i = 1 + for f in tableDict[B.DATA_NODE_HEADER]: + row[f] = fields[i] + i += 1 + if ttype == D.CSV_SPECTYPE_DATA: + row[B.ATTR_DATA_COMP] = {} + for c in fields[0].split(","): + a = c.split(":") + row[B.ATTR_DATA_COMP][a[0]] = a[1] + tableDict[B.DATA_NODE_DATA].append(row) + elif ttype == D.CSV_SPECTYPE_KEYS: + tableDict[D.CSV_NODETYPE_KEYS][fields[1]] = row + elif ttype == D.CSV_SPECTYPE_CONF: + tableDict[fields[1]] = row + return tableDict -def setTabContent(msg, data, tabledata, path): - if len(path) >= 2 and path[1] not in data[path[0]]: - data[path[0]][path[1]] = {} - if len(path) >= 3 and path[2] not in data[path[0]][path[1]]: - data[path[0]][path[1]][path[2]] = {} - if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]: - data[path[0]][path[1]][path[2]][path[3]] = {} +def setTableAttribute(tableAttr, key, val, job): + for attr in D.LIST_DATA_ATTR: + if (key.lower() in list_blocks[attr]): + tableAttr[attr] = val + tableAttr["hit"] = True + return tableAttr + tableAttr["hit"] = False + return tableAttr + + +def setTdataLine(tdata, fields, block, job): + """ + sets field(s) into tdata as a key-value-pair + additional fields will be concatenate to a intern separated list + :param tdata: + :param fields: + :param block: + :param job: + :return: + """ + a = fields[0].lower().split(":") + a[0] = block # normalized key + val = "" + for i in range(1, len(fields)-1): + val += D.INTERNAL_DELIMITER+fields[i] + if len(val) > len(D.INTERNAL_DELIMITER): + val = val[len(D.INTERNAL_DELIMITER):] + setTdataContent(job.m, tdata, val, a) + return tdata + + +def setTdataContent(msg, data, tabledata, path): + setTdataStructure(msg, data, path) if len(path) == 2: data[path[0]][path[1]] = tabledata elif len(path) == 3: @@ -274,155 +319,45 @@ def setTabContent(msg, data, tabledata, path): data[path[0]][path[1]][path[2]][path[3]] = tabledata -def getTabContent(msg, data, path): - if len(path) >= 2 and path[1] not in data[path[0]]: - data[path[0]][path[1]] = {} - if len(path) >= 3 and path[2] not in data[path[0]][path[1]]: - data[path[0]][path[1]][path[2]] = {} - if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]: - data[path[0]][path[1]][path[2]][path[3]] = {} +def getTdataContent(msg, data, path): + setTdataStructure(msg, data, path) if len(path) == 2: return data[path[0]][path[1]] elif len(path) == 3: return data[path[0]][path[1]][path[2]] elif len(path) == 4: return data[path[0]][path[1]][path[2]][path[3]] + elif len(path) == 1: + return data[path[0]] else: - pass + return None -def readCsv(msg, filename, comp, aliasNode=""): - lines = utils.file_tool.readFileLines(filename, msg) - print("readCsv "+filename) - print(lines) - return parseCsv(msg, filename, lines, comp, aliasNode) - - -def parseCsv(msg, filename, lines, comp, aliasNode=""): - job = basic.program.Job.getInstance() - verify = -4+job.getDebugLevel(TOOL_NAME) - job.debug(verify, "# # # # # # # # parseCsv " + filename + " :" + str(lines)) - fields = [] - nodes = [] - columns = [] - output = {} - state = 0 - data = {} - tableDict = {} - tableDate = "" - tableCnt = 0 - cnt = 0 - basename = os.path.basename(filename)[0:-4] - startCols = 1 - for line in lines: - fields = line.split(';') - testline = line.replace(";", "") - a = fields[0].split(':') - job.debug(verify, str(state) + " line " + line + " :" + str(len(fields)) + ": " + str(fields)) - if len(testline) < 2 and state < 1: - state = 0 - elif a[0].lower() == D.DATA_ATTR_DATE: - tableDate = fields[1] - state = 1 - elif a[0].lower() == D.DATA_ATTR_COUNT: - tableCnt = fields[1] - state = 1 - elif a[0].lower() in D.CSV_HEADER_START or \ - (comp is not None and state == 1 - and isCompTableFile(comp, filename)): - state = 2 - columns = [] - h = a - if len(h) < 2 and comp is not None: - a = ["table", basename] - h = a - startCols = 0 - cnt = len(fields) - job.debug(verify, str(state) + " cnt " + str(cnt)) - data[B.DATA_NODE_TABLES] = {} - h[0] = B.DATA_NODE_TABLES - if not aliasNode.isspace() and len(aliasNode) > 3: - struct = aliasNode.split(":") - for x in struct: - if len(x) > 2: - nodes.append(x) - job.debug(verify, str(state) + " nodes " + str(nodes)) - elif len(h) > 1: - for i in range(1, len(h)): - nodes.append(h[i]) - job.debug(verify, str(state) + " nodes " + str(nodes)) - tableDict = getTabContent(msg, data, h) - tableDict[B.ATTR_DATA_COMP] = {} - if len(tableDate) > 6: - tableDict[D.DATA_ATTR_DATE] = tableDate - if int(tableCnt) > 0: - tableDict[D.DATA_ATTR_COUNT] = tableCnt - j = 0 - for i in range(1, cnt): - if fields[i][0:1] == "_": - startCols += 1 - continue - job.debug(verify, str(i) + " cnt " + str(fields[i])) - if len(fields[i]) > 0: - columns.append(fields[i]) - j = j + 1 - cnt = j - tableDict[B.DATA_NODE_HEADER] = columns - job.debug(verify, str(state) + " " + str(cnt) + " cols " + str(columns)) - elif state >= 2 and len(testline) > 2: - job.debug(verify, str(state) + " " + str(len(testline))) - tableDict = getTabContent(msg, data, h) - state = 3 - row = {} - print(line) - if startCols > 0: - row[B.ATTR_DATA_COMP] = {} - row[B.ATTR_DATA_COMP][a[0]] = a[1] - tableDict[B.ATTR_DATA_COMP][a[0]] = a[1] - for i in range(startCols, cnt+startCols): - print("for "+str(i)+" "+str(len(row))+" "+str(startCols)+" "+str(len(fields))) - print(str(fields[i])) - if i >= len(columns)+startCols: - break - row[columns[i-startCols]] = fields[i] - job.debug(verify, str(state) + " row " + str(row)) - if B.DATA_NODE_DATA not in tableDict: - tableDict[B.DATA_NODE_DATA] = [] - tableDict[B.DATA_NODE_DATA].append(row) - setTabContent(msg, data, tableDict, h) - elif state == 3: - job.debug(verify, "structure " + str(state) + ": " + str(nodes)) - state = 0 - return data +def setTdataStructure(msg, data, path): + if len(path) >= 1 and path[0] not in data: + data[path[0]] = {} + if len(path) >= 2 and path[1] not in data[path[0]]: + data[path[0]][path[1]] = {} + if len(path) >= 3 and path[2] not in data[path[0]][path[1]]: + data[path[0]][path[1]][path[2]] = {} + if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]: + data[path[0]][path[1]][path[2]][path[3]] = {} + return data -def setSubnode(i, nodes, data, tree): - print("setSubnode " + str(i) + ": " + ": " + str(tree)) - if i >= len(nodes): - print("setSubnode a " + str(i)) - tree[B.DATA_NODE_DATA] = data - elif tree is not None and nodes[i] in tree.keys(): - print("setSubnode b " + str(i)) - tree[nodes[i]] = setSubnode(i+1, nodes, data, tree[nodes[i]]) - else: - print("setSubnode c " + str(i)) - tree[nodes[i]] = setSubnode((i + 1), nodes, data, {}) - return tree - -def getDataStructure(comp): - # gets data-structure from the vml in the component-folder - job = basic.program.Job.getInstance() - verify = -1+job.getDebugLevel(TOOL_NAME) - job.debug(verify, "getDataStructure " + comp) -def normalizeDataRow(dstruct, xpathtupel, row, referencedate): - # normalize data of the row if necessary - # raw-value is saved as new field with _raw as suffix - job = basic.program.Job.getInstance() - verify = -1+job.getDebugLevel(TOOL_NAME) - job.debug(verify, "calcDataRow " + row) +def splitFields(line, delimiter, job): + out = [] + fields = line.split(delimiter) + for i in range(0, len(fields)): + if fields[i][0:1] == "#": + break + if re.match(r"^\"(.*)\"$", fields[i]): + fields[i] = fields[i][1:-1] + out.append(fields[i]) + return out -def buildCsvData(filename, tdata, comp): +def buildCsvData(tdata, table, job=None): """ writes the testdata into a csv-file for documentation of the test-run :param teststatus: @@ -430,22 +365,14 @@ def buildCsvData(filename, tdata, comp): :param comp: if specific else None :return: """ - compColumn = not isCompTableFile(comp, filename) - job = basic.program.Job.getInstance() - verify = -1+job.getDebugLevel(TOOL_NAME) - job.debug(verify, "writeDataTable " + str(comp)) text = "" for k in [D.DATA_ATTR_DATE, D.DATA_ATTR_COUNT]: if k in tdata: text += k+";"+str(tdata[k])+"\n" - header = "table" + header = utils.i18n_tool.I18n.getInstance().getText(f"{B.DATA_NODE_TABLES=}", job)+":"+table for f in tdata[B.DATA_NODE_HEADER]: header += ";"+f - if compColumn: - text += header - else: - #text += "_nr;" + header[6:] + "\n" - text += header[6:] + "\n" + text += header + "\n" i = 0 for r in tdata[B.DATA_NODE_DATA]: row = "" @@ -455,30 +382,6 @@ def buildCsvData(filename, tdata, comp): row += ";"+str(r[f]) else: row += ";" - if compColumn: - text += row - else: - text += row[1:] - #text += str(i) + row + text += row text += "\n" return text - - -def writeCsvData(filename, tdata, comp): - text = "" - if B.DATA_NODE_TABLES in tdata: - for k in tdata[B.DATA_NODE_TABLES]: - text += buildCsvData(filename, tdata[B.DATA_NODE_TABLES][k], comp) - text += "\n" - utils.file_tool.writeFileText(comp.m, filename, text) - - -def isCompTableFile(comp, filename): - """ check if the filename belongs to the component """ - basetable = os.path.basename(filename)[0:-4] - if comp is None: - return False - if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basetable in comp.conf[B.DATA_NODE_DDL] \ - and comp.name in filename: - return True - return False \ No newline at end of file