Browse Source

create test-db-tables

refactor
Ulrich 2 years ago
parent
commit
63dfe0a641
  1. 49
      basic/connection.py
  2. 40
      basic/entity.py
  3. 49
      basic/testcase.py
  4. 52
      basic/testexecution.py
  5. 47
      basic/testplan.py
  6. 49
      basic/testsuite.py
  7. 113
      make_workspace.py
  8. 27
      utils/db_abstract.py
  9. 36
      utils/dbmysql_tool.py
  10. 53
      utils/dbrel_tool.py

49
basic/connection.py

@ -0,0 +1,49 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import utils.db_abstract
import basic.toolHandling
import utils.data_const as D
import basic.constants as B
import basic.entity
class Connection(basic.entity.Entity):
name = ""
description = ""
application = ""
usecase = []
story = []
tables = {}
steps = []
def __init__(self, job):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
def getSchema(self):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
print(str(dbi))
sql = dbi.getCreateTable("connection")
sql += dbi.getSchemaAttribut("cnid", "id")+","
sql += dbi.getSchemaAttribut("environment", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("component", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("type", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("ip", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("port", D.TYPE_INT)+","
sql += dbi.getSchemaAttribut("hostname", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("dompath", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("attributes", D.TYPE_TEXT)+","
sql += self.getHistoryFields()
sql += ");\n"
sql += dbi.getSchemaIndex("connection", "environment") + "\n"
sql += self.getHistoryIndex("connection")
return sql
#dbi.execStatement(sql)

40
basic/entity.py

@ -0,0 +1,40 @@
import utils.db_abstract
import basic.toolHandling
import utils.data_const as D
import basic.constants as B
class Entity:
def __int__(self, job):
self.job = job
def createSchema(self):
if B.TOPIC_NODE_DB in self.job.conf.confs:
dbi = basic.toolHandling.getDbTool(self.job, None, self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE])
else:
return "No DB in job-config"
sql = self.getSchema()
print(sql)
for s in sql.split(";\n"):
if len(s) < 3: continue
dbi.execStatement(self.job.conf.confs[B.TOPIC_NODE_DB], s+";")
def getSchema(self):
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def getHistoryFields(self):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaAttribut("inscommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("insauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("instime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("updcommit", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updauthor", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("updtime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("actual", D.TYPE_INT)
return sql
def getHistoryIndex(self, table):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getSchemaIndex(table, "actual") + "\n"
return sql

49
basic/testcase.py

@ -0,0 +1,49 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import utils.db_abstract
import basic.toolHandling
import utils.data_const as D
import basic.constants as B
import basic.entity
class Testcase(basic.entity.Entity):
name = ""
description = ""
application = ""
usecase = []
story = []
tables = {}
steps = []
def __init__(self, job):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
def getSchema(self):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getCreateTable("testcase")
sql += dbi.getSchemaAttribut("tcid", "id")+","
sql += dbi.getSchemaAttribut("name", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("description", D.TYPE_TEXT)+","
sql += dbi.getSchemaAttribut("project", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("usecase", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("attributes", D.TYPE_TEXT)+","
sql += self.getHistoryFields()
sql += ");\n"
sql += self.getHistoryIndex("testcase")
for attr in ["application", "story"]:
sql += dbi.getSchemaSubtable("tc", [{"attr":attr, "atype": D.TYPE_STR}])+"\n"
sql += dbi.getSchemaIndex(dbi.getIndexName("tc", attr),
dbi.getSubTableId(dbi.getSubTableName("tc", attr), attr))+"\n"
for attr in ["dtable", "step"]:
sql += dbi.getSchemaSubtable("tc", [{"attr":attr, "atype": D.TYPE_STR}, {"attr":"attributes", "atype": D.TYPE_TEXT}])+"\n"
sql += dbi.getSchemaIndex(dbi.getSubTableName("tc", attr),
dbi.getSubTableId(dbi.getSubTableName("tc", attr), attr))+"\n"
return sql

52
basic/testexecution.py

@ -0,0 +1,52 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.toolHandling
import utils.data_const as D
import basic.constants as B
import basic.entity
class Testexecution(basic.entity.Entity):
name = ""
description = "" # from testplan, testsuite, testcase
release = ""
path = ""
level = "" # testplan, testsuite, testcase
entities = {}
def __init__(self, job):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
def getSchema(self):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getCreateTable("testexecution")
sql += dbi.getSchemaAttribut("teid", "id")+","
sql += dbi.getSchemaAttribut("name", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("description", D.TYPE_TEXT)+","
sql += dbi.getSchemaAttribut("prelease", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("type", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("entity", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("path", D.TYPE_STRING)+","
sql += dbi.getSchemaAttribut("starttime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("finishtime", D.TYPE_TIME)+","
sql += dbi.getSchemaAttribut("attributes", D.TYPE_TEXT)+","
sql += self.getHistoryFields()
sql += ");\n"
sql += dbi.getSchemaIndex("testexecution", "release") + "\n"
sql += self.getHistoryIndex("testplan")
for attr in ["entity"]:
sql += dbi.getSchemaSubtable("te", [{"attr":attr, "atype": D.TYPE_STR},
{"attr":"type", "atype": D.TYPE_STR},
{"attr":"path", "atype": D.TYPE_STRING},
{"attr":"attributes", "atype": D.TYPE_TEXT}])+"\n"
sql += dbi.getSchemaIndex(dbi.getSubTableName("te", attr),
dbi.getSubTableId(dbi.getSubTableName("te", attr), attr))+"\n"
return sql

47
basic/testplan.py

@ -0,0 +1,47 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.toolHandling
import utils.data_const as D
import basic.constants as B
import basic.entity
class Testplan(basic.entity.Entity):
name = ""
description = ""
release = ""
testsuites = {}
steps = []
def __init__(self, job):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
def getSchema(self):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getCreateTable("testplan")
sql += dbi.getSchemaAttribut("tpid", "id")+","
sql += dbi.getSchemaAttribut("name", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("description", D.TYPE_TEXT)+","
sql += dbi.getSchemaAttribut("project", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("prelease", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("attributes", D.TYPE_TEXT)+","
sql += self.getHistoryFields()
sql += ");\n"
sql += self.getHistoryIndex("testplan")
for attr in ["testsuite"]:
sql += dbi.getSchemaSubtable("tp", [{"attr":attr, "atype": D.TYPE_STR}])+"\n"
sql += dbi.getSchemaIndex(dbi.getIndexName("tp", attr),
dbi.getSubTableId(dbi.getSubTableName("tp", attr), attr))+"\n"
for attr in ["step"]:
sql += dbi.getSchemaSubtable("tp", [{"attr":attr, "atype": D.TYPE_STR}, {"attr":"attributes", "atype": D.TYPE_TEXT}])+"\n"
sql += dbi.getSchemaIndex(dbi.getSubTableName("tp", attr),
dbi.getSubTableId(dbi.getSubTableName("tp", attr), attr))+"\n"
return sql

49
basic/testsuite.py

@ -0,0 +1,49 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------------------------------------
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
import basic.toolHandling
import utils.data_const as D
import basic.constants as B
import basic.entity
class Testsuite(basic.entity.Entity):
name = ""
description = ""
application = ""
usecase = []
testcases = {}
tables = {}
steps = []
def __init__(self, job):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
def getSchema(self):
dbtype = self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getCreateTable("testsuite")
sql += dbi.getSchemaAttribut("tsid", "id")+","
sql += dbi.getSchemaAttribut("name", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("description", D.TYPE_TEXT)+","
sql += dbi.getSchemaAttribut("project", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("usecase", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("attributes", D.TYPE_TEXT)+","
sql += self.getHistoryFields()
sql += ");\n"
sql += self.getHistoryIndex("testsuite")
for attr in ["application", "testcase"]:
sql += dbi.getSchemaSubtable("ts", [{"attr":attr, "atype": D.TYPE_STR}])+"\n"
sql += dbi.getSchemaIndex(dbi.getIndexName("ts", attr),
dbi.getSubTableId(dbi.getSubTableName("ts", attr), attr))+"\n"
for attr in ["dtable", "step"]:
sql += dbi.getSchemaSubtable("ts", [{"attr":attr, "atype": D.TYPE_STR}, {"attr":"attributes", "atype": D.TYPE_TEXT}])+"\n"
sql += dbi.getSchemaIndex(dbi.getSubTableName("ts", attr),
dbi.getSubTableId(dbi.getSubTableName("ts", attr), attr))+"\n"
return sql

113
make_workspace.py

@ -0,0 +1,113 @@
# program to iinitialize the workspace
# pre: 1. program cloned
# 2. components are cloned, optional with path_const in config-folder
# post: 1. all folder are made with special name like components/const/path_const.py - otherwise with defaults
# 2. basic-configuration is created if it does not exist
# 3. necessary python-packages will be installed
# 4. git repos will be always updated
# -------------------------------------------------------------------------------------------------------------
"""
"""
import os
import basic.program
import utils.path_tool
import utils.file_tool
import basic.constants as B
# look if the path-constanst are specific defined.
home = os.getcwd()
if os.path.exists(os.path.join(home, "components", "config", "path_const.py")):
import components.config.path_const as P
else:
import utils.path_const as P
# import always the keyword.
import utils.path_const as Q
print("# ----------------------------------------------------------------------------------- ")
dirs = {}
dirs[Q.ATTR_PATH_HOME] = home
dirs[Q.ATTR_PATH_PROGRAM] = home
dirs[Q.ATTR_PATH_COMPONENTS] = os.path.join(home, "components")
pval = [P.ATTR_PATH_ARCHIV, P.ATTR_PATH_ENV, P.ATTR_PATH_DEBUG, P.ATTR_PATH_TEMP]
qkey = [Q.ATTR_PATH_ARCHIV, Q.ATTR_PATH_ENV, Q.ATTR_PATH_DEBUG, Q.ATTR_PATH_TEMP]
dirkey = [Q.ATTR_PATH_PROGRAM, Q.ATTR_PATH_COMPONENTS, Q.ATTR_PATH_HOME, Q.VAL_BASE_DATA] + qkey
home = utils.path_tool.getHome()
print("mkdirs in home " + home)
dirname = os.path.join(home, P.VAL_BASE_DATA)
dirs[Q.VAL_BASE_DATA] = dirname
if not os.path.exists(dirname):
os.makedirs(dirname, exist_ok=True)
print(" - "+dirname)
else:
print(" - " + dirname + " exists ")
dirname = os.path.join(home, P.VAL_CONFIG)
if not os.path.exists(dirname):
os.makedirs(dirname, exist_ok=True)
print(" - "+dirname)
else:
print(" - " + dirname + " exists ")
for i in range(0, len(pval)):
dirname = os.path.join(home, P.VAL_BASE_DATA, pval[i])
dirs[qkey[i]] = dirname
if not os.path.exists(dirname):
os.makedirs(dirname, exist_ok=True)
print(" - "+dirname)
else:
print(" - " + dirname + " exists ")
print("\n# ----------------------------------------------------------------------------------- ")
for format in ["yml", "json", "xml"]:
filename = os.path.join(home, Q.VAL_CONFIG, B.BASIS_FILE+"."+format)
if os.path.isfile(filename):
break
filename = ""
if len(filename) < 1:
filename = os.path.join(home, Q.VAL_CONFIG, B.BASIS_FILE+".yml")
text = "basic:\n"
text += " language: en\n"
text += " paths:\n"
for d in dirkey:
text += " "+d+": "+dirs[d]+"\n"
print(text)
print(filename)
with open(filename, 'w') as file:
file.write(text)
file.close()
print("create basis-config in "+filename)
print(text)
else:
print("basis-config exists "+filename)
print("\n# ----------------------------------------------------------------------------------- ")
print("please install requirements ")
# install requirements
# sudo apt install python3.10-venv
# python3 -m venv venv
# . venv/bin/activate
# pip install Flask
job = basic.program.Job("unit")
print("\n# ----------------------------------------------------------------------------------- ")
import utils.git_tool
for repo in ["program", "components", "testdata"]:
utils.git_tool.gitPull(job, repo)
print("\n# ----------------------------------------------------------------------------------- ")
if "db" in job.conf.confs:
import basic.connection
entity = basic.connection.Connection(job)
entity.createSchema()
import basic.testcase
entity = basic.testcase.Testcase(job)
entity.createSchema()
import basic.testsuite
entity = basic.testsuite.Testsuite(job)
entity.createSchema()
import basic.testplan
entity = basic.testplan.Testplan(job)
entity.createSchema()
import basic.testexecution
entity = basic.testexecution.Testexecution(job)
entity.createSchema()

27
utils/db_abstract.py

@ -393,4 +393,29 @@ class DbFcts():
def getDbTime(self, value):
return value
def getDbNull(self):
return B.SVAL_NULL
return B.SVAL_NULL
def getSchemaAttribut(self, attr, atype):
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def getSchemaSubtable(self, parent, attr, atype):
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def getSchemaIndex(self, attr):
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def getSubTableName(self, parent, attr):
if "_"+attr in parent:
return parent
return parent+"_"+attr
def getSubTableId(self, parent, attr):
pabr = parent.split("_")[0]
if pabr == "idx":
pabr = parent.split("_")[1]
return pabr+attr[0:3]+"id"
def getIndexName(self, table, attr):
if attr in table:
return "idx_"+table
return "idx_"+table+"_"+attr

36
utils/dbmysql_tool.py

@ -6,11 +6,12 @@
# ---------------------------------------------------------------------------------------------------------
import basic.program
import utils.config_tool
import utils.db_abstract
import utils.dbrel_tool
import mysql.connector
import basic.constants as B
import utils.data_const as D
class DbFcts(utils.db_abstract.DbFcts):
class DbFcts(utils.dbrel_tool.DbFcts):
"""
This interface defines each necessary connection to any kind of database.
The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool.
@ -77,23 +78,20 @@ class DbFcts(utils.db_abstract.DbFcts):
cmd = cmd[0:-1]+";"
self.comp.m.logInfo(cmd)
def getConnector(self, job):
""" add-on-method to get the connector
this method should only called by the class itself """
job = self.job # basic.program.Job.getInstance()
mydb = mysql.connector.connect(
host = "localhost",
user = "datest",
password = "Advent!2021",
database = "datest"
)
return mysql
@staticmethod
def execStatement(self, comp, conn, statement):
def execStatement(self, dbconn, statement):
""" add-on-method to execute the statement
this method should only called by the class itself """
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
print("execStatement "+statement)
connector = mysql.connector.connect(
host=dbconn["host"],
user=dbconn["user"],
password=dbconn["passwd"]
)
cursor = connector.cursor()
try:
cursor.execute(statement)
except:
if "CREATE INDEX " in statement:
return
raise Exception("DB-Exception "+statement)

53
utils/dbrel_tool.py

@ -9,7 +9,7 @@ import utils.config_tool
import utils.db_abstract
import mysql.connector
import basic.constants as B
import utils.data_const as D
class DbFcts(utils.db_abstract.DbFcts):
"""
This interface defines each necessary connection to any kind of database.
@ -88,6 +88,57 @@ class DbFcts(utils.db_abstract.DbFcts):
database = "datest"
)
return mysql
def getCreateTable(self, table):
sql = "CREATE TABLE IF NOT EXISTS " + self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_DB_DATABASE] + "."+table+" ("
return sql
def getSchemaAttribut(self, attr, atype):
if atype == "id":
return attr + " INTEGER PRIMARY KEY AUTO_INCREMENT"
elif atype == D.TYPE_STR:
return attr + " varchar(50)"
elif atype == D.TYPE_STRING:
return attr + " varchar(500)"
elif atype == D.TYPE_TEXT:
return attr + " text"
elif atype == D.TYPE_INT:
return attr + " integer"
elif atype == D.TYPE_TIME:
return attr + " datetime"
def getSchemaSubtable(self, parent, attributes):
sql = "CREATE TABLE IF NOT EXISTS "+\
self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_DB_DATABASE]+"."+self.getSubTableName(parent, attributes[0]["attr"]) + " ("
sql += self.getSchemaAttribut(self.getSubTableId(parent, attributes[0]["attr"]), "id") + ","
sql += self.getSchemaAttribut(parent+"id", D.TYPE_INT) + ","
for a in attributes:
sql += self.getSchemaAttribut(a["attr"], a["atype"]) + ","
sql = sql[0:-1]+");"
return sql
def getSchemaIndex(self, table, attr):
if table[0:4] == "idx_":
table = table[4:]
sql = "CREATE INDEX "
sql += "idx_"+table+"_"+attr
sql += " ON " + self.job.conf.confs[B.TOPIC_NODE_DB][B.ATTR_DB_DATABASE] + "." + table
sql += " ( " + attr + " );"
return sql
def getConnector(self, job):
""" add-on-method to get the connector
this method should only called by the class itself """
job = self.job # basic.program.Job.getInstance()
mydb = mysql.connector.connect(
host = "localhost",
user = "datest",
password = "Advent!2021",
database = "datest"
)
return mysql
@staticmethod
def execStatement(self, comp, conn, statement):

Loading…
Cancel
Save