Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

144 lines
4.5 KiB

import getpass
import os
import re
import basic.toolHandling
import tools.data_const as D
import basic.constants as B
import tools.date_tool
import tools.file_tool
class Entity:
def __int__(self, job):
self.job = job
self.table = ""
self.testserver = None
def get_schema(self):
"""
gets schema/ddl-informations in order to create the database
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def read_entity(self, job, name):
"""
reads the entity from the file-system
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def select_entity(self, job, name):
"""
reads the entity from the database
it should get the same result like read_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def write_entity(self, job, name):
"""
writes the entity into the database
it similar to update_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def update_entity(self, job, name):
"""
writes the entity into the database
it similar to update_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def getDbAttr(self, job):
out = {}
for attr in [B.ATTR_DB_HOST, B.ATTR_DB_USER, B.ATTR_DB_DATABASE, B.ATTR_DB_PASSWD]:
out[attr] = job.conf[B.TOPIC_NODE_DB][attr]
return out
def getDdl(self, job, ddl):
out = {}
for t in ddl:
out[t] = {}
for f in ddl[t]:
out[t][f] = {}
for a in ddl[t][f]:
print("entity-23 "+f+", "+a+" "+str(ddl))
out[t][f][a] = ddl[t][f][a]
out[t][f][D.DDL_FNAME] = f
out[t][B.DATA_NODE_HEADER] = list(ddl[t].keys())
return out
def createSchema(self, testserver):
if B.TOPIC_NODE_DB in self.job.conf:
dbi = basic.toolHandling.getDbTool(self.job, testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
sql = self.get_schema()
print(sql)
for s in sql.split(";\n"):
if len(s) < 3: continue
try:
dbi.execStatement(s+";", self.job.conf[B.TOPIC_NODE_DB])
print("SQL executed: "+s)
except Exception as e:
raise Exception("Fehler bei createSchema "+s)
def getHistoryFields(self):
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaAttribut("inscommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("insauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("instime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("updcommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updtime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("actual", D.TYPE_INT)
return sql
def selectHistoryFields(self):
if B.TOPIC_NODE_DB in self.job.conf:
dbi = basic.toolHandling.getDbTool(self.job, self.testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
dbi.selectRows
def getHistoryIndex(self, table):
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaIndex(table, "actual") + "\n"
return sql
def read_spec(job, testentity, testgran, specpath):
if not os.path.isfile(specpath):
return
text = tools.file_tool.read_file_text(job, specpath, job.m)
if re.match(r".*?depricated;[jJyY]", text):
return None
spec = {}
regex = re.compile(r".*\nhead:(.*?);(.+)")
for res in regex.finditer(text):
#res = re.search(r".*head:(.*?);(.+)\n", text)
key = res.group(1)
if key == B.SUBJECT_DESCRIPTION:
spec[B.SUBJECT_DESCRIPTION] = res.group(2).replace(";", "")
elif key in [B.SUBJECT_APPS, B.PAR_APP]:
apps = res.group(2).replace(";", ",").split(",")
spec[B.SUBJECT_APPS] = apps
else:
val = res.group(2).replace(";", "")
spec[key] = val
return spec