Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

346 lines
11 KiB

# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os
import basic.toolHandling
import basic.constants as B
import model.entity
import tools.path_const as P
import tools.data_const as D
import tools.config_tool
import tools.file_tool
import tools.git_tool
TYPE_ADMIN = "admin"
TYPE_CTLG = "catalog"
TYPE_CONTEXT = "context"
LISTNAME_DDLNAMES = "fieldnames"
LISTNAME_DDLFIELDS = "fielddef"
LISTNAME_FIELDS = "fieldlist"
LISTNAME_NODES = "nodelist"
LISTNAME_SUBTABLE = "subtables"
TABLE_NAME = "table"
""" system-name for this entity """
FIELD_ID = "tbid"
DEFAULT_FIELD = ""
DEFAULT_TYPE = "string"
DEFAULT_FORMAT = "vchar(256)"
DEFAULT_INDEX = "N"
DEFAULT_CONSTRAINT = "nullable"
DEFAULT_AGGREGAT = ""
DEFAULT_GENERIC = ""
DEFAULT_KEY = ""
DEFAULT_ACCEPTANCE = ""
DEFAULT_ALIAS = ""
DEFAULT_DESCRIPTION = ""
DEFAULTS = {
D.DDL_FIELD : DEFAULT_FIELD,
D.DDL_TYPE : DEFAULT_TYPE,
D.DDL_FORMAT : DEFAULT_FORMAT,
D.DDL_INDEX : DEFAULT_INDEX,
D.DDL_CONSTRAINT : DEFAULT_CONSTRAINT,
D.DDL_AGGREGAT : DEFAULT_AGGREGAT,
D.DDL_GENERIC : DEFAULT_GENERIC,
D.DDL_KEY : DEFAULT_KEY,
D.DDL_ACCEPTANCE : DEFAULT_ACCEPTANCE,
D.DDL_ALIAS : DEFAULT_ALIAS,
D.DDL_DESCRIPTION : DEFAULT_DESCRIPTION
}
def select_tables(job, project="", application="", component=""):
outList = []
appl = tools.config_tool.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS)
path = os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_COMPS], "catalog", "tables")
for p in os.listdir(path):
if p[-4:] not in [".csv", ".yml", ".xml", "json"]:
continue
table = p[:-4]
outList.append(table)
return outList
class Table(model.entity.Entity):
"""
table-object as part of a database
in different of datatable it is neither a concrete table in the automation-model
nor a concrete table in the system-model
it is an abstract super-class in order to relation to the database-management-system
"""
FIELD_ID = "tbid"
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_PROJECT]
""" list of object-attributes """
LIST_NODES = [B.NODE_ATTRIBUTES, "fielddef", "fieldnames"]
LIST_SUBTABLES = []
LIST_ADMINFIELDS = {
"insauthor": {
"_field": "insauthor",
"type": "str",
"format": "varchar(128)",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"inscommit": {
"_field": "inscommit",
"type": "str",
"format": "varchar(1024)",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"instime": {
"_field": "instime",
"type": "time",
"format": "time",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"updauthor": {
"_field": "updauthor",
"type": "str",
"format": "varchar(128)",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"updcommit": {
"_field": "updcommit",
"type": "str",
"format": "varchar(1024)",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"updtime": {
"_field": "updtime",
"type": "time",
"format": "time",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"actual": {
"_field": "actual",
"type": "int",
"format": "int",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
}
}
# project
# testcase
# artefact :
#
LIST_CATALOGFIELDS = {
"project": {
"_field": "project",
"type": "str",
"format": "varchar(128)",
"index": "I",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"application": {
"_field": "application",
"type": "str",
"format": "varchar(128)",
"index": "I",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"testcase": {
"_field": "testcase",
"type": "str",
"format": "varchar(128)",
"index": "I",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
},
"artifact": {
"_field": "artifact",
"type": "str",
"format": "varchar(128)",
"index": "I",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": "result "
},
"refdate": {
"_field": "refdate",
"type": "time",
"format": "time",
"index": "N",
"generic": "",
"aggregat": "",
"key": "",
"acceptance": "",
"alias": "",
"description": ""
}
}
tbid = 0
name = ""
project = ""
fieldnames = []
fielddef = {}
def set_object(self, project, name):
self.project = project
self.name = name
def get_schema(self, tableName="", tableType=""):
"""
gets schema/ddl-informations in order to create the database
"""
sql = ""
sqlTable = ""
sqlSub = ""
dbi = basic.toolHandling.getDbTool(self.job, None, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE])
sqlTable += dbi.getCreateTable(tableName)
tableId = ""
if tableType in [TYPE_CTLG]:
for f in self.LIST_CATALOGFIELDS.keys():
if f not in self.fieldnames:
self.fieldnames.append(f)
self.fielddef[f] = self.LIST_CATALOGFIELDS[f]
if tableType in [TYPE_ADMIN, TYPE_CTLG]:
for f in self.LIST_ADMINFIELDS.keys():
if f not in self.fieldnames:
self.fieldnames.append(f)
self.fielddef[f] = self.LIST_ADMINFIELDS[f]
for f in self.fieldnames:
if f[0:1] == "_":
continue
fo = self.fielddef[f]
if D.DDL_INDEX in fo and len(fo[D.DDL_INDEX]) > 0:
a = fo[D.DDL_INDEX].split(":")
if a[0] == "I":
sqlSub += dbi.getSchemaIndex(tableName, fo[D.DDL_FIELD]) + "\n"
elif a[0] == "S":
attrList = []
attr = {"attr":fo[D.DDL_FIELD], "atype": fo[D.DDL_TYPE]}
attrList.append(attr)
for i in range(2, len(a)):
if i % 2 == 1:
continue
if a[i] == "attr":
attr = {"attr":B.NODE_ATTRIBUTES, "atype": D.TYPE_TEXT}
elif i+1 < len(a):
attr = {"attr": a[i], "atype": a[i+1]}
attrList.append(attr)
sqlSub += dbi.getSchemaSubtable(a[1], attrList) + "\n"
sqlSub += dbi.getSchemaIndex(dbi.getSubtableName(a[1], fo[D.DDL_FIELD]), tableId) + "\n"
continue
if fo[D.DDL_TYPE] not in ["subtable"]:
sqlTable += dbi.getSchemaAttribut(fo[D.DDL_FIELD], fo[D.DDL_TYPE]) + ","
if fo[D.DDL_TYPE] == D.TYPE_PK:
tableId = fo[D.DDL_FIELD]
sql = sqlTable[0:-1]+");\n"+sqlSub
"""
# print(sql)
"""
return sql
def read_unique_names(self, job, project="", application="", gran= "", args={}, ttype: str="") -> list:
return []
# table is not an real entity
def read_entity(self, job, name: str, args: dict={}):
return self.read_ddl(job, name, args=args)
# table is not an real entity
def read_ddl(self, job, name, args: dict={}):
"""
reads the ddl of the table depending on context
a) component: the ddl is read from specific or general component-folder
b) testcase: the ddl is read from general component-folder
c) testserver: the ddl is read from model-folder
:param job:
:param name:
:param context:
:return:
"""
config = {}
if "context" in args:
if args["context"] == "component":
ddl = tools.config_tool.getConfig(job, D.DDL_FILENAME, self.component.name, name, ttype=D.CSV_SPECTYPE_DDL)
elif args["context"] == "testdata":
ddl = tools.config_tool.getConfig(job, D.DDL_FILENAME, args["context"], name, ttype=D.CSV_SPECTYPE_DDL)
elif args["context"] == B.ATTR_INST_TESTSERVER:
ddl = tools.config_tool.getConfig(job, D.DDL_FILENAME, B.ATTR_INST_TESTSERVER, name, ttype=D.CSV_SPECTYPE_DDL)
if "_name" in ddl:
config[ddl["_name"]] = ddl
else:
config = ddl
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES)
def select_entity(self, job, name):
"""
reads the entity from the database
it should get the same result like read_entity
:param job:
:param name:
:return:
"""
self.read_entity(job, name)
# raise Exception(B.EXCEPT_NOT_IMPLEMENT)
# def write_entity(self, job, name):
# table is not an real entity
# def update_entity(self, job, name):
# table is not an real entity
# def remove_entity(self, job, name):
# table is not an real entity
# def delete_entity(self, job, name):
# table is not an real entity