import unittest import utils.tdata_tool as t import basic.constants as B import utils.data_const as D import basic.program import os class MyTestCase(unittest.TestCase): def runTest(self): self.test_tdata() def xtest_tdata(self): job = basic.program.Job("unit") args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "job_tool", "tdtyp": "csv", "tdsrc": "implement", "tdname": "firstunit", "modus": "unit"} job.par.setParameterArgs(args) filename = str(job.conf.confs["paths"]["testdata"]) + "/" + getattr(job.par, "tdsrc") + "/" + getattr(job.par, "tdname") + ".csv" tdata = t.readCsv(job.m, filename, None) self.assertEqual(len(tdata["testa1"]), 3) setattr(job.par, "tdtyp", "dir") setattr(job.par, "tdsrc", "TST001") tdata = t.getTestdata() self.assertEqual(("steps" in tdata), True) def test_getCsvSpec_data(self): job = basic.program.Job("unit") tdata = {} args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tdtyp": "csv", "tdsrc": "TC0001", "tdname": "testspec", "modus": "unit"} job.par.setParameterArgs(args) filename = os.path.join(job.conf.confs["paths"]["testdata"], getattr(job.par, "tdsrc"), getattr(job.par, "tdname") + ".csv") """ a) data : like a table with data-array of key-value-pairs a_0 is keyword [option, step, CSV_HEADER_START ] a_0 : { a_1 : { f_1 : v_1, .... } # option, step a_0 : { .. a_n : { _header : [ .. ], _data : [ rows... ] # table, node """ tests = ["malformated", "comments", D.CSV_BLOCK_OPTION, D.CSV_BLOCK_STEP, B.DATA_NODE_TABLES] if "comments" in tests: specLines = [ ";;;;;;", "#;;;;;;" ] tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_DATA) self.assertEqual(0, len(tdata)) if "malformated" in tests: malformat = "option;arg;;;;;" specLines = [ "option:par;arg;;;;;", malformat, "#option:nopar;arg;;;;;", "#;;;;;;" ] self.assertRaises(Exception, t.parseCsvSpec, (job.m, specLines, D.CSV_SPECTYPE_DATA)) malformat = "step;component;1;arg:val;;;;;" specLines = [ "step:1;component;1;arg:val;;;", malformat ] # TODO sortierung nicht ausgwertet # self.assertRaises(D.EXCP_MALFORMAT+malformat, t.parseCsvSpec, (job.m, specLines, D.CSV_SPECTYPE_DATA)) malformat = "step:2;component;1;arg;;;;;" specLines = [ "step:1;component;1;arg:val;;;", malformat ] self.assertRaises(Exception, t.parseCsvSpec, (job.m, specLines, D.CSV_SPECTYPE_DATA)) specLines = [ "option:par;arg;;;;;", "#option:nopar;arg;;;;;", "#;;;;;;" ] if D.CSV_BLOCK_OPTION in tests: specLines = [ "option:description;something;;;;;", "#;;;;;;" ] tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_DATA) self.assertEqual(1, len(tdata)) print(tdata) self.assertIn(D.CSV_BLOCK_OPTION, tdata) if D.CSV_BLOCK_STEP in tests: specLines = [ "step:1;testa;1;table:_lofts,action:import;;;;;", "#;;;;;;" ] tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_DATA) print(tdata) self.assertEqual(1, len(tdata)) self.assertIn(B.DATA_NODE_STEPS, tdata) self.assertIsInstance(tdata[B.DATA_NODE_STEPS], list) for step in tdata[B.DATA_NODE_STEPS]: print(step) self.assertIn(B.DATA_NODE_COMP, step) self.assertIn(B.ATTR_DATA_REF, step) self.assertIn(B.ATTR_STEP_ARGS, step) if B.DATA_NODE_TABLES in tests: specLines = [ "table:testa:lofts;_nr;street;city;zip;state;beds;baths;sqft;type;price;latitude;longitude", "testa:lofts;1;stra;town;12345;usa;4;1;50;house;111;45;8", "#;;;;;;" ] tdata = t.parseCsvSpec(job.m, specLines, B.DATA_NODE_TABLES) print(tdata) self.assertEqual(1, len(tdata)) self.assertIn(B.DATA_NODE_TABLES, tdata) self.assertIsInstance(tdata[B.DATA_NODE_TABLES], dict) for k in tdata[B.DATA_NODE_TABLES]["testa"]: table = tdata[B.DATA_NODE_TABLES]["testa"][k] self.assertIn(B.DATA_NODE_HEADER, table) self.assertIn(B.DATA_NODE_DATA, table) def xtest_getCsvSpec_tree(self): job = basic.program.Job("unit") tdata = {} args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tdtyp": "csv", "tdsrc": "TC0001", "tdname": "testspec", "modus": "unit"} job.par.setParameterArgs(args) """" b) tree : as a tree - the rows must be unique identified by the first column a_0 is keyword in CSV_HEADER_START a_0 : { .. a_n : { _header : [ fields.. ], _data : { field : value } """ def xtest_getCsvSpec_key(self): job = basic.program.Job("unit") tdata = {} args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tdtyp": "csv", "tdsrc": "TC0001", "tdname": "testspec", "modus": "unit"} job.par.setParameterArgs(args) """" c) keys : as a tree - the rows must be unique identified by the first column a_0 is keyword in CSV_HEADER_START a_1 ... a_n is key characterized by header-field like _fk* or _pk* a_0 : { .. a_n : { _keys : [ _fpk*.. ] , _header : [ fields.. ], _data : { pk_0 : { ... pk_n : { field : value } """ def xtest_getCsvSpec_conf(self): job = basic.program.Job("unit") tdata = {} args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tdtyp": "csv", "tdsrc": "TC0001", "tdname": "testspec", "modus": "unit"} job.par.setParameterArgs(args) """" d) conf: _header : [ field_0, ... ] { field_0 : { attr_0 : val_0, .. }, field_1 : { ... }, ... } """ specLines = [ "table:lofts;_field;field;type;acceptance;key", "lofts;street;a;str;;T:1", ";city;b;str;;F:1", "#;;;;;;" ] tdata = t.parseCsvSpec(job.m, specLines, D.CSV_SPECTYPE_CONF) print(tdata) self.assertEqual(1, len(tdata)) self.assertNotIn(B.DATA_NODE_TABLES, tdata) self.assertIn("lofts", tdata) table = tdata["lofts"] self.assertIn(B.DATA_NODE_HEADER, table) self.assertIn(B.DATA_NODE_DATA, table) if __name__ == '__main__': unittest.main()