Compare commits

...

2 Commits

  1. 2
      basic/Testserver.py
  2. 0
      features/environment.py
  3. 0
      features/steps/steps.py
  4. 49
      model/entity.py
  5. 2
      model/prelease.csv
  6. 6
      model/project.py
  7. 4
      model/table.py
  8. 27
      test/test_04config.py
  9. 10
      test/test_90testserver.py
  10. 4
      test/testtools.py
  11. 5
      tools/data_tool.py
  12. 5
      tools/db_abstract.py
  13. 11
      tools/dbmysql_tool.py
  14. 9
      tools/file_type.py
  15. 0
      tools/make_tool.py

2
basic/Testserver.py

@ -16,7 +16,7 @@ import tools.data_tool
COMP_NAME = B.ATTR_INST_TESTSERVER
# class Testserver(basic.component.Component):
class Testserver():
class Testserver:
"""
the Testserver represents the workspace with all resources for the automation

0
features/environment.py

0
features/steps/steps.py

49
model/entity.py

@ -63,7 +63,7 @@ class Entity:
LIST_SUBTABLES = []
PREFIX_SUBTABLE = ""
def __init__(self, job, entityname: str="" , name: str="", args: dict={}):
def __init__(self, job, entityname: str = "", name: str = "", args: dict = {}):
import model.table
self.job = job
if entityname == "":
@ -110,7 +110,8 @@ class Entity:
self.ddls[entityname][model.table.LISTNAME_FIELDS] = listFields
self.ddls[entityname][model.table.LISTNAME_NODES] = listNodes
self.ddls[entityname][model.table.LISTNAME_SUBTABLE] = listSubtables
# check LISTEN
# check LISTEN ... hard coded vs. configuered
# TODO why hard coded const ??
for f in listFields:
if f not in self.LIST_FIELDS:
raise Exception(entityname + " " + str(self) + " a check list <-> LIST_FIELDS " + f)
@ -133,15 +134,17 @@ class Entity:
raise Exception(entityname + " " + str(self) + " b check list <-> LIST_SUBTABLES " + f)
def get_unique_names(self, job, storage="", project="", application="", gran="", args={}, ttype: str=""):
def get_unique_names(self, job, storage = "", project = "", application = "", gran = "",
ttype: str = "", args: dict = {}) -> list:
"""
gets the entity-names from the defined storage - the field name must be an unique identifier
:param job:
:param opt. storage: values db / files - default files
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:param storage: opt. values db / files - default files
:param project: opt. select-criteria if used and defined
:param application: opt. select-criteria if used and defined
:param gran: opt. granularity values testcase / testsuite / testplan
:param ttype: opt. ddd
:param args: opt. additional args
:return: list of entity-names
"""
entityNames = []
@ -157,10 +160,10 @@ class Entity:
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:param project: opt. select-criteria if used and defined
:param application: opt. select-criteria if used and defined
:param gran: opt. granularity values testcase / testsuite / testplan
:param args: opt. additional args
:return: list of entity-names
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
@ -169,11 +172,11 @@ class Entity:
"""
gets the entity-names from the defined storage
:param job:
:param opt. storage: values db / files - default files
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:param storage: opt. values db / files - default files
:param project: opt. select-criteria if used and defined
:param application: opt. select-criteria if used and defined
:param gran: opt. granularity values testcase / testsuite / testplan
:param args: opt. additional args
:return: list of entity-names
"""
entities = []
@ -194,10 +197,10 @@ class Entity:
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:param project: select-criteria if used and defined
:param application: select-criteria if used and defined
:param gran: granularity values testcase / testsuite / testplan
:param args additional args
:return: list of entity-names
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
@ -603,6 +606,9 @@ class Entity:
sql = dbi.getSchemaIndex(table, "actual") + "\n"
return sql
def get_schema(self, tableName, tableObject):
pass
def insert_entity(self, job):
"""
inserts the entity into the database
@ -634,3 +640,4 @@ def read_spec(job, testentity, testgran, specpath):
spec[key] = val
return spec

2
model/prelease.csv

@ -5,4 +5,6 @@ table:prelease;_field;type;format;index;generic;aggregat;key;acceptance;alias;de
;reference;str;vchar(256);N;;;;;;
;project;string;vchar(256);I;;;;;;
;attributes;string;jlob;N;;;;;;
;applications;subtable;subtable;N;;;;;;
;stories;subtable;subtable;N;;;;;;

1 table:prelease _field type format index generic aggregat key acceptance alias description
5 reference str vchar(256) N
6 project string vchar(256) I
7 attributes string jlob N
8 applications subtable subtable N
9 stories subtable subtable N
10

6
model/project.py

@ -167,7 +167,9 @@ class Project(model.entity.Entity):
:param name:
:return:
"""
if table == "" and len(self.ddls) == 0:
if table == "":
table = self.entityname
if len(self.ddls) == 0:
self.insert_entity(job, name=name, table=self.entityname, rows=rows)
# self.setDbAttributes(job, [TABLE_NAME])
dbi = basic.toolHandling.getDbTool(job, self, job.conf[B.TOPIC_NODE_DB]["type"])
@ -184,7 +186,7 @@ class Project(model.entity.Entity):
rows = []
row = {}
for f in self.ddls[table]:
row[f] = getattr(self, f)
row[f] = getattr(self, f, "")
rows.append(row)
dbi.insertRows(job, table, rows)

4
model/table.py

@ -297,7 +297,7 @@ class Table(model.entity.Entity):
return self.read_ddl(job, name, args=args)
# table is not an real entity
def read_ddl(self, job, name, args: dict={}):
def read_ddl(self, job: any, name: str, args: dict = {}):
"""
reads the ddl of the table depending on context
a) component: the ddl is read from specific or general component-folder
@ -305,7 +305,7 @@ class Table(model.entity.Entity):
c) testserver: the ddl is read from model-folder
:param job:
:param name:
:param context:
:param args:
:return:
"""
config = {}

27
test/test_04config.py

@ -17,10 +17,11 @@ import test.constants as T
import test.testtools
import tools.path_const as P
import basic.constants as B
import tools.data_const as D
TEST_FUNCTIONS = ["test_01getConfigPath", "test_02mergeAttributes", "test_03getAttributes",
"test_20getPlainName"]
TEST_FUNCTIONS = ["test_01getConfigPath"]
#TEST_FUNCTIONS = ["test_01getConfigPath"]
verbose = False
class MyTestCase(unittest.TestCase):
@ -56,10 +57,8 @@ class MyTestCase(unittest.TestCase):
cnttest += 1
r = tools.config_tool.select_config_path(job, P.KEY_TESTSUITE, "TST001")
self.assertIn(os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_TDATA], "TESTPROJ", B.SUBJECT_TESTSUITES, "TST001", "test"), r)
r = tools.config_tool.getConfig(job, P.KEY_TOOL, "path")
cnttest += 1
r = tools.config_tool.getConfig(job, P.KEY_TOOL, "path", ttype=D.CSV_SPECTYPE_KEYS)
if verbose: print("pattern " + r["pattern"]["log"])
if verbose: print("pattern " + r["pattern"]["precond"])
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
@ -73,22 +72,22 @@ class MyTestCase(unittest.TestCase):
return
job = test.testtools.getJob()
componentName = "testcm"
confs = tools.config_tool.getConfig(job, "comp", componentName)
confs = tools.config_tool.getConfig(job, "comp", componentName, ttype=D.CSV_SPECTYPE_COMP)
conns = tools.conn_tool.getConnections(job, componentName)
self.assertEqual(confs["conf"][B.TOPIC_INST][B.ATTR_INST_CNT], 1)
self.assertEqual(confs[B.SUBJECT_COMP][B.TOPIC_INST][B.ATTR_INST_CNT], 1)
self.assertEqual(conns[0][B.TOPIC_INST][B.ATTR_INST_CNT], 2)
self.assertNotIn(B.ATTR_INST_SGL, conns[0][B.TOPIC_INST])
confs["conf"] = tools.config_tool.mergeConn(job.m, confs["conf"], conns[0])
self.assertEqual(confs["conf"][B.TOPIC_INST][B.ATTR_INST_CNT], 2)
confs[B.SUBJECT_COMP] = tools.config_tool.mergeConn(job.m, confs[B.SUBJECT_COMP], conns[0])
self.assertEqual(confs[B.SUBJECT_COMP][B.TOPIC_INST][B.ATTR_INST_CNT], 2)
cnttest += 1 # it overwrites
self.assertEqual(confs["conf"][B.TOPIC_INST][B.ATTR_INST_SGL], "n")
self.assertEqual(confs[B.SUBJECT_COMP][B.TOPIC_INST][B.ATTR_INST_SGL], "n")
cnttest += 1 # it keep
componentName = "testprddb"
confs = tools.config_tool.getConfig(job, "comp", componentName)
confs = tools.config_tool.getConfig(job, "comp", componentName, ttype=D.CSV_SPECTYPE_COMP)
conns = tools.conn_tool.getConnections(job, componentName)
self.assertNotIn(B.ATTR_ARTS_TYPE, confs["conf"][B.SUBJECT_ARTIFACTS][B.TOPIC_NODE_DB])
confs["conf"] = tools.config_tool.mergeConn(job.m, confs["conf"], conns[0])
self.assertIn(B.ATTR_ARTS_TYPE, confs["conf"][B.SUBJECT_ARTIFACTS][B.TOPIC_NODE_DB])
self.assertNotIn(B.ATTR_ARTS_TYPE, confs[B.SUBJECT_COMP][B.SUBJECT_ARTIFACTS][B.TOPIC_NODE_DB])
confs[B.SUBJECT_COMP] = tools.config_tool.mergeConn(job.m, confs[B.SUBJECT_COMP], conns[0])
self.assertIn(B.ATTR_ARTS_TYPE, confs[B.SUBJECT_COMP][B.SUBJECT_ARTIFACTS][B.TOPIC_NODE_DB])
cnttest += 1 # new attribute
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)

10
test/test_90testserver.py

@ -12,7 +12,7 @@ import basic.constants as B
# the list of TEST_FUNCTIONS defines which function will be really tested.
# if you minimize the list you can check the specific test-function
TEST_FUNCTIONS = ["test_01createTestserver", "test_02getDBSchema", "test_11createDBTables", "test_11syncApplication"]
TEST_FUNCTIONS = ["test_02getDBSchema"]
#TEST_FUNCTIONS = ["test_02getDBSchema"]
# with this variable you can switch prints on and off
verbose = False
@ -34,8 +34,9 @@ class MyTestCase(unittest.TestCase):
if B.TOPIC_NODE_DB in job.conf:
self.assertIn(B.TOPIC_NODE_DB, testserver.conf[B.TOPIC_CONN])
self.assertIn(B.ATTR_DB_DATABASE, testserver.conf[B.TOPIC_CONN][B.TOPIC_NODE_DB])
self.assertIn(B.DATA_NODE_DDL, testserver.conf)
self.assertIn("application", testserver.conf[B.DATA_NODE_DDL])
# 2024-04-21 auskommentiert, Testserver war neu aufgestellt
# self.assertIn(B.DATA_NODE_DDL, testserver.conf)
# self.assertIn("application", testserver.conf[B.DATA_NODE_DDL])
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_02getDBSchema(self):
@ -91,6 +92,8 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
"""
2024-04-21: auskommentiert, denn createDBTables durch createAdminTables erstellt
if B.TOPIC_NODE_DB not in job.conf:
job.conf[B.TOPIC_NODE_DB] = {}
job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE] = "rel"
@ -101,6 +104,7 @@ class MyTestCase(unittest.TestCase):
sql = testserver.model[t].get_schema(tableName=t, tableObject=testserver.model[t])
print(sql)
#testserver.createDBTables(job)
"""
def test_zzz(self):
if verbose: print(MyTestCase.mymsg)

4
test/testtools.py

@ -105,8 +105,8 @@ def getComp(job, componentName=""):
componentName = DEFAULT_COMP
comp.conf = {}
comp.name = componentName
confs = tools.config_tool.getConfig(job, "comp", componentName)
confs = tools.config_tool.getConfig(job, "comp", componentName, ttype=D.CSV_SPECTYPE_COMP)
conns = tools.conn_tool.getConnections(job, componentName)
comp.conf = confs["conf"]
comp.conf = confs[B.SUBJECT_COMP]
comp.conf[B.TOPIC_CONN] = conns[0]
return comp

5
tools/data_tool.py

@ -28,6 +28,11 @@ def getPluralKeyword(inkey):
return getPurKeyword(inkey)+"s"
def getSingularKeyword(inkey):
"""
singular word is mostly without ending s, except \"stories\"
:param inkey:
:return:
"""
if "stories" in inkey:
return B.SUBJECT_STORY
return getPurKeyword(inkey)

5
tools/db_abstract.py

@ -480,6 +480,11 @@ class DbFcts():
return "idx_"+table+"_"+attr
def getInsertFields(self, ddl):
"""
:param ddl:
:return:
"""
outheader = []
if B.DATA_NODE_KEYS in ddl:
header = ddl[B.DATA_NODE_KEYS].keys()

11
tools/dbmysql_tool.py

@ -82,8 +82,15 @@ class DbFcts(tools.dbrel_tool.DbFcts):
"""
verify = -1+job.getDebugLevel("db_tool")
attr = self.getDbAttributes(job, B.SVAL_NULL)
insheader = self.getInsertFields(self.comp.conf[B.DATA_NODE_DDL][table])
if len(insheader) < len(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]):
if hasattr(self, "comp") and hasattr(self.comp, "conf"):
tableheader = self.comp.conf[B.DATA_NODE_DDL][table]
insheader = self.getInsertFields(tableheader)
elif hasattr(self, "comp") and hasattr(self.comp, "ddls"):
tableheader = self.comp.getFieldList()
insheader = tableheader
else:
tableheader = []
if len(insheader) < 10: # len(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]):
lastid = 1
else:
lastid = 0

9
tools/file_type.py

@ -28,7 +28,7 @@ def rebuild_tdata(job, tdata: dict, tableAttr: dict, ttype:str) -> dict:
elif ttype + "s" in B.LIST_SUBJECTS or ttype == B.SUBJECT_USER:
enty = model.factory.get_entity_object(job, ttype, {})
return enty.rebuild_data(job, tdata)
elif ttype in ["basic", "tool"]:
elif ttype in ["basic", "tool", D.CSV_SPECTYPE_KEYS, D.CSV_SPECTYPE_COMP]:
return tdata
else:
raise Exception("ttype is not defined " + ttype)
@ -112,7 +112,9 @@ def check_tdata(job, tdata: dict, ttype:str) -> dict:
elif ttype + "s" in B.LIST_SUBJECTS or ttype == B.SUBJECT_USER:
enty = model.factory.get_entity_object(job, ttype, {})
return enty.check_data(job, tdata)
elif ttype in ["basic"]:
elif ttype in [D.CSV_SPECTYPE_KEYS]:
return tdata
elif ttype in ["basic", "tool"]:
return tdata
else:
job.m.logError("ttype is not defined " + ttype)
@ -147,7 +149,8 @@ def checkComp(job, tdata: dict) -> dict:
checkNodes[MUST_NODES] = [B.SUBJECT_ARTIFACTS, B.SUBJECT_STEPS, "functions", B.SUBJECT_DATATABLES]
checkNodes[MUSTNT_NODES] = [B.DATA_NODE_DATA]
checkNodes[OPT_NODES] = []
return check_nodes(job, tdata, checkNodes)
check_nodes(job, tdata[B.SUBJECT_COMP], checkNodes)
return tdata
class DatatypeCatalog():
"""

0
tools/make_tool.py

Loading…
Cancel
Save