Compare commits

...

7 Commits

  1. 10
      .gitignore
  2. 54
      basic/catalog.py
  3. 41
      basic/compexec.py
  4. 2
      basic/componentHandling.py
  5. 47
      basic/constants.py
  6. 19
      basic/step.py
  7. 27
      basic/toolHandling.py
  8. 23
      init_testcase.py
  9. 2
      test/test_02css.py
  10. 42
      test/test_04config.py
  11. 40
      test/test_07catalog.py
  12. 7
      test/test_08i18n.py
  13. 94
      test/test_24gen.py
  14. 12
      test/testtools.py
  15. 3
      utils/api_abstract.py
  16. 4
      utils/cli_abstract.py
  17. 88
      utils/config_tool.py
  18. 50
      utils/data_const.py
  19. 37
      utils/db_abstract.py
  20. 10
      utils/dbcsv_tool.py
  21. 11
      utils/dbmysql_tool.py
  22. 11
      utils/dbrel_tool.py
  23. 8
      utils/dbsfile_tool.py
  24. 9
      utils/dbshive_tool.py
  25. 3
      utils/dbspark_tool.py
  26. 3
      utils/file_abstract.py
  27. 19
      utils/filejson_tool.py
  28. 19
      utils/filelog_tool.py
  29. 19
      utils/filexml_tool.py
  30. 23
      utils/gen_const.py
  31. 130
      utils/gen_tool.py
  32. 22
      utils/i18n_tool.py
  33. 1
      utils/path_const.py
  34. 24
      utils/path_tool.py
  35. 703
      utils/tdata_tool.py

10
.gitignore

@ -0,0 +1,10 @@
**/__pycache__
components
test/lauf
test/log
test/tdata
test/environment
test/conf
.idea
config

54
basic/catalog.py

@ -9,7 +9,9 @@ import basic.program
import basic.constants as B
import utils.path_const as P
import utils.data_const as D
import utils.config_tool
import utils.path_tool
import utils.file_tool
import utils.tdata_tool
EXP_KEY_MISSING = "key is missing {}"
@ -37,40 +39,66 @@ class Catalog:
return Catalog.__instance
def getValue(self, domain, key):
def getValue(self, domain, key, job):
"""
this function gets the value of the domain an key
:param domain:
:param key:
:return:
"""
job = basic.program.Job.getInstance()
if not (isinstance(domain, str) and len(domain)):
raise Exception(EXP_KEY_MISSING, (domain, key))
if not (isinstance(key, str) and len(key)):
if not (isinstance(domain, str) or len(domain) < 1):
raise Exception(EXP_KEY_MISSING, (domain, key))
if not (isinstance(key, str) or len(key) < 1):
job.m.setError(EXP_KEY_MISSING+" ("+domain+", "+key+")")
return ""
if domain not in self.catalog:
self.readDomain(domain)
if key not in self.catalog[domain][key]:
raise Exception(EXP_KEY_DOESNT_EXIST, (domain, key))
self.readDomain(domain, job)
if key not in self.catalog[domain]:
job.m.setError(EXP_KEY_DOESNT_EXIST+" ("+domain+", "+key+")")
return ""
return self.catalog[domain][key]
def readDomain(self, domain):
def getKeys(self, domain, job):
"""
this function gets the value of the domain an key
:param domain:
:param key:
:return:
"""
if not (isinstance(domain, str) or len(domain) < 1):
raise Exception(EXP_KEY_MISSING, (domain))
if domain not in self.catalog:
self.readDomain(domain, job)
if domain not in self.catalog:
return []
out = []
for x in self.catalog[domain].keys():
out.append(x)
return out
def readDomain(self, domain, job):
"""
this function reads the domain-entries
:param domain:
:return:
"""
job = basic.program.Job.getInstance()
if not (isinstance(domain, str) and len(domain)):
if not (isinstance(domain, str) or len(domain) < 1):
raise Exception(EXP_KEY_MISSING, (domain))
if domain in self.catalog:
return
filename = utils.path_tool.rejoinPath(job.conf.getPath(P.ATTR_PATH_TDATA), "catalog", domain+".csv")
data = utils.tdata_tool.getCsvSpec(job.m, filename, D.CSV_SPECTYPE_KEYS)
pathname = utils.config_tool.getConfigPath(P.KEY_CATALOG, domain, job)
if pathname is None:
raise Exception(EXP_KEY_MISSING, (domain))
if pathname[-4:] == ".csv":
data = utils.tdata_tool.getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_KEYS)
else:
data = utils.file_tool.readFileDict(pathname, job.m)
self.catalog[domain] = data[B.DATA_NODE_TABLES][domain][B.DATA_NODE_KEYS]
return data
def exportXSD(self, domain):

41
basic/compexec.py

@ -79,16 +79,18 @@ class Testexecuter():
verify = -1+job.getDebugLevel(self.name)
self.m.debug(verify, "--- "+str(inspect.currentframe().f_code.co_name)+"() started at "
+ datetime.now().strftime("%Y%m%d_%H%M%S")+" for " + str(self.name).upper())
if B.ATTR_ARTS_LOG in self.conf[B.SUBJECT_ARTS]:
self.m.logInfo("log rotate in "+ self.name)
if B.TOPIC_NODE_DB in self.conf[B.SUBJECT_ARTS]:
self.m.logInfo("delete content "+ self.name)
dbi = basic.toolHandling.getDbTool(self)
dbi.deleteTables()
if B.ATTR_ARTS_LOB in self.conf[B.SUBJECT_ARTS]:
self.m.logInfo("lob is deleted with flaskdb "+ self.name)
if B.ATTR_ARTS_FILE in self.conf[B.SUBJECT_ARTS]:
self.m.logInfo("rm files in "+ self.name)
for node in [B.TOPIC_NODE_DB, B.TOPIC_NODE_CLI, B.TOPIC_NODE_API]:
if node not in self.conf[B.SUBJECT_ARTS]:
continue
tool = basic.toolHandling.getTool(node, self, job)
tool.reset_TData(job)
if B.TOPIC_NODE_FILE in self.conf[B.SUBJECT_ARTS]:
for file in self.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_FILE]:
if file in B.LIST_FILE_ATTR:
continue
print("91: "+self.classname+" "+file)
tool = basic.toolHandling.getFileTool(job, self, B.TOPIC_NODE_FILE+"."+file)
tool.reset_TData(job)
self.m.setMsg("resetInstance for " + self.name + " is OK")
self.m.debug(verify, "--- " + str(inspect.currentframe().f_code.co_name) + "() finished at " + datetime.now().strftime("%Y%m%d_%H%M%S") + " for " + str(self.name).upper())
@ -103,13 +105,15 @@ class Testexecuter():
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel(self.name)
self.m.debug(verify, "--- " + str(inspect.currentframe().f_code.co_name) + "() started at " + datetime.now().strftime("%Y%m%d_%H%M%S") + " for " + str(self.name).upper())
for node in [B.TOPIC_NODE_DB, B.TOPIC_NODE_CLI, B.TOPIC_NODE_FILE, B.TOPIC_NODE_API]:
print(node)
if B.TOPIC_NODE_DB in self.conf[B.SUBJECT_ARTS] and B.DATA_NODE_TABLES in tdata:
for t in tdata[B.DATA_NODE_TABLES]:
print (t)
if utils.db_abstract.isCompTable(self, tdata, t):
self.m.logInfo("insert content "+ self.name)
dbi = basic.toolHandling.getDbTool(self)
dbi.insertTables(tdata)
dbi = basic.toolHandling.getDbTool(self, job)
dbi.insertTables(tdata, job)
break
self.m.setMsg("data loaded for " + self.name + " is OK")
self.m.debug(verify, "--- " + str(inspect.currentframe().f_code.co_name) + "() finished at " + datetime.now().strftime("%Y%m%d_%H%M%S") + " for " + str(self.name).upper())
@ -135,15 +139,16 @@ class Testexecuter():
self.m.debug(verify, "--- " + str(inspect.currentframe().f_code.co_name) + "() started at " + datetime.now().strftime("%Y%m%d_%H%M%S") + " for " + str(self.name).upper())
if B.TOPIC_NODE_DB in self.conf[B.SUBJECT_ARTS]:
self.m.logInfo("select db-content "+ self.name)
dbi = basic.toolHandling.getDbTool(self)
data = dbi.selectTables(subdir)
dbi = basic.toolHandling.getDbTool(self, job)
data = dbi.selectTables(subdir, job)
print("ppp")
#data = {}
for t in data[subdir]:
data[B.DATA_NODE_TABLES] = {}
data[B.DATA_NODE_TABLES][t] = data[subdir][t]
utils.tdata_tool.writeCsvData(utils.path_tool.rejoinPath(
utils.path_tool.composePattern("{tcresult}", self), subdir, t+".csv"), data, self)
utils.tdata_tool.writeCsvData(
utils.path_tool.rejoinPath(utils.path_tool.composePattern("{tcresult}", self), subdir, t+".csv"),
data, self, job)
if B.ATTR_ARTS_LOB in self.conf[B.SUBJECT_ARTS]:
self.m.logInfo("check lob if is deleted with flaskdb "+ self.name)
self.m.setMsg("readInstance for " + self.name + " is OK")
@ -352,8 +357,8 @@ class Testexecuter():
# fill each data into matching-object
for side in M.MATCH_SIDES:
if side == M.MATCH_SIDE_PRESTEP:
if B.ATTR_DB_PRESTEP in self.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][t]:
a = self.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][t][B.ATTR_DB_PRESTEP].split(":")
if B.ATTR_ARTS_PRESTEP in self.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][t]:
a = self.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][t][B.ATTR_ARTS_PRESTEP].split(":")
if a[0] != self.name:
comp = cm.getComponent(a[0])
else:

2
basic/componentHandling.py

@ -192,6 +192,8 @@ class ComponentManager:
name = compName
i = 0
c.name = name
c.classname = compName
c.m = basic.message.Message(job, basic.message.LIMIT_DEBUG, "logTime", name)
c.conf = utils.config_tool.mergeConn(c.m, confs["conf"], conns[i])
c.conf[B.SUBJECT_CONN] = conns[i]

47
basic/constants.py

@ -98,13 +98,20 @@ DATA_NODE_COMP = "comp"
DATA_NODE_PAR = "par"
""" This constant defines """
ATTR_ARTS_TYPE = "type"
""" must attribute for the type of the technique """
ATTR_ARTS_NAME = "name"
""" optional attribute just for information """
ATTR_ARTS_PATH = "path"
""" optional attribute for the basic folder if the artifact is stored in the filesystem """
ATTR_ARTS_RESET = "reset"
""" optional attribute if the artefact should be reset in the initializing-phase """
ATTR_ARTS_PRESTEP = "prestep"
""" optional attribute to define a source-table for this table """
LIST_ARTS_ATTR = [ATTR_ARTS_TYPE, ATTR_ARTS_PATH, ATTR_ARTS_RESET, ATTR_ARTS_PRESTEP, ATTR_ARTS_NAME]
TOPIC_NODE_DB = "db"
# testexec, db_abstr
ATTR_DB_TYPE = "type"
""" must attribute for the type of the database """
ATTR_DB_RESET = "reset"
""" optional attribute in order to use a different technical name for the db-table """
ATTR_DB_PARTITION = "partitioned"
""" optional attribute if table is partitioned
- this keyword delimited by "+" will be replaced by partition-names which are parametrized """
@ -114,10 +121,23 @@ ATTR_DB_SCHEMA = "schema"
""" optional attribute for technical name of the schema """
ATTR_DB_TABNAME = "tabname"
""" optional attribute in order to use a different technical name for the db-table """
ATTR_DB_PRESTEP = "prestep"
""" optional attribute to define a source-table for this table """
LIST_DB_ATTR = [ATTR_DB_TYPE, ATTR_DB_RESET, ATTR_DB_PARTITION,
ATTR_DB_DATABASE, ATTR_DB_SCHEMA, ATTR_DB_TABNAME, ATTR_DB_PRESTEP]
LIST_DB_ATTR = [ATTR_DB_PARTITION, ATTR_DB_DATABASE, ATTR_DB_SCHEMA, ATTR_DB_TABNAME, ATTR_ARTS_PRESTEP] + LIST_ARTS_ATTR
TOPIC_NODE_CLI = "cli"
LIST_CLI_ATTR = [] + LIST_ARTS_ATTR
TOPIC_NODE_API = "api"
LIST_API_ATTR = [] + LIST_ARTS_ATTR
TOPIC_NODE_FILE = "file"
ATTR_FILE_OLD = "oldfile"
ATTR_FILE_ROTATE = "rotate"
LIST_FILE_ATTR = [ATTR_FILE_OLD, ATTR_FILE_ROTATE] + LIST_ARTS_ATTR
LIST_ATTR = {
TOPIC_NODE_DB: LIST_DB_ATTR,
TOPIC_NODE_API: LIST_API_ATTR,
TOPIC_NODE_CLI: LIST_CLI_ATTR,
TOPIC_NODE_FILE: LIST_FILE_ATTR
}
ATTR_DB_CONN_JAR = "conn_jar_name"
""" optional attribute for connection-jar-file instead of connection by ip, port """
ATTR_CONN_HOST = "hostname"
@ -131,15 +151,6 @@ ATTR_CONN_DOMPATH = "dompath"
ATTR_CONN_USER = "user"
ATTR_CONN_PASSWD = "password"
LIST_CONN_ATTR = [ATTR_DB_CONN_JAR, ATTR_CONN_HOST, ATTR_CONN_IP, ATTR_CONN_PORT, ATTR_CONN_DOMPATH, ATTR_CONN_USER, ATTR_CONN_PASSWD]
TOPIC_NODE_CLI = "cli"
ATTR_CLI_TYPE = "type"
LIST_CLI_ATTR = [ATTR_CLI_TYPE]
TOPIC_NODE_API = "api"
ATTR_API_TYPE = "type"
LIST_API_ATTR = [ATTR_API_TYPE]
TOPIC_NODE_FILE = "file"
ATTR_FILE_TYPE = "type"
LIST_FILE_ATTR = [ATTR_FILE_TYPE]
# the configuration of a component or tool
# entity { : variable name of the group, basic, component-name or tool-name

19
basic/step.py

@ -14,6 +14,7 @@ b) execute specific test-entity in the test-suite-execution
"""
import basic.constants as B
import utils.data_const as D
import utils.i18n_tool
LIST_ARGS = [
"start", # for starting the specified main-program
@ -34,6 +35,11 @@ class Step:
self.execStep = ""
self.args = {}
def getStepText(self, job):
text = self.comp+D.CSV_DELIMITER+str(self.execStep)+D.CSV_DELIMITER+self.refLine
for k in self.args:
text += D.CSV_DELIMITER+k+":"+self.args[k]
return text+"\n"
def parseOldStep(job, fields):
step = {}
@ -63,7 +69,7 @@ def parseOldStep(job, fields):
return step
def parseStep(job, fields):
step = Step
step = Step()
step.comp = fields[D.STEP_COMP_I]
step.execStep = fields[D.STEP_EXECNR_I]
step.refLine = fields[D.STEP_REFNR_I]
@ -89,3 +95,14 @@ def parseStep(job, fields):
setattr(step, b[0], b[1])
# data[B.DATA_NODE_STEPS].append(step)
return step
def getStepHeader(job):
text = "# "
text += utils.i18n_tool.I18n.getInstance().getText(f"{D.CSV_BLOCK_STEP=}", job)
text += ";"+utils.i18n_tool.I18n.getInstance().getText(f"{D.STEP_ATTR_COMP=}", job)
text += ";"+utils.i18n_tool.I18n.getInstance().getText(f"{D.STEP_ATTR_EXECNR=}", job)
text += ";"+utils.i18n_tool.I18n.getInstance().getText(f"{D.STEP_ATTR_REFNR=}", job)
text += ";"+utils.i18n_tool.I18n.getInstance().getText(f"{D.STEP_ATTR_ARGS=}", job)
return text + ";..;;;\n"

27
basic/toolHandling.py

@ -9,6 +9,8 @@ import os
import basic.program
import basic.constants as B
# -------------------------------------------------
import utils.config_tool
def hasAttr(o, name):
if (isinstance(o, dict)):
@ -46,10 +48,20 @@ def getCompAttr(comp, topic, attr, table=""):
return getAttr(comp.conf[B.SUBJECT_ARTS][topic], attr)
raise LookupError(topic+"."+attr+" is not set in comp " + comp.name)
def getTool(technicType, comp, job):
if technicType == B.TOPIC_NODE_DB:
return getDbTool(comp, job)
if technicType == B.TOPIC_NODE_CLI:
return getCliTool(comp, job)
if technicType == B.TOPIC_NODE_API:
return getApiTool(comp, job)
if technicType == B.TOPIC_NODE_FILE:
# TODO im Allgemeinen keine konrete Implementierung aufrufen,
# denn zu einer Komponente koennen unterschiedliche Dateien vorkommen
return getFileTool(job, comp, "")
# class ToolManager:
def getDbTool(comp):
job = basic.program.Job.getInstance()
def getDbTool(comp, job):
verify = int(job.getDebugLevel("db_tool"))
dbtype = getCompAttr(comp, B.TOPIC_NODE_DB, B.ATTR_TYPE, "")
toolname = "db"+dbtype+"_tool"
@ -63,8 +75,7 @@ def getDbTool(comp):
c.setComp(comp)
return c
def getCliTool(comp):
job = basic.program.Job.getInstance()
def getCliTool(comp, job):
verify = int(job.getDebugLevel("db_tool"))
clitype = getCompAttr(comp, B.TOPIC_NODE_CLI, B.ATTR_TYPE, "")
toolname = "cli"+clitype+"_tool"
@ -78,8 +89,7 @@ def getCliTool(comp):
c.setComp(comp)
return c
def getApiTool(comp):
job = basic.program.Job.getInstance()
def getApiTool(comp, job):
verify = int(job.getDebugLevel("db_tool"))
apitype = getCompAttr(comp, B.TOPIC_NODE_API, B.ATTR_TYPE, "")
toolname = "api"+apitype+"_tool"
@ -93,8 +103,11 @@ def getApiTool(comp):
c.setComp(comp)
return c
def getFileTool(job, comp=None):
def getFileTool(job, comp, filenode=""):
verify = int(job.getDebugLevel("db_tool"))
if len(filenode) > 3 and filenode[-1:] != ".":
filetype = utils.config_tool.getAttribute(comp, filenode, B.ATTR_ARTS_TYPE, job)
else:
filetype = getCompAttr(comp, B.TOPIC_NODE_FILE, B.ATTR_TYPE, "")
toolname = "file"+filetype+"_tool"
filepath = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "utils", toolname+".py")

23
init_testcase.py

@ -1,25 +1,30 @@
# This is a sample Python script.
import os
# import jsonpickle # pip install jsonpickle
import yaml # pip install pyyaml
#!/usr/bin/python
# program to execute steps of a testcase
# PARAM: --environment --application --tcdir [ testcase, tctime ]
# main functions
# + reset_testcase() : comp-config --> system
# + load_testcase() : testspec --> tdata --> system.data
# + select_testcase() : system.data --> data --> archiv.result
# ---------------------------------------------------import os
import basic.program as program
import utils.tdata_tool
import basic.componentHandling
import basic.constants as B
import utils.file_tool
import utils.path_tool
import utils.path_const as P
import basic.message as message
# Press Umschalt+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
PROGRAM_NAME = "init_testcase"
def startPyJob(job):
cm = basic.componentHandling.ComponentManager.getInstance("init")
print("cm "+str(cm))
cm.initComponents()
comps = cm.getComponents(PROGRAM_NAME)
job.m.setMsg("# Components initialized with these relevant components " + str(comps))
testdata = utils.tdata_tool.getTestdata()
print("------------------------------------------------------------")
for c in comps:
comp = cm.getComponent(c)
comp.m.logInfo("------- "+comp.name+" ----------------------------------------")
@ -28,7 +33,7 @@ def startPyJob(job):
if job.hasFunction("load_TData"):
comp.load_TData(B.PAR_TESTCASE, testdata)
if job.hasFunction("read_TData"):
comp.read_TData("vorher", B.PAR_TESTCASE)
comp.read_TData(utils.path_tool.getKeyValue(P.KEY_PRECOND), B.PAR_TESTCASE)
comp.m.logInfo("------- "+comp.name+" ----------------------------------------")
job.m.merge(comp.m)
print(str(comp))
@ -39,8 +44,6 @@ if __name__ == '__main__':
print(PROGRAM_NAME)
x = program.Job(PROGRAM_NAME)
x.startJob()
x.m.logInfo("hier eine LogInfo")
x.m.logDebug("hier eine DbugMeldung")
x.m.logDebug(str(vars(x.par)) + "\n" + str(vars(x.conf)))
if x.m.isRc("fatal"):
x.stopJob()

2
test/test_02css.py

@ -23,8 +23,6 @@ class MyTestCase(unittest.TestCase):
"tool": "job_tool", "tdtyp": "csv", "tdsrc": "implement", "tdname": "firstunit",
"modus": "unit"}
job.par.setParameterArgs(args)
if verbose: print("eeeeeeeee")
if verbose: print(json.dumps(job.conf.confs))
# ------- inline ---------------
job.conf.setConfig("tools.csstyp", "inline")
job.conf.confs.get("tools")["csstyp"] == "inline"

42
test/test_04config.py

@ -1,3 +1,12 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung
# ---------------------------------------------------------------------------------------------------------
import sys
import basic.constants as B
import unittest
import os
import inspect
@ -8,13 +17,14 @@ import test.testtools
import utils.path_const as P
import basic.constants as B
TEST_FUNCTIONS = ["test_getConfig", "test_mergeAttributes"]
VERIFY = False
TEST_FUNCTIONS = ["test_01getConfig", "test_02mergeAttributes", "test_03getAttributes"]
TEST_FUNCTIONS = ["test_03getAttributes"]
verbose = False
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
def test_getConfig(self):
def test_01getConfig(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
@ -39,12 +49,12 @@ class MyTestCase(unittest.TestCase):
r = utils.config_tool.getConfigPath(P.KEY_COMP, "testcrm")
self.assertIn(os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS), "testcrm", "CONFIG"), r)
r = utils.config_tool.getConfig(P.KEY_TOOL, "path")
if VERIFY: print("pattern " + r["pattern"]["log"])
if VERIFY: print("pattern " + r["pattern"]["precond"])
if verbose: print("pattern " + r["pattern"]["log"])
if verbose: print("pattern " + r["pattern"]["precond"])
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_mergeAttributes(self):
def test_02mergeAttributes(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
@ -65,20 +75,32 @@ class MyTestCase(unittest.TestCase):
componentName = "testprddb"
confs = utils.config_tool.getConfig("comp", componentName)
conns = utils.conn_tool.getConnections(componentName)
self.assertNotIn(B.ATTR_DB_TYPE, confs["conf"][B.SUBJECT_ARTS][B.TOPIC_NODE_DB])
self.assertNotIn(B.ATTR_ARTS_TYPE, confs["conf"][B.SUBJECT_ARTS][B.TOPIC_NODE_DB])
confs["conf"] = utils.config_tool.mergeConn(job.m, confs["conf"], conns[0])
self.assertIn(B.ATTR_DB_TYPE, confs["conf"][B.SUBJECT_ARTS][B.TOPIC_NODE_DB])
self.assertIn(B.ATTR_ARTS_TYPE, confs["conf"][B.SUBJECT_ARTS][B.TOPIC_NODE_DB])
cnttest += 1 # new attribute
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_getAttributes(self):
def test_03getAttributes(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
comp = test.testtools.getComp("testprddb")
path = "db.product"
attrList = utils.config_tool.getAttributeList(comp, path, job)
self.assertIn(B.ATTR_ARTS_PATH, attrList)
self.assertIn(B.ATTR_ARTS_RESET, attrList)
cnttest += 2 # new attribute
comp = test.testtools.getComp("testrest")
path = "file.xmlrest"
attrList = utils.config_tool.getAttributeList(comp, path, job)
print(str(comp.conf["conn"]))
print(str(comp.conf[B.SUBJECT_ARTS]))
print(str(attrList))
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
@ -87,5 +109,5 @@ class MyTestCase(unittest.TestCase):
if __name__ == '__main__':
VERIFY = True
verbose = True
unittest.main()

40
test/test_07catalog.py

@ -2,6 +2,7 @@ import unittest
import os
import inspect
import utils.path_tool
import basic.message
import basic.program
import basic.constants as B
import test.constants
@ -13,9 +14,9 @@ HOME_PATH = test.constants.HOME_PATH
OS_SYSTEM = test.constants.OS_SYSTEM
# here you can select single testfunction for developping the tests
TEST_FUNCTIONS = ["test_01class", "test_02key"]
#TEST_FUNCTIONS = [ "test_02key"]
verbose = True
TEST_FUNCTIONS = ["test_01class", "test_02read", "test_03key"]
TEST_FUNCTIONS = [ "test_03key"]
verbose = False
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
@ -29,16 +30,46 @@ class MyTestCase(unittest.TestCase):
job = test.testtools.getJob()
catalog = basic.catalog.Catalog.getInstance()
self.assertIsNotNone(catalog)
cnttest += 1
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_02key(self):
def test_02read(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
catalog = basic.catalog.Catalog.getInstance()
self.assertRaises(Exception, catalog.readDomain, ("xxx", job))
cnttest += 1
res = catalog.readDomain("countries", job)
self.assertEqual(isinstance(res, dict), True)
cnttest += 1
countries = catalog.getKeys("countries", job)
self.assertEqual(len(countries), 21)
cnttest += 1
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_03key(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
catalog = basic.catalog.Catalog.getInstance()
res = catalog.getValue("countries", "key", job)
self.assertEqual(res, "")
self.assertEqual(job.m.rc, basic.message.RC_ERROR)
cnttest += 1
res = catalog.getValue("countries", "TD", job)
print(str(res))
self.assertEqual(res["Land"], "Tschad")
print(str(res))
cnttest += 1
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
@ -47,4 +78,5 @@ class MyTestCase(unittest.TestCase):
if __name__ == '__main__':
verbose = True
unittest.main()

7
test/test_08i18n.py

@ -15,6 +15,7 @@ import test.testtools
import utils.path_const as P
import utils.i18n_tool
import basic.constants as B
import utils.data_const as D
HOME_PATH = test.constants.HOME_PATH
OS_SYSTEM = test.constants.OS_SYSTEM
@ -83,7 +84,11 @@ class MyTestCase(unittest.TestCase):
# i18n.getText("EXP_KEY_MISSING", EXP_KEY_MISSING, job)
res = i18n.getAliasList(f"{EXP_KEY_MISSING=}", job)
if verbose: print("RESULT "+str(res))
self.assertEqual(res, ['key is missing {}', 'Schluesselwort fehlt {}'])
self.assertEqual(res, ['key is missing {}', 'key is missing {}', 'Schluesselwort fehlt {}'])
cnttest += 1
res = i18n.getAliasList(f"{D.DATA_ATTR_ALIAS=}", job)
if verbose: print("RESULT "+str(res))
self.assertEqual(res, ["_alias"])
cnttest += 1
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)

94
test/test_24gen.py

@ -0,0 +1,94 @@
"""
unit-test
"""
import unittest
import inspect
import utils.gen_tool
import utils.gen_const as G
import basic.program
import test.testtools
# the list of TEST_FUNCTIONS defines which function will be really tested.
# if you minimize the list you can check the specific test-function
TEST_FUNCTIONS = ["test_11getValueList", "test_12getCntElement", "test_13getMinCount", "test_14getElemList"]
TEST_FUNCTIONS = ["test_01getElemList"]
# with this variable you can switch prints on and off
verbose = False
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
def test_01getElemList(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
res = utils.gen_tool.getElemList(G.KEY_LIST, "0 .. 5", 6, job)
print((str(res)))
res = utils.gen_tool.getElemList(G.KEY_LIST, "a, b, c, d", 6, job)
print((str(res)))
res = utils.gen_tool.getElemList(G.KEY_LIST, "cat:countries", 6, job)
print((str(res)))
def test_11getValueList(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
for cnt in [3, 4, 6, 10]:
res = utils.gen_tool.getValueList(["A", "B", "C", "D"], cnt, job)
self.assertEqual(len(res), cnt)
cnttest += 1
for cnt in [3, 4, 6, 10]:
res = utils.gen_tool.getValueList("[A, B, C, D]", cnt, job)
self.assertEqual(len(res), cnt)
cnttest += 1
for cnt in [3, 4, 6, 10]:
res = utils.gen_tool.getValueList("0 .. 4", cnt, job)
self.assertEqual(len(res), cnt)
cnttest += 1
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_12getCntElement(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
def test_13getMinCount(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
def test_14getElemList(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
def test_zzz(self):
if verbose: print(MyTestCase.mymsg)
if __name__ == '__main__':
verbose = True
unittest.main()

12
test/testtools.py

@ -3,6 +3,7 @@ import basic.constants as B
import basic.component
import utils.data_const as D
import test.constants as T
import utils.config_tool
DEFAULT_GRAN = "tc"
DEFAULT_APP = "TESTAPP"
@ -82,9 +83,12 @@ def getJob(pgran="", papp="", penv="", ptstamp="", pmode=""):
return job
def getComp():
def getComp(componentName):
comp = basic.component.Component()
comp.conf = {}
comp.name = "person"
pass
comp.name = componentName
confs = utils.config_tool.getConfig("comp", componentName)
conns = utils.conn_tool.getConnections(componentName)
comp.conf = confs["conf"]
comp.conf[B.SUBJECT_CONN] = conns[0]
return comp

3
utils/api_abstract.py

@ -28,6 +28,9 @@ class ApiFcts():
self.comp = None
pass
def reset_TData(self, job):
pass
def setComp(self, comp):
self.comp = comp

4
utils/cli_abstract.py

@ -22,6 +22,7 @@ The main tasks are: \n
"""
import basic.program
import utils.config_tool
import basic.constants as B
class CliFcts():
"""
@ -32,6 +33,9 @@ class CliFcts():
self.comp = None
pass
def reset_TData(self, job):
pass
def setComp(self, comp):
self.comp = comp

88
utils/config_tool.py

@ -3,6 +3,7 @@
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung
# ---------------------------------------------------------------------------------------------------------
import sys
import basic.constants as B
@ -23,7 +24,7 @@ import utils.path_const as P
COMP_FILES = [D.DDL_FILENAME]
CONFIG_FORMAT = [D.DFILE_TYPE_YML, D.DFILE_TYPE_JSON, D.DFILE_TYPE_CSV]
def getConfigPath(modul, name, subname=""):
def getConfigPath(modul, name, subname="", job=None):
"""
gets the most specified configuration of different sources
Parameter:
@ -39,6 +40,7 @@ def getConfigPath(modul, name, subname=""):
the parameter-files could be one of these file-types:
* yaml, json, csv
"""
if job is None:
job = basic.program.Job.getInstance()
verify = job.getDebugLevel("config_tool")-4
job.debug(verify, "getConfig " + modul + ", " + name)
@ -106,6 +108,39 @@ def getConfigPath(modul, name, subname=""):
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTCASE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
name, D.DFILE_TESTCASE_NAME + "."+format)
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_TESTSUITE:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
name, D.DFILE_TESTSUITE_NAME + "." + format)
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
elif modul == P.KEY_CATALOG:
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_TDATA),
P.KEY_CATALOG, name + "." + format)
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_COMPONENTS),
P.KEY_CATALOG, name + "." + format)
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
for format in CONFIG_FORMAT:
pathname = os.path.join(job.conf.getPath(P.ATTR_PATH_PROGRAM),
P.KEY_CATALOG, name + "." + format)
job.debug(verify, "4 " + pathname)
if os.path.exists(pathname):
return pathname
else:
pathname = utils.path_tool.composePath(P.P_TCPARFILE)
job.debug(verify, "7 " + pathname)
@ -213,10 +248,61 @@ def getConfig(modul, name, subname=""):
return confs
def getAttribute(comp, path, attr, job):
attrList = getAttributeList(comp, path, job)
if attr in attrList:
return attrList[attr]
else:
return ""
def getAttributeList(comp, path, job):
"""
gets a concrete attribute-list for an arteifact-element from the config-attributes from the connection-attributes
https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie
:param comp:
:param path: artifact-type.artifact-name for example: DB.person
:return: list of all attributes for the artifact-element
"""
attrList = {}
a = path.split(".")
artType = a[0]
artName = a[1]
if B.SUBJECT_CONN not in comp.conf:
raise Exception ("Environment is not configured")
if artType in comp.conf[B.SUBJECT_CONN]:
if artName in comp.conf[B.SUBJECT_CONN][artType]:
for attr, val in comp.conf[B.SUBJECT_CONN][artType][artName].items():
if attr not in B.LIST_ATTR[artType]:
continue
attrList[attr] = val
for attr, val in comp.conf[B.SUBJECT_CONN][artType].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
if artType in comp.conf[B.SUBJECT_ARTS]:
if artName in comp.conf[B.SUBJECT_ARTS][artType]:
for attr, val in comp.conf[B.SUBJECT_ARTS][artType][artName].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
for attr, val in comp.conf[B.SUBJECT_ARTS][artType].items():
if attr not in B.LIST_ATTR[artType]:
continue
if attr in attrList:
continue
attrList[attr] = val
return attrList
def mergeConn(msg, conf, conn):
"""
merges the config-attributes from the connection-attributes
because the connection-attributes has to overwrite the config-attributes if the subject is configured
https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/225-konfiguration-der-testanwendung#konfigurationshierarchie
:param conf:
:param conn:
:return:

50
utils/data_const.py

@ -2,11 +2,13 @@
"""
constants for used for api-functions
"""
import basic.constants as B
DDL_FILENAME = "DATASTRUCTURE"
DATA_NODE_TYPE = "type"
TYPE_STRING = "string"
TYPE_STR = "str"
TYPE_INT = "int"
TYPE_FLOAT = "float"
TYPE_DOUBLE = "double"
@ -23,15 +25,46 @@ DDL_TYPE = "type"
DFILE_TYPE_YML = "yml"
DFILE_TYPE_JSON = "json"
DFILE_TYPE_CSV = "csv"
DFILE_TESTCASE_NAME = "testspec"
DFILE_TESTSUITE_NAME = "testsuite"
DFILE_TABLE_PREFIX = "table_"
LIST_DFNAME_ATTR = [DFILE_TESTCASE_NAME, DFILE_TESTSUITE_NAME, DFILE_TABLE_PREFIX]
LIST_DFNAME_CONST = ["DFILE_TESTCASE_NAME", "DFILE_TESTSUITE_NAME", "DFILE_TABLE_PREFIX"]
DATA_SRC_DIR = "dir"
DATA_SRC_CSV = "csv"
DATA_ATTR_COUNT = "_count"
""" statistical information of data-count """
DATA_ATTR_DATE = "_date"
""" reference-date for computing the actual date in relation to specification or expectation """
DATA_ATTR_COMP = "_comp"
""" reference to using componente with their object """
DATA_ATTR_CHAR = "_char"
""" character of the data in order to delete it ión initialization """
DATA_ATTR_KEY = "_key"
""" key for a data-specification of a catalog-list - default: the first field is the key """
DATA_ATTR_ALIAS = "_alias"
LIST_DATA_ATTR = [DATA_ATTR_COUNT, DATA_ATTR_DATE, DATA_ATTR_CHAR, DATA_ATTR_COMP, DATA_ATTR_ALIAS,DATA_ATTR_KEY]
LIST_ATTR_CONST = ["DATA_ATTR_COUNT", "DATA_ATTR_DATE", "DATA_ATTR_CHAR", "DATA_ATTR_COMP", "DATA_ATTR_ALIAS", "DATA_ATTR_KEY"]
HEAD_ATTR_DESCR = "decription"
HEAD_ATTR_TARGET = "target"
HEAD_ATTR_USECASE = "usecase"
HEAD_ATTR_UCID = "usecase-id"
HEAD_ATTR_STORY = "story"
HEAD_ATTR_STORYID = "storyid-id"
HEAD_ATTR_APPS = B.SUBJECT_APPS
HEAD_ATTR_DEPR = "deprecated"
LIST_HEAD_ATTR = [HEAD_ATTR_DESCR, HEAD_ATTR_TARGET, HEAD_ATTR_USECASE, HEAD_ATTR_UCID,
HEAD_ATTR_STORY, HEAD_ATTR_STORYID, HEAD_ATTR_APPS, HEAD_ATTR_DEPR]
LIST_HEAD_CONST = ["HEAD_ATTR_DESCR", "HEAD_ATTR_TARGET", "HEAD_ATTR_USECASE", "HEAD_ATTR_UCID",
"HEAD_ATTR_STORY", "HEAD_ATTR_STORYID", "HEAD_ATTR_APPS", "HEAD_ATTR_DEPR"]
CSV_HEADER_START = ["node", "table", "tabelle"]
CSV_DELIMITER = ";"
INTERNAL_DELIMITER = "||"
"""
internal structure of testdata
@ -42,13 +75,26 @@ CSV_SPECTYPE_KEYS = "keys"
CSV_SPECTYPE_CONF = "conf"
CSV_NODETYPE_KEYS = "_keys"
CSV_BLOCK_OPTION = "option"
CSV_BLOCK_STEP = "step"
CSV_BLOCK_HEAD = "_head"
CSV_BLOCK_OPTION = B.DATA_NODE_OPTION
CSV_BLOCK_STEP = B.DATA_NODE_STEPS
CSV_BLOCK_TABLES = B.DATA_NODE_TABLES
CSV_BLOCK_IMPORT = "_import"
LIST_CSV_BLOCKS = [CSV_BLOCK_HEAD, CSV_BLOCK_OPTION, CSV_BLOCK_STEP, CSV_BLOCK_TABLES, CSV_BLOCK_IMPORT]
LIST_BLOCK_CONST = ["CSV_BLOCK_HEAD", "CSV_BLOCK_OPTION", "CSV_BLOCK_STEP", "CSV_BLOCK_TABLES", "CSV_BLOCK_IMPORT"]
STEP_COMP_I = 1
STEP_EXECNR_I = 2
STEP_REFNR_I = 3
STEP_ARGS_I = 4
STEP_LIST_I = 4
STEP_ATTR_COMP = "component"
STEP_ATTR_EXECNR = "exec-step"
STEP_ATTR_REFNR = "reference-nr"
STEP_ATTR_ARGS = "arguments"
LIST_STEP_ATTR = [STEP_ATTR_COMP, STEP_ATTR_EXECNR, STEP_ATTR_REFNR, STEP_ATTR_ARGS]
LIST_STEP_CONST = ["STEP_ATTR_COMP", "STEP_ATTR_EXECNR", "STEP_ATTR_REFNR", "STEP_ATTR_ARGS"]
EXCP_MALFORMAT = "malformated line: "
ATTR_SRC_TYPE = "tdtyp"

37
utils/db_abstract.py

@ -219,7 +219,7 @@ def formatDbField(comp, val, field):
def formatDbVal(msg, val, dtyp):
ctlg = basic.catalog.Catalog.getInstance()
if dtyp == D.TYPE_STRING:
if dtyp == D.TYPE_STRING or dtyp == D.TYPE_STR:
if not isinstance(val, str):
msg.logError("field must be " + dtyp + ", " + str(val))
return str(val)
@ -278,14 +278,14 @@ class DbFcts():
return getDbAttributes(self.comp, table)
def selectTables(self, subdir):
def selectTables(self, subdir, job):
""" method to delete rows from a database
statement written in sql """
self.loadDdl()
self.loadDdl(job)
tdata = {}
tdata[subdir] = {}
for t in self.comp.conf[B.DATA_NODE_DDL]:
tdata[subdir][t] = self.selectRows(t)
tdata[subdir][t] = self.selectRows(t, job)
if B.DATA_NODE_DATA not in tdata[subdir][t]:
raise Exception("missing data node in table")
tdata[subdir][t][D.DATA_ATTR_COUNT] = len(tdata[subdir][t][B.DATA_NODE_DATA])
@ -293,25 +293,28 @@ class DbFcts():
self.comp.m.logMsg("Tabelle {} mit {} Zeilen gelesen".format(t, len(tdata[subdir][t][B.DATA_NODE_DATA])))
return tdata
def selectRows(self, statement):
def selectRows(self, table, job):
""" method to select rows from a database
statement written in sql """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def deleteTables(self):
def reset_TData(self, job):
self.deleteTables(job)
def deleteTables(self, job):
""" method to delete rows from a database
statement written in sql """
self.loadDdl()
self.loadDdl(job)
for t in self.comp.conf[B.DATA_NODE_DDL]:
print("zu loeschende Tabelle "+t)
self.deleteRows(t)
self.deleteRows(t, job)
def deleteRows(self, table):
def deleteRows(self, table, job):
""" method to delete rows from a database
statement written in sql """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def updateRows(self, statement):
def updateRows(self, statement, job):
""" method to delete rows from a database
statement written in sql """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
@ -321,17 +324,17 @@ class DbFcts():
this method should only called by the class itself """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def insertTables(self, tdata):
def insertTables(self, tdata, job):
"""
method to insert rows into the database of the component
"""
# TODO wird der Tabellenname/DB/Schema unter tdata gespeichert?
plainname = basic.componentHandling.getPlainCompname(self.comp.name)
self.loadDdl()
self.loadDdl(job)
for t in tdata[B.DATA_NODE_TABLES]:
print("einzufuegende Tabelle "+self.comp.name+" "+t)
if isCompTable(self.comp, tdata, t):
self.insertRows(t, tdata[B.DATA_NODE_TABLES][t][B.DATA_NODE_DATA])
self.insertRows(t, tdata[B.DATA_NODE_TABLES][t][B.DATA_NODE_DATA], job)
self.comp.m.logMsg("in Tabelle {} {} Zeilen eingefuegt".format(
t, len(tdata[B.DATA_NODE_TABLES][t][B.DATA_NODE_DATA])))
@ -347,10 +350,9 @@ class DbFcts():
this method should only called by the class itself """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def loadDdl(self):
def loadDdl(self, job):
"""" load the DDL for each database-table
the ddl are mostly stored as csv in the component-folder """
job = basic.program.Job.getInstance()
if (B.DATA_NODE_DDL in self.comp.conf):
return
conf = utils.config_tool.getConfig(D.DDL_FILENAME, self.comp.name)
@ -365,11 +367,10 @@ class DbFcts():
return ""
def getDbValue(self, fo, value):
# TODO Untersceidung csv und echte DB
return formatDbField(self.comp, value, fo)
value = str(formatDbField(self.comp, value, fo))
if len(value.strip()) == 0 and fo[D.DDL_FNULLABLE] == B.SVAL_YES:
return self.getDbNull()
if fo[D.DATA_NODE_TYPE] == D.TYPE_STRING:
if fo[D.DATA_NODE_TYPE] == D.TYPE_STRING or fo[D.DATA_NODE_TYPE] == D.TYPE_STR:
return "'"+value.strip()+"'"
elif fo[D.DATA_NODE_TYPE] == D.TYPE_INT:
return value.strip()

10
utils/dbcsv_tool.py

@ -21,7 +21,7 @@ class DbFcts(utils.db_abstract.DbFcts):
def __init__(self):
pass
def selectRows(self, table):
def selectRows(self, table, job):
""" method to select rows from a database
statement written in sql """
sqlTable = utils.db_abstract.getSqlTable(self.comp, table)
@ -40,10 +40,9 @@ class DbFcts(utils.db_abstract.DbFcts):
return tdata
def deleteRows(self, table):
def deleteRows(self, table, job):
""" method to delete rows from a database
statement written in sql """
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
sqlTable = utils.db_abstract.getSqlTable(self.comp, table)
header = ""
@ -62,11 +61,10 @@ class DbFcts(utils.db_abstract.DbFcts):
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def insertRows(self, table, rows):
def insertRows(self, table, rows, job):
""" method to insert rows into a database
the rows will be interpreted by the ddl of the component
"""
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
sqlTable = utils.db_abstract.getSqlTable(self.comp, table)
header = ""
@ -89,7 +87,7 @@ class DbFcts(utils.db_abstract.DbFcts):
rowvalues = self.comp.name+":"+table
for h in self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]:
print("head "+h)
if h in B.LIST_DB_ATTR:
if h in [B.DATA_NODE_HEADER, B.DATA_NODE_DATA]:
continue
print("h "+h)
if (h in r):

11
utils/dbmysql_tool.py

@ -19,11 +19,10 @@ class DbFcts(utils.db_abstract.DbFcts):
def __init__(self):
pass
def selectRows(self, table):
def selectRows(self, table, job):
""" method to select rows from a database
statement written in sql """
tdata = {}
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "SELECT * FROM "+table+";"
#mycursor = self.getConnector()
@ -38,24 +37,22 @@ class DbFcts(utils.db_abstract.DbFcts):
self.comp.m.logInfo(cmd)
return tdata
def deleteRows(self, table):
def deleteRows(self, table, job):
""" method to delete rows from a database
statement written in sql """
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "DELETE FROM "+table+";"
self.comp.m.logInfo(cmd)
def updateRows(self, statement):
def updateRows(self, statement, job):
""" method to delete rows from a database
statement written in sql """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def insertRows(self, table, rows):
def insertRows(self, table, rows, job):
""" method to insert rows into a database
the rows will be interpreted by the ddl of the component
"""
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "INSERT INTO "+table+";"
header = ""

11
utils/dbrel_tool.py

@ -19,11 +19,10 @@ class DbFcts(utils.db_abstract.DbFcts):
def __init__(self):
pass
def selectRows(self, table):
def selectRows(self, table, job):
""" method to select rows from a database
statement written in sql """
tdata = {}
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "SELECT * FROM "+table+";"
#mycursor = self.getConnector()
@ -38,24 +37,22 @@ class DbFcts(utils.db_abstract.DbFcts):
self.comp.m.logInfo(cmd)
return tdata
def deleteRows(self, table):
def deleteRows(self, table, job):
""" method to delete rows from a database
statement written in sql """
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "DELETE FROM "+table+";"
self.comp.m.logInfo(cmd)
def updateRows(self, statement):
def updateRows(self, statement, job):
""" method to delete rows from a database
statement written in sql """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def insertRows(self, table, rows):
def insertRows(self, table, rows, job):
""" method to insert rows into a database
the rows will be interpreted by the ddl of the component
"""
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "INSERT INTO "+table+";"
header = ""

8
utils/dbsfile_tool.py

@ -25,13 +25,12 @@ class DbFcts(utils.db_abstract.DbFcts):
pass
def selectRows(self, table):
def selectRows(self, table, job):
""" method to select rows from a database
statement written in sql """
tdata = {}
dry = 0
# attr = self.getDbAttributes(table)
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
pattern = "s3a://{hostname}/data/{tenant}/mt/sandboxes/{job.par.usecae}/{job.par.workspace}/{outfile}/VR_+reg+/"
files = self.comp.composeFileClauses(pattern)
@ -51,10 +50,9 @@ class DbFcts(utils.db_abstract.DbFcts):
#tdata[B.DATA_NODE_DATA] = data
return tdata
def deleteRows(self, table):
def deleteRows(self, table, job):
""" method to delete rows from a database
statement written in sql """
job = basic.program.Job.getInstance()
dry = 0
verify = -1+job.getDebugLevel("db_tool")
cmd = "DELETE FROM "+table
@ -72,7 +70,7 @@ class DbFcts(utils.db_abstract.DbFcts):
print("select "+sql)
#self.comp.m.logInfo(cmd)
def insertRows(self, table, rows):
def insertRows(self, table, rows, job):
""" method to insert rows into a database
the rows will be interpreted by the ddl of the component
"""

9
utils/dbshive_tool.py

@ -26,13 +26,11 @@ class DbFcts(utils.db_abstract.DbFcts):
pass
def selectRows(self, table):
def selectRows(self, table, job):
""" method to select rows from a database
statement written in sql """
tdata = {}
dry = 0
# attr = self.getDbAttributes(table)
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "SELECT "+",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER])
cmd += " FROM "+table
@ -56,10 +54,9 @@ class DbFcts(utils.db_abstract.DbFcts):
tdata[B.DATA_NODE_DATA] = data
return tdata
def deleteRows(self, table):
def deleteRows(self, table, job):
""" method to delete rows from a database
statement written in sql """
job = basic.program.Job.getInstance()
dry = 0
verify = -1+job.getDebugLevel("db_tool")
cmd = "DELETE FROM "+table
@ -77,7 +74,7 @@ class DbFcts(utils.db_abstract.DbFcts):
print("select "+sql)
#self.comp.m.logInfo(cmd)
def insertRows(self, table, rows):
def insertRows(self, table, rows, job):
""" method to insert rows into a database
the rows will be interpreted by the ddl of the component
"""

3
utils/dbspark_tool.py

@ -23,11 +23,10 @@ class DbFcts(utils.db_abstract.DbFcts):
return out
def selectRows(self, table):
def selectRows(self, table, job):
""" method to select rows from a database
statement written in sql """
tdata = {}
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "SELECT "+",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER])
cmd += " FROM "+table+""+self.getWhere()+""+self.getOrder()

3
utils/file_abstract.py

@ -32,6 +32,9 @@ class FileFcts():
def __init__(self):
pass
def reset_TData(self, job):
pass
def setComp(self, job, comp=None):
self.job = job
self.comp = comp

19
utils/filejson_tool.py

@ -0,0 +1,19 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.program
import utils.config_tool
import utils.file_abstract
import basic.constants as B
import utils.path_tool
import utils.file_tool
import utils.tdata_tool
class FileFcts(utils.file_abstract.FileFcts):
def __init__(self):
pass

19
utils/filelog_tool.py

@ -0,0 +1,19 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.program
import utils.config_tool
import utils.file_abstract
import basic.constants as B
import utils.path_tool
import utils.file_tool
import utils.tdata_tool
class FileFcts(utils.file_abstract.FileFcts):
def __init__(self):
pass

19
utils/filexml_tool.py

@ -0,0 +1,19 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.program
import utils.config_tool
import utils.file_abstract
import basic.constants as B
import utils.path_tool
import utils.file_tool
import utils.tdata_tool
class FileFcts(utils.file_abstract.FileFcts):
def __init__(self):
pass

23
utils/gen_const.py

@ -0,0 +1,23 @@
#!/usr/bin/python
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/232-testfallgenerierung
# ---------------------------------------------------------------------------------------------------------
KEY_RANDOM = "random"
KEY_LIST = "list"
KEY_VARNUM = "varnum"
KEY_VARSTR = "varstr"
KEY_VARDAT = "vardat"
KEY_PREFIX_X = "x"
VAL_DELIMITER = ","
VAL_SECTOR = " .. "
VAL_CATALOG = "cat"
CLS_MISFORMAT = "missformat"
CLS_NONE = "none"
CLS_EMPTY = "empty"
CLS_LESS = "less"
CLS_GREATER = "more"
ATTR_MIN_COUNT = "mincount"

130
utils/gen_tool.py

@ -0,0 +1,130 @@
#!/usr/bin/python
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# https://ucarmesin.de/index.php/it/testautomatisierung-fuer-daten-test/232-testfallgenerierung
# ---------------------------------------------------------------------------------------------------------
import re
import utils.config_tool
import utils.path_const as P
import basic.constants as B
import basic.program
import utils.gen_const as G
import random
import basic.catalog
VAL_CLASSES: {
"xvarnum": {
G.CLS_MISFORMAT: "str",
G.CLS_NONE: None,
G.CLS_EMPTY: 0,
G.CLS_LESS: True,
G.CLS_GREATER: True,
G.ATTR_MIN_COUNT: 5
},
"xvarnum": {
G.CLS_MISFORMAT: "str,feb",
G.CLS_NONE: None,
G.CLS_EMPTY: 0,
G.CLS_LESS: True,
G.CLS_GREATER: True,
G.ATTR_MIN_COUNT: 6
},
"xvarstr": {
G.CLS_MISFORMAT: "num,sym,koeln",
G.CLS_NONE: None,
G.CLS_EMPTY: 0,
G.CLS_LESS: False,
G.CLS_GREATER: False,
G.ATTR_MIN_COUNT: 7
}
}
def getCntElement(values, job):
if G.VAL_SECTOR in values:
return 2
elif G.VAL_DELIMITER in values:
a = values.split(G.VAL_DELIMITER)
return len(a)
elif G.VAL_CATALOG + ":" in values:
a = [0, 1, 2, 3, 4]
return len(a)
return 1
def getValueList(values, count, job):
out = []
for i in range(0, count):
out.append(values[i % len(values)])
print(str(out))
return out
def getMinCount(formula, values, job):
"""
:param formula:
:param values: definition of value-list
:param job:
:return: count of necessary values
"""
if G.KEY_RANDOM in formula:
return 1
elif formula[0:1] == G.KEY_PREFIX_X:
elems = getCntElement(values, job)
factor = 1
if VAL_CLASSES[formula][G.CLS_LESS]:
factor = factor * 2
if VAL_CLASSES[formula][G.CLS_GREATER]:
factor = factor * 2
return VAL_CLASSES[formula][G.ATTR_MIN_COUNT] + factor * (elems - 1)
elif formula == G.KEY_LIST:
return getCntElement(values, job)
return 1
def getElemList(formula, values, count, job):
"""
:param formula:
:param values:
:param count:
:param job:
:return:
"""
out = []
verbose = False
if verbose: print(values+" , "+str(count))
sector_regex = r"(.*)" + re.escape(G.VAL_SECTOR)+ r"(.*)"
delim_regex = r"(.*)" + re.escape(G.VAL_DELIMITER)+ r"(.*)"
catalog_regex = re.escape(G.VAL_CATALOG)+ r":(.*)"
if re.match(sector_regex, values):
if verbose: print("match 1")
temp = []
res = re.search(sector_regex, values)
start = res.group(1)
target = res.group(2)
if start.isdecimal() and target.isdecimal():
for i in range(int(start), int(target)):
temp.append(str(i))
if target not in temp:
temp.append(target)
if verbose: print(str(start)+" - "+str(target)+" : "+str(temp))
elif re.match(delim_regex, values):
if verbose: print("match 2")
temp = values.split(G.VAL_DELIMITER)
for i in range(0, len(temp)): temp[i] = temp[i].strip()
if verbose: print(str(temp))
elif re.match(catalog_regex, values):
res = re.search(catalog_regex, values)
domain = res.group(1)
catalog = basic.catalog.Catalog.getInstance()
temp = catalog.getKeys(domain, job)
if not isinstance(temp, list):
temp = []
max = 3
while len(temp) > 0 and len(out) < count:
out += temp
max -= 1
if max < 0: break
return out[0:count]

22
utils/i18n_tool.py

@ -6,6 +6,7 @@
# ---------------------------------------------------------------------------------------------------------
import utils.config_tool
import utils.path_const as P
import basic.constants as B
import basic.program
DEFAULT_LANGUAGE = "en"
@ -39,22 +40,18 @@ class I18n:
def getText(self, key, job=None):
"""
this function gets the text depending on language which is set in job.conf
:param domain:
:param key:
:param key: MUST GIVEN WITH (f"{CONST=}", ..
:return:
if not (isinstance(langue, str) and len(langue)):
raise Exception(EXP_KEY_MISSING, (domain, key))
if not (isinstance(key, str) and len(key)):
raise Exception(EXP_KEY_MISSING, (langue, key))
return self.cache[langue][key]
return self.cache[language][key]
"""
if job is None:
jon = basic.program.Job.getInstance()
job = basic.program.Job.getInstance()
if "language" in job.conf.confs:
language = job.conf.confs["language"]
else:
language = "en"
if language not in self.cache:
raise Exception(EXP_KEY_MISSING, (key))
print(key)
out = self.extractText(key)
key = self.extractKey(key)
if key in self.cache[language]:
@ -67,8 +64,11 @@ class I18n:
if job is None:
jon = basic.program.Job.getInstance()
out = []
out.append(self.extractText(key))
key = self.extractKey(key)
for language in self.cache:
if key not in self.cache[language]:
continue
out.append(self.cache[language][key])
return out
@ -82,9 +82,7 @@ class I18n:
else:
y = x
key = y
print("= in key " + y)
else:
print("!= in key")
y = key
return key

1
utils/path_const.py

@ -11,6 +11,7 @@ KEY_BACKUP = "backup"
KEY_REFFILE = "reffile"
KEY_TESTCASE = "tc"
KEY_TESTSUITE = "ts"
KEY_CATALOG = "catalog"
KEY_DEBUGNAME = "debugname"
KEY_LOGNAME = "logname"
KEY_BASIC = "basic"

24
utils/path_tool.py

@ -83,12 +83,13 @@ def composePattern(pattern, comp):
"""
job = basic.program.Job.getInstance()
verify = job.getDebugLevel(TOOL_NAME)
verbose = False
job.debug(verify, "composePattern " + pattern)
max=5
l = re.findall('\{.*?\}', pattern)
job.debug(verify, l)
for pat in l:
print(str(max) + ": " + pattern + ": " + pat)
if verbose: print(str(max) + ": " + pattern + ": " + pat)
pit = getKeyValue(pat[1:-1], comp)
job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
pattern = pattern.replace(pat, pit)
@ -186,9 +187,10 @@ def extractPath(pathtyp, path):
"""
job = basic.program.Job.getInstance()
patterlist = extractPattern(pathtyp)
verbose = False
work = path
i = 0
print("-- extractPatternList -- " + pathtyp + ":" + str(patterlist))
if verbose: print("-- extractPatternList -- " + pathtyp + ":" + str(patterlist))
for p in patterlist:
if len(p) < 1 : continue
delim = p[0]
@ -199,13 +201,13 @@ def extractPath(pathtyp, path):
nextdelim = ""
else:
nextdelim = patterlist[i+1][0]
print("xPath delim " + delim + " " + str(len(delim)) + ", " + nextdelim + " work " + work)
if verbose: print("xPath delim " + delim + " " + str(len(delim)) + ", " + nextdelim + " work " + work)
work = work[len(delim):]
print("xPath key " + key + " i " + str(i) + " work " + work)
if verbose: print("xPath key " + key + " i " + str(i) + " work " + work)
if val is not None:
print("val not none " + val)
if verbose: print("val not none " + val)
if val in work:
print("val ok")
if verbose: print("val ok")
work = work.replace(val, "")
elif "time" in key and "job.par" in key:
prop = ""
@ -214,25 +216,25 @@ def extractPath(pathtyp, path):
else:
prop = work
key = key[8:]
print("setprop " + key + " = " + prop)
if verbose: print("setprop " + key + " = " + prop)
if hasattr(job.par, key): delattr(job.par, key)
setattr(job.par, key, val)
else:
print("val not not ok " + val + " zu " + key)
if verbose: print("val not not ok " + val + " zu " + key)
elif "job.par" in key:
prop = ""
if i < len(patterlist) - 1:
print("job.par nextdelim " + nextdelim)
if verbose: print("job.par nextdelim " + nextdelim)
prop = work[0:work.index(nextdelim)]
else:
prop = work
key = key[8:]
print("setprop " + key + " = " + prop)
if verbose: print("setprop " + key + " = " + prop)
if hasattr(job.par, key): delattr(job.par, key)
setattr(job.par, key, prop)
work = work.replace(prop, "")
else:
print("val is none " + key)
if verbose: print("val is none " + key)
i = i +1

703
utils/tdata_tool.py

@ -4,268 +4,334 @@
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
the issue of this tool is to transform extern data to the internal structure and the internal structure into extern data - i.e. mostly test-results.
* * * * * * * *
the testdata have several elements
* parameter (-td --tdata) : to identify which testdata should be loaded
* source (flaskdb: dbname / dir: filename) : always structured in a table (easy to specify) with columns
* node : where the rows are
* action : what should be done - default insert
+ fields : dates in relation of a reference<day or a formula
* interface : configured in components and used in comparison with attributes to each field:
* ignored - if it should be ignored on differences, it is necessary on technical ID-fields
* id-field - necessary
* * * * * * * *
the testdata itself which are written in different artifacts of modern applications are mostly stored as tree
- so as xml, json, always with plain data in the leaf. So the intern structure should be also a tree - in python: dictionary.
"""
import os.path
import inspect
import basic.program
import utils.config_tool
import utils.file_tool
import basic.constants as B
import utils.data_const as D
import utils.path_const as P
import utils.path_tool
import utils.date_tool
import basic.step
import utils.i18n_tool
import re
TOOL_NAME = "tdata_tool"
""" name of the tool in order to switch debug-info on """
TDATA_NODES = [ D.CSV_BLOCK_OPTION ]
list_blocks = {} # lists of aliases
def getTdataAttr():
def getTestdata(job=None):
"""
get the testdata from one of the possible sources
for the testcase resp testsuite of the job
:return:
"""
if job is None:
job = basic.program.Job.getInstance()
out = {} #
out[D.ATTR_SRC_TYPE] = D.DATA_SRC_DIR
print("---getTdataAttr")
print(vars(job.par))
if hasattr(job.par, B.PAR_TESTCASE):
out[D.ATTR_SRC_NAME] = getattr(job.par, B.PAR_TESTCASE)
elif hasattr(job.par, B.PAR_TESTSUITE):
out[D.ATTR_SRC_NAME] = getattr(job.par, B.PAR_TESTSUITE)
for p in [D.ATTR_SRC_TYPE, D.ATTR_SRC_DATA, D.ATTR_SRC_NAME]:
# out[p] = ""
if hasattr(job.par, p):
out[p] = getattr(job.par, p)
return out
if "testcase" in job.program:
return collectTestdata(B.PAR_TESTCASE, getattr(job.par, B.PAR_TESTCASE), job)
else:
return collectTestdata(B.PAR_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), job)
def getTestdata():
def collectTestdata(gran, testentity, job):
"""
get the testdata from one of the possible soources
* dir: each file in the specific testarchiv
* csv: specific file
* db: specific db with a testcase-catalogue
collects the testdata from kind of the possible sources
for the testcase resp testsuite
:return:
"""
job = basic.program.Job.getInstance()
#reftyp = getattr(job.par, "tdtyp")
#source = getattr(job.par, "tdsrc")
#criteria = getattr(job.par, "tdname")
tdata = getTdataAttr() # {"reftyp": reftyp, "source": source, "criteria": criteria}
print(tdata)
if tdata[D.ATTR_SRC_TYPE] == "flaskdb":
# read data-structure with sourcename
# connect to source
# select with all data with datastructure
job.m.setInfo("Test-Data readed from " + tdata[D.ATTR_SRC_TYPE] + " for " + tdata[D.ATTR_SRC_NAME])
elif tdata[D.ATTR_SRC_TYPE] == D.DATA_SRC_CSV:
# read file in testdata
job.m.logInfo("Test-Data readed from " + tdata[D.ATTR_SRC_TYPE] + " for " + tdata[D.ATTR_SRC_NAME])
elif tdata[D.ATTR_SRC_TYPE] == D.DATA_SRC_DIR:
path = os.path.join(job.conf.getJobConf(B.SUBJECT_PATH+":"+B.ATTR_PATH_TDATA), tdata[D.ATTR_SRC_NAME])
filename = os.path.join(path , "testspec.csv")
data = getCsvSpec(job.m, filename, D.CSV_SPECTYPE_DATA)
for k in data:
tdata[k] = data[k]
if (k == D.CSV_BLOCK_OPTION):
for p in data[k]:
setattr(job.par, p, data[k][p])
files = utils.file_tool.getFiles(job.m, path, "table_", None)
setBlockLists(job)
if gran == B.PAR_TESTCASE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(P.KEY_TESTCASE, getattr(job.par, B.PAR_TESTCASE), "", job)
if gran == B.PAR_TESTSUITE:
basispath = utils.path_tool.rejoinPath(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], testentity)
pathname = utils.config_tool.getConfigPath(P.KEY_TESTSUITE, getattr(job.par, B.PAR_TESTSUITE), "", job)
if pathname[-3:] == D.DFILE_TYPE_CSV:
tdata = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
tdata = utils.file_tool.readFileDict(pathname, job.m)
# get explicit specdata of includes
if D.CSV_BLOCK_IMPORT in tdata:
for pathname in tdata[D.CSV_BLOCK_IMPORT]:
pathname = utils.path_tool.rejoinPath(pathname)
if job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA] not in pathname:
pathname = utils.path_tool.rejoinPath(basispath, pathname)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
data = utils.file_tool.readFileDict(pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
# get implicit specdata of spec-library
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
files = utils.file_tool.getFiles(job.m, basispath, prefix, None)
if len(files) < 0:
continue
for f in files:
print(f)
filename = os.path.join(path, f)
data = readCsv(job.m, filename, None)
table = f[6:-4]
print(filename+" "+table)
if B.DATA_NODE_TABLES not in tdata:
tdata[B.DATA_NODE_TABLES] = {}
tdata[B.DATA_NODE_TABLES][table] = data[B.DATA_NODE_TABLES][table]
if f in tdata[D.CSV_BLOCK_TABLES]:
continue
pathname = utils.path_tool.rejoinPath(basispath, f)
if pathname[-3:] == D.DFILE_TYPE_CSV:
data = getCsvSpec(job.m, pathname, D.CSV_SPECTYPE_DATA)
else:
job.m.setFatal("test-Data: reftyp " + tdata[D.ATTR_SRC_TYPE] + " is not implemented")
data = utils.file_tool.readFileDict(pathname, job.m)
for table in data[D.CSV_BLOCK_TABLES]:
if table in tdata[D.CSV_BLOCK_TABLES]:
print("Fehler")
tdata[D.CSV_BLOCK_TABLES][table] = data[D.CSV_BLOCK_TABLES][table]
# fill the options into job-parameter
for p in tdata[D.CSV_BLOCK_OPTION]:
setattr(job.par, p, tdata[D.CSV_BLOCK_OPTION][p])
return tdata
def getCsvSpec(msg, filename, type):
def setBlockLists(job):
for block in D.LIST_BLOCK_CONST + D.LIST_ATTR_CONST + D.LIST_DFNAME_CONST:
list = utils.i18n_tool.I18n.getInstance().getAliasList(block+"='"+eval("D."+block)+"'")
#list.append(eval("D."+block))
list_blocks[eval("D." + block)] = []
for x in list:
list_blocks[eval("D." + block)].append(x.lower())
def readCsv(msg, filename, comp, aliasNode="", job=None):
if job is None:
job = basic.program.Job.getInstance()
lines = utils.file_tool.readFileLines(filename, msg)
print("readCsv "+filename)
return parseCsv(msg, filename, lines, comp, aliasNode, job)
def parseCsv(msg, filename, lines, comp, aliasNode="", job=None):
if job is None:
job = basic.program.Job.getInstance()
if len(list_blocks) < 1:
setBlockLists(job)
tdata = {}
if len(aliasNode) < 1:
print(str(list_blocks))
aliasNode = extractAliasNode(filename, comp, job)
if len(aliasNode) > 3:
tdata[D.DATA_ATTR_ALIAS] = aliasNode
return parseCsvSpec(msg, lines, D.CSV_SPECTYPE_DATA, tdata, job)
def extractAliasNode(filename, comp, job):
basename = os.path.basename(filename)[0:-4]
for prefix in list_blocks[D.DFILE_TABLE_PREFIX]:
if basename.find(prefix) == 0:
basename = basename[len(prefix):]
if comp is None:
return ""
if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basename in comp.conf[B.DATA_NODE_DDL]:
return B.DATA_NODE_TABLES+":"+basename
return ""
def getCsvSpec(msg, filename, ttype, job=None):
"""
get data from a csv-file
a = field[0] delimited by :
a) data : like a table with data-array of key-value-pairs
a_0 is keyword [option, step, CSV_HEADER_START ]
a_0 : { a_1 : { f_1 : v_1, .... } # option, step
a_0 : { .. a_n : { _header : [ .. ], _data : [ rows... ] # table, node
b) tree : as a tree - the rows must be unique identified by the first column
a_0 is keyword in CSV_HEADER_START
a_0 : { .. a_n : { _header : [ fields.. ], _data : { field : value }
c) keys : as a tree - the rows must be unique identified by the first column
a_0 is keyword in CSV_HEADER_START
a_1 ... a_n is key characterized by header-field like _fk* or _pk*
a_0 : { .. a_n : { _keys : [ _fpk*.. ] , _header : [ fields.. ], _data : { pk_0 : { ... pk_n : { field : value }
d) conf:
_header : [ field_0, ... ]
{ field_0 : { attr_0 : val_0, .. }, field_1 : { ... }, ... }
reads the specification from a csv-file and maps it into the internal data-structure
:param msg:
:param filename:
:param type:
:param job:
:return:
"""
if job is None:
job = basic.program.Job.getInstance()
lines = utils.file_tool.readFileLines(filename, msg)
return parseCsvSpec(msg, lines, type)
tdata = {} # the result
return parseCsvSpec(msg, lines, ttype, tdata, job)
def parseCsvSpec(msg, lines, ttype, tdata, job=None):
"""
def parseCsvSpec(msg, lines, type):
:param msg:
:param lines:
:param type:
:param job:
:return:
"""
if job is None:
job = basic.program.Job.getInstance()
data = {}
header = []
h = [] # from a[]
if len(list_blocks) < 1:
setBlockLists(job)
status = "start"
tableDate = utils.date_tool.getActdate(utils.date_tool.F_DE)
tableDict = {}
verbose = False
tableAttr = {} # table
tableDict = {} # table
for l in lines:
print("lines "+l)
fields = l.split(D.CSV_DELIMITER)
if verbose: print("lines "+l)
fields = splitFields(l, D.CSV_DELIMITER, job)
# check empty line, comment
if (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1):
if (len(fields) < 1) or (len(l.strip().replace(D.CSV_DELIMITER,"")) < 1):
status = "start"
continue
if (fields[0][0:1] == "#"):
continue
a = fields[0].lower().split(":")
# keywords option, step, table
if a[0] not in data and (a[0] in TDATA_NODES):
data[a[0]] = {}
if (a[0].lower() == D.CSV_BLOCK_STEP):
if (not B.DATA_NODE_STEPS in data):
data[B.DATA_NODE_STEPS] = []
step = basic.step.parseStep(job, fields)
"""
step = {}
step[B.DATA_NODE_COMP] = fields[D.STEP_COMP_I]
step[B.ATTR_EXEC_REF] = fields[D.STEP_EXECNR_I]
step[B.ATTR_DATA_REF] = fields[D.STEP_REFNR_I]
step[B.ATTR_STEP_ARGS] = {}
if D.STEP_ARGS_I == D.STEP_LIST_I:
args = ""
for i in range(D.STEP_ARGS_I, len(fields)):
if len(fields[i]) < 1:
if verbose: print(str(a)+" -- "+str(fields))
tableAttr = setTableAttribute(tableAttr, a[0], fields[1], job)
if (tableAttr["_hit"]):
status = "TABLE_ALIAS"
continue
if fields[i][0:1] == "#":
if (a[0].lower() in list_blocks[D.CSV_BLOCK_HEAD]):
if verbose: print("head "+l)
setTdataLine(tdata, fields, D.CSV_BLOCK_HEAD, job)
status = "start"
continue
args += "," + fields[i]
args = args[1:]
else:
args = fields[D.STEP_ARGS_I]
a = args.split(",")
for arg in a:
print("arg "+arg)
b = arg.split(":")
if len(b) < 2:
raise Exception(D.EXCP_MALFORMAT + "" + l)
step[B.ATTR_STEP_ARGS][b[0]] = b[1]
"""
data[B.DATA_NODE_STEPS].append(step)
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_OPTION]):
if verbose: print("option " + l)
setTdataLine(tdata, fields, D.CSV_BLOCK_OPTION, job)
status = "start"
continue
elif (a[0].lower() == D.CSV_BLOCK_OPTION):
if len(a) < 2:
raise Exception(D.EXCP_MALFORMAT+""+l)
data[a[0]][a[1]] = fields[1]
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_STEP]):
if verbose: print("step "+l)
step = basic.step.parseStep(job, fields)
if D.CSV_BLOCK_STEP not in tdata:
tdata[D.CSV_BLOCK_STEP] = []
tdata[D.CSV_BLOCK_STEP].append(step)
status = "start"
continue
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_IMPORT]):
if verbose: print("includes " + l)
if D.CSV_BLOCK_IMPORT not in tdata:
tdata[D.CSV_BLOCK_IMPORT] = []
tdata[D.CSV_BLOCK_IMPORT].append(fields[1])
status = "start"
continue
elif a[0].lower() == D.DATA_ATTR_DATE:
tableDate = fields[1]
elif (a[0].lower() in D.CSV_HEADER_START):
# create deep structure a_0 ... a_n
print("tdata 136 CSV_HEADER_START "+str(len(a)))
elif (a[0].lower() in list_blocks[D.CSV_BLOCK_TABLES]):
if verbose: print("tables "+l)
h = a
header = []
if B.DATA_NODE_TABLES not in data:
data[B.DATA_NODE_TABLES] = {}
h[0] = B.DATA_NODE_TABLES
comps = {}
tableDict = getTabContent(msg, data, h)
i = 0
for f in fields:
i += 1
if i <= 1:
if ttype == D.CSV_SPECTYPE_CONF:
del h[0]
tableDict = getTdataContent(msg, tdata, h)
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
elif (status == D.CSV_SPECTYPE_DATA):
tableDict = getTdataContent(msg, tdata, h)
if verbose: print("setTableData "+str(h)+" "+str(tableDict))
setTableData(tableDict, fields, ttype, job)
elif (status == "TABLE_ALIAS") and D.DATA_ATTR_ALIAS in tdata:
alias = tdata[D.DATA_ATTR_ALIAS]
b = alias.split(":")
h = [B.DATA_NODE_TABLES] + b
tableDict = getTdataContent(msg, tdata, h)
tableDict[D.DATA_ATTR_ALIAS] = alias
fields = [alias] + fields
setTableHeader(tableDict, tableAttr, fields, ttype, job)
status = D.CSV_SPECTYPE_DATA
if ttype == D.CSV_SPECTYPE_CONF:
header = []
for k in tdata:
if k in D.LIST_DATA_ATTR:
continue
if B.DATA_NODE_DATA in tdata[k]:
tdata[k].pop(B.DATA_NODE_DATA)
for f in tdata[k]:
if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR:
continue
if len(f) < 1:
break
header.append(f)
tdata[k][B.DATA_NODE_HEADER] = header
header = []
if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]:
for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]:
if k in tdata[B.DATA_NODE_TABLES]:
if verbose: print("Error")
else:
tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k]
tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES)
return tdata
def setTableHeader(tableDict, tableAttr, fields, ttype, job):
header = []
for i in range(1, len(fields)):
header.append(fields[i].strip())
tableDict[B.DATA_NODE_HEADER] = header
print("tdata 165 header "+str(header))
if type == D.CSV_SPECTYPE_TREE:
for attr in tableAttr:
tableDict[attr] = tableAttr[attr]
# preparate the sub-structure for row-data
if ttype == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA] = {}
elif type == D.CSV_SPECTYPE_KEYS:
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS] = {}
elif type == D.CSV_SPECTYPE_CONF:
tableDict = {}
headerFields = []
tableDict[D.DATA_ATTR_KEY] = 1
if D.DATA_ATTR_KEY in tableAttr:
tableDict[D.DATA_ATTR_KEY] = header.index(tableAttr[D.DATA_ATTR_KEY]) + 1
else:
tableDict[B.DATA_NODE_DATA] = []
tableDict[D.DATA_ATTR_DATE] = tableDate
setTabContent(msg, data, tableDict, h)
status = D.CSV_SPECTYPE_DATA
continue
elif (status == D.CSV_SPECTYPE_DATA):
# check A-col for substructure
# fill data
tableDict = getTabContent(msg, data, h)
return tableDict
def setTableData(tableDict, fields, ttype, job):
row = {}
print(fields)
if ttype == D.CSV_SPECTYPE_DATA and ":" not in fields[0] and D.DATA_ATTR_ALIAS in tableDict:
fields = [tableDict[D.DATA_ATTR_ALIAS]] + fields
i = 1
# case-differentiation DATA or TREE
for f in header:
print(str(i)+" "+str(len(fields))+" "+str(len(header)))
row[f] = fields[i]
if type == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA][f] = fields[i]
for f in tableDict[B.DATA_NODE_HEADER]:
row[f] = fields[i].strip()
i += 1
if type == D.CSV_SPECTYPE_DATA:
print("parseSpec "+ str(fields[0]))
if ttype == D.CSV_SPECTYPE_DATA:
if B.ATTR_DATA_COMP in tableDict:
tcomps = tableDict[B.ATTR_DATA_COMP]
else:
tcomps = {}
row[B.ATTR_DATA_COMP] = {}
for c in fields[0].split(","):
a = c.split(":")
print("parseSpec " + str(a))
comps[a[0]] = a[1]
row[B.ATTR_DATA_COMP][a[0]] = a[1]
#row[B.ATTR_DATA_COMP] = fields[0].split(",")
tableDict[B.ATTR_DATA_COMP] = comps
tcomps[a[0]] = a[1]
row[B.ATTR_DATA_COMP][a[0]] = a[1].strip()
tableDict[B.DATA_NODE_DATA].append(row)
elif type == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS][fields[1]] = row
elif type == D.CSV_SPECTYPE_CONF:
tableDict[B.ATTR_DATA_COMP] = tcomps
elif ttype == D.CSV_SPECTYPE_KEYS:
tableDict[D.CSV_NODETYPE_KEYS][fields[tableDict[D.DATA_ATTR_KEY]].strip()] = row
elif ttype == D.CSV_SPECTYPE_CONF:
tableDict[fields[1]] = row
headerFields.append(fields[1])
setTabContent(msg, data, tableDict, h)
if (status in [D.CSV_SPECTYPE_DATA, D.CSV_SPECTYPE_KEYS]):
tableDict = getTabContent(msg, data, h)
if type == D.CSV_SPECTYPE_CONF:
tableDict[B.DATA_NODE_HEADER] = headerFields
setTabContent(msg, data, tableDict, h)
if type == D.CSV_SPECTYPE_CONF:
data = data[B.DATA_NODE_TABLES]
print("return getCsvSpec "+str(data))
return data
return tableDict
def mergeTableComponents(comps, rowComps):
for c in rowComps.split(","):
a = c.split(":")
comps[a[0]] = a[1]
return comps
def setTableAttribute(tableAttr, key, val, job):
for attr in D.LIST_DATA_ATTR:
if (key.lower() in list_blocks[attr]):
tableAttr[attr] = val.strip()
tableAttr["_hit"] = True
return tableAttr
tableAttr["_hit"] = False
return tableAttr
def setTabContent(msg, data, tabledata, path):
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
def setTdataLine(tdata, fields, block, job):
"""
sets field(s) into tdata as a key-value-pair
additional fields will be concatenate to a intern separated list
:param tdata:
:param fields:
:param block:
:param job:
:return:
"""
a = fields[0].lower().split(":")
a[0] = block # normalized key
val = ""
for i in range(1, len(fields)-1):
val += D.INTERNAL_DELIMITER+fields[i]
if len(val) > len(D.INTERNAL_DELIMITER):
val = val[len(D.INTERNAL_DELIMITER):]
setTdataContent(job.m, tdata, val, a)
return tdata
def setTdataContent(msg, data, tabledata, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
data[path[0]][path[1]] = tabledata
elif len(path) == 3:
@ -274,155 +340,54 @@ def setTabContent(msg, data, tabledata, path):
data[path[0]][path[1]][path[2]][path[3]] = tabledata
def getTabContent(msg, data, path):
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
def getTdataContent(msg, data, path):
setTdataStructure(msg, data, path)
if len(path) == 2:
return data[path[0]][path[1]]
elif len(path) == 3:
return data[path[0]][path[1]][path[2]]
elif len(path) == 4:
return data[path[0]][path[1]][path[2]][path[3]]
elif len(path) == 1:
return data[path[0]]
else:
pass
def readCsv(msg, filename, comp, aliasNode=""):
lines = utils.file_tool.readFileLines(filename, msg)
print("readCsv "+filename)
print(lines)
return parseCsv(msg, filename, lines, comp, aliasNode)
return None
def parseCsv(msg, filename, lines, comp, aliasNode=""):
job = basic.program.Job.getInstance()
verify = -4+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "# # # # # # # # parseCsv " + filename + " :" + str(lines))
fields = []
nodes = []
columns = []
output = {}
state = 0
data = {}
tableDict = {}
tableDate = ""
tableCnt = 0
cnt = 0
basename = os.path.basename(filename)[0:-4]
startCols = 1
for line in lines:
fields = line.split(';')
testline = line.replace(";", "")
a = fields[0].split(':')
job.debug(verify, str(state) + " line " + line + " :" + str(len(fields)) + ": " + str(fields))
if len(testline) < 2 and state < 1:
state = 0
elif a[0].lower() == D.DATA_ATTR_DATE:
tableDate = fields[1]
state = 1
elif a[0].lower() == D.DATA_ATTR_COUNT:
tableCnt = fields[1]
state = 1
elif a[0].lower() in D.CSV_HEADER_START or \
(comp is not None and state == 1
and isCompTableFile(comp, filename)):
state = 2
columns = []
h = a
if len(h) < 2 and comp is not None:
a = ["table", basename]
h = a
startCols = 0
cnt = len(fields)
job.debug(verify, str(state) + " cnt " + str(cnt))
data[B.DATA_NODE_TABLES] = {}
h[0] = B.DATA_NODE_TABLES
if not aliasNode.isspace() and len(aliasNode) > 3:
struct = aliasNode.split(":")
for x in struct:
if len(x) > 2:
nodes.append(x)
job.debug(verify, str(state) + " nodes " + str(nodes))
elif len(h) > 1:
for i in range(1, len(h)):
nodes.append(h[i])
job.debug(verify, str(state) + " nodes " + str(nodes))
tableDict = getTabContent(msg, data, h)
tableDict[B.ATTR_DATA_COMP] = {}
if len(tableDate) > 6:
tableDict[D.DATA_ATTR_DATE] = tableDate
if int(tableCnt) > 0:
tableDict[D.DATA_ATTR_COUNT] = tableCnt
j = 0
for i in range(1, cnt):
if fields[i][0:1] == "_":
startCols += 1
continue
job.debug(verify, str(i) + " cnt " + str(fields[i]))
if len(fields[i]) > 0:
columns.append(fields[i])
j = j + 1
cnt = j
tableDict[B.DATA_NODE_HEADER] = columns
job.debug(verify, str(state) + " " + str(cnt) + " cols " + str(columns))
elif state >= 2 and len(testline) > 2:
job.debug(verify, str(state) + " " + str(len(testline)))
tableDict = getTabContent(msg, data, h)
state = 3
row = {}
print(line)
if startCols > 0:
row[B.ATTR_DATA_COMP] = {}
row[B.ATTR_DATA_COMP][a[0]] = a[1]
tableDict[B.ATTR_DATA_COMP][a[0]] = a[1]
for i in range(startCols, cnt+startCols):
print("for "+str(i)+" "+str(len(row))+" "+str(startCols)+" "+str(len(fields)))
print(str(fields[i]))
if i >= len(columns)+startCols:
break
row[columns[i-startCols]] = fields[i]
job.debug(verify, str(state) + " row " + str(row))
if B.DATA_NODE_DATA not in tableDict:
tableDict[B.DATA_NODE_DATA] = []
tableDict[B.DATA_NODE_DATA].append(row)
setTabContent(msg, data, tableDict, h)
elif state == 3:
job.debug(verify, "structure " + str(state) + ": " + str(nodes))
state = 0
def setTdataStructure(msg, data, path):
if len(path) >= 1 and path[0] not in data:
data[path[0]] = {}
if len(path) >= 2 and path[1] not in data[path[0]]:
data[path[0]][path[1]] = {}
if len(path) >= 3 and path[2] not in data[path[0]][path[1]]:
data[path[0]][path[1]][path[2]] = {}
if len(path) >= 4 and path[3] not in data[path[0]][path[1]][path[2]]:
data[path[0]][path[1]][path[2]][path[3]] = {}
return data
def setSubnode(i, nodes, data, tree):
print("setSubnode " + str(i) + ": " + ": " + str(tree))
if i >= len(nodes):
print("setSubnode a " + str(i))
tree[B.DATA_NODE_DATA] = data
elif tree is not None and nodes[i] in tree.keys():
print("setSubnode b " + str(i))
tree[nodes[i]] = setSubnode(i+1, nodes, data, tree[nodes[i]])
else:
print("setSubnode c " + str(i))
tree[nodes[i]] = setSubnode((i + 1), nodes, data, {})
return tree
def splitFields(line, delimiter, job):
out = []
fields = line.split(delimiter)
for i in range(0, len(fields)):
if fields[i][0:1] == "#":
break
if re.match(r"^\"(.*)\"$", fields[i]):
fields[i] = fields[i][1:-1]
out.append(fields[i])
return out
def getDataStructure(comp):
# gets data-structure from the vml in the component-folder
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "getDataStructure " + comp)
def normalizeDataRow(dstruct, xpathtupel, row, referencedate):
# normalize data of the row if necessary
# raw-value is saved as new field with _raw as suffix
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "calcDataRow " + row)
def writeCsvData(filename, tdata, comp, job):
text = ""
if B.DATA_NODE_TABLES in tdata:
for k in tdata[B.DATA_NODE_TABLES]:
text += buildCsvData(tdata[B.DATA_NODE_TABLES][k], k, job)
text += "\n"
utils.file_tool.writeFileText(comp.m, filename, text)
def buildCsvData(filename, tdata, comp):
def buildCsvData(tdata, table, job=None):
"""
writes the testdata into a csv-file for documentation of the test-run
:param teststatus:
@ -430,55 +395,57 @@ def buildCsvData(filename, tdata, comp):
:param comp: if specific else None
:return:
"""
compColumn = not isCompTableFile(comp, filename)
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel(TOOL_NAME)
job.debug(verify, "writeDataTable " + str(comp))
text = ""
for k in [D.DATA_ATTR_DATE, D.DATA_ATTR_COUNT]:
if k in tdata:
text += k+";"+str(tdata[k])+"\n"
header = "table"
header = utils.i18n_tool.I18n.getInstance().getText(f"{B.DATA_NODE_TABLES=}", job)+":"+table
for f in tdata[B.DATA_NODE_HEADER]:
header += ";"+f
if compColumn:
text += header
else:
#text += "_nr;" + header[6:] + "\n"
text += header[6:] + "\n"
header += D.CSV_DELIMITER+f
text += header + "\n"
i = 0
for r in tdata[B.DATA_NODE_DATA]:
row = ""
if B.ATTR_DATA_COMP in r:
for k in r[B.ATTR_DATA_COMP]:
row += ","+k+":"+r[B.ATTR_DATA_COMP][k]
row = row[1:]
i += 1
for f in tdata[B.DATA_NODE_HEADER]:
if f in r:
row += ";"+str(r[f])
row += D.CSV_DELIMITER+str(r[f])
else:
row += ";"
if compColumn:
row += D.CSV_DELIMITER
text += row
else:
text += row[1:]
#text += str(i) + row
text += "\n"
return text
def writeCsvData(filename, tdata, comp):
def buildCsvSpec(tdata, job=None):
text = ""
if B.DATA_NODE_TABLES in tdata:
for k in tdata[B.DATA_NODE_TABLES]:
text += buildCsvData(filename, tdata[B.DATA_NODE_TABLES][k], comp)
text += "\n"
utils.file_tool.writeFileText(comp.m, filename, text)
if D.CSV_BLOCK_IMPORT in tdata:
for k in tdata[D.CSV_BLOCK_HEAD]:
text += utils.i18n_tool.I18n.getInstance().getText(f"{D.CSV_BLOCK_HEAD=}", job)
text += ":"+k+D.CSV_DELIMITER+tdata[D.CSV_BLOCK_HEAD][k]+"\n"
text += "# option:key ;values;..;;;;\n"
if D.CSV_BLOCK_OPTION in tdata:
for k in tdata[D.CSV_BLOCK_OPTION]:
text += utils.i18n_tool.I18n.getInstance().getText(f"{D.CSV_BLOCK_OPTION=}", job)
text += ":" + k + D.CSV_DELIMITER + getHeadArgs(tdata[D.CSV_BLOCK_OPTION][k], job)+"\n"
text += "#;;;;;;\n"
if D.CSV_BLOCK_STEP in tdata:
text += basic.step.getStepHeader(job)
i = 1
for step in tdata[D.CSV_BLOCK_STEP]:
text += utils.i18n_tool.I18n.getInstance().getText(f"{D.CSV_BLOCK_STEP=}", job) + ":" + str(i)
text += D.CSV_DELIMITER + step.getStepText(job)
i += 1
text += "#;;;;;;\n"
if D.CSV_BLOCK_TABLES in tdata:
for k in tdata[D.CSV_BLOCK_TABLES]:
text += buildCsvData(tdata[D.CSV_BLOCK_TABLES][k], k, job)
text += "#;;;;;;\n"
return text
def isCompTableFile(comp, filename):
""" check if the filename belongs to the component """
basetable = os.path.basename(filename)[0:-4]
if comp is None:
return False
if B.TOPIC_NODE_DB in comp.conf[B.SUBJECT_ARTS] and basetable in comp.conf[B.DATA_NODE_DDL] \
and comp.name in filename:
return True
return False
def getHeadArgs(value, job):
return value.replace(D.INTERNAL_DELIMITER, D.CSV_DELIMITER)

Loading…
Cancel
Save