Browse Source

unit complete execution

master
Ulrich Carmesin 3 years ago
parent
commit
6883904045
  1. 173
      test/test_compare.py
  2. 11
      test/test_config.py
  3. 5
      test/test_conn.py
  4. 7
      test/test_css.py
  5. 19
      test/test_db.py
  6. 14
      test/test_file.py
  7. 9
      test/test_job.py
  8. 16
      test/test_main.py
  9. 19
      test/test_path.py
  10. 13
      test/test_tdata.py
  11. 3
      test/test_toolhandling.py
  12. 72
      test/test_xml.py
  13. 37
      unit_run.py
  14. 33
      utils/config_tool.py
  15. 16
      utils/conn_tool.py
  16. 15
      utils/css_tool.py
  17. 30
      utils/file_tool.py
  18. 126
      utils/match_tool.py
  19. 28
      utils/path_tool.py
  20. 138
      utils/tdata_tool.py
  21. 6
      utils/xml_tool.py

173
test/test_compare.py

@ -0,0 +1,173 @@
import json
import unittest
from basic.program import Job
import utils.match_tool
import components.component
tdata = {
"postReq": {
"database": {
"scheme": {
"table": {
"_data": []
}
}
}
},
"preAct": {
"database": {
"scheme": {
"table": {
"_data": [
{"id": 1, "name": "Brecht", "year": 2014, "position": "top", "hobby": "meaning"},
{"id": 2, "name": "Meier", "year": 2015, "position": "first", "hobby": "reading" },
{"id": 3, "name": "Smith", "year": 2018, "position": "second", "hobby": "writing"},
{"id": 4, "name": "Smith", "year": 2019, "position": "third", "hobby": "executing"}
]
}
}
}
},
"postAct": {
"database": {
"scheme": {
"table": {
"_data": [
{"id": 1, "name": "Brecht", "year": 2014, "position": "top", "hobby": "meaning"},
{"id": 2, "name": "Maier", "year": 2015, "position": "first", "hobby": "reading"},
{"id": 4, "name": "Smith", "year": 2018, "position": "second", "hobby": "writing"},
{"id": 3, "name": "Smith", "year": 2019, "position": "third", "hobby": "executing"}
]
}
}
}
}
}
conf = {
"ddl": {
"database": {
"scheme": {
"table": {
"id": { "feld": "id", "type": "int", "acceptance": "ignore", "key": "T:3" },
"name": { "feld": "name", "type": "string", "acceptance": "must", "key": "F:1" },
"year": { "feld": "year", "type": "int", "acceptance": "must", "key": "F:2" },
"position": { "feld": "id", "type": "string", "acceptance": "must", "key": "" },
"hobby": { "feld": "id", "type": "string", "acceptance": "must", "key": "" },
"_header": ["id", "name", "year", "position", "hobby"]
}
}
}
}
}
class MyTestCase(unittest.TestCase):
def runTest(self):
self.test_matchstart()
self.test_hitmanage()
self.test_similarity()
self.test_bestfit()
self.test_compareRow()
self.test_compareRows()
self.test_match()
def test_matchstart(self):
job = Job("unit")
comp = components.component.Component()
comp.files = { "A": "/home/match/per.csv", "B": "/home/match/post.csv"}
comp.conf = conf
matching = utils.match_tool.Matching(comp)
matching.setData(tdata, utils.match_tool.MATCH_PREPOST)
print(matching.htmltext)
def test_hitmanage(self):
comp = components.component.Component()
comp.files = { "A": "/home/match/per.csv", "B": "/home/match/post.csv"}
comp.conf = conf
matching = utils.match_tool.Matching(comp)
matching.linksA = { "a0001": "null", "a0002": "null", "a0003": "null", "a0004": "null"}
matching.linksB = { "b0001": "null", "b0002": "null", "b0003": "null", "b0004": "null"}
self.assertEqual(matching.isHitA("a0001"), False, "with value null")
self.assertEqual(matching.isHitB("b0005"), False, "doesnt exist")
self.assertEqual(("b0005" not in matching.linksB), True, "doesnt exist")
matching.setHit("a0001", "b0005")
self.assertEqual(matching.isHitA("a0001"), True, "with value null")
self.assertEqual(matching.isHitB("b0005"), True, "doesnt exist")
self.assertEqual(("b0005" in matching.linksB), True, "doesnt exist")
def test_similarity(self):
matching = self.getMatching()
utils.match_tool.getSimilarity(matching, ":database:scheme:table:_data",
tdata["preAct"]["database"]["scheme"]["table"]["_data"][0],
tdata["postAct"]["database"]["scheme"]["table"]["_data"][0],1)
def test_bestfit(self):
job = Job("unit")
comp = components.component.Component()
comp.files = { "A": "/home/match/per.csv", "B": "/home/match/post.csv"}
comp.conf = conf
matching = utils.match_tool.Matching(comp)
matching.sideA = tdata["preAct"]["database"]["scheme"]["table"]["_data"]
matching.sideB = tdata["postAct"]["database"]["scheme"]["table"]["_data"]
utils.match_tool.matchBestfit(matching, ":database:scheme:table:_data")
print(json.dumps(matching.linksA))
print(json.dumps(matching.linksB))
print(json.dumps(matching.nomatch))
utils.match_tool.matchRestfit(matching)
print(json.dumps(matching.linksA))
print(json.dumps(matching.linksB))
print(json.dumps(matching.nomatch))
def test_compareRow(self):
job = Job("unit")
comp = components.component.Component()
comp.files = { "A": "/home/match/per.csv", "B": "/home/match/post.csv"}
comp.conf = conf
matching = self.getMatching()
matching.sideA = tdata["preAct"]["database"]["scheme"]["table"]["_data"]
matching.sideB = tdata["postAct"]["database"]["scheme"]["table"]["_data"]
ddl = conf["ddl"]["database"]["scheme"]["table"]
header = []
for f in ddl["_header"]:
header.append({"field": f, "type": ddl[f]["type"], "acceptance": ddl[f]["acceptance"]})
i = 1
text = utils.match_tool.compareRow(matching, header, tdata["preAct"]["database"]["scheme"]["table"]["_data"][i], tdata["postAct"]["database"]["scheme"]["table"]["_data"][i])
print(text)
def test_compareRows(self):
job = Job("unit")
comp = components.component.Component()
comp.files = { "A": "/home/match/per.csv", "B": "/home/match/post.csv"}
comp.conf = conf
matching = self.getMatching()
matching.sideA = tdata["preAct"]["database"]["scheme"]["table"]["_data"]
matching.sideB = tdata["postAct"]["database"]["scheme"]["table"]["_data"]
linksA = {"a0001": "b0001", "a0002": "b0002" }
matching.linksA = linksA
text = utils.match_tool.compareRows(matching, ":database:scheme:table:_data")
print(text)
def test_match(self):
job = Job("unit")
comp = components.component.Component()
comp.files = {"A": "/home/match/per.csv", "B": "/home/match/post.csv"}
# tdata["postReq"] = tdata["preAct"]
comp.conf = conf
matching = utils.match_tool.Matching(comp)
matching.setData(tdata, utils.match_tool.MATCH_POSTCOND)
text = utils.match_tool.matchTree(matching)
print("\n-------------\n")
print(text)
print("\n-------------\n")
print(matching.difftext)
def getMatching(self):
job = Job("unit")
comp = components.component.Component()
comp.files = { "A": "/home/match/per.csv", "B": "/home/match/post.csv"}
comp.conf = conf
matching = utils.match_tool.Matching(comp)
matching.setData(tdata, utils.match_tool.MATCH_PREPOST)
matching.difftext = ""
return matching
if __name__ == '__main__':
unittest.main()

11
test/test_config.py

@ -5,9 +5,13 @@ import test.constants
import os import os
HOME_PATH = test.constants.HOME_PATH HOME_PATH = test.constants.HOME_PATH
VERIFY = False
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def test_something(self): def runTest(self):
self.test_getConfig()
def test_getConfig(self):
job = Job("unit") job = Job("unit")
args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "config_tool", args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "config_tool",
"modus": "unit"} "modus": "unit"}
@ -18,9 +22,10 @@ class MyTestCase(unittest.TestCase):
self.assertEqual(r, None) self.assertEqual(r, None)
r = t.getConfigPath("comp", "testA2") r = t.getConfigPath("comp", "testA2")
r = t.getConfig("tool", "path") r = t.getConfig("tool", "path")
print("pattern " + r["pattern"]["log"]) if VERIFY: print("pattern " + r["pattern"]["log"])
print("pattern " + r["pattern"]["precond"]) if VERIFY: print("pattern " + r["pattern"]["precond"])
if __name__ == '__main__': if __name__ == '__main__':
VERIFY = True
unittest.main() unittest.main()

5
test/test_conn.py

@ -3,7 +3,10 @@ import basic.program
import utils.conn_tool import utils.conn_tool
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def test_something(self): def runTest(self):
self.test_conn()
def test_conn(self):
print("# # # # tetsComponents # # # # #") print("# # # # tetsComponents # # # # #")
job = basic.program.Job("unit") job = basic.program.Job("unit")
args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", "tool" : "job_tool"} args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", "tool" : "job_tool"}

7
test/test_css.py

@ -4,6 +4,9 @@ import basic.program
import json import json
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def runTest(self):
self.test_css()
def test_css(self): def test_css(self):
job = basic.program.Job("unit") job = basic.program.Job("unit")
args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug",
@ -19,7 +22,7 @@ class MyTestCase(unittest.TestCase):
self.assertEqual(len(text), 37) self.assertEqual(len(text), 37)
self.assertEqual(("style" in text), True) self.assertEqual(("style" in text), True)
text = utils.css_tool.getInternalStyle("diffFiles") text = utils.css_tool.getInternalStyle("diffFiles")
self.assertEqual(len(text), 0) self.assertEqual(len(text), 84)
text = utils.css_tool.getExternalStyle("diffFiles") text = utils.css_tool.getExternalStyle("diffFiles")
self.assertEqual(len(text), 0) self.assertEqual(len(text), 0)
# ------- internal --------------- # ------- internal ---------------
@ -29,7 +32,7 @@ class MyTestCase(unittest.TestCase):
self.assertEqual(("class" in text), True) self.assertEqual(("class" in text), True)
text = utils.css_tool.getInternalStyle("diffFiles") text = utils.css_tool.getInternalStyle("diffFiles")
print(text) print(text)
self.assertEqual(len(text), 173) self.assertEqual(len(text), 237)
self.assertEqual(("<style>" in text), True) self.assertEqual(("<style>" in text), True)
text = utils.css_tool.getExternalStyle("diffFiles") text = utils.css_tool.getExternalStyle("diffFiles")
self.assertEqual(len(text), 0) self.assertEqual(len(text), 0)

19
test/test_db.py

@ -1,19 +0,0 @@
import sys, os
import unittest
import basic.program
import utils.conn_tool
sys.path.append((0, os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'lib', 'python3.9', 'site-packages', 'mysql')))
#import mysql.connector
import db_tool
class MyTestCase(unittest.TestCase):
def test_something(self):
job = basic.program.Job("unit")
args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", "tool" : "job_tool"}
job.par.setParameterArgs(args)
print(sys.path)
dbconn = db_tool.getConnector()
print (dbconn)
pass
if __name__ == '__main__':
unittest.main()

14
test/test_file.py

@ -7,6 +7,11 @@ import test.constants
HOME_PATH = test.constants.HOME_PATH HOME_PATH = test.constants.HOME_PATH
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def runTest(self):
self.test_encoding()
self.test_getFiles()
self.test_pathTool()
def test_getFiles(self): def test_getFiles(self):
job = basic.program.Job("unit") job = basic.program.Job("unit")
args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "job_tool", args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "job_tool",
@ -19,7 +24,7 @@ class MyTestCase(unittest.TestCase):
r = t.getFiles(job.m, job.conf.confs.get("paths").get("program") + "/utils", ".*_tool.py", None) r = t.getFiles(job.m, job.conf.confs.get("paths").get("program") + "/utils", ".*_tool.py", None)
self.assertEqual((len(r) > 2), True) self.assertEqual((len(r) > 2), True)
r = t.getFilesRec(job.m, job.conf.confs.get("paths").get("program"), ".*?file.*.py") r = t.getFilesRec(job.m, job.conf.confs.get("paths").get("program"), ".*?file.*.py")
print (r) # print (r)
def test_pathTool(self): def test_pathTool(self):
job = basic.program.Job("unit") job = basic.program.Job("unit")
@ -29,13 +34,14 @@ class MyTestCase(unittest.TestCase):
#self.assertEqual(utils.path_tool.generatePath("program", "komp", "testA", "CONFIG.yml"), #self.assertEqual(utils.path_tool.generatePath("program", "komp", "testA", "CONFIG.yml"),
# os.path.join(HOME_PATH, "components","testA","COFIG.yml")) # os.path.join(HOME_PATH, "components","testA","COFIG.yml"))
def test_encoding(self): def test_encoding(self):
job = basic.program.Job("unit")
print("------- test_encoding") print("------- test_encoding")
encodings = ['utf-8', 'windows-1250', 'iso-8859-1'] encodings = ['utf-8', 'windows-1250', 'iso-8859-1']
res = utils.file_tool.getFileEncoding(os.path.join(HOME_PATH,"test","tdata","encoded_iso8859.txt")) res = utils.file_tool.getFileEncoding(job.m, os.path.join(HOME_PATH,"test","conf","tdata","encoded_iso8859.txt"))
self.assertEqual(res, "iso-8859-1") self.assertEqual(res, "iso-8859-1")
res = utils.file_tool.getFileEncoding(os.path.join(HOME_PATH,"test","tdata","encoded_win1250.txt")) res = utils.file_tool.getFileEncoding(job.m, os.path.join(HOME_PATH,"test","conf","tdata","encoded_win1250.txt"))
self.assertEqual(res, "iso-8859-1") self.assertEqual(res, "iso-8859-1")
res = utils.file_tool.getFileEncoding(os.path.join(HOME_PATH,"test","tdata","encoded_utf8.txt")) res = utils.file_tool.getFileEncoding(job.m, os.path.join(HOME_PATH,"test","conf","tdata","encoded_utf8.txt"))
self.assertEqual(res, "utf-8") self.assertEqual(res, "utf-8")

9
test/test_job.py

@ -8,7 +8,12 @@ import test.constants
HOME_PATH = test.constants.HOME_PATH HOME_PATH = test.constants.HOME_PATH
PYTHON_CMD = "python" PYTHON_CMD = "python"
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def xtest_parameter(self): def runTest(self):
self.test_parameter()
self.test_components()
self.test_run()
def test_parameter(self):
job = Job("unit") job = Job("unit")
args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", "tool" : "job_tool"} args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", "tool" : "job_tool"}
job.par.setParameterArgs(args) job.par.setParameterArgs(args)
@ -20,7 +25,7 @@ class MyTestCase(unittest.TestCase):
args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug",
"tool" : "job_tool", "tsdir": os.path.join(HOME_PATH, "test", "lauf", "V0.1", "startjob", "2021-08-21_18-ß2-01")} "tool" : "job_tool", "tsdir": os.path.join(HOME_PATH, "test", "lauf", "V0.1", "startjob", "2021-08-21_18-ß2-01")}
job.par.setParameterArgs(args) job.par.setParameterArgs(args)
def xtest_components(self): def test_components(self):
print("# # # # tetsComponents # # # # #") print("# # # # tetsComponents # # # # #")
job = Job.resetInstance("unit") job = Job.resetInstance("unit")
args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", "tool" : "job_tool"} args = { "application" : "TEST" , "environment" : "ENV01", "modus" : "unit", "loglevel" : "debug", "tool" : "job_tool"}

16
test/test_main.py

@ -1,16 +0,0 @@
import unittest, os
import utils.path_tool as path_tool
import basic.program as program
import test.constants
HOME_PATH = test.constants.HOME_PATH
class MyTestCase(unittest.TestCase):
def test_pathTool(self):
x = program.Job("test:application=TEST:application=ENV01")
self.assertEqual(path_tool.generatePath("program", "komp", "testA", "CONFIG.yml"),
os.path.join(HOME_PATH, "components","testA","CONFIG.yml"))
if __name__ == '__main__':
unittest.main()

19
test/test_path.py

@ -6,6 +6,9 @@ import test.constants
HOME_PATH = test.constants.HOME_PATH HOME_PATH = test.constants.HOME_PATH
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def runTest(self):
self.test_path()
def test_path(self): def test_path(self):
job = Job("unit") job = Job("unit")
args = {"application": "TEST", "environment": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "job_tool", args = {"application": "TEST", "environment": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "job_tool",
@ -13,23 +16,23 @@ class MyTestCase(unittest.TestCase):
job.par.setParameterArgs(args) job.par.setParameterArgs(args)
pt = utils.path_tool.PathConf() pt = utils.path_tool.PathConf()
r = utils.path_tool.getKeyValue("job.par.application") r = utils.path_tool.getKeyValue("job.par.application")
print(r) #print(r)
r = utils.path_tool.getKeyValue("job.conf.results") r = utils.path_tool.getKeyValue("job.conf.results")
print(r) #print(r)
self.assertEqual(r, os.path.join(HOME_PATH,"test","target")) self.assertEqual(r, os.path.join(HOME_PATH,"test","target"))
r = utils.path_tool.composePath("tcbase", None) r = utils.path_tool.composePath("tcbase", None)
print(r) #print(r)
args = { "application" : "TEST" , "application" : "ENV01", "modus" : "unit", "loglevel" : "debug", args = { "application" : "TEST" , "application" : "ENV01", "modus" : "unit", "loglevel" : "debug",
"tool" : "job_tool", "tsdir": os.path.join(HOME_PATH, "test","lauf","V0.1","startjob","2021-08-21_18-ß2-01")} "tool" : "job_tool", "tsdir": os.path.join(HOME_PATH, "test","conf","lauf","V0.1","startjob","2021-08-21_18-ß2-01")}
job = Job.resetInstance("unit") job = Job.resetInstance("unit")
job.par.setParameterArgs(args) job.par.setParameterArgs(args)
r = utils.path_tool.extractPattern("tsbase" ) r = utils.path_tool.extractPattern("tsbase" )
print(r) #print(r)
self.assertEqual(r[0][1], "job.conf.archiv") self.assertEqual(r[0][1], "job.conf.archiv")
self.assertEqual(r[3][0], "_") self.assertEqual(r[3][0], "_")
r = utils.path_tool.extractPath("tsbase" , os.path.join(HOME_PATH, "test","lauf","V0.1","startjob_2021-08-21_10-02-01")) r = utils.path_tool.extractPath("tsbase" , os.path.join(HOME_PATH, "test","conf","lauf","V0.1","startjob_2021-08-21_10-02-01"))
print("r " + str(r)) #print("r " + str(r))
print(vars(job.par)) #print(vars(job.par))
self.assertEqual(job.par.release, "V0.1") self.assertEqual(job.par.release, "V0.1")
self.assertEqual(job.par.usecase, "startjob") self.assertEqual(job.par.usecase, "startjob")
self.assertEqual(job.par.tltime, "2021-08-21_10-02-01") self.assertEqual(job.par.tltime, "2021-08-21_10-02-01")

13
test/test_tdata.py

@ -3,15 +3,22 @@ import utils.tdata_tool as t
import basic.program import basic.program
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def test_pathTool(self): def runTest(self):
self.test_tdata()
def test_tdata(self):
job = basic.program.Job("unit") job = basic.program.Job("unit")
args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug",
"tool": "job_tool", "tdtyp": "csv", "tdsrc": "implement", "tdname": "firstunit", "tool": "job_tool", "tdtyp": "csv", "tdsrc": "implement", "tdname": "firstunit",
"modus": "unit"} "modus": "unit"}
job.par.setParameterArgs(args) job.par.setParameterArgs(args)
filename = str(job.conf.confs["paths"]["testdata"]) + "/" + getattr(job.par, "tdsrc") + "/" + getattr(job.par, "tdname") + ".csv" filename = str(job.conf.confs["paths"]["testdata"]) + "/" + getattr(job.par, "tdsrc") + "/" + getattr(job.par, "tdname") + ".csv"
tdata = t.readCsv(filename) tdata = t.readCsv(job.m, filename, None)
self.assertEqual(1, 1) self.assertEqual(len(tdata["testa1"]), 3)
setattr(job.par, "tdtyp", "dir")
setattr(job.par, "tdsrc", "TST001")
tdata = t.getTestdata()
self.assertEqual(("steps" in tdata), True)
if __name__ == '__main__': if __name__ == '__main__':

3
test/test_toolhandling.py

@ -10,6 +10,9 @@ HOME_PATH = test.constants.HOME_PATH
conf = {} conf = {}
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
def runTest(self):
self.test_toolhandling()
def test_toolhandling(self): def test_toolhandling(self):
job = basic.program.Job("unit") job = basic.program.Job("unit")
args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "config_tool", args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "config_tool",

72
test/test_xml.py

@ -1,72 +0,0 @@
import unittest
#import basic.program
import utils.xml_tool
import utils.xml_tool
class MyTestCase(unittest.TestCase):
def xtest_xmlTool(self):
#job = basic.program.Job("unit")
args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug",
"tool": "job_tool", "tdtyp": "csv", "tdsrc": "implement", "tdname": "firstunit",
"modus": "unit"}
beispiel_json = {'root': {'@attr': 'xyz', '$': 'inhalt', "b": "bold"}}
tree = {}
tree["root"] = args
xml = utils.xml_tool.dict2xml(tree)
print(xml)
xml = utils.xml_tool.dict2xml(beispiel_json)
print(xml)
self.assertEqual(1, 1)
f = utils.xml_tool.fcts()
def xtest_addSingle(self):
tree = {}
# tree = utils.xml_tool.fcts.addMerkmal(tree, '/root/datensegment/satz[@klasse="4711x"]/mm[@name="NAME"]/wert', 2, "abc")
tree = utils.xml_tool.fcts.addMerkmal(tree, '/root/datensegment/kratz/mm[@name="NAME"]/wert', 2, "abc")
self.assertEqual(tree["kratz"]["mm"][0]["wert"], "abc")
self.assertEqual(tree["kratz"]["mm"][0]["@name"], "NAME")
def xtest_addTwo(self):
# a-b-b
# c-a-a c-a-b
tree = {}
print("--------------------------------------------------------------------------------")
tree = utils.xml_tool.fcts.addMerkmal(tree, '/root/datensegment/kratz/mm[@name="NAME"]/wert', 2, "abc")
print("--------------------------------------------------------------------------------")
tree = utils.xml_tool.fcts.addMerkmal(tree, '/root/datensegment/kratz/mm[@name="LAND"]/wert', 2, "xyz")
baum = {}
baum["root"] = tree
print("<------"+str(baum["root"]))
self.assertEqual(tree["kratz"]["mm"][0]["wert"], "abc")
self.assertEqual(tree["kratz"]["mm"][0]["@name"], "NAME")
self.assertEqual(tree["kratz"]["mm"][1]["wert"], "xyz")
self.assertEqual(tree["kratz"]["mm"][1]["@name"], "LAND")
def xtest_addOnePaths(self):
tree = {}
print("--------------------------------------------------------------------------------")
tree = utils.xml_tool.fcts.setMerkmal(tree, '/root/datensegment/satz[@klasse="4711x"]/mm[@name="NAME"]/wert', "abc")
baum = {}
baum["root"] = tree
print("<------"+str(baum["root"]))
self.assertEqual(tree["root"]["datensegment"]["satz"][0]["mm"][0]["wert"], "abc")
self.assertEqual(tree["root"]["datensegment"]["satz"][0]["mm"][0]["@name"], "NAME")
self.assertEqual(tree["root"]["datensegment"]["satz"][0]["@klasse"], "4711x")
def test_addTwoPaths(self):
tree = {}
print("--------------------------------------------------------------------------------")
tree = utils.xml_tool.fcts.setMerkmal(tree, '/root/datensegment/satz[@klasse="4711x"]/mm[@name="NAME"]/wert', "abc")
print("--------------------------------------------------------------------------------")
tree = utils.xml_tool.fcts.setMerkmal(tree, '/root/datensegment/satz[@klasse="4711x"]/mm[@name="LAND"]/wert', "xyz")
baum = {}
baum["root"] = tree
print("<------"+str(baum["root"]))
self.assertEqual(tree["root"]["datensegment"]["satz"][0]["mm"][0]["wert"], "abc")
self.assertEqual(tree["root"]["datensegment"]["satz"][0]["mm"][0]["@name"], "NAME")
self.assertEqual(tree["root"]["datensegment"]["satz"][0]["@klasse"], "4711x")
if __name__ == '__main__':
unittest.main()

37
unit_run.py

@ -13,5 +13,42 @@ OR instead 2 and 3 on test-first-strategy:
5. at the end you get an xls-sheet like your result-csv-file but additionally with comparsion-result as detailed result and the only counts of passed and failed tests as minimum-result which can be inserted into management-tools 5. at the end you get an xls-sheet like your result-csv-file but additionally with comparsion-result as detailed result and the only counts of passed and failed tests as minimum-result which can be inserted into management-tools
""" """
# Press the green button in the gutter to run the script. # Press the green button in the gutter to run the script.
import importlib
import unittest
import test.constants
import os, glob
HOME_PATH = test.constants.HOME_PATH
VERIFY = False
class MyTestLoader(unittest.TestLoader):
def start_testcases(self):
pass
def create_test_suite():
testFileStrings = glob.glob(os.path.join(HOME_PATH, "test", 'test_*.py'))
print(testFileStrings)
suite = unittest.TestSuite()
for t in testFileStrings:
v = "test."+t[len(HOME_PATH)+6:-3]
#v = v.replace("\/", '.')
#v = v.replace("\\", '.')
cmodul = importlib.import_module(v)
class_ = getattr(cmodul, "MyTestCase")
suite.addTest(class_())
#module_strings = [str for str in test_file_strings]
#mod = __import__(os.path.join(HOME_PATH, "test", 'test_css.py'), globals(), locals(), ['suite'])
#suitefn = getattr(mod, 'MyTestCase')
#suite.addTest(suitefn())
# suites = ["test.test_css.MyTestCase()"]
# testSuite = unittest.TestSuite(suites)
return suite
if __name__ == '__main__': if __name__ == '__main__':
# loader = MyTestLoader()
print ("start") print ("start")
loader = unittest.TestLoader()
suite = create_test_suite()
#loader.discover(os.path.join(HOME_PATH, "test"), 'test_.*py')
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)

33
utils/config_tool.py

@ -10,6 +10,7 @@ import yaml
import basic.componentHandling import basic.componentHandling
import utils.path_tool import utils.path_tool
import os.path import os.path
import basic.constants as B
COMP_FILES = ["DATASTRUCTURE"] COMP_FILES = ["DATASTRUCTURE"]
CONFIG_FORMAT = ["yml", "json", "csv"] CONFIG_FORMAT = ["yml", "json", "csv"]
@ -83,9 +84,39 @@ def getConfigPath(modul, name):
return pathname return pathname
job.debug(verify, "12 " + pathname) job.debug(verify, "12 " + pathname)
def getConfValue(attribute, comp):
if attribute == B.ATTR_CONN_DBTYPE:
if not hasAttr(comp.conf[B.SUBJECT_CONN], "dbtype"):
if hasAttr(comp.conf[B.SUBJECT_CONN], "types") and hasAttr(comp.conf[B.SUBJECT_CONN]["types"], "dbtype"):
dbtype = comp.conf[B.SUBJECT_CONN]["types"]["dbtype"]
else:
raise LookupError("dbtype is not set in comp " + comp.name)
else:
dbtype = comp.conf["conn"]["dbtype"]
return ""
return ""
def hasAttr(o, name):
#print("hasAttr " + str(type(o))+" "+name)
if (isinstance(o, dict)):
if (name in o.keys()):
#print("hasAttr dict ok " + str(type(o)))
return True
#print("hasAttr dict "+str(type(o)))
elif (isinstance(o, list)):
pass
#print("hasAttr list "+str(type(o)))
elif hasattr(o, name):
#print("hasAttr class ok "+str(type(o)))
return True
return False
def getConfig(modul, name): def getConfig(modul, name):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = job.getDebugLevel("config_tool") verify = job.getDebugLevel("config_tool")-1
pathname = getConfigPath(modul, name) pathname = getConfigPath(modul, name)
confs = {} confs = {}
job.debug(verify, "getConfig " + pathname) job.debug(verify, "getConfig " + pathname)

16
utils/conn_tool.py

@ -5,6 +5,9 @@
""" """
import basic.program import basic.program
import utils.config_tool import utils.config_tool
import basic.constants as B
def getConnection(comp, nr): def getConnection(comp, nr):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = job.getDebugLevel("conn_tool") verify = job.getDebugLevel("conn_tool")
@ -27,6 +30,7 @@ def getConnection(comp, nr):
pass pass
return None return None
def getConnections(comp): def getConnections(comp):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = job.getDebugLevel("conn_tool") verify = job.getDebugLevel("conn_tool")
@ -41,15 +45,15 @@ def getConnections(comp):
pass pass
elif job.conf.confs.get("tools").get("connsrc") == "csv": elif job.conf.confs.get("tools").get("connsrc") == "csv":
pass pass
print(comp) #print(comp)
print(conn["env"].keys()) #print(conn["env"].keys())
print(conn["env"][comp]["instance"]) #print(conn["env"][comp][B.SUBJECT_INST])
xtypes = None xtypes = None
if ("types" in conn["env"][comp]): if ("types" in conn["env"][comp]):
xtypes = conn["env"][comp]["types"] xtypes = conn["env"][comp]["types"]
for i in range(conn["env"][comp]["instance"]): for i in range(conn["env"][comp][B.SUBJECT_INST]):
print("range " + str(i+1)) print("range " + str(i + 1))
instnr = "inst" + str(i+1) instnr = "inst" + str(i + 1)
if (xtypes is not None): if (xtypes is not None):
conn["env"][comp][instnr]["types"] = xtypes conn["env"][comp][instnr]["types"] = xtypes
conns.append(conn["env"][comp][instnr]) conns.append(conn["env"][comp][instnr])

15
utils/css_tool.py

@ -13,9 +13,8 @@ CSS_CLASS = {
def getInlineStyle(filetype, cssclass): def getInlineStyle(filetype, cssclass):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("css_tool")) - 4 verify = int(job.getDebugLevel("css_tool")) - 1
job.debug(verify, "getDiffHeader ") # job.debug(verify, "getDiffHeader ")
print(job.conf.confs.get("tools").get("csstyp"))
if job.conf.confs.get("tools").get("csstyp") == "inline": if job.conf.confs.get("tools").get("csstyp") == "inline":
out = "style=\""+CSS_CLASS[filetype][cssclass]+"\"" out = "style=\""+CSS_CLASS[filetype][cssclass]+"\""
else: else:
@ -24,8 +23,7 @@ def getInlineStyle(filetype, cssclass):
def getInternalStyle(filetype): def getInternalStyle(filetype):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4 verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "getDiffHeader ")
out = "" out = ""
if job.conf.confs.get("tools").get("csstyp") == "internal": if job.conf.confs.get("tools").get("csstyp") == "internal":
out = "<style>" out = "<style>"
@ -41,16 +39,15 @@ def getInternalStyle(filetype):
else: else:
out = "<style>" out = "<style>"
for c in CSS_CLASS["general"]: for c in CSS_CLASS["general"]:
line = "\n"+c+" { "+CSS_CLASS["general"][c]+"} " line = "\n "+c+" { "+CSS_CLASS["general"][c]+" } "
line.replace(":", ": ").replace(";", "; ").replace("_Q_", ", ") line.replace(":", ": ").replace(";", "; ").replace("_Q_", ", ")
out += line out += line
out += "\n</style>" out += " \n </style>"
return out return out
def getExternalStyle(filetype): def getExternalStyle(filetype):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool")) - 4 verify = int(job.getDebugLevel("match_tool")) - 1
job.debug(verify, "getDiffHeader ")
out = "" out = ""
if job.conf.confs.get("tools").get("csstyp") == "external": if job.conf.confs.get("tools").get("csstyp") == "external":
for c in CSS_CLASS[filetype]: for c in CSS_CLASS[filetype]:

30
utils/file_tool.py

@ -90,8 +90,8 @@ def mkPaths(msg, pfad):
modus = job.conf.confs["paths"]["mode"] modus = job.conf.confs["paths"]["mode"]
os.makedirs(pfad, exist_ok=True) os.makedirs(pfad, exist_ok=True)
def getFileEncoding(path): def getFileEncoding(msg, path):
print(path) print("--- getFileEncoding "+path)
encodings = ['utf-8', 'iso-8859-1'] # add more encodings = ['utf-8', 'iso-8859-1'] # add more
for e in encodings: for e in encodings:
print(e) print(e)
@ -106,24 +106,40 @@ def getFileEncoding(path):
else: else:
print('opening the file with encoding: %s ' % e) print('opening the file with encoding: %s ' % e)
return e return e
break return detectFileEncode(msg, path)
def rest(path): # return "" def detectFileEncode(msg, path): # return ""
job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
print(path) print(path)
cntIso = 0 cntIso = 0
cntUtf = 0 cntUtf = 0
with open(path, 'rb') as file: with open(path, 'rb') as file:
while (byte := file.read(1)): while (byte := file.read(1)):
i = int.from_bytes(byte, "little") i = int.from_bytes(byte, "little")
print(str(byte)+" = "+str(i))
#byte = file.read(1) #byte = file.read(1)
if ((i == 196) or (i == 228) or (i == 214) or (i == 246) or (i == 220) or (i == 252) or (i == 191)): if ((i == 196) or (i == 228) or (i == 214) or (i == 246) or (i == 220) or (i == 252) or (i == 191)):
cntIso += 1 cntIso += 1
print("iso") if (i == 160):
pass
elif (i > 127): elif (i > 127):
cntUtf += 1 cntUtf += 1
print("utf8")
if (cntIso > cntUtf): if (cntIso > cntUtf):
return 'iso-8859-1' return 'iso-8859-1'
return 'utf-8' return 'utf-8'
def readFileLines(msg, path):
job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
enc = detectFileEncode(msg, path)
with open(path, 'r', encoding=enc) as file:
lines = file.read().splitlines()
file.close()
return lines
def writeFileText(msg, path, text, enc="utf-8"):
job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
with open(path, 'w', encoding=enc) as file:
file.write(text)
file.close()

126
utils/match_tool.py

@ -94,6 +94,8 @@ class Matching():
self.matchkeys = {} self.matchkeys = {}
def isHitA(self, key): def isHitA(self, key):
print("isHitA "+str(key))
for k in self.linksA: print(k)
return ((key in self.linksA) and (self.linksA[key] != "null")) return ((key in self.linksA) and (self.linksA[key] != "null"))
def isHitB(self, key): def isHitB(self, key):
return ((key in self.linksB) and (self.linksB[key] != "null")) return ((key in self.linksB) and (self.linksB[key] != "null"))
@ -120,7 +122,7 @@ class Matching():
return ddl return ddl
def setDiffHeader(matching): def setDiffHeader(matching):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool"))-4 verify = int(job.getDebugLevel("match_tool"))-1
job.debug(verify, "getDiffHeader ") job.debug(verify, "getDiffHeader ")
htmltxt = "<!DOCTYPE html>" htmltxt = "<!DOCTYPE html>"
htmltxt += "<html><head>" htmltxt += "<html><head>"
@ -159,17 +161,21 @@ def matchBestfit(matching, path):
:return: :return:
""" """
i=0 i=0
for r in matching.sideA: if (matching.sideA is not None):
k = composeKey("a", i) for r in matching.sideA:
matching.setHit(k, "null") k = composeKey("a", i)
i += 1 matching.setHit(k, "null")
i += 1
i = 0 i = 0
for r in matching.sideB: if (matching.sideB is not None):
k = composeKey("b", i) for r in matching.sideB:
matching.setHit("null", k) k = composeKey("b", i)
i += 1 matching.setHit("null", k)
i += 1
ia = 0 ia = 0
ix = 1 ix = 1
if (matching.sideA is None) or (matching.sideB is None):
return
for rA in matching.sideA: for rA in matching.sideA:
ib = 0 ib = 0
for rB in matching.sideB: for rB in matching.sideB:
@ -253,7 +259,6 @@ def getSimilarity(matching, path, rA, rB, i):
def matchTree(matching): def matchTree(matching):
""" """
:param matching: :param matching:
:return: :return:
""" """
@ -293,8 +298,6 @@ def getEvaluation(matching, type, acceptance, sideA, sideB):
verify = int(job.getDebugLevel("match_tool"))-1 verify = int(job.getDebugLevel("match_tool"))-1
job.debug(verify, "getEvaluation "+str(sideA)+" ?= "+str(sideB)) job.debug(verify, "getEvaluation "+str(sideA)+" ?= "+str(sideB))
match = getStringSimilarity(str(sideA), str(sideB)) match = getStringSimilarity(str(sideA), str(sideB))
colorA = "black"
colorB = "black"
classA = "black" classA = "black"
classB = "black" classB = "black"
result = "test" result = "test"
@ -302,41 +305,39 @@ def getEvaluation(matching, type, acceptance, sideA, sideB):
if acceptance == "ignore": result = "ignore" if acceptance == "ignore": result = "ignore"
if (matching.matchtype == MATCH_POSTCOND) and (result == "test"): if (matching.matchtype == MATCH_POSTCOND) and (result == "test"):
result = "hard" result = "hard"
colorA = "green"
colorB = "crimson"
classA = "diffA" classA = "diffA"
classB = "diffB" classB = "diffB"
else: else:
colorA = "darkblue"
colorB = "darkmagenta"
classA = "acceptA" classA = "acceptA"
classB = "acceptB" classB = "acceptB"
return [result, colorA, colorB, classA, classB] return [result, classA, classB]
def matchDict(matching, A, B, path): def matchDict(matching, A, B, path):
""" travers through the datatree """ """ travers through the datatree """
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool"))-4 verify = int(job.getDebugLevel("match_tool"))-4
job.debug(verify, "matchDict "+path) job.debug(verify, "matchDict "+path)
for k in A: if (A is not None):
print(k) for k in A:
if k == "_match": print(k)
continue if k == "_match":
if (B is not None) and (B[k]): continue
if (isinstance(A[k], dict)): A[k]["_match"] = "Y" if (B is not None) and (B[k]):
if (isinstance(B[k], dict)): B[k]["_match"] = "Y" if (isinstance(A[k], dict)): A[k]["_match"] = "Y"
matchElement(matching, A[k], B[k], path+":"+k) if (isinstance(B[k], dict)): B[k]["_match"] = "Y"
else: matchElement(matching, A[k], B[k], path+":"+k)
if (isinstance(A[k], dict)): A[k]["_march"] = "N" else:
matchElement(matching, A[k], None, path+":"+k) if (isinstance(A[k], dict)): A[k]["_march"] = "N"
for k in B: matchElement(matching, A[k], None, path+":"+k)
if k == "_match": if (B is not None):
continue for k in B:
if (A is not None) and (A[k]): if k == "_match":
continue continue
else: if (A is not None) and (A[k]):
if (isinstance(A[k], dict)): B[k]["_match"] = "N" continue
matchElement(matching, None, B[k], path+":"+k) else:
if (A is not None) and (isinstance(A[k], dict)): B[k]["_match"] = "N"
matchElement(matching, None, B[k], path+":"+k)
return matching return matching
def matchArray(matching, A, B, path): def matchArray(matching, A, B, path):
@ -370,12 +371,39 @@ def compareRows(matching, path):
header.append({ "field": f, "type": ddl[f]["type"], "acceptance": ddl[f]["acceptance"]}) header.append({ "field": f, "type": ddl[f]["type"], "acceptance": ddl[f]["acceptance"]})
htmltext += "<th>"+f+"</th>" htmltext += "<th>"+f+"</th>"
htmltext += "</tr>" htmltext += "</tr>"
matching.difftext = htmltext
for k in sorted(matching.linksA): for k in sorted(matching.linksA):
htmltext += compareRow(matching, header, matching.sideA[int(extractKeyI(k))], print(k)
if (matching.isHitA(k)):
htmltext += compareRow(matching, header, matching.sideA[int(extractKeyI(k))],
matching.sideB[int(extractKeyI(matching.linksA[k]))]) matching.sideB[int(extractKeyI(matching.linksA[k]))])
else:
htmltext += markRow(matching, header, matching.sideA[int(extractKeyI(k))], "A")
for k in sorted(matching.linksB):
if (not matching.isHitB(k)):
htmltext += markRow(matching, header, matching.sideB[int(extractKeyI(k))], "B")
htmltext += "</table>" htmltext += "</table>"
matching.difftext += "</table>"
return htmltext return htmltext
def markRow(matching, header, row, side):
job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("match_tool"))-4
text = ""
cssClass = ""
for f in header:
if side == "A":
res = getEvaluation(matching, f["type"], f["acceptance"], row[f["field"]], "")
cssClass = res[1]
else:
res = getEvaluation(matching, f["type"], f["acceptance"], "", row[f["field"]])
cssClass = res[2]
text += "<td "+utils.css_tool.getInlineStyle("diffFiles", cssClass)+">" + str(row[f["field"]]) + "</td>"
text = "<tr><td "+utils.css_tool.getInlineStyle("diffFiles", cssClass)+">" \
+ MATCH[matching.matchtype]["short"+side] + "</td>" + text + "</tr>"
matching.difftext += text
return text
def compareRow(matching, header, rA, rB): def compareRow(matching, header, rA, rB):
""" traverse through matched rows """ """ traverse through matched rows """
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
@ -383,14 +411,13 @@ def compareRow(matching, header, rA, rB):
allident = True allident = True
textA = "" textA = ""
textB = "" textB = ""
text = ""
for f in header: for f in header:
print(f) print(f)
res = getEvaluation(matching, f["type"], f["acceptance"], rA[f["field"]], rB[f["field"]]) res = getEvaluation(matching, f["type"], f["acceptance"], rA[f["field"]], rB[f["field"]])
match = res[0] match = res[0]
colorA = res[1] classA = res[1]
colorB = res[2] classB = res[2]
classA = res[3]
classB = res[4]
if (match == "MATCH"): if (match == "MATCH"):
textA += "<td>"+str(rA[f["field"]])+"</td>" textA += "<td>"+str(rA[f["field"]])+"</td>"
textB += "<td>"+str(rB[f["field"]])+"</td>" textB += "<td>"+str(rB[f["field"]])+"</td>"
@ -398,28 +425,17 @@ def compareRow(matching, header, rA, rB):
allident = False allident = False
textA += "<td "+utils.css_tool.getInlineStyle("diffFiles", classA)+">" + str(rA[f["field"]]) + "</td>" textA += "<td "+utils.css_tool.getInlineStyle("diffFiles", classA)+">" + str(rA[f["field"]]) + "</td>"
textB += "<td "+utils.css_tool.getInlineStyle("diffFiles", classB)+">" + str(rB[f["field"]]) + "</td>" textB += "<td "+utils.css_tool.getInlineStyle("diffFiles", classB)+">" + str(rB[f["field"]]) + "</td>"
#textB += "<td style=\"color:" + colorB + ";\">" + str(rB[f["field"]]) + "</td>"
else: else:
allident = False allident = False
textA += "<td "+utils.css_tool.getInlineStyle("diffFiles", classA)+">"+str(rA[f["field"]])+" ("+match+")</td>" textA += "<td "+utils.css_tool.getInlineStyle("diffFiles", classA)+">"+str(rA[f["field"]])+" ("+match+")</td>"
textB += "<td "+utils.css_tool.getInlineStyle("diffFiles", classB)+">"+str(rB[f["field"]])+" ("+match+")</td>" textB += "<td "+utils.css_tool.getInlineStyle("diffFiles", classB)+">"+str(rB[f["field"]])+" ("+match+")</td>"
#textB += "<td style=\"color:"+colorB+";\">"+str(rA[f["field"]])+" ("+match+")</td>"
if allident: if allident:
return "<tr><td/>"+textA+"</tr>" return "<tr><td/>"+textA+"</tr>"
return "<tr><td>"+MATCH[matching.matchtype]["shortA"]+"</td>"+textA+"</tr><tr><td>"+MATCH[matching.matchtype]["shortB"]+"</td>"+textB+"</tr>" text = "<tr><td>"+MATCH[matching.matchtype]["shortA"]+"</td>"+textA+"</tr><tr><td>"+MATCH[matching.matchtype]["shortB"]+"</td>"+textB+"</tr>"
matching.difftext += text
return text
# -------------------------------------------------------------------------- # --------------------------------------------------------------------------
def matchLines(matching): def matchLines(matching):
pass pass
# --------------------------------------------------------------------------
def getScoreint(score):
if score.is_integer():
return score
elif "h_" in score:
return int(score[2:])
def getHitscore(score):
return "h_" + ':04d'.format(getScoreint(score))
def getNextHitscore(score):
return "h_" + ':04d'.format(getScoreint(score)+1)

28
utils/path_tool.py

@ -12,18 +12,18 @@ def getKeyValue(key):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = job.getDebugLevel("path_tool") verify = job.getDebugLevel("path_tool")
pt = PathConf.getInstance() pt = PathConf.getInstance()
print("getKeyValue " + key) job.debug(verify, "getKeyValue " + key)
if 'job.par' in key: if 'job.par' in key:
neu = job.getParameter(key[8:]) neu = job.getParameter(key[8:])
return neu return neu
elif 'job.conf' in key: elif 'job.conf' in key:
neu = job.conf.confs["paths"][key[9:]] neu = job.conf.confs["paths"][key[9:]]
print(neu) job.debug(verify, neu)
return neu return neu
# return job.conf.confs["paths"][key[9:]] # return job.conf.confs["paths"][key[9:]]
elif (pt.pattern): elif (pt.pattern):
return pt.pattern[key] return pt.pattern[key]
print("pt exists") job.debug(verify, "pt exists")
else: else:
return "xx-"+key+"-xx" return "xx-"+key+"-xx"
@ -31,36 +31,37 @@ def composePath(pathname, comp):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = job.getDebugLevel("path_tool") verify = job.getDebugLevel("path_tool")
pt = PathConf.getInstance() pt = PathConf.getInstance()
print("composePath " + pathname + " zu " + str(pt) + "mit ") job.debug(verify, "composePath " + pathname + " zu " + str(pt) + "mit ")
print(pt.pattern) job.debug(verify, str(pt.pattern))
if pt.pattern[pathname]: if pt.pattern[pathname]:
return composePatttern(pt.pattern[pathname], comp) return composePatttern(pt.pattern[pathname], comp)
else: else:
print("in Pattern nicht vorhanden: " + pathname) job.debug(verify, "in Pattern nicht vorhanden: " + pathname)
def composePatttern(pattern, comp): def composePatttern(pattern, comp):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = job.getDebugLevel("path_tool") verify = job.getDebugLevel("path_tool")
print("composePattern " + pattern) job.debug(verify, "composePattern " + pattern)
max=5 max=5
l = re.findall('\{.*?\}', pattern) l = re.findall('\{.*?\}', pattern)
print(l) job.debug(verify, l)
for pat in l: for pat in l:
pit = getKeyValue(pat[1:-1]) pit = getKeyValue(pat[1:-1])
print(str(max) + ": " + pattern + ": " + pat + ": " + pit) job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
pattern = pattern.replace(pat, pit) pattern = pattern.replace(pat, pit)
print(str(max) + ": " + pattern + ": " + pat + ": " + pit) job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
while ("{" in pattern): while ("{" in pattern):
max = max-1 max = max-1
print(str(max) + ": " + pattern + ": " + pat + ": " + pit) job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
pattern = composePatttern(pattern, comp) pattern = composePatttern(pattern, comp)
print(str(max) + ": " + pattern + ": " + pat + ": " + pit) job.debug(verify, str(max) + ": " + pattern + ": " + pat + ": " + pit)
if (max < 3) : if (max < 3) :
break break
return pattern return pattern
def extractPattern(pathtyp): def extractPattern(pathtyp):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = job.getDebugLevel("path_tool")
out = [] out = []
pt = PathConf.getInstance() pt = PathConf.getInstance()
pattern = pt.pattern[pathtyp] pattern = pt.pattern[pathtyp]
@ -70,7 +71,7 @@ def extractPattern(pathtyp):
j = work.index("}") j = work.index("}")
pre = work[0:i] pre = work[0:i]
pat = work[i+1:j] pat = work[i+1:j]
print (work + " von " + str(i) + "-" + str(j) + " pre " + pre + "pat " + pat) job.debug(verify, work + " von " + str(i) + "-" + str(j) + " pre " + pre + "pat " + pat)
pit = getKeyValue(pat) pit = getKeyValue(pat)
tup = (pre, pat, pit) tup = (pre, pat, pit)
out.append(tup) out.append(tup)
@ -84,6 +85,7 @@ def extractPath(pathtyp, pathname):
i = 0 i = 0
print("-- extractPatternList -- " + pathtyp + ":" + str(patterlist)) print("-- extractPatternList -- " + pathtyp + ":" + str(patterlist))
for p in patterlist: for p in patterlist:
if len(p) < 1 : continue
delim = p[0] delim = p[0]
key = p[1] key = p[1]
val = p[2] val = p[2]

138
utils/tdata_tool.py

@ -16,13 +16,32 @@ the testdata have several elements
the testdata itself which are written in different artifacts of modern applications are mostly stored as tree the testdata itself which are written in different artifacts of modern applications are mostly stored as tree
- so as xml, json, always with plain data in the leaf. So the intern structure should be also a tree - in python: dictionary. - so as xml, json, always with plain data in the leaf. So the intern structure should be also a tree - in python: dictionary.
""" """
import os.path
import basic.program import basic.program
import csv import utils.file_tool
import basic.constants as B
DATA_SOURCE_DIR = "dir"
DATA_SOURCE_CSV = "csv"
CSV_HEADER_START = ["node", "table", "tabelle"]
CSV_DELIMITER = ";"
CSV_SPECTYPE_DATA = "data"
CSV_SPECTYPE_TREE = "tree"
def getSourcefile(filename): def getSourcefile(filename):
pass pass
def getTestdata(): def getTestdata():
"""
get the testdata from one of the possible soources
* dir: each file in the specific testarchiv
* csv: specific file
* db: specific db with a testcase-catalogue
:return:
"""
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
reftyp = getattr(job.par, "tdtyp") reftyp = getattr(job.par, "tdtyp")
source = getattr(job.par, "tdsrc") source = getattr(job.par, "tdsrc")
@ -36,11 +55,93 @@ def getTestdata():
elif reftyp == "csv": elif reftyp == "csv":
# read file in testdata # read file in testdata
job.m.logInfo("Test-Data readed from " + reftyp + " for " + criteria) job.m.logInfo("Test-Data readed from " + reftyp + " for " + criteria)
elif reftyp == "dir":
filename = os.path.join(job.conf.getJobConf("paths:testdata"), source, "testspec.csv")
data = getCsvSpec(job.m, filename, CSV_SPECTYPE_DATA)
for k in data:
tdata[k] = data[k]
else: else:
job.m.setFatal("test-Data: reftyp " + reftyp + " is not implemented") job.m.setFatal("test-Data: reftyp " + reftyp + " is not implemented")
return tdata return tdata
def readCsv(filename): def getCsvSpec(msg, filename, type):
"""
get data from a csv-file
a) data : like a table with data-array of key-value-pairs
b) tree : as a tree - the rows must be unique identified by the first column
"""
data = {}
header = []
lines = utils.file_tool.readFileLines(msg, filename)
status = "start"
for l in lines:
fields = l.split(CSV_DELIMITER)
# check empty line, comment
if (len(l.strip().replace(CSV_DELIMITER,"")) < 1):
status = "start"
continue
if (fields[0][0:1] == "#"):
continue
a = fields[0].split(":")
# keywords option, step, table
if (a[0] not in data):
data[a[0]] = {}
if (a[0].lower() == "step"):
if (not "steps" in data):
data["steps"] = []
step = {}
step["comp"] = fields[1]
step["todo"] = fields[2]
step["args"] = fields[3]
data["steps"].append(step)
continue
elif (a[0].lower() == "option"):
data[a[0]][a[1]] = fields[1]
continue
elif (a[0].lower() in CSV_HEADER_START):
i = 0
for f in fields:
i += 1
if i == 1: continue
header.append(f)
status = CSV_SPECTYPE_DATA
continue
elif (status == CSV_SPECTYPE_DATA):
# check A-col for substructure
if (a[0] not in data):
data[a[0]] = {}
if len(a) == 1 and type == CSV_SPECTYPE_DATA:
data[a[0]]["_data"] = []
# its a component
if len(a) > 1 and a[1] not in data[a[0]]:
data[a[0]][a[1]] = {}
if len(a) == 2 and type == CSV_SPECTYPE_DATA:
data[a[0]][a[1]]["_data"] = []
if len(a) > 2 and a[2] not in data[a[0]][a[1]]:
data[a[0]][a[1]][a[2]] = {}
if len(a) == 3 and type == CSV_SPECTYPE_DATA:
data[a[0]][a[1]][a[1]]["_data"] = []
# fill data
row = {}
i = 1
for f in header:
row[f] = fields[i]
i += 1
if len(a) == 1 and type == CSV_SPECTYPE_DATA:
data[a[0]]["_data"].append(row)
elif len(a) == 1 and type == CSV_SPECTYPE_DATA:
data[a[0]] = {f: row}
elif len(a) == 2 and type == CSV_SPECTYPE_DATA:
data[a[0]][a[1]]["_data"].append(row)
elif len(a) == 1 and type == CSV_SPECTYPE_DATA:
data[a[0]][a[1]]["_data"] = {f: row}
elif len(a) == 3 and type == CSV_SPECTYPE_DATA:
data[a[0]][a[1]][a[2]] = row
elif len(a) == 1 and type == CSV_SPECTYPE_DATA:
data[a[0]][a[1]][a[2]] = {f: row}
return data
def readCsv(msg, filename, comp):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("tdata_tool") verify = -1+job.getDebugLevel("tdata_tool")
job.debug(verify, "readCsv " + filename) job.debug(verify, "readCsv " + filename)
@ -51,22 +152,24 @@ def readCsv(filename):
state = 0 state = 0
data = [] data = []
cnt = 0 cnt = 0
with open(filename, 'r') as file: lines = utils.file_tool.readFileLines(msg, filename)
lines = file.read().splitlines() startCols = 1
for line in lines: for line in lines:
# while (line := file.readline().rstrip()):
fields = line.split(';') fields = line.split(';')
testline = line.replace(";", "") testline = line.replace(";", "")
job.debug(verify, str(state) + " line " + line + " :" + str(len(fields)) + ": " + str(fields)) job.debug(verify, str(state) + " line " + line + " :" + str(len(fields)) + ": " + str(fields))
if len(testline) < 2 and state < 1: if len(testline) < 2 and state < 1:
state = 0 state = 0
elif fields[0].lower() == "node": elif fields[0].lower() in CSV_HEADER_START:
state = 2 state = 2
columns = [] columns = []
cnt = len(fields) cnt = len(fields)
job.debug(verify, str(state) + " cnt " + str(cnt)) job.debug(verify, str(state) + " cnt " + str(cnt))
j = 0 j = 0
for i in range(2, cnt-1): for i in range(1, cnt-1):
if fields[0][0:1] == "_":
startCols += 1
continue
job.debug(verify, str(i) + " cnt " + str(fields[i])) job.debug(verify, str(i) + " cnt " + str(fields[i]))
if len(fields[i]) > 0: if len(fields[i]) > 0:
columns.append(fields[i]) columns.append(fields[i])
@ -79,7 +182,7 @@ def readCsv(filename):
job.debug(verify, str(state) + " nodes " + str(nodes)) job.debug(verify, str(state) + " nodes " + str(nodes))
state = 3 state = 3
row = {} row = {}
for i in range(2, cnt-1): for i in range(startCols, cnt-1):
row[columns[i-2]] = fields[i] row[columns[i-2]] = fields[i]
job.debug(verify, str(state) + " row " + str(row)) job.debug(verify, str(state) + " row " + str(row))
data.append(row) data.append(row)
@ -88,11 +191,7 @@ def readCsv(filename):
output = setSubnode(0, nodes, data, output) output = setSubnode(0, nodes, data, output)
data = [] data = []
state = 0 state = 0
print(str(output))
print(str(output))
output = setSubnode(0, nodes, data, output) output = setSubnode(0, nodes, data, output)
print(str(output))
file.close()
return output return output
def setSubnode(i, nodes, data, tree): def setSubnode(i, nodes, data, tree):
@ -121,7 +220,7 @@ def normalizeDataRow(dstruct, xpathtupel, row, referencedate):
verify = -1+job.getDebugLevel("tdata_tool") verify = -1+job.getDebugLevel("tdata_tool")
job.debug(verify, "calcDataRow " + row) job.debug(verify, "calcDataRow " + row)
def writeDataTable(teststatus, tdata, comp): def writeDataTable(filename, tdata, comp):
""" """
writes the testdata into a csv-file for documentation of the test-run writes the testdata into a csv-file for documentation of the test-run
:param teststatus: :param teststatus:
@ -132,4 +231,15 @@ def writeDataTable(teststatus, tdata, comp):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("tdata_tool") verify = -1+job.getDebugLevel("tdata_tool")
job.debug(verify, "writeDataTable " + str(comp)) job.debug(verify, "writeDataTable " + str(comp))
text = "table"
for f in tdata[B.DATA_NODE_HEADER]:
text += ";"+f
for r in tdata[B.DATA_NODE_DATA]:
text += "\n"
for f in tdata[B.DATA_NODE_HEADER]:
if f in r:
text += ";"+r[f]
else:
text += ";"
text += "\n"
utils.file_tool.writeFileText(comp.m, filename, text)

6
utils/xml_tool.py

@ -3,7 +3,7 @@ import json
import yaml import yaml
import re import re
import datetime import datetime
#from tools.reader import fcts as reader import utils.tdata_tool
import os, sys, json import os, sys, json
import xmltodict import xmltodict
import pprint import pprint
@ -49,7 +49,7 @@ class register():
print (row) print (row)
satz = {} satz = {}
satz = self.schreibeSegmentsatz(satz, content, row["_lfdNR"]) satz = self.schreibeSegmentsatz(satz, content, row["_lfdNR"])
# print("ausgabe vorher: "+str(ausgabe)) # print("ausgabe vorher: "+str(ausgabe))*
ausgabe["nachricht"]["datensegment"].append(satz["datensegment"]) ausgabe["nachricht"]["datensegment"].append(satz["datensegment"])
return ausgabe return ausgabe
@ -125,7 +125,7 @@ class Absender():
FELDER = ("ABSENDER_KENNUNG", "ABSENDER_IDENTITÄT", "ABSENDER_TELEFON", "ABSENDER_EMAIL", "ABSENDER_FAX", "ABSENDER_URL", "BERICHTSEINHEIT_ID", "REGNAME") FELDER = ("ABSENDER_KENNUNG", "ABSENDER_IDENTITÄT", "ABSENDER_TELEFON", "ABSENDER_EMAIL", "ABSENDER_FAX", "ABSENDER_URL", "BERICHTSEINHEIT_ID", "REGNAME")
def __init__(self): def __init__(self):
content = reader.readCsv("testdaten/allgemein/absender.csv", "keys") content = utils.tdata_tool.readCsv("testdaten/allgemein/absender.csv", "keys")
self.keys = content["tabelle"]["absender"]["_keys"] self.keys = content["tabelle"]["absender"]["_keys"]
class Vrfelder(): class Vrfelder():

Loading…
Cancel
Save