diff --git a/test/test_db.py b/test/test_db.py index bf9a587..3684eb6 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -6,21 +6,50 @@ import basic.toolHandling import test.constants import components.component import basic.constants as B +import utils.db_abstract +import utils.data_const as D HOME_PATH = test.constants.HOME_PATH conf = {} class MyTestCase(unittest.TestCase): + def test_parseSql(self): + job = basic.program.Job("unit") + args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "config_tool", + "modus": "unit"} + job.par.setParameterArgs(args) + # "SELECT * FROM lofts" + obj = utils.db_abstract.parseConditions("family like !%utz%! and state = !+reg+!") + self.assertEqual(obj[0][3], "and") + self.assertEqual(obj[0][1], "family") + self.assertEqual(obj[1][3], "end") + self.assertEqual(obj[1][1], "state") + obj = utils.db_abstract.parseConditions("family like !%utz%! or state = !+reg+!") + self.assertEqual(obj[0][3], "or") + self.assertEqual(obj[0][1], "family") + self.assertEqual(obj[1][3], "end") + self.assertEqual(obj[1][1], "state") + ddl = utils.config_tool.getConfig("DATASTRUCTURE", "testb1", "lofts") + dbwhere = utils.db_abstract.parseSQLwhere("family like !%utz%! and state = !+reg+!", ddl["testb1"]["lofts"]) + self.assertIn("state", dbwhere) + self.assertNotIn("family", dbwhere) + dbwhere = utils.db_abstract.parseSQLwhere("street like !%utz%! and state = !+reg+!", ddl["testb1"]["lofts"]) + self.assertIn("state", dbwhere) + self.assertIn("street", dbwhere) + pass - def test_toolhandling(self): + def xtest_toolhandling(self): job = basic.program.Job("unit") args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "config_tool", "modus": "unit"} job.par.setParameterArgs(args) #t = basic.toolHandling.ToolManager() comp = components.component.Component() - comp.name = "testb" + comp.name = "testb1" + table = "lofts" comp.conf = {} + comp.conf[B.DATA_NODE_DDL] = {} + comp.conf[B.DATA_NODE_DDL][table] = utils.config_tool.getConfig("DATASTRUCTURE", comp.name, table) comp.conf[B.SUBJECT_ARTS] = {} comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB] = {} comp.conf[B.SUBJECT_CONN] = {} @@ -39,15 +68,24 @@ class MyTestCase(unittest.TestCase): # sqls = comp.composeSqlClauses("SELECT * FROM lofts") print(sqls) - setattr(job.par, B.PAR_DB_WHERE, "name like !%utz%! and regname = !+reg+!") + self.assertEqual(sqls["ALL"], "SELECT * FROM .lofts ORDER BY id") + setattr(job.par, B.PAR_DB_WHERE, "street like !%utz%! and state = !+reg+!") comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][B.ATTR_DB_PARTITION] = "+reg+" sqls = comp.composeSqlClauses("SELECT * FROM lofts") print(sqls) - setattr(job.par, B.PAR_DB_PARTITION, "BA,FV") + self.assertIn("street", sqls["ALL"]) #assertEqual(("street" in sqls), True) + self.assertIn("state", sqls["ALL"]) + setattr(job.par, B.PAR_DB_WHERE, "family like !%utz%! and state = !+reg+!") + comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][B.ATTR_DB_PARTITION] = "+reg+" sqls = comp.composeSqlClauses("SELECT * FROM lofts") print(sqls) - tool.deleteRows("deltable") - tool.selectRows("deltable") + self.assertIn("family", sqls["ALL"]) + self.assertIn("state", sqls["ALL"]) + setattr(job.par, B.PAR_DB_PARTITION, "BA,FV") + #sqls = comp.composeSqlClauses("SELECT * FROM lofts") + print(sqls) + #tool.deleteRows("deltable") + #tool.selectRows("deltable") if __name__ == '__main__': diff --git a/utils/db_abstract.py b/utils/db_abstract.py index 26aa494..8ba32c7 100644 --- a/utils/db_abstract.py +++ b/utils/db_abstract.py @@ -85,6 +85,69 @@ def getDbAttributes(comp, table): print("f " + attr + " " + out[attr]) return out +def getStringIndex(text, intern): + if intern in text: + return text.index(intern) + return 999 + +def parseCondition(condition): + out = [] + for operator in [">=", "<=", ">", "<", "=", " like ", " in "]: + if operator in condition: + i = condition.lower().index(operator) + field = condition[0:i] + attr = condition[i+len(operator):] + out = [operator.strip(), field.strip(), attr.strip()] + return out + return out + +def parseConditions(condition): + """ the functions parses the condition into their syntactical parts: + parts = [ [ conjunctor , [ operator , field, attribute ], ... ] + the condition should be a simple normalform either with "and" or with "or" + """ + rest = condition + dbwhere = [] + if (" and " in rest and " or " in rest): + raise Exception("the sql-condition must contain only \"ands\" or \"ors\" "+rest) + + while (" and " in rest or " or " in rest): + iand = getStringIndex(rest, " and ") + ior = getStringIndex(rest, " or ") + print("conjunctors " + str(iand) + " " + str(ior)) + if iand == 999 and ior == 999: + print("fertig") + elif iand < ior: + conjunctor = "and" + condition = rest[0:iand] + rest = rest[iand + 5:] + cond = parseCondition(condition) + cond.append(conjunctor) + dbwhere.append(cond) + elif iand > ior: + conjunctor = "or" + condition = rest[0:ior] + rest = rest[ior + 4:] + cond = parseCondition(condition) + cond.append(conjunctor) + dbwhere.append(cond) + cond = parseCondition(rest) + cond.append("end") + dbwhere.append(cond) + return dbwhere + +def parseSQLwhere(condition, ddl=None): + parts = parseConditions(condition) + conjunctor = "" + dbwhere = "" + for cond in parts: + if cond[1] in ddl[B.DATA_NODE_HEADER]: + dbwhere += " "+conjunctor+" "+cond[1]+" "+cond[0]+" "+cond[2] + conjunctor = cond[3] + return "WHERE "+dbwhere.strip() + + +# --------------------------------------------------------------------------------------------------------------- class DbFcts(): """