Browse Source

2024-05-08 pep-warnings

refactor
Ulrich 7 months ago
parent
commit
2f49df27d4
  1. 6
      test/test_04config.py
  2. 45
      test/test_92workspace.py
  3. 195
      tools/config_tool.py
  4. 36
      tools/make_tool.py
  5. 3
      tools/path_tool.py

6
test/test_04config.py

@ -61,6 +61,9 @@ class MyTestCase(unittest.TestCase):
r = tools.config_tool.getConfig(job, P.KEY_TOOL, "path", ttype=D.CSV_SPECTYPE_KEYS) r = tools.config_tool.getConfig(job, P.KEY_TOOL, "path", ttype=D.CSV_SPECTYPE_KEYS)
if verbose: print("pattern " + r["pattern"]["log"]) if verbose: print("pattern " + r["pattern"]["log"])
if verbose: print("pattern " + r["pattern"]["precond"]) if verbose: print("pattern " + r["pattern"]["precond"])
r = tools.config_tool.getConfig(job, P.KEY_BASIC, "servers", ttype=D.CSV_SPECTYPE_KEYS)
if verbose: print("pattern " + r["pattern"]["log"])
if verbose: print("pattern " + r["pattern"]["precond"])
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest) MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
@ -127,6 +130,9 @@ class MyTestCase(unittest.TestCase):
plain = tools.config_tool.get_plain_filename(job, os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_PROGRAM], plain = tools.config_tool.get_plain_filename(job, os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_PROGRAM],
P.ATTR_PATH_TDATA, "testfall." + ext)) P.ATTR_PATH_TDATA, "testfall." + ext))
self.assertEqual(plain, "testfall") self.assertEqual(plain, "testfall")
MyTestCase.mymsg += "\n----- " + actfunction + " : " + str(cnttest)
def test_zzz(self): def test_zzz(self):
print(MyTestCase.mymsg) print(MyTestCase.mymsg)

45
test/test_92workspace.py

@ -0,0 +1,45 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung
# ---------------------------------------------------------------------------------------------------------
import sys
import basic.constants as B
import unittest
import os
import inspect
import tools.make_tool
import tools.conn_tool
import test.constants as T
import test.testtools
import tools.path_const as P
import basic.constants as B
import tools.data_const as D
TEST_FUNCTIONS = ["test_01createWorkspaces"]
#TEST_FUNCTIONS = ["test_01getConfigPath"]
verbose = False
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
def test_01createWorkspaces(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
tools.make_tool.createWorkspaces(job)
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_zzz(self):
print(MyTestCase.mymsg)
if __name__ == '__main__':
verbose = True
unittest.main()

195
tools/config_tool.py

@ -26,24 +26,28 @@ import tools.path_const as P
TABLE_FILES = [D.DDL_FILENAME] TABLE_FILES = [D.DDL_FILENAME]
CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV] CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV]
def getExistingPath(job, pathnames): def getExistingPath(job, pathnames):
if isinstance(pathnames, str): if isinstance(pathnames, str):
pathnames = [pathnames] pathnames = [pathnames]
for p in pathnames: for p in pathnames:
if p[-1:] == ".": if p[-1:] == ".":
p = p[0:-1] p = p[0:-1]
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
pathname = p+"."+format pathname = p + "." + f
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return pathname
return None return None
def select_config_path(job, modul, name, subname=""):
def select_config_path(job: any, modul: str, name: str, subname="") -> str:
""" """
gets the most specified configuration of different sources gets the most specified configuration of different sources
Parameter: :param job:
* typ -- (basic, comp, tool) :param modul:
* name -- the specific class :param name:
:param subname:
:return:
sources: sources:
* programm << * programm <<
* install << * install <<
@ -58,23 +62,19 @@ def select_config_path(job, modul, name, subname=""):
verify = False # job = basic.program.Job.getInstance() verify = False # job = basic.program.Job.getInstance()
else: else:
verify = job.getDebugLevel("config_tool") verify = job.getDebugLevel("config_tool")
if verify: job.debug(verify, "getConfig " + modul + ", " + name)
if modul == P.KEY_TOOL: if modul == P.KEY_TOOL:
return getToolPath(job, name, subname) return getToolPath(job, name, subname)
elif modul == P.KEY_COMP: elif modul == P.KEY_COMP:
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME],
P.VAL_CONFIG, P.KEY_COMP +"_" + name + "." + format) P.VAL_CONFIG, P.KEY_COMP + "_" + name + "." + f)
if verify: job.debug(verify, "4 " + pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(name), "CONFIG." + format) basic.componentHandling.getComponentFolder(name), "CONFIG." + f)
if verify: job.debug(verify, "5 " + pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
if verify: job.debug(verify, "6 " + pathname)
raise Exception(P.EXP_CONFIG_MISSING, modul + ", " + name) raise Exception(P.EXP_CONFIG_MISSING, modul + ", " + name)
elif modul in TABLE_FILES: elif modul in TABLE_FILES:
return getTablePath(job, name, subname, modul) return getTablePath(job, name, subname, modul)
@ -94,36 +94,32 @@ def select_config_path(job, modul, name, subname=""):
return getModelPath(job, name) return getModelPath(job, name)
else: else:
pathname = tools.path_tool.compose_path(job, P.P_TCPARFILE) pathname = tools.path_tool.compose_path(job, P.P_TCPARFILE)
if verify: job.debug(verify, "7 " + pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return pathname
pathname = tools.path_tool.compose_path(job, P.P_TSPARFILE) pathname = tools.path_tool.compose_path(job, P.P_TSPARFILE)
if verify: job.debug(verify, "8 " + pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return pathname
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
if len(subname) > 1: if len(subname) > 1:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_RELEASE], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_RELEASE],
P.VAL_CONFIG, "basis." + format) P.VAL_CONFIG, "basis." + f)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
if len(subname) > 1: if len(subname) > 1:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_ENV], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_ENV],
P.VAL_CONFIG, "basis." + format) P.VAL_CONFIG, "basis." + f)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
if len(subname) > 1: if len(subname) > 1:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME],
P.VAL_CONFIG, "basis." + format) P.VAL_CONFIG, "basis." + f)
if verify: job.debug(verify, "9 " + pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
raise Exception(P.EXP_CONFIG_MISSING, modul + ", " + name) raise Exception(P.EXP_CONFIG_MISSING, modul + ", " + name)
def getToolPath(job, name, subname): def getToolPath(job, name, subname):
if subname != "": if subname != "":
envdir = subname envdir = subname
@ -149,7 +145,8 @@ def getToolPath(job, name, subname):
return path return path
raise Exception(P.EXP_CONFIG_MISSING, envdir + ", " + name) raise Exception(P.EXP_CONFIG_MISSING, envdir + ", " + name)
def getTablePath(job, name, subname, filename):
def getTablePath(job: any, name: str, subname: str, filename: str) -> str:
""" """
reads the ddl of the table depending on context (=name) reads the ddl of the table depending on context (=name)
a) component: the ddl is read from specific or general component-folder a) component: the ddl is read from specific or general component-folder
@ -184,51 +181,73 @@ def getTablePath(job, name, subname, filename):
return configpath return configpath
if name == B.ATTR_INST_TESTSERVER: if name == B.ATTR_INST_TESTSERVER:
# print(name) # print(name)
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_PROGRAM], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_PROGRAM],
P.ATTR_PATH_MODEL, subname + "." + format) P.ATTR_PATH_MODEL, subname + "." + f)
#print(pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
for f in CONFIG_FORMAT:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(name), filename + "." + format) basic.componentHandling.getComponentFolder(name), filename + "." + f)
#print(pathname)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
if len(subname) > 1: if len(subname) > 1:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS], pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS],
basic.componentHandling.getComponentFolder(name), subname + "." + format) basic.componentHandling.getComponentFolder(name), subname + "." + f)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
raise Exception(P.EXP_CONFIG_MISSING, filename + ", " + name) raise Exception(P.EXP_CONFIG_MISSING, filename + ", " + name)
def getBasicPath(job, name):
path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS], def getBasicPath(job: any, name: str) -> str:
"""
the function searchs the config-file in the basic-context.
:param job:
:param name:
:return:
"""
path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME],
P.VAL_CONFIG, name)) P.VAL_CONFIG, name))
if path is not None: if path is not None:
return path return path
path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME], path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_COMPONENTS],
P.VAL_CONFIG, name)) P.VAL_CONFIG, name))
if path is not None: if path is not None:
return path return path
raise Exception(P.EXP_CONFIG_MISSING, name)
def getEnvironmentPath(job, name):
def getEnvironmentPath(job: any, name: str) -> str:
"""
the function searchs the config-file in the environment-context.
:param job:
:param name:
:return:
"""
path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_ENV], name, path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_ENV], name,
P.VAL_CONFIG, B.SUBJECT_ENVIRONMENT)) P.VAL_CONFIG, B.SUBJECT_ENVIRONMENT))
if path is not None: if path is not None:
return path return path
raise Exception(P.EXP_CONFIG_MISSING, name)
def getTestPath(job, name, subdir, filename): def getTestPath(job: any, name: str, subdir: str, filename: str) -> str:
for format in CONFIG_FORMAT: for f in CONFIG_FORMAT:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_TDATA], job.par.project, pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_TDATA], job.par.project,
subdir, name, filename + "." + format) subdir, name, filename + "." + f)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
raise Exception(P.EXP_CONFIG_MISSING, name)
def getCatalogPath(job, name): def getCatalogPath(job: any, name: str) -> str:
"""
the function searchs the config-file in the catalog-context.
:param job:
:param name:
:return:
"""
if hasattr(job, "par") and hasattr(job.par, B.SUBJECT_PROJECT): if hasattr(job, "par") and hasattr(job.par, B.SUBJECT_PROJECT):
path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_TDATA], path = getExistingPath(job, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_TDATA],
getattr(job.par, B.SUBJECT_PROJECT), P.KEY_CATALOG, name)) getattr(job.par, B.SUBJECT_PROJECT), P.KEY_CATALOG, name))
@ -253,20 +272,34 @@ def getCatalogPath(job, name):
return path return path
raise Exception(P.EXP_CONFIG_MISSING, name) raise Exception(P.EXP_CONFIG_MISSING, name)
def getUserPath(job, name):
def getUserPath(job: any, name: str) -> str:
"""
the function searchs the config-file in the user-context.
:param job:
:param name:
:return:
"""
for ext in CONFIG_FORMAT: for ext in CONFIG_FORMAT:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME], P.VAL_CONFIG, pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME], P.VAL_CONFIG,
P.VAL_USER, name + "." + ext) P.VAL_USER, name + "." + ext)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
return None raise Exception(P.EXP_CONFIG_MISSING, name)
def getModelPath(job, name): def getModelPath(job: any, name: str) -> str:
"""
the function searchs the config-file in the model-context.
:param job:
:param name:
:return:
"""
for ext in CONFIG_FORMAT: for ext in CONFIG_FORMAT:
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_PROGRAM], P.ATTR_PATH_MODEL, name + "." + ext) pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_PROGRAM], P.ATTR_PATH_MODEL, name + "." + ext)
if os.path.exists(pathname): if os.path.exists(pathname):
return pathname return str(pathname)
return None raise Exception(P.EXP_CONFIG_MISSING, name)
def get_plain_filename(job, path): def get_plain_filename(job, path):
@ -284,8 +317,9 @@ def get_plain_filename(job, path):
return filename return filename
return filename return filename
def getConfValue(attribute, comp):
if attribute == B.ATTR_CONN_DBTYPE: def getConfValue(attribute: str, comp: any) -> str:
if attribute == B.ATTR_TYPE:
if not hasAttr(comp.conf[B.TOPIC_CONN], "dbtype"): if not hasAttr(comp.conf[B.TOPIC_CONN], "dbtype"):
if hasAttr(comp.conf[B.TOPIC_CONN], "types") and hasAttr(comp.conf[B.TOPIC_CONN]["types"], "dbtype"): if hasAttr(comp.conf[B.TOPIC_CONN], "types") and hasAttr(comp.conf[B.TOPIC_CONN]["types"], "dbtype"):
dbtype = comp.conf[B.TOPIC_CONN]["types"]["dbtype"] dbtype = comp.conf[B.TOPIC_CONN]["types"]["dbtype"]
@ -298,10 +332,10 @@ def getConfValue(attribute, comp):
def getAttr(o, name): def getAttr(o, name):
if (isinstance(o, dict)): if isinstance(o, dict):
if (name in o.keys()): if name in o.keys():
return o[name] return o[name]
elif (isinstance(o, list)): elif isinstance(o, list):
pass pass
elif hasattr(o, name): elif hasattr(o, name):
return getattr(o, name) return getattr(o, name)
@ -309,13 +343,11 @@ def getAttr(o, name):
def hasAttr(o, name): def hasAttr(o, name):
if (isinstance(o, dict)): if isinstance(o, dict):
if (name in o.keys()): if name in o.keys():
return True return True
elif (isinstance(o, list)): elif isinstance(o, list):
pass pass
elif hasattr(o, name): elif hasattr(o, name):
return True return True
return False return False
@ -338,11 +370,9 @@ def getConfig(job, modul: str, name: str, subname: str = "", ttype: str = D.CSV_
msg = None msg = None
if hasattr(job, "m"): msg = job.m if hasattr(job, "m"): msg = job.m
pathname = select_config_path(job, modul, name, subname) pathname = select_config_path(job, modul, name, subname)
# print("+++ " + str(pathname))
confs = {} confs = {}
if pathname is None: if pathname is None:
return None return {}
job.debug(verify, "getConfig " + pathname)
if len(pathname) < 1: if len(pathname) < 1:
return confs return confs
if ttype == "" and modul in ["tool", "comp"]: if ttype == "" and modul in ["tool", "comp"]:
@ -404,6 +434,7 @@ def getAttributeList(comp, path, job):
attrList[attr] = val attrList[attr] = val
return attrList return attrList
def mergeConn(msg, conf, conn): def mergeConn(msg, conf, conn):
""" """
merges the config-attributes from the connection-attributes merges the config-attributes from the connection-attributes
@ -422,29 +453,31 @@ def mergeConn(msg, conf, conn):
if topic not in conf[B.SUBJECT_ARTIFACTS]: if topic not in conf[B.SUBJECT_ARTIFACTS]:
continue continue
if topic == B.TOPIC_NODE_DB: if topic == B.TOPIC_NODE_DB:
list = B.LIST_DB_ATTR attrlist = B.LIST_DB_ATTR
if topic == B.TOPIC_NODE_CLI: elif topic == B.TOPIC_NODE_CLI:
list = B.LIST_CLI_ATTR attrlist = B.LIST_CLI_ATTR
if topic == B.TOPIC_NODE_API: elif topic == B.TOPIC_NODE_API:
list = B.LIST_API_ATTR attrlist = B.LIST_API_ATTR
if topic == B.TOPIC_NODE_FILE: elif topic == B.TOPIC_NODE_FILE:
list = B.LIST_FILE_ATTR attrlist = B.LIST_FILE_ATTR
else:
attrlist = []
for a in conf[B.SUBJECT_ARTIFACTS][topic]: for a in conf[B.SUBJECT_ARTIFACTS][topic]:
if topic not in conn: if topic not in conn:
continue continue
if a in list: if a in attrlist:
if a in conn[topic]: if a in conn[topic]:
conf[B.SUBJECT_ARTIFACTS][topic][a] = conn[topic][a] conf[B.SUBJECT_ARTIFACTS][topic][a] = conn[topic][a]
else: else:
for b in conf[B.SUBJECT_ARTIFACTS][topic][a]: for b in conf[B.SUBJECT_ARTIFACTS][topic][a]:
if b not in list: if b not in attrlist:
msg.logError("not-topic-attribute in topic-connection: " + topic + ", " + b) msg.logError("not-topic-attribute in topic-connection: " + topic + ", " + b)
continue continue
if a not in conn[topic]: if a not in conn[topic]:
continue continue
if b in conn[topic][a]: if b in conn[topic][a]:
conf[B.SUBJECT_ARTIFACTS][topic][a][b] = conn[topic][a][b] conf[B.SUBJECT_ARTIFACTS][topic][a][b] = conn[topic][a][b]
for a in list: for a in attrlist:
if topic not in conn: if topic not in conn:
break break
if topic not in conn: if topic not in conn:

36
tools/make_tool.py

@ -0,0 +1,36 @@
# ---------------------------------------
# functions for managing diverse repositories
# namescheme:
# server/project_element.git
# project_element_server/.git
import os
import tools.config_tool
import basic.constants as B
import tools.path_const as P
import tools.data_const as D
def createWorkspaces(job: any, project=""):
projects = tools.config_tool.getConfig(job, P.KEY_BASIC, B.SUBJECT_PROJECTS, ttype=D.CSV_SPECTYPE_KEYS)
servers = tools.config_tool.getConfig(job, P.KEY_BASIC, "servers", ttype=D.CSV_SPECTYPE_KEYS)
repositories = tools.config_tool.getConfig(job, P.KEY_BASIC, "repositories", ttype=D.CSV_SPECTYPE_KEYS)
home = job.conf[B.TOPIC_PATH][B.ATTR_PATH_HOME]
pp = []
if project == "":
for p in projects[B.SUBJECT_PROJECTS]:
pp.append(projects[B.SUBJECT_PROJECTS][p]["short"])
else:
pp.append(project)
sp = str(os.path.sep)
a = home.split(sp)
while a[-1] != "workspace":
a.pop()
print(str(a))
workroot = sp.join(a)
print(str(servers))
for s in servers["servers"]:
print("s: "+str(servers["servers"][s]))
for p in pp:
workpath = os.path.join(workroot, p+"_workspace_"+servers["servers"][s]["name"])
print(workpath)

3
tools/path_tool.py

@ -54,8 +54,7 @@ def getActualJsonPath(job):
return path return path
def compose_path(job: any, pathname: str, comp=None) -> str:
def compose_path(job, pathname, comp):
""" """
this function composes a concrete path by the structured pathname this function composes a concrete path by the structured pathname
- the key of pathname is declared in path_const and the structure is configurated in config/value.yml. - the key of pathname is declared in path_const and the structure is configurated in config/value.yml.

Loading…
Cancel
Save