diff --git a/basic/componentHandling.py b/basic/componentHandling.py index 7884ec2..960721a 100644 --- a/basic/componentHandling.py +++ b/basic/componentHandling.py @@ -29,18 +29,6 @@ PARAM_NOSUBNODE = ["artifact", "components", "instance"] DEFAULT_INST_CNT = 1 DEFAULT_INST_SGL = "y" -def getComponents(mainfct): - job = basic.program.Job.getInstance() - verify = -2 + job.getDebugLevel("job_tool") - job.debug(verify, "getComponents " + mainfct) - out = [] - for c in comps: - job.debug(verify, "getComponents " + c + ": " + str(comps[c].conf)) - print("getComponents " + c + ": " + str(comps[c].conf)) - if mainfct in comps[c].conf["function"]: - out.append(c) - return out - def getInstanceAttributes(conf): """ @@ -48,9 +36,10 @@ def getInstanceAttributes(conf): :param conf: :return: a complete set of these attributes """ - out = {} - out[B.ATTR_INST_CNT] = DEFAULT_INST_CNT - out[B.ATTR_INST_SGL] = DEFAULT_INST_SGL + out = { + B.ATTR_INST_CNT: DEFAULT_INST_CNT, + B.ATTR_INST_SGL: DEFAULT_INST_SGL + } if B.SUBJECT_INST in conf: for attr in [B.ATTR_INST_CNT, B.ATTR_INST_SGL]: if attr in conf[B.SUBJECT_INST]: @@ -58,6 +47,19 @@ def getInstanceAttributes(conf): return out +def getComponents(mainfct): + job = basic.program.Job.getInstance() + verify = -2 + job.getDebugLevel("job_tool") + job.debug(verify, "getComponents " + mainfct) + out = [] + for c in comps: + job.debug(verify, "getComponents " + c + ": " + str(comps[c].conf)) + print("getComponents " + c + ": " + str(comps[c].conf)) + if mainfct in comps[c].conf["function"]: + out.append(c) + return out + + class ComponentManager: __instance = None """ diff --git a/basic/constants.py b/basic/constants.py index 3126f5d..e4aefd8 100644 --- a/basic/constants.py +++ b/basic/constants.py @@ -67,11 +67,20 @@ DATA_NODE_PAR = "par" """ This constant defines """ TOPIC_NODE_DB = "db" ATTR_DB_PARTITION = "partitioned" -""" attribute if table is partitioned - partitions are parametrized """ +""" optional attribute if table is partitioned + - this keyword delimited by "+" will be replaced by partition-names which are parametrized """ ATTR_DB_CONN_JAR = "conn_jar_name" -""" attribute for connection-jar-file instead of connection by ip, port """ +""" optional attribute for connection-jar-file instead of connection by ip, port """ +ATTR_CONN_HOST = "hostname" +""" optional attribute for connection-jar-file instead of connection by ip, port """ +ATTR_CONN_TENANT = "tenant" +""" optional attribute for connection-jar-file instead of connection by ip, port """ +ATTR_DB_DATABASE = "database" +""" attribute for technical name of the database """ +ATTR_DB_SCHEMA = "schema" +""" optional attribute for technical name of the schema """ ATTR_DB_TABNAME = "tabname" -""" attribute in order to use a different technical name for the db-table """ +""" optional attribute in order to use a different technical name for the db-table """ PAR_DB_WHERE = "dbwhere" """ optional parameter with a where-clause """ PAR_DB_PARTITION = "dbparts" diff --git a/basic/toolHandling.py b/basic/toolHandling.py index ea27367..cc00224 100644 --- a/basic/toolHandling.py +++ b/basic/toolHandling.py @@ -76,3 +76,18 @@ def getCliTool(comp): c = class_() c.setComp(comp) return c + +def getApiTool(comp): + job = basic.program.Job.getInstance() + verify = int(job.getDebugLevel("db_tool")) + apitype = getCompAttr(comp, B.TOPIC_NODE_API, B.ATTR_TYPE, "") + toolname = "api"+apitype+"_tool" + filepath = os.path.join(job.conf.confs[B.SUBJECT_PATH][B.ATTR_PATH_PROGRAM], "utils", toolname+".py") + #comp.m.debug(verify, "toolname "+filepath) + if not os.path.exists(filepath): + raise FileNotFoundError("file for tool "+toolname+" does not exist "+filepath) + cmodul = importlib.import_module("utils."+toolname) + class_ = getattr(cmodul, "ApiFcts") + c = class_() + c.setComp(comp) + return c diff --git a/components/testexec.py b/components/testexec.py index 031e465..80d9eed 100644 --- a/components/testexec.py +++ b/components/testexec.py @@ -120,6 +120,25 @@ class Testexecuter(): self.m.debug(verify, "--- " + str(inspect.currentframe().f_code.co_name) + "() finished at " + datetime.now().strftime("%Y%m%d_%H%M%S") + " for " + str(self.name).upper()) + def composeFileClauses(self, pattern): + job = basic.program.Job.getInstance() + out = {} + attr = utils.db_abstract.getDbAttributes(self, "null") + while "{" in pattern: + pre = pattern[0:pattern.index("{")] + pat = pattern[pattern.index("{"):pattern.index("}")] + post = pattern[pattern.index("}"):] + pattern = pre+attr[pat]+post + if (hasattr(job.par, B.PAR_DB_PARTITION)) and (attr[B.ATTR_DB_PARTITION] != "n"): + parts = getattr(job.par, B.PAR_DB_PARTITION) + a = parts.split(",") + for p in a: + pattern = pattern.replace(attr[B.ATTR_DB_PARTITION], p) + out[p] = pattern + else: + out["ALL"] = pattern + return out + def composeSqlClauses(self, sql): job = basic.program.Job.getInstance() out = {} @@ -130,8 +149,10 @@ class Testexecuter(): print(sql_new) attr = utils.db_abstract.getDbAttributes(self, table) if attr[B.ATTR_DB_TABNAME] != "": - table = self.conf[B.SUBJECT_ARTS][table][B.ATTR_DB_TABNAME] - sql_new += " " + table + table = attr[B.ATTR_DB_TABNAME] + if attr[B.ATTR_DB_SCHEMA] != "": + table = attr[B.ATTR_DB_SCHEMA]+"."+table + sql_new += " "+attr[B.ATTR_DB_DATABASE]+"."+table print(sql_new) if (hasattr(job.par, B.PAR_DB_WHERE)): sql_new += " WHERE "+getattr(job.par, B.PAR_DB_WHERE) diff --git a/utils/api_abstract.py b/utils/api_abstract.py index 5337ff6..a0c59cf 100644 --- a/utils/api_abstract.py +++ b/utils/api_abstract.py @@ -5,25 +5,21 @@ # Source : gitea.ucarmesin.de # --------------------------------------------------------------------------------------------------------- """ -This abstract class CliFcts defines the interface for the execution of any command-line-functions. +This abstract class ApiFcts defines the interface for the execution of any command-line-functions. It uses the following configuration - .a) COMP.conf->artifact->cli->[system] : for structural attributes of the operating-system \n - .c) COMP.conf->conn->[cli] : for connection-attributes and structural attributes, + .a) COMP.conf->artifact->api->[system] : for structural attributes of the operating-system \n + .c) COMP.conf->conn->[api] : for connection-attributes and structural attributes, maybe specialized for the operating-system The attribute type resolves which technique is used, implemented in a specific tool-class: - * cmd,bash,powersh ... for specific local operating-system - * ssh,hadoop ... for specific remote operating-system + * nifi, The main tasks are: \n -.1. executing the command-array - with attributes - * conn.ip, host, port, user, password, ... for synchronous db-connection - * conn.root, ... for path-definitions for file-implementations (csv, ) """ import basic.program import utils.config_tool -class CliFcts(): +class ApiFcts(): """ This interface defines each necessary connection to any kind of database. The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool. @@ -35,7 +31,17 @@ class CliFcts(): def setComp(self, comp): self.comp = comp - def execCommand(self, comp, command): + def startCommand(self, comp, args): + """ method to execute the statement + this method should only called by the class itself """ + raise Exception("method is not implemented") + + def statusCommand(self, comp, args): + """ method to execute the statement + this method should only called by the class itself """ + raise Exception("method is not implemented") + + def stopCommand(self, comp, args): """ method to execute the statement this method should only called by the class itself """ raise Exception("method is not implemented") diff --git a/utils/db_abstract.py b/utils/db_abstract.py index 631220c..c6f1a08 100644 --- a/utils/db_abstract.py +++ b/utils/db_abstract.py @@ -43,9 +43,9 @@ import utils.config_tool import basic.constants as B import os -DFLT_DB_PARTITION = "n" +DEFAULT_DB_PARTITION = "n" """ attribute if table is partitioned - partitions are parametrized """ -DFLT_DB_CONN_JAR = "n" +DEFAULT_DB_CONN_JAR = "n" """ attribute for connection-jar-file instead of connection by ip, port """ @@ -57,9 +57,11 @@ def getDbAttributes(comp, table): * comp.artifact.[db].[table].attr """ out = { + B.ATTR_DB_DATABASE: "", + B.ATTR_DB_SCHEMA: "", B.ATTR_DB_TABNAME: "", - B.ATTR_DB_PARTITION: DFLT_DB_PARTITION, - B.ATTR_DB_CONN_JAR: DFLT_DB_CONN_JAR + B.ATTR_DB_PARTITION: DEFAULT_DB_PARTITION, + B.ATTR_DB_CONN_JAR: DEFAULT_DB_CONN_JAR } for attr in out.keys(): print(attr) diff --git a/utils/dbsfile_tool.py b/utils/dbsfile_tool.py index 9599167..2f6395c 100644 --- a/utils/dbsfile_tool.py +++ b/utils/dbsfile_tool.py @@ -4,6 +4,12 @@ # Author : Ulrich Carmesin # Source : gitea.ucarmesin.de # --------------------------------------------------------------------------------------------------------- +""" +This class is a technical implementation for Hive-connection with spark - typically used in a +Machine Learning environment for example in hadoop +""" +import json + import basic.program import utils.config_tool import utils.db_abstract @@ -27,17 +33,17 @@ class DbFcts(utils.db_abstract.DbFcts): # attr = self.getDbAttributes(table) job = basic.program.Job.getInstance() verify = -1+job.getDebugLevel("db_tool") - cmd = "SELECT "+"*" #",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]) - cmd += " FROM "+table - sqls = self.comp.composeSqlClauses(cmd) + pattern = "s3a://{hostname}/data/{tenant}/mt/sandboxes/{job.par.usecae}/{job.par.workspace}/{outfile}/VR_+reg+/" + files = self.comp.composeFileClauses(pattern) data = [] - for k in sqls.keys(): - sql = sqls[k] + for k in files.keys(): + sql = files[k] if dry == 1: spark = self.getConnector() - df = spark.sql(sql) - for r in df: - data.append(r) + df = spark.read.parquet(sql) + dfj = df.toJSON() + for r in dfj.collect(): + data.append(json.loads(r)) else: print("select "+sql) #self.comp.m.logInfo(cmd) diff --git a/utils/dbshive_tool.py b/utils/dbshive_tool.py index 4aaff87..8d1652d 100644 --- a/utils/dbshive_tool.py +++ b/utils/dbshive_tool.py @@ -4,6 +4,13 @@ # Author : Ulrich Carmesin # Source : gitea.ucarmesin.de # --------------------------------------------------------------------------------------------------------- +""" +This class is a technical implementation for Hive-connection with spark - typically used in a +Machine Learning environment for example in hadoop +""" +import json +import os + import basic.program import utils.config_tool import utils.db_abstract @@ -27,7 +34,7 @@ class DbFcts(utils.db_abstract.DbFcts): # attr = self.getDbAttributes(table) job = basic.program.Job.getInstance() verify = -1+job.getDebugLevel("db_tool") - cmd = "SELECT "+"*" #",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]) + cmd = "SELECT "+",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]) cmd += " FROM "+table sqls = self.comp.composeSqlClauses(cmd) data = [] @@ -35,14 +42,15 @@ class DbFcts(utils.db_abstract.DbFcts): sql = sqls[k] if dry == 1: spark = self.getConnector() - df = spark.sql(cmd) - for r in df: - data.append(r) + df = spark.sql(sql) + dfj = df.toJSON() + for r in dfj.collect(): + data.append(json.loads(r)) else: print("select "+sql) - #self.comp.m.logInfo(cmd) - #tdata[B.DATA_NODE_HEADER] = self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER] - #tdata[B.DATA_NODE_DATA] = data + self.comp.m.logInfo(sql) + tdata[B.DATA_NODE_HEADER] = self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER] + tdata[B.DATA_NODE_DATA] = data return tdata def deleteRows(self, table): @@ -81,10 +89,15 @@ class DbFcts(utils.db_abstract.DbFcts): """ add-on-method to get the connector this method should only called by the class itself """ job = basic.program.Job.getInstance() - spark = pyspark.SparkSession\ - .builder\ - .appName("datest")\ - .getOrCreate() + attr = self.getDbAttributes("null") + spark = None + if B.ATTR_DB_CONN_JAR in attr: + connectorJar = os.environ.get(attr[B.ATTR_DB_CONN_JAR]) + spark = pyspark.SparkSession\ + .builder\ + .appName("datest")\ + .config("spark.jars", f"{connectorJar}")\ + .getOrCreate() return spark diff --git a/utils/dbspark_tool.py b/utils/dbspark_tool.py index 9f05c58..7507373 100644 --- a/utils/dbspark_tool.py +++ b/utils/dbspark_tool.py @@ -17,6 +17,12 @@ class DbFcts(utils.db_abstract.DbFcts): """ def __init__(self): pass + + def getDbAttributes(self): + out = {} + + return out + def selectRows(self, table): """ method to select rows from a database statement written in sql """