import getpass import os import re import basic.toolHandling import tools.data_const as D import basic.constants as B import tools.date_tool import tools.file_tool class Entity: def __int__(self, job): self.job = job self.table = "" self.testserver = None def get_schema(self): """ gets schema/ddl-informations in order to create the database """ raise Exception(B.EXCEPT_NOT_IMPLEMENT) def read_entity(self, job, name): """ reads the entity from the file-system :param job: :param name: :return: """ raise Exception(B.EXCEPT_NOT_IMPLEMENT) def select_entity(self, job, name): """ reads the entity from the database it should get the same result like read_entity :param job: :param name: :return: """ raise Exception(B.EXCEPT_NOT_IMPLEMENT) def write_entity(self, job, name): """ writes the entity into the database it similar to update_entity :param job: :param name: :return: """ raise Exception(B.EXCEPT_NOT_IMPLEMENT) def update_entity(self, job, name): """ writes the entity into the database it similar to update_entity :param job: :param name: :return: """ raise Exception(B.EXCEPT_NOT_IMPLEMENT) def getDbAttr(self, job): out = {} for attr in [B.ATTR_DB_HOST, B.ATTR_DB_USER, B.ATTR_DB_DATABASE, B.ATTR_DB_PASSWD]: out[attr] = job.conf[B.TOPIC_NODE_DB][attr] return out def getDdl(self, job, ddl): out = {} for t in ddl: out[t] = {} for f in ddl[t]: out[t][f] = {} for a in ddl[t][f]: print("entity-23 "+f+", "+a+" "+str(ddl)) out[t][f][a] = ddl[t][f][a] out[t][f][D.DDL_FIELD] = f out[t][B.DATA_NODE_HEADER] = list(ddl[t].keys()) return out def createSchema(self, testserver): if B.TOPIC_NODE_DB in self.job.conf: dbi = basic.toolHandling.getDbTool(self.job, testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]) else: return "No DB in job-config" sql = self.get_schema() print(sql) for s in sql.split(";\n"): if len(s) < 3: continue try: dbi.execStatement(s+";", self.job.conf[B.TOPIC_NODE_DB]) print("SQL executed: "+s) except Exception as e: raise Exception("Fehler bei createSchema "+s) def getHistoryFields(self): dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE] dbi = basic.toolHandling.getDbTool(self.job, None, dbtype) sql = dbi.getSchemaAttribut("inscommit", D.TYPE_STR)+"," sql += dbi.getSchemaAttribut("insauthor", D.TYPE_STR)+"," sql += dbi.getSchemaAttribut("instime", D.TYPE_TIME)+"," sql += dbi.getSchemaAttribut("updcommit", D.TYPE_STR)+"," sql += dbi.getSchemaAttribut("updauthor", D.TYPE_STR)+"," sql += dbi.getSchemaAttribut("updtime", D.TYPE_TIME)+"," sql += dbi.getSchemaAttribut("actual", D.TYPE_INT) return sql def selectHistoryFields(self): if B.TOPIC_NODE_DB in self.job.conf: dbi = basic.toolHandling.getDbTool(self.job, self.testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]) else: return "No DB in job-config" dbi.selectRows def getHistoryIndex(self, table): dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE] dbi = basic.toolHandling.getDbTool(self.job, None, dbtype) sql = dbi.getSchemaIndex(table, "actual") + "\n" return sql def read_spec(job, testentity, testgran, specpath): if not os.path.isfile(specpath): return text = tools.file_tool.read_file_text(job, specpath, job.m) if re.match(r".*?depricated;[jJyY]", text): return None spec = {} regex = re.compile(r".*\nhead:(.*?);(.+)") for res in regex.finditer(text): #res = re.search(r".*head:(.*?);(.+)\n", text) key = res.group(1) if key == B.SUBJECT_DESCRIPTION: spec[B.SUBJECT_DESCRIPTION] = res.group(2).replace(";", "") elif key in [B.SUBJECT_APPS, B.PAR_APP]: apps = res.group(2).replace(";", ",").split(",") spec[B.SUBJECT_APPS] = apps else: val = res.group(2).replace(";", "") spec[key] = val return spec