Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

245 lines
9.2 KiB

# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
print("is importing module.app")
import os
import basic.program
import basic.toolHandling
import basic.constants as B
import model.entity
import model.constants as M
import tools.data_const as D
import tools.path_const as P
import tools.config_tool
import tools.file_tool
import tools.git_tool
TABLE_NAMES = ["application", "ap_project", "ap_component"]
STORAGES = [ M.STORAGE_FILE, M.STORAGE_DB ]
""" used storage in priority sortage, so: if file then read_fcts / if db then select-fcts """
DEFAULT_SYNC = M.SYNC_FULL_GIT2DB
TABLE_NAME = B.SUBJECT_APP
""" system-name for this entity """
FIELD_ID = "apid"
FILE_EXTENSION = D.DFILE_TYPE_YML
UNIQUE_FIELDS = [D.FIELD_NAME]
""" unique business field as human identifer """
IDENTIFYER_FIELDS = [FIELD_ID]
""" unique technical field as technical identifer """
print("has imported module.app")
def searchProjects(job, appl):
"""
search all relevant projects from server-configuration
filtered by parameter --application , --project
:param job:
:return:
"""
projects = {}
if B.SUBJECT_PROJECTS in job.conf:
for k in job.conf[B.SUBJECT_PROJECTS]:
if k in B.LIST_SUBJECTS:
continue
if hasattr(job.par, B.PAR_PROJ) and k != getattr(job.par, B.PAR_PROJ):
continue
if hasattr(job.par, B.PAR_APP) \
and k not in appl[B.SUBJECT_APPS][getattr(job.par, B.PAR_APP)][B.SUBJECT_PROJECTS]:
continue
projects[k] = appl[B.SUBJECT_PROJECTS][k]
projects[k][B.SUBJECT_ENVIRONMENT] = []
else:
job.conf[B.SUBJECT_PROJECTS] = appl[B.SUBJECT_PROJECTS]
return projects
def select_applications(job, projectList):
"""
get all project which are configured for the workspace
with all environments where the application of the project are installed
:param job:
:return:
"""
appl = tools.config_tool.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS)
return searchApplications(job, projectList, appl)
def searchApplications(job, projectList, appl):
appList = {}
for proj in projectList:
if hasattr(job, "par") and hasattr(job.par, B.PAR_PROJ) and proj != getattr(job.par, B.PAR_PROJ):
continue
for app in appl[B.SUBJECT_APPS]:
if B.SUBJECT_PROJECT in appl[B.SUBJECT_APPS][app] and proj != appl[B.SUBJECT_APPS][app][B.SUBJECT_PROJECT]:
continue
appList[app] = appl[B.SUBJECT_APPS][app]
return appList
import model.entity
def syncEnitities(job):
"""
synchronize the configuration with the database
:param job:
:return:
"""
syncMethod = DEFAULT_SYNC
if syncMethod.count("-") < 2:
return
fileTime = model.entity.VAL_ZERO_TIME
dbTime = model.entity.VAL_ZERO_TIME
# get git-commit
if "git" in syncMethod:
apppath = tools.config_tool.select_config_path(job, P.KEY_BASIC, B.SUBJECT_APPS, "")
repopath = apppath[len(job.conf[B.TOPIC_PATH][B.ATTR_PATH_COMPS]) + 1:]
gitresult = tools.git_tool.gitLog(job, B.ATTR_PATH_COMPS, repopath, 1)
fileTime = gitresult[0]["date"]
print(str(gitresult))
if "db" in syncMethod:
if B.TOPIC_NODE_DB in job.conf:
dbi = basic.toolHandling.getDbTool(job, job.testserver, job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
data = dbi.selectRows(TABLE_NAMES[0], job)
print(str(data[B.DATA_NODE_DATA]))
if len(data[B.DATA_NODE_DATA]) > 0:
dbTime = data[B.DATA_NODE_DATA][0]["updtime"]
if fileTime == dbTime:
print("gleich")
elif fileTime < dbTime:
print("db vorne")
(appObjects, appDict) = selectEntities(job, dbi)
print(str(appDict))
applPath = tools.config_tool.select_config_path(job, P.KEY_BASIC, B.SUBJECT_APPS)
tools.file_tool.write_file_dict(job.m, job, applPath, appDict)
#
elif fileTime > dbTime:
print("git vorne")
applData = tools.config_tool.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS)
insertEntities(job, applData, dbTime, dbi)
def selectEntities(job, dbi):
appObjects = []
appDict = {}
appDict[B.SUBJECT_PROJECTS] = {}
appDict[B.SUBJECT_APPS] = {}
appData = dbi.selectRows(TABLE_NAMES[0], job)
projData = dbi.selectRows(TABLE_NAMES[1], job)
compData = dbi.selectRows(TABLE_NAMES[2], job)
for row in appData[B.DATA_NODE_DATA]:
ao = Application(job)
ao.setAppRow(row, "")
appDict[B.SUBJECT_APPS][ao.name] = {}
for f in job.testserver.conf[B.DATA_NODE_DDL][TABLE_NAMES[0]][B.DATA_NODE_HEADER]:
if f in model.entity.ENTITY_FIELDS:
continue
appDict[B.SUBJECT_APPS][ao.name][f] = getattr(ao, f)
apid = ao.apid
rows = [row for row in projData[B.DATA_NODE_DATA] if row["apid"] == apid]
ao.setProjRow(rows)
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_PROJECTS] = []
for proj in getattr(ao, B.PAR_PROJ):
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_PROJECTS].append(proj)
if proj in appDict[B.SUBJECT_PROJECTS]:
appDict[B.SUBJECT_PROJECTS][proj][B.SUBJECT_APPS].append(ao.name)
continue
appDict[B.SUBJECT_PROJECTS][proj] = {}
appDict[B.SUBJECT_PROJECTS][proj][B.SUBJECT_APPS] = []
appDict[B.SUBJECT_PROJECTS][proj][B.SUBJECT_APPS].append(ao.name)
aoproj = getattr(ao, "project")[proj]
for f in job.testserver.conf[B.DATA_NODE_DDL][TABLE_NAMES[1]][B.DATA_NODE_HEADER]:
if f in model.entity.ENTITY_FIELDS + ["approid", "apid"]:
continue
appDict[B.SUBJECT_PROJECTS][proj][f] = aoproj[f]
rows = [row for row in compData[B.DATA_NODE_DATA] if row["apid"] == apid]
ao.setCompRow(rows)
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_COMPS] = []
for comp in getattr(ao, B.PAR_COMP):
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_COMPS].append(comp)
appObjects.append(ao)
return appObjects, appDict
def insertEntities(job,applData, dbTime, dbi):
# insertRows
# get list of application
if dbTime != model.entity.VAL_ZERO_TIME:
for t in TABLE_NAMES:
dbi.deleteRows(job, t)
for app in applData[B.SUBJECT_APPS]:
ao = Application(job)
ao.read_entity(job, app)
ao.insertEntity(dbi)
class Application(model.entity.Entity):
""" table = "application"
job = None
name = ""
description = ""
reference = ""
components = {}
project = {}
"""
FIELD_ID = "apid"
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_PROJECT]
""" list of object-attributes """
LIST_NODES = [B.NODE_ATTRIBUTES]
LIST_SUBTABLES = [B.SUBJECT_APPS, B.SUBJECT_COMPS, B.SUBJECT_USECASES, B.SUBJECT_VARIANTS]
PREFIX_SUBTABLE = "ap"
def read_unique_names(self, job, project, application, gran, args):
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
config = self.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS,
tools.config_tool.get_plain_filename(job, ""), ttype=B.SUBJECT_APP)
outList = list(config[B.SUBJECT_APPS].keys())
return outList
def read_entity(self, job, name):
"""
reads the entity from the file-system
:param job:
:param name:
:return:
"""
config = self.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS,
tools.config_tool.get_plain_filename(job, name), ttype=B.SUBJECT_APP)
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES)
@staticmethod
def rebuild_data(job, data: dict) -> dict:
"""
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements
:param job:
:param data:
:return:
"""
data = tools.file_type.popSubjectsNode(job, data)
data = tools.file_type.popNameNode(job, data)
return data
def check_data(self, job, data: dict) -> dict:
"""
it checks the data for the specific form
:param job:
:param tdata:
:param ttype:
:return:
"""
import tools.file_type
checkNodes = {}
checkNodes[tools.file_type.MUST_NODES] = []
checkNodes[tools.file_type.MUSTNT_NODES] = [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] + B.LIST_SUBJECTS
checkNodes[tools.file_type.OPT_NODES] = []
return tools.file_type.check_nodes(job, data, checkNodes)