Browse Source

toolHandling db, api

master
Ulrich Carmesin 3 years ago
parent
commit
5af95c1cec
  1. 32
      basic/componentHandling.py
  2. 15
      basic/constants.py
  3. 15
      basic/toolHandling.py
  4. 25
      components/testexec.py
  5. 26
      utils/api_abstract.py
  6. 10
      utils/db_abstract.py
  7. 22
      utils/dbsfile_tool.py
  8. 35
      utils/dbshive_tool.py
  9. 6
      utils/dbspark_tool.py

32
basic/componentHandling.py

@ -29,18 +29,6 @@ PARAM_NOSUBNODE = ["artifact", "components", "instance"]
DEFAULT_INST_CNT = 1
DEFAULT_INST_SGL = "y"
def getComponents(mainfct):
job = basic.program.Job.getInstance()
verify = -2 + job.getDebugLevel("job_tool")
job.debug(verify, "getComponents " + mainfct)
out = []
for c in comps:
job.debug(verify, "getComponents " + c + ": " + str(comps[c].conf))
print("getComponents " + c + ": " + str(comps[c].conf))
if mainfct in comps[c].conf["function"]:
out.append(c)
return out
def getInstanceAttributes(conf):
"""
@ -48,9 +36,10 @@ def getInstanceAttributes(conf):
:param conf:
:return: a complete set of these attributes
"""
out = {}
out[B.ATTR_INST_CNT] = DEFAULT_INST_CNT
out[B.ATTR_INST_SGL] = DEFAULT_INST_SGL
out = {
B.ATTR_INST_CNT: DEFAULT_INST_CNT,
B.ATTR_INST_SGL: DEFAULT_INST_SGL
}
if B.SUBJECT_INST in conf:
for attr in [B.ATTR_INST_CNT, B.ATTR_INST_SGL]:
if attr in conf[B.SUBJECT_INST]:
@ -58,6 +47,19 @@ def getInstanceAttributes(conf):
return out
def getComponents(mainfct):
job = basic.program.Job.getInstance()
verify = -2 + job.getDebugLevel("job_tool")
job.debug(verify, "getComponents " + mainfct)
out = []
for c in comps:
job.debug(verify, "getComponents " + c + ": " + str(comps[c].conf))
print("getComponents " + c + ": " + str(comps[c].conf))
if mainfct in comps[c].conf["function"]:
out.append(c)
return out
class ComponentManager:
__instance = None
"""

15
basic/constants.py

@ -67,11 +67,20 @@ DATA_NODE_PAR = "par"
""" This constant defines """
TOPIC_NODE_DB = "db"
ATTR_DB_PARTITION = "partitioned"
""" attribute if table is partitioned - partitions are parametrized """
""" optional attribute if table is partitioned
- this keyword delimited by "+" will be replaced by partition-names which are parametrized """
ATTR_DB_CONN_JAR = "conn_jar_name"
""" attribute for connection-jar-file instead of connection by ip, port """
""" optional attribute for connection-jar-file instead of connection by ip, port """
ATTR_CONN_HOST = "hostname"
""" optional attribute for connection-jar-file instead of connection by ip, port """
ATTR_CONN_TENANT = "tenant"
""" optional attribute for connection-jar-file instead of connection by ip, port """
ATTR_DB_DATABASE = "database"
""" attribute for technical name of the database """
ATTR_DB_SCHEMA = "schema"
""" optional attribute for technical name of the schema """
ATTR_DB_TABNAME = "tabname"
""" attribute in order to use a different technical name for the db-table """
""" optional attribute in order to use a different technical name for the db-table """
PAR_DB_WHERE = "dbwhere"
""" optional parameter with a where-clause """
PAR_DB_PARTITION = "dbparts"

15
basic/toolHandling.py

@ -76,3 +76,18 @@ def getCliTool(comp):
c = class_()
c.setComp(comp)
return c
def getApiTool(comp):
job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("db_tool"))
apitype = getCompAttr(comp, B.TOPIC_NODE_API, B.ATTR_TYPE, "")
toolname = "api"+apitype+"_tool"
filepath = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "utils", toolname+".py")
#comp.m.debug(verify, "toolname "+filepath)
if not os.path.exists(filepath):
raise FileNotFoundError("file for tool "+toolname+" does not exist "+filepath)
cmodul = importlib.import_module("utils."+toolname)
class_ = getattr(cmodul, "ApiFcts")
c = class_()
c.setComp(comp)
return c

25
components/testexec.py

@ -120,6 +120,25 @@ class Testexecuter():
self.m.debug(verify, "--- " + str(inspect.currentframe().f_code.co_name) + "() finished at " + datetime.now().strftime("%Y%m%d_%H%M%S") + " for " + str(self.name).upper())
def composeFileClauses(self, pattern):
job = basic.program.Job.getInstance()
out = {}
attr = utils.db_abstract.getDbAttributes(self, "null")
while "{" in pattern:
pre = pattern[0:pattern.index("{")]
pat = pattern[pattern.index("{"):pattern.index("}")]
post = pattern[pattern.index("}"):]
pattern = pre+attr[pat]+post
if (hasattr(job.par, B.PAR_DB_PARTITION)) and (attr[B.ATTR_DB_PARTITION] != "n"):
parts = getattr(job.par, B.PAR_DB_PARTITION)
a = parts.split(",")
for p in a:
pattern = pattern.replace(attr[B.ATTR_DB_PARTITION], p)
out[p] = pattern
else:
out["ALL"] = pattern
return out
def composeSqlClauses(self, sql):
job = basic.program.Job.getInstance()
out = {}
@ -130,8 +149,10 @@ class Testexecuter():
print(sql_new)
attr = utils.db_abstract.getDbAttributes(self, table)
if attr[B.ATTR_DB_TABNAME] != "":
table = self.conf[B.SUBJECT_ARTS][table][B.ATTR_DB_TABNAME]
sql_new += " " + table
table = attr[B.ATTR_DB_TABNAME]
if attr[B.ATTR_DB_SCHEMA] != "":
table = attr[B.ATTR_DB_SCHEMA]+"."+table
sql_new += " "+attr[B.ATTR_DB_DATABASE]+"."+table
print(sql_new)
if (hasattr(job.par, B.PAR_DB_WHERE)):
sql_new += " WHERE "+getattr(job.par, B.PAR_DB_WHERE)

26
utils/api_abstract.py

@ -5,25 +5,21 @@
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
This abstract class CliFcts defines the interface for the execution of any command-line-functions.
This abstract class ApiFcts defines the interface for the execution of any command-line-functions.
It uses the following configuration
.a) COMP.conf->artifact->cli->[system] : for structural attributes of the operating-system \n
.c) COMP.conf->conn->[cli] : for connection-attributes and structural attributes,
.a) COMP.conf->artifact->api->[system] : for structural attributes of the operating-system \n
.c) COMP.conf->conn->[api] : for connection-attributes and structural attributes,
maybe specialized for the operating-system
The attribute type resolves which technique is used, implemented in a specific tool-class:
* cmd,bash,powersh ... for specific local operating-system
* ssh,hadoop ... for specific remote operating-system
* nifi,
The main tasks are: \n
.1. executing the command-array - with attributes
* conn.ip, host, port, user, password, ... for synchronous db-connection
* conn.root, ... for path-definitions for file-implementations (csv, )
"""
import basic.program
import utils.config_tool
class CliFcts():
class ApiFcts():
"""
This interface defines each necessary connection to any kind of database.
The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool.
@ -35,7 +31,17 @@ class CliFcts():
def setComp(self, comp):
self.comp = comp
def execCommand(self, comp, command):
def startCommand(self, comp, args):
""" method to execute the statement
this method should only called by the class itself """
raise Exception("method is not implemented")
def statusCommand(self, comp, args):
""" method to execute the statement
this method should only called by the class itself """
raise Exception("method is not implemented")
def stopCommand(self, comp, args):
""" method to execute the statement
this method should only called by the class itself """
raise Exception("method is not implemented")

10
utils/db_abstract.py

@ -43,9 +43,9 @@ import utils.config_tool
import basic.constants as B
import os
DFLT_DB_PARTITION = "n"
DEFAULT_DB_PARTITION = "n"
""" attribute if table is partitioned - partitions are parametrized """
DFLT_DB_CONN_JAR = "n"
DEFAULT_DB_CONN_JAR = "n"
""" attribute for connection-jar-file instead of connection by ip, port """
@ -57,9 +57,11 @@ def getDbAttributes(comp, table):
* comp.artifact.[db].[table].attr
"""
out = {
B.ATTR_DB_DATABASE: "",
B.ATTR_DB_SCHEMA: "",
B.ATTR_DB_TABNAME: "",
B.ATTR_DB_PARTITION: DFLT_DB_PARTITION,
B.ATTR_DB_CONN_JAR: DFLT_DB_CONN_JAR
B.ATTR_DB_PARTITION: DEFAULT_DB_PARTITION,
B.ATTR_DB_CONN_JAR: DEFAULT_DB_CONN_JAR
}
for attr in out.keys():
print(attr)

22
utils/dbsfile_tool.py

@ -4,6 +4,12 @@
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
This class is a technical implementation for Hive-connection with spark - typically used in a
Machine Learning environment for example in hadoop
"""
import json
import basic.program
import utils.config_tool
import utils.db_abstract
@ -27,17 +33,17 @@ class DbFcts(utils.db_abstract.DbFcts):
# attr = self.getDbAttributes(table)
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "SELECT "+"*" #",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER])
cmd += " FROM "+table
sqls = self.comp.composeSqlClauses(cmd)
pattern = "s3a://{hostname}/data/{tenant}/mt/sandboxes/{job.par.usecae}/{job.par.workspace}/{outfile}/VR_+reg+/"
files = self.comp.composeFileClauses(pattern)
data = []
for k in sqls.keys():
sql = sqls[k]
for k in files.keys():
sql = files[k]
if dry == 1:
spark = self.getConnector()
df = spark.sql(sql)
for r in df:
data.append(r)
df = spark.read.parquet(sql)
dfj = df.toJSON()
for r in dfj.collect():
data.append(json.loads(r))
else:
print("select "+sql)
#self.comp.m.logInfo(cmd)

35
utils/dbshive_tool.py

@ -4,6 +4,13 @@
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
This class is a technical implementation for Hive-connection with spark - typically used in a
Machine Learning environment for example in hadoop
"""
import json
import os
import basic.program
import utils.config_tool
import utils.db_abstract
@ -27,7 +34,7 @@ class DbFcts(utils.db_abstract.DbFcts):
# attr = self.getDbAttributes(table)
job = basic.program.Job.getInstance()
verify = -1+job.getDebugLevel("db_tool")
cmd = "SELECT "+"*" #",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER])
cmd = "SELECT "+",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER])
cmd += " FROM "+table
sqls = self.comp.composeSqlClauses(cmd)
data = []
@ -35,14 +42,15 @@ class DbFcts(utils.db_abstract.DbFcts):
sql = sqls[k]
if dry == 1:
spark = self.getConnector()
df = spark.sql(cmd)
for r in df:
data.append(r)
df = spark.sql(sql)
dfj = df.toJSON()
for r in dfj.collect():
data.append(json.loads(r))
else:
print("select "+sql)
#self.comp.m.logInfo(cmd)
#tdata[B.DATA_NODE_HEADER] = self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]
#tdata[B.DATA_NODE_DATA] = data
self.comp.m.logInfo(sql)
tdata[B.DATA_NODE_HEADER] = self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]
tdata[B.DATA_NODE_DATA] = data
return tdata
def deleteRows(self, table):
@ -81,10 +89,15 @@ class DbFcts(utils.db_abstract.DbFcts):
""" add-on-method to get the connector
this method should only called by the class itself """
job = basic.program.Job.getInstance()
spark = pyspark.SparkSession\
.builder\
.appName("datest")\
.getOrCreate()
attr = self.getDbAttributes("null")
spark = None
if B.ATTR_DB_CONN_JAR in attr:
connectorJar = os.environ.get(attr[B.ATTR_DB_CONN_JAR])
spark = pyspark.SparkSession\
.builder\
.appName("datest")\
.config("spark.jars", f"{connectorJar}")\
.getOrCreate()
return spark

6
utils/dbspark_tool.py

@ -17,6 +17,12 @@ class DbFcts(utils.db_abstract.DbFcts):
"""
def __init__(self):
pass
def getDbAttributes(self):
out = {}
return out
def selectRows(self, table):
""" method to select rows from a database
statement written in sql """

Loading…
Cancel
Save