Browse Source

init_testcase.load

master
Ulrich Carmesin 2 years ago
parent
commit
5b998760a5
  1. 1
      basic/constants.py
  2. 3
      requirements.txt
  3. 21
      test/test_tdata.py
  4. 5
      utils/data_const.py
  5. 3
      utils/date_tool.py
  6. 4
      utils/db_abstract.py
  7. 7
      utils/dbcsv_tool.py
  8. 27
      utils/tdata_tool.py

1
basic/constants.py

@ -201,6 +201,7 @@ ATTR_TYPE = "type" # | x | x | | x | conn_tool, toolHandl
RULE_ACCEPTANCE = "acceptance" # | | | | x | tools_match RULE_ACCEPTANCE = "acceptance" # | | | | x | tools_match
ATTR_STEP_ARGS = "args" ATTR_STEP_ARGS = "args"
ATTR_EXEC_REF = "_exec"
ATTR_DATA_REF = "_nr" ATTR_DATA_REF = "_nr"
ATTR_DATA_COMP = "_comp" ATTR_DATA_COMP = "_comp"

3
requirements.txt

@ -1 +1,2 @@
pyyaml pyyaml
paramiko

21
test/test_tdata.py

@ -19,7 +19,7 @@ OS_SYSTEM = test.constants.OS_SYSTEM
# here you can select single testfunction for developping the tests # here you can select single testfunction for developping the tests
TEST_FUNCTIONS = ["test_tdata", "test_getCsvSpec_data", "test_getCsvSpec_tree", "test_getCsvSpec_key", TEST_FUNCTIONS = ["test_tdata", "test_getCsvSpec_data", "test_getCsvSpec_tree", "test_getCsvSpec_key",
"test_getCsvSpec_conf", "test_extractPattern", "test_parseCsv"] "test_getCsvSpec_conf", "test_extractPattern", "test_parseCsv"]
TEST_FUNCTIONS = ["test_parseCsv"] TEST_FUNCTIONS = ["test_getCsvSpec_data"]
class MyTestCase(unittest.TestCase): class MyTestCase(unittest.TestCase):
mymsg = "--------------------------------------------------------------" mymsg = "--------------------------------------------------------------"
@ -108,7 +108,7 @@ class MyTestCase(unittest.TestCase):
cnttest += 2 cnttest += 2
if D.CSV_BLOCK_STEP in tests: if D.CSV_BLOCK_STEP in tests:
specLines = [ specLines = [
"step:1;testa;1;table:_lofts,action:import;;;;;", "step:1;testa;1;1;table:_lofts,action:import;;;;;",
"#;;;;;;" "#;;;;;;"
] ]
tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_DATA) tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_DATA)
@ -140,6 +140,23 @@ class MyTestCase(unittest.TestCase):
self.assertIn(B.DATA_NODE_HEADER, table) self.assertIn(B.DATA_NODE_HEADER, table)
self.assertIn(B.DATA_NODE_DATA, table) self.assertIn(B.DATA_NODE_DATA, table)
cnttest += 2 cnttest += 2
if B.DATA_NODE_TABLES in tests:
specLines = [
"option:description;create 2 new contracts;;;;",
"# ;;;;;",
"# ;component;exec;_nr;action;args;;",
"step:1;testrest;2;1;function:xml-rest;action:new;;",
"step:2;testrest;3;1,2;function:json-rest;action:new;;",
"# ;;;;;",
"# testdate only here specified;expect:row 2 is inserted as precond;;;;",
"_date;01.07.2022;;;;",
"table:person;_nr;famname;name;birth;sex",
"testrest:person;1;Brecht;Bert;10.02.98;m",
"testrest:person,testcrmdb:person;2;Leon;Donna;28.09.42;f"
]
tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_DATA)
print(tdata)
MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest) MyTestCase.mymsg += "\n----- "+actfunction+" : "+str(cnttest)

5
utils/data_const.py

@ -44,6 +44,11 @@ CSV_NODETYPE_KEYS = "_keys"
CSV_BLOCK_OPTION = "option" CSV_BLOCK_OPTION = "option"
CSV_BLOCK_STEP = "step" CSV_BLOCK_STEP = "step"
STEP_COMP_I = 1
STEP_EXECNR_I = 2
STEP_REFNR_I = 3
STEP_ARGS_I = 4
STEP_LIST_I = 4
EXCP_MALFORMAT = "malformated line: " EXCP_MALFORMAT = "malformated line: "
ATTR_SRC_TYPE = "tdtyp" ATTR_SRC_TYPE = "tdtyp"

3
utils/date_tool.py

@ -10,6 +10,7 @@ import utils.data_const as D
F_DIR = "%Y-%m-%d_%H-%M-%S" F_DIR = "%Y-%m-%d_%H-%M-%S"
F_DB_DATE = "%Y-%m-%d"
F_DE = "%d.%m.%Y" F_DE = "%d.%m.%Y"
F_N8 = "%Y%m%d" F_N8 = "%Y%m%d"
@ -113,7 +114,7 @@ def parseDate(instring):
mon = int(res.group(2)) mon = int(res.group(2))
day = int(res.group(3)) day = int(res.group(3))
return (year, mon, day, hour, min, sec) return (year, mon, day, hour, min, sec)
if re.match(r"\d{1,2}[-./]\d{1,2}[-./]\d{1,2}", instring): if re.match(r"\d{1,2}[-./]\d{1,2}[-./]\d{4}", instring):
res = re.match(r"(\d{1,2})[-./](\d{1,2})[-./](\d{4})", instring) res = re.match(r"(\d{1,2})[-./](\d{1,2})[-./](\d{4})", instring)
year = int(res.group(3)) year = int(res.group(3))
mon = int(res.group(2)) mon = int(res.group(2))

4
utils/db_abstract.py

@ -217,6 +217,10 @@ def formatDbVal(msg, val, dtyp):
if not isinstance(val, str): if not isinstance(val, str):
msg.logError("field must be " + dtyp + ", " + str(val)) msg.logError("field must be " + dtyp + ", " + str(val))
return str(val) return str(val)
if dtyp == D.TYPE_DATE:
if not isinstance(val, str):
msg.logError("field must be " + dtyp + ", " + str(val))
return utils.date_tool.getFormatDatetupel(utils.date_tool.parseDate(val), utils.date_tool.F_DB_DATE)
if dtyp == D.TYPE_INT: if dtyp == D.TYPE_INT:
if not (isinstance(val, int) or re.match(r"^\d+$", val)): if not (isinstance(val, int) or re.match(r"^\d+$", val)):
msg.logError("field must be " + dtyp + ", " + str(val)) msg.logError("field must be " + dtyp + ", " + str(val))

7
utils/dbcsv_tool.py

@ -91,14 +91,11 @@ class DbFcts(utils.db_abstract.DbFcts):
print("head "+h) print("head "+h)
if h in B.LIST_DB_ATTR: if h in B.LIST_DB_ATTR:
continue continue
if B.DATA_NODE_DATA not in self.comp.conf[B.DATA_NODE_DDL][table]:
rowvalues = ""
break
print("h "+h) print("h "+h)
if (h in r): if (h in r):
rowvalues += ";"+str(self.getDbValue(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_DATA][h], r[h])) rowvalues += ";"+str(self.getDbValue(self.comp.conf[B.DATA_NODE_DDL][table][h], r[h]))
else: else:
rowvalues += ";"+str(self.getDbValue(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_DATA][h], "")) rowvalues += ";"+str(self.getDbValue(self.comp.conf[B.DATA_NODE_DDL][table][h], ""))
print("rv " + rowvalues) print("rv " + rowvalues)
cmd += rowvalues+"\n" cmd += rowvalues+"\n"
utils.file_tool.writeFileText(self.comp.m, path, cmd) utils.file_tool.writeFileText(self.comp.m, path, cmd)

27
utils/tdata_tool.py

@ -141,11 +141,24 @@ def parseCsvSpec(msg, lines, type):
if (not B.DATA_NODE_STEPS in data): if (not B.DATA_NODE_STEPS in data):
data[B.DATA_NODE_STEPS] = [] data[B.DATA_NODE_STEPS] = []
step = {} step = {}
step[B.DATA_NODE_COMP] = fields[1] step[B.DATA_NODE_COMP] = fields[D.STEP_COMP_I]
step[B.ATTR_DATA_REF] = fields[2] step[B.ATTR_EXEC_REF] = fields[D.STEP_EXECNR_I]
step[B.ATTR_DATA_REF] = fields[D.STEP_REFNR_I]
step[B.ATTR_STEP_ARGS] = {} step[B.ATTR_STEP_ARGS] = {}
a = fields[3].split(",") if D.STEP_ARGS_I == D.STEP_LIST_I:
args = ""
for i in range(D.STEP_ARGS_I, len(fields)):
if len(fields[i]) < 1:
continue
if fields[i][0:1] == "#":
continue
args += "," + fields[i]
args = args[1:]
else:
args = fields[D.STEP_ARGS_I]
a = args.split(",")
for arg in a: for arg in a:
print("arg "+arg)
b = arg.split(":") b = arg.split(":")
if len(b) < 2: if len(b) < 2:
raise Exception(D.EXCP_MALFORMAT + "" + l) raise Exception(D.EXCP_MALFORMAT + "" + l)
@ -163,7 +176,9 @@ def parseCsvSpec(msg, lines, type):
# create deep structure a_0 ... a_n # create deep structure a_0 ... a_n
print("tdata 136 CSV_HEADER_START "+str(len(a))) print("tdata 136 CSV_HEADER_START "+str(len(a)))
h = a h = a
data[B.DATA_NODE_TABLES] = {} header = []
if B.DATA_NODE_TABLES not in data:
data[B.DATA_NODE_TABLES] = {}
h[0] = B.DATA_NODE_TABLES h[0] = B.DATA_NODE_TABLES
comps = {} comps = {}
tableDict = getTabContent(msg, data, h) tableDict = getTabContent(msg, data, h)
@ -195,20 +210,22 @@ def parseCsvSpec(msg, lines, type):
# fill data # fill data
tableDict = getTabContent(msg, data, h) tableDict = getTabContent(msg, data, h)
row = {} row = {}
print(fields)
i = 1 i = 1
# case-differentiation DATA or TREE # case-differentiation DATA or TREE
for f in header: for f in header:
print(str(i)+" "+str(len(fields))+" "+str(len(header)))
row[f] = fields[i] row[f] = fields[i]
if type == D.CSV_SPECTYPE_TREE: if type == D.CSV_SPECTYPE_TREE:
tableDict[B.DATA_NODE_DATA][f] = fields[i] tableDict[B.DATA_NODE_DATA][f] = fields[i]
i += 1 i += 1
if type == D.CSV_SPECTYPE_DATA: if type == D.CSV_SPECTYPE_DATA:
print("parseSpec "+ str(fields[0])) print("parseSpec "+ str(fields[0]))
row[B.ATTR_DATA_COMP] = {}
for c in fields[0].split(","): for c in fields[0].split(","):
a = c.split(":") a = c.split(":")
print("parseSpec " + str(a)) print("parseSpec " + str(a))
comps[a[0]] = a[1] comps[a[0]] = a[1]
row[B.ATTR_DATA_COMP] = {}
row[B.ATTR_DATA_COMP][a[0]] = a[1] row[B.ATTR_DATA_COMP][a[0]] = a[1]
#row[B.ATTR_DATA_COMP] = fields[0].split(",") #row[B.ATTR_DATA_COMP] = fields[0].split(",")
tableDict[B.ATTR_DATA_COMP] = comps tableDict[B.ATTR_DATA_COMP] = comps

Loading…
Cancel
Save