Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
6.7 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os
import basic.toolHandling
import tools.job_const as J
import tools.data_const as D
import basic.constants as B
import model.entity
import tools.config_tool
import tools.job_tool
import tools.path_tool
import tools.path_const as P
import model.entity
TABLE_NAMES = ["application", "ap_project", "ap_component"]
STORAGES = [model.entity.STORAGE_FILE, model.entity.STORAGE_DB]
""" used storage in priority sortage, so: if file then read_fcts / if db then select-fcts """
DEFAULT_SYNC = model.entity.SYNC_FULL_GIT2DB
FIELD_ID = "tsid"
FIELD_NAME = D.FIELD_NAME
FIELD_DESCRIPTION = B.SUBJECT_DESCRIPTION
FIELD_REFERENCE = B.SUBJECT_REFERENCE
FIELD_PROJECT = B.SUBJECT_PROJECT
LIST_FIELDS = [FIELD_ID, FIELD_NAME, FIELD_DESCRIPTION, FIELD_REFERENCE, FIELD_PROJECT]
""" list of object-attributes """
SUB_USECASE = B.SUBJECT_USECASES
SUB_TESTCASES = B.SUBJECT_TESTCASES
SUB_STEPS = "steps"
LIST_SUBTABLES = [SUB_USECASE, SUB_STEPS, SUB_TESTCASES]
LIST_SUB_DESCRIPT = [D.DATA_ATTR_USECASE_DESCR]
FILE_EXTENSION = D.DFILE_TYPE_YML
UNIQUE_FIELDS = [FIELD_NAME]
""" unique business field as human identifer """
IDENTIFYER_FIELDS = [FIELD_ID]
""" unique technical field as technical identifer """
class Testsuite(model.entity.Entity):
name = ""
description = ""
application = ""
usecase = []
testcases = {}
tables = {}
steps = []
def __init__(self, job, project, name=""):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
self.project = project
if len(name) > 1:
self.name = name
def read_unique_names(self, job, project, application, gran, args):
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
path = os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_TDATA], getattr(job.par, B.SUBJECT_PROJECT),
B.SUBJECT_TESTSUITES)
outList = self.getDirlist(job, path, "")
return outList
def read_entity(self, job, name):
"""
reads the entity from the file-system
:param job:
:param name:
:return:
"""
# r = tools.config_tool.select_config_path(job, P.KEY_TESTCASE, "TC0001")
config = self.getConfig(job, P.KEY_TESTSUITE, name, tools.config_tool.get_plain_filename(job, name))
self.setAttributes(config, name, LIST_FIELDS, LIST_SUBTABLES)
for k in LIST_SUBTABLES:
if not hasattr(self, k):
continue
if "_"+k in config[name] and "_"+k+"-description" in LIST_SUB_DESCRIPT:
values = {}
if "_"+k+"-description" in config[name]:
for l in config[name]["_"+k]:
if l in config[name]["_"+k+"-description"]:
values[config[name]["_"+k][l]] = config[name]["_" + k + "-description"][l]
else:
values[config[name]["_"+k][l]] = ""
else:
for l in config[name]["_"+k]:
values[config[name]["_" + k][l]] = ""
setattr(self, k, values)
return self
def get_schema(self, tableName="", tableObject=None):
#TODO veraltet
2 years ago
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getCreateTable("testsuite")
sql += dbi.getSchemaAttribut("tsid", "id")+","
sql += dbi.getSchemaAttribut("name", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut(B.SUBJECT_REFERENCE, D.TYPE_TEXT)+","
sql += dbi.getSchemaAttribut("project", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("usecase", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut(B.NODE_ATTRIBUTES, D.TYPE_TEXT)+","
sql += self.getHistoryFields()
sql += ");\n"
sql += self.getHistoryIndex("testsuite")
for attr in ["application", "testcase"]:
sql += dbi.getSchemaSubtable("ts", [{"attr":attr, "atype": D.TYPE_STR}])+"\n"
sql += dbi.getSchemaIndex(dbi.getIndexName("ts", attr),
dbi.getSubTableId(dbi.getSubTableName("ts", attr), attr))+"\n"
for attr in ["dtable", "step"]:
sql += dbi.getSchemaSubtable("ts", [{"attr":attr, "atype": D.TYPE_STR}, {"attr":B.NODE_ATTRIBUTES, "atype": D.TYPE_TEXT}])+"\n"
sql += dbi.getSchemaIndex(dbi.getSubTableName("ts", attr),
dbi.getSubTableId(dbi.getSubTableName("ts", attr), attr))+"\n"
return sql
def select_testsuite(job, project, testsuite):
jobProj = None
print("testsuite select: "+str(project)+" "+str(testsuite))
if hasattr(job.par, B.PAR_PROJ):
jobProj = getattr(job.par, B.PAR_PROJ)
setattr(job.par, B.PAR_PROJ, project)
path = tools.path_tool.compose_path(job, P.P_TDROOT, None)
specpath = os.path.join(path, testsuite, D.DFILE_TESTSUITE_NAME + ".csv")
spec = model.entity.read_spec(job, testsuite, J.GRAN_TS, specpath)
if jobProj is None:
delattr(job.par, B.PAR_PROJ)
else:
setattr(job.par, B.PAR_PROJ, jobProj)
return spec
def select_testsuites(job, projList, appList):
out = {}
jobProj = None
print("testsuite select: "+str(projList)+" "+str(appList))
if hasattr(job.par, B.PAR_PROJ):
jobProj = getattr(job.par, B.PAR_PROJ)
for proj in projList:
setattr(job.par, B.PAR_PROJ, proj)
path = tools.path_tool.compose_path(job, P.P_TDROOT, None)
if os.path.exists(path):
for d in os.listdir(path):
if not os.path.isdir(os.path.join(path, d)):
continue
if d[0:1] == "_":
continue
print(d)
specpath = os.path.join(path, d, D.DFILE_TESTSUITE_NAME + ".csv")
spec = model.entity.read_spec(job, d, J.GRAN_TS, specpath)
if spec is None:
continue
out[d] = spec
out[d][B.SUBJECT_PROJECTS] = [proj]
if jobProj is None:
delattr(job.par, B.PAR_PROJ)
else:
setattr(job.par, B.PAR_PROJ, jobProj)
return out