Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

410 lines
14 KiB

import getpass
import os
import re
import basic.toolHandling
# import model.entity
import tools.data_const as D
import tools.path_const as P
import basic.constants as B
import tools.config_tool
import tools.date_tool
import tools.file_tool
ENTITY_NAME = "name"
ENTITY_ATTRIBUTES = B.NODE_ATTRIBUTES
ENTITY_INS_COMMIT = "inscommit"
ENTITY_INS_AUTHOR = "insauthor"
ENTITY_INS_TIME = "instime"
ENTITY_UPD_COMMIT = "updcommit"
ENTITY_UPD_AUTHOR = "updauthor"
ENTITY_UPD_TIME = "updtime"
ENTITY_ACTUAL = "actual"
VAL_ACTUAL = 1
VAL_ZERO_TIME = "2000-01-01_00-00-00"
ENTITY_FIELDS = [ENTITY_INS_COMMIT, ENTITY_INS_AUTHOR, ENTITY_INS_TIME,
ENTITY_UPD_COMMIT, ENTITY_UPD_AUTHOR, ENTITY_UPD_TIME, ENTITY_ACTUAL]
SYNC_FULL_GIT2DB = "full-git-db"
SYNC_HEAD_GIT2DB = "head-git-db"
SYNC_COPY_FILE2DB = "copy-file-db"
SYNC_ONLY_GIT = "only-git"
SYNC_ONLY_DB = "only-db"
STORAGE_DB = B.TOPIC_NODE_DB
STORAGE_FILE = B.TOPIC_NODE_FILE
LIST_ENTITY_SYNC = [SYNC_ONLY_GIT, SYNC_FULL_GIT2DB, SYNC_HEAD_GIT2DB, SYNC_COPY_FILE2DB, SYNC_ONLY_DB]
print("is importing module.entity")
def getEntityValue(job, field, gitcommit):
if field == ENTITY_INS_COMMIT:
return ""
if field == ENTITY_INS_AUTHOR:
return getpass.getuser()
if field == ENTITY_INS_TIME:
return tools.date_tool.getActdate(tools.date_tool.F_DIR)
if field == ENTITY_UPD_COMMIT:
return gitcommit["commit"]
if field == ENTITY_UPD_AUTHOR:
return gitcommit["author"]
if field == ENTITY_UPD_TIME:
return gitcommit["date"]
if field == ENTITY_ACTUAL:
return VAL_ACTUAL
class Entity:
def __int__(self, job):
self.job = job
self.table = ""
self.testserver = None
def get_unique_names(self, job, storage="", project="", application="", gran="", args={}):
"""
gets the entity-names from the defined storage - the field name must be an unique identifier
:param job:
:param opt. storage: values db / files - default files
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
entityNames = []
if storage == STORAGE_DB:
entityNames = self.select_unique_names(job, project, application, gran, args)
elif storage == STORAGE_FILE:
entityNames = self.read_unique_names(job, project, application, gran, args)
else:
entityNames = self.read_unique_names(job, project, application, gran, args)
return entityNames
def get_entities(self, job, storage="", project="", application="", gran="", args={}):
"""
gets the entity-names from the defined storage
:param job:
:param opt. storage: values db / files - default files
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
entities = []
entityNames = self.get_unique_names(job, storage=storage, project=project, application=application,
gran=gran, args=args)
for k in entityNames:
if storage == STORAGE_DB:
entity = self.select_entity(job, k)
elif storage == STORAGE_FILE:
entity = self.read_entity(job, k)
else:
entity = self.read_entity(job, k)
entities.append(entity)
return entities
def read_unique_names(self, job, project, application, gran, args):
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def select_unique_names(self, job, project, application, gran, args):
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def setDbAttributes(self, job, tables):
"""
set the db-attributes like connection and ddl
:param job:
:param tables: list of table-names
:return:
"""
setattr(self, "m", job.m)
config = {}
config[B.TOPIC_CONN] = job.conf[B.TOPIC_NODE_DB]
config[B.DATA_NODE_DDL] = {}
for t in tables:
ddl = tools.db_abstract.get_ddl(job, B.ATTR_INST_TESTSERVER, t)
config[B.DATA_NODE_DDL][t] = ddl
setattr(self, "conf", config)
def getEntity(self, job, name):
if B.TOPIC_NODE_DB in job.conf:
self.select_entity(job, name)
#self.read_entity(job, name)
else:
self.read_entity(job, name)
def read_entity(self, job, name):
"""
reads the entity from the file-system
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def select_entity(self, job, name):
"""
reads the entity from the database
it should get the same result like read_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def write_entity(self, job, name):
"""
writes the entity into the file-system
it similar to update_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def insert_entity(self, job, name):
"""
inserts the entity into the database
it similar to update_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def update_entity(self, job, name):
"""
writes the entity into the database
it similar to update_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def remove_entity(self, job, name):
"""
removes the entity from the file-system
it similar to delete_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def removeEntity(self, job, name, storagepath, ext):
"""
removes the entity from the file-system
it similar to delete_entity
:param job:
:param name: single substring or list of name or dict of names with the keys as
:return:
"""
nameList = []
if isinstance(name, dict):
nameList = name.keys()
elif isinstance(name, list):
nameList = name
else:
nameList.append(name)
for name in nameList:
pathname = os.path.join(storagepath, name + "." + ext)
os.remove(pathname)
def delete_entity(self, job, name, table):
"""
deletes the entity into the database
it similar to update_entity
:param job:
:param name:
:return:
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
""" 2023-05 """
@staticmethod
def getConfig(job, module, subject, name):
"""
reads the entity from the database
it should get the same result like read_entity
:param job:
:param name:
:return:
"""
config = tools.config_tool.getConfig(job, module, subject)
oldConfig = config
if config is not None:
if subject not in config:
newConfig = {}
newConfig[subject] = {}
for k in config:
newConfig[subject][k] = config[k]
config = newConfig
pass
if len(name) == 0:
return config
elif name in config[subject]:
outConfig = {}
outConfig[name] = config[subject][name]
return outConfig
elif B.DATA_NODE_KEYS in config[subject] \
and name in config[subject][B.DATA_NODE_KEYS]:
# if csv-data is a catalog
outConfig = {}
outConfig[name] = config[subject][B.DATA_NODE_KEYS][name]
return outConfig
elif name == subject:
return config
raise Exception("keine Config zu "+name)
@staticmethod
def getDirlist(job, path, ext):
outList = []
for k in os.listdir(path):
if k[:1] in [".", "_"]:
continue
if k in [P.KEY_CATALOG, P.KEY_TOOL, P.VAL_CONFIG, P.VAL_TEST, P.VAL_TOOLS]:
continue
if ext == "":
if not os.path.isdir(os.path.join(path, k)):
continue
outList.append(k)
continue
else:
if not os.path.isfile(os.path.join(path, k)):
continue
if len(k) < len(ext):
continue
xx = k[-len(ext):]
if ext != k[-len(ext):]:
continue
outList.append(k[:-len(ext)-1])
return outList
def setAttributes(self, config, rootname, fields, subjects):
""" 2023-05 """
for k in fields:
if k not in config[rootname]:
for l in config[rootname]:
if l[1:] == k:
setattr(self, k, config[rootname][l])
break
continue
setattr(self, k, config[rootname][k])
for k in subjects:
if k not in config[rootname]:
for l in config[rootname]:
if l[1:] == k:
setattr(self, k, config[rootname][l])
break
continue
setattr(self, k, config[rootname][k])
topics = {}
for k in B.LIST_TOPIC_NODES:
if k in config[rootname]:
topics[k] = config[rootname][k]
if len(topics) > 0:
setattr(self, "topics", topics)
return self
def getDbAttr(self, job):
out = {}
for attr in [B.ATTR_DB_HOST, B.ATTR_DB_USER, B.ATTR_DB_DATABASE, B.ATTR_DB_PASSWD]:
2 years ago
out[attr] = job.conf[B.TOPIC_NODE_DB][attr]
return out
def getDdl(self, job, ddl):
out = {}
for t in ddl:
out[t] = {}
for f in ddl[t]:
out[t][f] = {}
for a in ddl[t][f]:
print("entity-23 "+f+", "+a+" "+str(ddl))
out[t][f][a] = ddl[t][f][a]
out[t][f][D.DDL_FIELD] = f
out[t][B.DATA_NODE_HEADER] = list(ddl[t].keys())
return out
def createSchema(self, testserver):
2 years ago
if B.TOPIC_NODE_DB in self.job.conf:
dbi = basic.toolHandling.getDbTool(self.job, testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
sql = self.get_schema()
print(sql)
for s in sql.split(";\n"):
if len(s) < 3: continue
try:
# dbi.execStatement(s+";", self.job.conf[B.TOPIC_NODE_DB])
print("SQL executed: "+s)
except Exception as e:
raise Exception("Fehler bei createSchema "+s)
def getHistoryFields(self):
2 years ago
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaAttribut("inscommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("insauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("instime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("updcommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updtime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("actual", D.TYPE_INT)
return sql
def selectHistoryFields(self):
2 years ago
if B.TOPIC_NODE_DB in self.job.conf:
dbi = basic.toolHandling.getDbTool(self.job, self.testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
dbi.selectRows
def getHistoryIndex(self, table):
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaIndex(table, "actual") + "\n"
return sql
def read_spec(job, testentity, testgran, specpath):
if not os.path.isfile(specpath):
return
text = tools.file_tool.read_file_text(job, specpath, job.m)
if re.match(r".*?depricated;[jJyY]", text):
return None
spec = {}
regex = re.compile(r".*\nhead:(.*?);(.+)")
for res in regex.finditer(text):
#res = re.search(r".*head:(.*?);(.+)\n", text)
key = res.group(1)
if key == B.SUBJECT_DESCRIPTION:
spec[B.SUBJECT_DESCRIPTION] = res.group(2).replace(";", "")
elif key in [B.SUBJECT_APPS, B.PAR_APP]:
apps = res.group(2).replace(";", ",").split(",")
spec[B.SUBJECT_APPS] = apps
else:
val = res.group(2).replace(";", "")
spec[key] = val
return spec