#!/usr/bin/python # -*- coding: utf-8 -*- # --------------------------------------------------------------------------------------------------------- # Author : Ulrich Carmesin # Source : gitea.ucarmesin.de # --------------------------------------------------------------------------------------------------------- """ This class is a technical implementation for Hive-connection with spark - typically used in a Machine Learning environment for example in hadoop """ import json import basic.program import utils.config_tool import utils.db_abstract import pyspark import basic.constants as B class DbFcts(utils.db_abstract.DbFcts): """ This interface defines each necessary connection to any kind of database. The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool. """ def __init__(self): pass def selectRows(self, table, job): """ method to select rows from a database statement written in sql """ tdata = {} dry = 0 # attr = self.getDbAttributes(table) verify = -1+job.getDebugLevel("db_tool") pattern = "s3a://{hostname}/data/{tenant}/mt/sandboxes/{job.par.usecae}/{job.par.workspace}/{outfile}/VR_+reg+/" files = self.comp.composeFileClauses(job, pattern) data = [] for k in files.keys(): sql = files[k] if dry == 1: spark = self.getConnector() df = spark.read.parquet(sql) dfj = df.toJSON() for r in dfj.collect(): data.append(json.loads(r)) else: print("select "+sql) #self.comp.m.logInfo(cmd) #tdata[B.DATA_NODE_HEADER] = self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER] #tdata[B.DATA_NODE_DATA] = data return tdata def deleteRows(self, table, job): """ method to delete rows from a database statement written in sql """ dry = 0 verify = -1+job.getDebugLevel("db_tool") cmd = "DELETE FROM "+table print("deleteRows "+cmd) sqls = self.comp.composeSqlClauses(job, cmd) print("deleteRows "+cmd) print(sqls) for k in sqls.keys(): sql = sqls[k] if dry == 1: #spark = self.getConnector() #df = spark.sql(cmd) pass else: print("select "+sql) #self.comp.m.logInfo(cmd) def insertRows(self, table, rows, job): """ method to insert rows into a database the rows will be interpreted by the ddl of the component """ job = self.job # basic.program.Job.getInstance() verify = -1+job.getDebugLevel("db_tool") spark = self.getConnector() df = spark.createDataFrame(rows) self.comp.m.logInfo("cmd") def getConnector(self): """ add-on-method to get the connector this method should only called by the class itself """ job = self.job # basic.program.Job.getInstance() attr = self.getDbAttributes(B.SVAL_NULL) spark = None if B.ATTR_DB_CONN_JAR in attr: spark = pyspark.SparkSession\ .builder\ .appName("datest")\ .config("sparkjar", "")\ .getOrCreate() return spark