import basic.program import basic.constants as B import basic.component import tools.data_const as D import test.constants as T import tools.config_tool DEFAULT_GRAN = "tc" DEFAULT_APP = "TESTAPP" DEFAULT_ENV = "ENV01" DEFAULT_DATA_DIR = T.DATA_PATH + "/tdata" DEFAULT_ARCHIV_DIR = T.DATA_PATH + "/lauf" DEFAULT_TIME = "2022-03-19_12-09-09" DEFAULT_MODE = "unit" DEFAULT_COMP = "testcrmdb" gran = "" app = "" env = "" tstamp = "" mode = "" path = "" # xample-DDL conf = { B.SUBJECT_ARTIFACT: { B.TOPIC_NODE_DB: { B.ATTR_TYPE: "csv", "person": { "tabname": "tbl_person" } }, B.TOPIC_NODE_CLI: { B.ATTR_TYPE: "cmd" } }, B.DATA_NODE_DDL: { "person": { "id": {D.DDL_FIELD: "id", D.DDL_TYPE: "int", D.DDL_ACCEPTANCE: "ignore", D.DDL_KEY: "T:3"}, "name": {D.DDL_FIELD: "name", D.DDL_TYPE: "string", D.DDL_ACCEPTANCE: "must", D.DDL_KEY: "F:1"}, "birth": {D.DDL_FIELD: "id", D.DDL_TYPE: "date", D.DDL_ACCEPTANCE: "must", D.DDL_KEY: ""}, "year": {D.DDL_FIELD: "year", D.DDL_TYPE: "int", D.DDL_ACCEPTANCE: "must", D.DDL_KEY: "F:2"}, "position": {D.DDL_FIELD: "id", D.DDL_TYPE: "string", D.DDL_ACCEPTANCE: "must", D.DDL_KEY: ""}, "hobby": {D.DDL_FIELD: "id", D.DDL_TYPE: "string", D.DDL_ACCEPTANCE: "must", D.DDL_KEY: ""}, "_header": ["id", "name", "birth", "year", "position", "hobby"] } } } def getWorkspaceJob(program): args = {"application": "service", "environment": "Testserver", "project": "TESTPROJ", "step": 2} # "usecase": "TST001", "tstime": "2022-03-17_17-28"} job = basic.program.Job(program, "", args) return job def getJob(pgran="", papp="", penv="", ptstamp="", pmode=""): #job = basic.program.Job.popInstance() #if not job is None: # job.stopJob(1) if len(pgran) < 1: gran = DEFAULT_GRAN else: gran = pgran if len(papp) < 1: app = DEFAULT_APP else: app = papp if len(penv) < 1: env = DEFAULT_ENV else: env = penv if len(ptstamp) < 1: tstamp = DEFAULT_TIME else: tstamp = ptstamp if len(pmode) < 1: mode = DEFAULT_MODE else: mode = pmode if gran == "tc": path = DEFAULT_ARCHIV_DIR + "/TC0001/" + tstamp elif gran == "ts": path = DEFAULT_ARCHIV_DIR + "/testlauf/TST001_" + tstamp #job.conf[B.SUBJECT_PATH]["components"] = T.COMP_PATH args = {"application": app, "environment": env, "modus": mode, gran+"time": tstamp, gran+"dir": path, "project": "TESTPROJ", "step": 2} # "usecase": "TST001", "tstime": "2022-03-17_17-28"} job = basic.program.Job("unit_tester", "", args) return job def getTestJob(): args = {} args[B.PAR_PROGRAM] = "unit_tester" args[B.PAR_APP] = "service" args[B.PAR_PROJ] = "TESTPROJ" args[B.PAR_ENV] = "Testserver" job = basic.program.Job(args[B.PAR_PROGRAM], "", args) return job def getComp(job, componentName=""): comp = basic.component.Component() if len(componentName) < 1: componentName = DEFAULT_COMP comp.conf = {} comp.name = componentName confs = tools.config_tool.getConfig(job, "comp", componentName) conns = tools.conn_tool.getConnections(job, componentName) comp.conf = confs["conf"] comp.conf[B.TOPIC_CONN] = conns[0] return comp