Browse Source

mapping dict for xml-files

master
Ulrich Carmesin 2 years ago
parent
commit
d04d6d9ffd
  1. 8
      basic/constants.py
  2. 1
      basic/step.py
  3. 0
      basic/text_const.py
  4. 43
      test/test_06file.py
  5. 4
      test/test_07catalog.py
  6. 8
      test/test_12toolhandling.py
  7. 197
      test/test_25map.py
  8. 59
      test/test_32file.py
  9. 30
      utils/file_tool.py
  10. 375
      utils/map_tool.py

8
basic/constants.py

@ -95,7 +95,9 @@ DATA_NODE_DDL = "ddl"
The fields are defined in data_const (D) """ The fields are defined in data_const (D) """
DATA_NODE_COMP = "comp" DATA_NODE_COMP = "comp"
""" This constant defines """ """ This constant defines """
DATA_NODE_PAR = "par" DATA_NODE_PAR = "par"
DATA_NODE_CATALOG = "_catalog"
DATA_NODE_ROW = "_row"
ATTR_ARTS_TYPE = "type" ATTR_ARTS_TYPE = "type"
@ -173,6 +175,8 @@ ATTR_PATH_EXPECT = "expect"
""" This constant defines the folder in testing-filesystem for test-expectation values """ """ This constant defines the folder in testing-filesystem for test-expectation values """
ATTR_PATH_PROGRAM = "program" ATTR_PATH_PROGRAM = "program"
""" This constant defines the program-folder in the workspace """ """ This constant defines the program-folder in the workspace """
ATTR_PATH_COMPS = "components"
""" This constant defines the subfolder in the program-folder in the workspace """
ATTR_PATH_ENV = "environment" ATTR_PATH_ENV = "environment"
""" This constant defines the folder in testing-filesystem, used for configs related to environments """ """ This constant defines the folder in testing-filesystem, used for configs related to environments """
ATTR_PATH_RELEASE = "release" ATTR_PATH_RELEASE = "release"
@ -217,6 +221,8 @@ ATTR_EXEC_REF = "_exec"
ATTR_DATA_REF = "_nr" ATTR_DATA_REF = "_nr"
ATTR_DATA_COMP = "_comp" ATTR_DATA_COMP = "_comp"
SUBJECT_TOOL = "tool"
# ------------------------------------------------------------- # -------------------------------------------------------------
# exception texts # exception texts
EXP_NO_BASIS_FILE = "basis file cant be found" EXP_NO_BASIS_FILE = "basis file cant be found"

1
basic/step.py

@ -73,6 +73,7 @@ def parseStep(job, fields):
step.comp = fields[D.STEP_COMP_I] step.comp = fields[D.STEP_COMP_I]
step.execStep = fields[D.STEP_EXECNR_I] step.execStep = fields[D.STEP_EXECNR_I]
step.refLine = fields[D.STEP_REFNR_I] step.refLine = fields[D.STEP_REFNR_I]
setattr(step, B.ATTR_DATA_REF, step.refLine)
if D.STEP_ARGS_I == D.STEP_LIST_I: if D.STEP_ARGS_I == D.STEP_LIST_I:
args = "" args = ""
for i in range(D.STEP_ARGS_I, len(fields)): for i in range(D.STEP_ARGS_I, len(fields)):

0
basic/text_const.py

43
test/test_06file.py

@ -4,13 +4,16 @@ import inspect
import utils.file_tool as t import utils.file_tool as t
import utils.path_tool import utils.path_tool
import basic.program import basic.program
import test.constants import test.constants as T
import test.testtools import test.testtools
import pprint
import json
HOME_PATH = test.constants.HOME_PATH HOME_PATH = test.constants.HOME_PATH
DATA_PATH = test.constants.DATA_PATH DATA_PATH = test.constants.DATA_PATH
TEST_FUNCTIONS = ["test_getFiles", "test_pathTool", "test_encoding"] TEST_FUNCTIONS = ["test_getFiles", "test_pathTool", "test_encoding", "test_11readYml", "test_14readXml"]
TEST_FUNCTIONS = ["test_11readYml"]
verbose = False
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
mymsg = "" mymsg = ""
@ -64,10 +67,44 @@ class MyTestCase(unittest.TestCase):
cnttest += 3 cnttest += 3
MyTestCase.mymsg += "\n----- " + actfunction + " : " + str(cnttest) MyTestCase.mymsg += "\n----- " + actfunction + " : " + str(cnttest)
def test_11readYml(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
pathname = os.path.join(T.COMP_PATH, "testrest", "mapping-rest.yml")
res = utils.file_tool.readFileDict(pathname, job.m)
print(res)
pathname = os.path.join(DATA_PATH, "tdata", "UNIT_TEST", "rest-message.xml")
utils.file_tool.writeFileDict(job.m, pathname, res)
def test_14readXml(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
pathname = os.path.join(DATA_PATH, "tdata", "UNIT_TEST", "shiporder.xml")
res = utils.file_tool.readFileDict(pathname, job.m)
res = dict(res)
print(res)
self.assertIn("shiporder", res)
self.assertIn("@orderid", res["shiporder"])
for x in res["shiporder"]:
print(x+" "+str(type(res["shiporder"][x])))
pathname = os.path.join(DATA_PATH, "tdata", "UNIT_TEST", "shiporder-res.yml")
utils.file_tool.writeFileDict(job.m, pathname, res)
MyTestCase.mymsg += "\n----- " + actfunction + " : " + str(cnttest)
def test_zzz(self): def test_zzz(self):
print(MyTestCase.mymsg) print(MyTestCase.mymsg)
if __name__ == '__main__': if __name__ == '__main__':
verbose = True
unittest.main() unittest.main()

4
test/test_07catalog.py

@ -15,7 +15,7 @@ OS_SYSTEM = test.constants.OS_SYSTEM
# here you can select single testfunction for developping the tests # here you can select single testfunction for developping the tests
TEST_FUNCTIONS = ["test_01class", "test_02read", "test_03key"] TEST_FUNCTIONS = ["test_01class", "test_02read", "test_03key"]
TEST_FUNCTIONS = [ "test_03key"] #TEST_FUNCTIONS = [ "test_03key"]
verbose = False verbose = False
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
@ -70,6 +70,8 @@ class MyTestCase(unittest.TestCase):
self.assertEqual(res["Land"], "Tschad") self.assertEqual(res["Land"], "Tschad")
print(str(res)) print(str(res))
cnttest += 1 cnttest += 1
res = catalog.getValue("sender", "firma", job)
print(str(res))
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest) MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)

8
test/test_12toolhandling.py

@ -48,9 +48,9 @@ class MyTestCase(unittest.TestCase):
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_CLI] = {} comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_CLI] = {}
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_DB][B.ATTR_TYPE] = "mysql" comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_DB][B.ATTR_TYPE] = "mysql"
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_CLI][B.ATTR_TYPE] = "ssh" comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_CLI][B.ATTR_TYPE] = "ssh"
tool = basic.toolHandling.getDbTool(comp) tool = basic.toolHandling.getDbTool(job, comp)
self.assertRegex(str(type(tool)), 'dbmysql_tool.DbFcts') self.assertRegex(str(type(tool)), 'dbmysql_tool.DbFcts')
tool = basic.toolHandling.getCliTool(comp) tool = basic.toolHandling.getCliTool(job, comp)
self.assertRegex(str(type(tool)), 'clissh_tool.CliFcts') self.assertRegex(str(type(tool)), 'clissh_tool.CliFcts')
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_FILE] = {} comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_FILE] = {}
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_FILE][B.ATTR_TYPE] = "xml" comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_FILE][B.ATTR_TYPE] = "xml"
@ -59,8 +59,8 @@ class MyTestCase(unittest.TestCase):
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_DB][B.ATTR_TYPE] = "dxx" comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_DB][B.ATTR_TYPE] = "dxx"
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_CLI][B.ATTR_TYPE] = "sxx" comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_CLI][B.ATTR_TYPE] = "sxx"
self.assertRaises(FileNotFoundError, basic.toolHandling.getDbTool, comp) self.assertRaises(FileNotFoundError, basic.toolHandling.getDbTool, job, comp)
self.assertRaises(FileNotFoundError, basic.toolHandling.getCliTool, comp) self.assertRaises(FileNotFoundError, basic.toolHandling.getCliTool, job, comp)
if __name__ == '__main__': if __name__ == '__main__':

197
test/test_25map.py

@ -0,0 +1,197 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import unittest
import inspect
import os
import json
import basic.program
import utils.path_tool
import utils.path_const as P
import utils.config_tool
import utils.data_const as D
import basic.toolHandling
import test.constants
import basic.component
import basic.constants as B
import utils.map_tool
import utils.file_tool
import test.testtools
import utils.tdata_tool
HOME_PATH = test.constants.HOME_PATH
conf = {}
# here you can select single testfunction for developping the tests
# "test_toolhandling", "test_parseSql" -> test of components
TEST_FUNCTIONS = ["test_02getIds", "test_03extractIds", "test_04getFieldVal", "test_05getValue",
"test_11mapTdata"]
TEST_FUNCTIONS = ["test_11mapTdata"]
verbose = False
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
def test_03extractIds(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
res = utils.map_tool.extractIds(job, "1")
self.assertEqual(res, ["1"])
res = utils.map_tool.extractIds(job, "1, 2")
self.assertEqual(res, "1,2".split(","))
res = utils.map_tool.extractIds(job, "1-3")
self.assertEqual(res, "1,2,3".split(","))
res = utils.map_tool.extractIds(job, "1, 3-6, 8")
self.assertEqual(res, "1,3,4,5,6,8".split(","))
cnttest += 4
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_02getIds(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
path = utils.config_tool.getConfigPath(P.KEY_TESTCASE, "TC0001", "", job)
tdata = utils.tdata_tool.getCsvSpec(job.m, path, D.CSV_SPECTYPE_DATA)
args = {}
args[B.DATA_NODE_DATA] = tdata
args[B.DATA_NODE_STEPS] = tdata[B.DATA_NODE_STEPS][0]
args[utils.map_tool.ACT_ID] = {}
args[utils.map_tool.ID_LIST] = {}
ids = {}
res = utils.map_tool.getIds(job, args, "msgid={_steps._nr}")
print(res)
self.assertEqual(res[0], "1")
self.assertIn("msgid", args[utils.map_tool.ID_LIST])
cnttest += 1
res = utils.map_tool.getIds(job, args, "sender={_data.person._sender(_steps._nr)}")
print(res)
self.assertEqual(res[0], "firma")
args[B.DATA_NODE_STEPS] = tdata[B.DATA_NODE_STEPS][1]
res = utils.map_tool.getIds(job, args, "msgid={_steps._nr}")
self.assertEqual(res[1], "2")
self.assertIn("msgid", args[utils.map_tool.ID_LIST])
cnttest += 1
args[utils.map_tool.ACT_ID]["msgid"] = "1"
res = utils.map_tool.getIds(job, args, "posid={_data.product._nr,_pos(msgid)}")
self.assertEqual(res[0], ("1", "4"))
self.assertEqual(res[1], ("1", "5"))
cnttest += 1
compName = "testcrmdb"
args[B.DATA_NODE_COMP] = test.testtools.getComp(compName)
comp = args[B.DATA_NODE_COMP]
conf = utils.config_tool.getConfig(D.DDL_FILENAME, compName, "person")
comp.conf[B.DATA_NODE_DDL] = {}
comp.conf[B.DATA_NODE_DDL]["person"] = conf
res = utils.map_tool.getIds(job, args, "fields={_comp.ddl.person}")
print(res)
args[utils.map_tool.ACT_ID]["posid"] = ("1", "4")
res = utils.map_tool.getConditions(job, args, "{posid}")
print(res)
self.assertEqual(res[0], ("1", "4"))
#self.assertEqual(res[1], "4")
cnttest += 1
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_04getFieldVal(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
path = utils.config_tool.getConfigPath(P.KEY_TESTCASE, "TC0001", "", job)
tdata = utils.tdata_tool.getCsvSpec(job.m, path, D.CSV_SPECTYPE_DATA)
condIds = [["1"]]
args = {}
args[B.DATA_NODE_DATA] = tdata
res = utils.map_tool.getFieldVal(job, args, "person", "_sender", condIds)
print(res)
condIds = [["1"], ["4"]]
res = utils.map_tool.getFieldVal(job, args, "product", "descript", condIds)
print(res)
def test_05getValue(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
path = utils.config_tool.getConfigPath(P.KEY_TESTCASE, "TC0001", "", job)
tdata = utils.tdata_tool.getCsvSpec(job.m, path, D.CSV_SPECTYPE_DATA)
condIds = [["1"]]
args = {}
args[B.DATA_NODE_DATA] = tdata
args[B.DATA_NODE_STEPS] = tdata[B.DATA_NODE_STEPS][0]
args[utils.map_tool.ACT_ID] = {}
args[utils.map_tool.ID_LIST] = {}
#res = utils.map_tool.getValue(job, args, "msgid={_steps._nr}")
#-print(res)
#self.assertEqual(res, ['1'])
args[utils.map_tool.ACT_ID]["msgid"] = ['1']
#res = utils.map_tool.getValue(job, args, "sender={_data.person._sender(msgid)}")
#self.assertEqual(res, ["firma"])
#print(res)
print(args[utils.map_tool.ID_LIST])
args[utils.map_tool.ACT_ID]["sender"] = "firma"
res = utils.map_tool.getValue(job, args, "{_catalog.sender.depart(sender)}")
self.assertEqual(res, "main")
res = utils.map_tool.getValue(job, args, "{_steps.args.action}")
print(res)
res = utils.map_tool.getValue(job, args, "{_par.tctime}")
print(res)
args[utils.map_tool.ACT_ID]["msgid"] = "1"
res = utils.map_tool.getValue(job, args, "{_data.person.famname(msgid)}")
print(res)
self.assertEqual(res, "Brecht")
args[utils.map_tool.ACT_ID]["msgid"] = "1"
# select row if field is missing
res = utils.map_tool.getValue(job, args, "{_data.person(msgid)}")
print(res)
res = utils.map_tool.getValue(job, args, "hier ist ein Text")
print(res)
# it is one row [{f_1: v_2, ...}]
def test_11mapTdata(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
comp = test.testtools.getComp("testrest")
path = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_COMPS], "testrest", "mapping-rest.yml")
mapping = utils.file_tool.readFileDict(path, job.m)
path = utils.config_tool.getConfigPath(P.KEY_TESTCASE, "TC0001", "", job)
tdata = utils.tdata_tool.getCsvSpec(job.m, path, D.CSV_SPECTYPE_DATA)
res = utils.map_tool.mapTdata(job, mapping, tdata, tdata[B.DATA_NODE_STEPS][1], comp)
print(res)
for format in ["xml", "yml", "json"]:
path = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_TDATA], "temp", "result-rest."+format)
print(path)
utils.file_tool.writeFileDict(job.m, path, res)
doc = json.dumps(res, indent=0)
print(doc)
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_zzz(self):
print(MyTestCase.mymsg)
if __name__ == '__main__':
verbose = True
unittest.main()

59
test/test_32file.py

@ -0,0 +1,59 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import unittest
import inspect
import os
import basic.program
import utils.path_tool
import utils.path_const as P
import utils.config_tool
import utils.data_const as D
import basic.toolHandling
import test.constants
import basic.component
import basic.constants as B
import utils.file_abstract
import utils.file_tool
import test.testtools
import utils.tdata_tool
HOME_PATH = test.constants.HOME_PATH
conf = {}
# here you can select single testfunction for developping the tests
# "test_toolhandling", "test_parseSql" -> test of components
TEST_FUNCTIONS = ["test_11mapTdata"]
TEST_FUNCTIONS = ["test_05getValue"]
verbose = False
class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------"
def test_11mapTdata(self):
global mymsg
actfunction = str(inspect.currentframe().f_code.co_name)
cnttest = 0
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
comp = test.testtools.getComp("testrest")
path = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_COMPS], "testrest", "mapping-rest.yml")
mapping = utils.file_tool.readFileDict(path, job.m)
path = utils.config_tool.getConfigPath(P.KEY_TESTCASE, "TC0001", "", job)
tdata = utils.tdata_tool.getCsvSpec(job.m, path, D.CSV_SPECTYPE_DATA)
res = utils.file_abstract.mapTdata(job, mapping, tdata, tdata[B.DATA_NODE_STEPS][0], comp)
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)
def test_zzz(self):
print(MyTestCase.mymsg)
if __name__ == '__main__':
verbose = True
unittest.main()

30
utils/file_tool.py

@ -9,6 +9,7 @@ import os
import os.path import os.path
import re import re
import xmltodict
import yaml import yaml
import basic.message import basic.message
@ -222,10 +223,30 @@ def readFileDict(path, msg):
with open(path, 'r', encoding=enc) as file: with open(path, 'r', encoding=enc) as file:
doc = json.load(file) doc = json.load(file)
file.close() file.close()
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'r', encoding=enc) as file:
res = xmltodict.parse(file.read())
# doc = dict(res)
doc = castOrderedDict(res)
file.close()
elif D.DFILE_TYPE_CSV in path[-5:]: elif D.DFILE_TYPE_CSV in path[-5:]:
doc = utils.tdata_tool.getCsvSpec(msg, path, D.CSV_SPECTYPE_CONF) doc = utils.tdata_tool.getCsvSpec(msg, path, D.CSV_SPECTYPE_CONF)
return doc return doc
def castOrderedDict(res, job=None, key=""):
if isinstance(res, dict):
doc = dict(res)
for x in doc:
doc[x] = castOrderedDict(doc[x], job, x)
elif isinstance(res, list):
sublist = []
for i in range(0, len(res)):
sublist.append(castOrderedDict(res[i], job, ""))
doc = sublist
else:
doc = res
return doc
def writeFileText(msg, path, text, enc="utf-8"): def writeFileText(msg, path, text, enc="utf-8"):
job = basic.program.Job.getInstance() job = basic.program.Job.getInstance()
@ -245,7 +266,14 @@ def writeFileDict(msg, path, dict, enc="utf-8"):
file.close() file.close()
elif D.DFILE_TYPE_JSON in path[-5:]: elif D.DFILE_TYPE_JSON in path[-5:]:
with open(path, 'w', encoding=enc) as file: with open(path, 'w', encoding=enc) as file:
doc = json.dumps(file, indent=4) doc = json.dumps(dict, indent=4)
file.write(doc) file.write(doc)
file.close() file.close()
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'w', encoding=enc) as file:
text = xmltodict.unparse(dict, pretty=True)
if "<?xml version=" not in text:
text = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + text
file.write(text)
file.close()

375
utils/map_tool.py

@ -0,0 +1,375 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import os
import re
import basic.program
import basic.catalog
import utils.config_tool
import basic.constants as B
import basic.toolHandling
import utils.data_const as D
import utils.file_tool
import utils.path_tool
import basic.catalog
ACT_ID = "actid"
ID_LIST = "idlist"
MAP_ID = "_id"
MAP_FOR = "_foreach"
MAP_ROW = "_row"
MAP_FCTS = [ MAP_FOR, MAP_ID, MAP_ROW ]
MODUL_NAME = "map_tool"
def mapTdata(job, mapping, tdata, step, comp):
"""
initialize the mapping from testdata into the mapping-structure
:param job:
:param mapping:
:param tdata:
:param step:
:param comp:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
out = {}
path = ""
job.debug(verify, mapping)
args = {}
args[B.DATA_NODE_COMP] = comp
args[B.DATA_NODE_DATA] = tdata
args[B.DATA_NODE_STEPS] = step
args[ACT_ID] = {}
args[ID_LIST] = {}
out = mapElement(job, args, mapping, path, out)
job.debug(verify, ">>>>>>>>>>> \n"+str(out))
return out
def mapElement(job, args, elem, path, out):
"""
recursive mapping with building the dict of the testdata
:param job:
:param args:
:param elem:
:param path:
:param out:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
job.debug(verify, "mapElem "+path+" id: "+str(args[ACT_ID]))
if isinstance(elem, dict):
job.debug(verify, "##### dict ")
nodes = []
attrNodes = []
leafNodes = []
objNodes = []
for k in elem:
if MAP_ID in elem[k] or MAP_FOR in elem[k]:
objNodes.append(k)
elif k[0:1] == '@' or k[0:1] == '#':
attrNodes.append(k)
else:
leafNodes.append(k)
job.debug(verify, "nodes "+str(attrNodes)+" - "+str(leafNodes)+" - "+str(objNodes))
nodes = attrNodes + leafNodes + objNodes
for k in nodes:
# iterating this elem is declared inside of the elem
# like foreach but only one element
job.debug(verify, "# # "+k)
if MAP_ID in elem[k] or MAP_FOR in elem[k]:
job.debug(verify, "# + k in obj : val "+k)
if MAP_ID in elem[k]:
key = elem[k][MAP_ID][0:elem[k][MAP_ID].find("=")]
idlist = getIds(job, args, elem[k][MAP_ID])
if len(idlist) > 1:
uniqueKeys = {}
for x in idlist:
uniqueKeys[x] = x
if len(uniqueKeys) > 1:
raise Exception("bei keyword _id there is only one element allowed "+str(idlist))
else:
idlist = uniqueKeys.keys()
elif MAP_FOR in elem[k]:
key = elem[k][MAP_FOR][0:elem[k][MAP_FOR].find("=")]
idlist = getIds(job, args, elem[k][MAP_FOR])
sublist = []
a = path.split(",")
a.append(k)
npath = ",".join(a)
for id in idlist:
args[ACT_ID][key] = str(id)
if MAP_ROW in elem[k]:
row = getRow(job, args, elem[k][MAP_ROW])
sublist.append(mapElement(job, args, elem[k], npath, {}))
out[k] = sublist
elif k == MAP_ID or k == MAP_FOR or k == MAP_ROW:
job.debug(verify, "# + k in MAP : continue "+k)
continue
else:
job.debug(verify, "# + k in leaf : val "+k)
a = path.split(",")
a.append(k)
npath = ",".join(a)
job.debug(verify, "mapElem - dict "+k)
out[k] = mapElement(job, args, elem[k], npath, {})
elif isinstance(elem, list):
out = []
i = 0
for k in elem:
job.debug(verify, "mapElem - list "+str(k))
a = path.split(",")
a.append(str(i))
npath = ",".join(a)
out.append(mapElement(job, args, elem[i], path, {}))
i += 1
else:
job.debug(verify, "mapElem - leaf " + elem)
if elem[0:1] == "{" and elem[-1:] == "}":
elem = elem[1:-1]
out = toSimpleType(job, getValue(job, args, elem))
return out
def toSimpleType(job, value):
if isinstance(value, (list, tuple)) and len(value) == 1:
return value[0]
return value
def extractIds(job, idval):
ids = []
if isinstance(idval, list) or isinstance(idval, tuple):
a = idval
else:
a = idval.split(",")
for k in a:
if "-" in k:
b = k.split("-")
for i in range(int(b[0].strip()), int(b[1].strip())+1):
ids.append(str(i).strip())
elif isinstance(k, str):
ids.append(k.strip())
else:
ids.append(k)
return ids
def getRow(job, args, fieldkey):
a = fieldkey.split(".")
row = getValue(job, args, fieldkey)
if B.DATA_NODE_ROW not in args: args[B.DATA_NODE_ROW] = {}
a[1] = a[1][0:a[1].find("(")]
args[B.DATA_NODE_ROW][a[1]] = row[0]
return row
def getIds(job, args, fieldkey):
"""
sets the id resp list of ids into args[idlist]
the fieldkey has a formula like id={_source.table.field(condition)}
:param job:
:param args:
:param fieldky:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
job.debug(verify, "match = "+fieldkey)
out = []
idfield = fieldkey
if re.match(r"(.+)\=(.+)", fieldkey):
res = re.search(r"(.+)\=(.+)", fieldkey)
idfield = res.group(1)
fieldkey = res.group(2)
if fieldkey[0:1] == "{" and fieldkey[-1:] == "}":
fieldkey = fieldkey[1:-1]
if "temp" not in args: args["temp"] = {}
i = 0
while "(" in fieldkey and ")" in fieldkey:
innerCond = fieldkey[fieldkey.rfind("(")+1:fieldkey.find(")")]
if "." not in innerCond:
break
innerkey = "temp_"+str(i)
args["temp"][innerkey] = {}
args["temp"][innerkey]["idfield"] = idfield
args["temp"][innerkey]["fieldkey"] = fieldkey
innerlist = getIds(job, args, innerkey+"={"+innerCond+"}")
args[ACT_ID][innerkey] = ",".join(innerlist)
fieldkey = fieldkey.replace(innerCond, innerkey)
idfield = args["temp"][innerkey]["idfield"]
i += 1
if i > 3:
raise Exception("too much searches "+str(args["temp"]))
val = getValue(job, args, fieldkey)
idlist = extractIds(job, val)
args[ID_LIST][idfield] = idlist
job.debug(verify, "idlist " + str(args[ID_LIST]))
return idlist
def getConditions(job, args, fieldkey):
"""
gets a list of condition-value
:param job:
:param args:
:param fieldkey: in formula (c_1, c_2, ..)
:return: [v_1} ..]
"""
verify = job.getDebugLevel(MODUL_NAME)
while fieldkey[0:1] == "(" and fieldkey[-1:] == ")":
fieldkey = fieldkey[1:-1]
while fieldkey[0:1] == "{" and fieldkey[-1:] == "}":
fieldkey = fieldkey[1:-1]
out = []
a = fieldkey.split(",")
if len(a) > 1:
job.m.logError("map-condition should not have more parts - use tupel "+fieldkey)
for k in a:
if "." in k:
raise Exception("Formatfehler in " + fieldkey)
job.debug(verify, "actid " + str(args[ACT_ID]))
idelem = {}
idelem[k] = args[ACT_ID][k]
out.append(args[ACT_ID][k])
return out
def getValue(job, args, fieldkey):
"""
gets the value of the formula like {_source.table.field(condition)}
:param job:
:param args:
:param fieldkey:
:return:
"""
verify = job.getDebugLevel(MODUL_NAME)
job.debug(verify, "getValue "+fieldkey)
while fieldkey[0:1] == "{" and fieldkey[-1:] == "}":
fieldkey = fieldkey[1:-1]
val = ""
idfield = ""
source = ""
table = ""
field = ""
cond = ""
condIds = []
# special cases of id-management
# args[actid][xid] = actual id with name xid
# args[idlist][xid] = list of all ids with name xid
# a) declaration of the id : id={fielddefinition}
if re.match(r".+\=.+", fieldkey):
job.debug(verify, "getValue 222 " + fieldkey)
raise Exception("getIds sollte an passender Stelle direkt aufgerufen werden "+fieldkey)
return getIds(job, args, fieldkey)
# b) set of defined ids - neither fielddefinition nor a function
#elif "." not in fieldkey and "(" not in fieldkey or re.match(r"\(.+\)", fieldkey):
#print("getValue 226 " + fieldkey)
#raise Exception("getConditions sollte an passender Stelle direkt aufgerufen werden")
#return getConditions(job, args, fieldkey)
# fielddefinition with .-separated parts
b = fieldkey.split(".")
job.debug(verify, "match field "+fieldkey)
if re.match(r"(_.+)\..+\..+\(.+\)", fieldkey):
res = re.match(r"(_.+)\.(.+)\.(.+\(.+\))", fieldkey)
job.debug(verify, "match mit ()")
source = res.group(1)
table = res.group(2)
field = res.group(3)
#cond = res.group(4)
#condIds = getValue(job, args, cond)
elif len(b) == 1:
field = b[0]
elif len(b) == 2:
source = b[0]
field = b[1]
elif len(b) == 3:
source = b[0]
table = b[1]
field = b[2]
if re.match(r".+\(.+\)", field):
res = re.match(r"(.+)\((.+)\)", field)
field = res.group(1)
cond = res.group(2)
condIds = getConditions(job, args, cond)
job.debug(verify, source + " - " + table + " - " + field + " - " + cond + " : " + str(condIds))
if source == B.DATA_NODE_ROW:
if table not in args[B.DATA_NODE_ROW]:
raise Exception("row not initialiazed for table "+table+" "+str(args[B.DATA_NODE_ROW]))
row = args[B.DATA_NODE_ROW][table]
val = row[field]
elif source == B.DATA_NODE_DATA:
job.debug(verify, "data " + b[1])
if len(b) == 3:
job.debug(verify, table + ", " + field + ", " + cond + ", " + str(condIds))
val = toSimpleType(job, getFieldVal(job, args, table, field, condIds))
elif len(b) == 2:
job.debug(verify, table + ", " + field + ", " + cond + ", " + str(condIds))
val = getTdataRow(job, args[B.DATA_NODE_DATA], field, condIds)
elif source == B.DATA_NODE_STEPS:
job.debug(verify, "steps " + table+" - "+ field)
if hasattr(args[B.DATA_NODE_STEPS], field):
val = getattr(args[B.DATA_NODE_STEPS], field)
elif hasattr(args[B.DATA_NODE_STEPS], table):
row = getattr(args[B.DATA_NODE_STEPS], table)
if field in row:
val = row[field]
elif source[1:] == B.DATA_NODE_PAR:
job.debug(verify, "par " + b[1])
if getattr(job.par, b[1]):
val = getattr(job.par, b[1])
elif source == B.DATA_NODE_CATALOG:
job.debug(verify, "catalog " + table+", "+ field)
if len(b) != 3:
job.debug(verify, "catalog-Fehler")
return "Fehler-145"
row = basic.catalog.Catalog.getInstance().getValue(table, args[ACT_ID][cond], job)
if isinstance(row, dict) and field in row:
val = row[field]
elif source[1:] == B.DATA_NODE_COMP:
job.debug(verify, "comp "+table+", "+field)
comp = args[B.DATA_NODE_COMP]
if table == B.DATA_NODE_DDL:
fields = comp.conf[B.DATA_NODE_DDL][field][B.DATA_NODE_HEADER]
val = ",".join(fields)
else:
val = fieldkey
job.debug(verify, "return val "+str(val))
return val
def getFieldVal(job, args, table, field, condIds):
out = []
fields = field.split(",")
for row in getTdataRow(job, args[B.DATA_NODE_DATA], table, condIds):
if len(fields) == 1 and field in row:
out.append( str(row[field]).strip())
else:
tup = tuple()
for f in fields:
if f in row:
t = tuple( str(row[f]).strip() )
tup = tup + t
else:
raise Exception("field is missing in row "+f+", "+str(row))
out.append(tup)
return out
def getTdataRow(job, tdata, table, condIds):
verify = job.getDebugLevel(MODUL_NAME)
out = []
idFields = {}
job.debug(verify, "getTdata "+str(condIds))
for i in range(0, len(condIds)):
idFields[tdata[B.DATA_NODE_TABLES][table][B.DATA_NODE_HEADER][i]] = condIds[i]
job.debug(verify, "idFields "+str(idFields))
for row in tdata[B.DATA_NODE_TABLES][table][B.DATA_NODE_DATA]:
i = 0
for k in idFields:
for j in range(0, len(idFields[k])):
if row[k] == idFields[k][j]:
i += 1
if i == len(idFields):
out.append(row)
return out
Loading…
Cancel
Save