Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

59 lines
2.2 KiB

import utils.db_abstract
import basic.toolHandling
import utils.data_const as D
import basic.constants as B
class Entity:
def __int__(self, job):
self.job = job
def getDbAttr(self, job):
out = {}
for attr in [B.ATTR_DB_HOST, B.ATTR_DB_USER, B.ATTR_DB_DATABASE, B.ATTR_DB_PASSWD]:
out[attr] = job.conf.confs[B.TOPIC_NODE_DB][attr]
return out
def getDdl(self, job, ddl):
out = {}
for t in ddl:
out[t] = {}
for f in ddl[t]:
out[t][f] = {}
for a in ddl[t][f]:
print("entity-23 "+f+", "+a+" "+str(ddl))
out[t][f][a] = ddl[t][f][a]
out[t][f][D.DDL_FNAME] = f
out[t][B.DATA_NODE_HEADER] = list(ddl[t].keys())
return out
def createSchema(self):
if B.TOPIC_NODE_DB in self.job.conf.confs:
dbi = basic.toolHandling.getDbTool(self.job, None, self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
sql = self.getSchema()
print(sql)
for s in sql.split(";\n"):
if len(s) < 3: continue
dbi.execStatement(self.job.conf.confs[B.TOPIC_NODE_DB], s+";")
def getSchema(self):
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def getHistoryFields(self):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaAttribut("inscommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("insauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("instime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("updcommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updtime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("actual", D.TYPE_INT)
return sql
def getHistoryIndex(self, table):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaIndex(table, "actual") + "\n"
return sql