Ulrich Carmesin
3 years ago
3 changed files with 163 additions and 13 deletions
@ -0,0 +1,41 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
""" |
|||
This abstract class CliFcts defines the interface for the execution of any command-line-functions. |
|||
It uses the following configuration |
|||
.a) COMP.conf->artifact->cli->[system] : for structural attributes of the operating-system \n |
|||
.c) COMP.conf->conn->[cli] : for connection-attributes and structural attributes, |
|||
maybe specialized for the operating-system |
|||
|
|||
The attribute type resolves which technique is used, implemented in a specific tool-class: |
|||
* cmd,bash,powersh ... for specific local operating-system |
|||
* ssh,hadoop ... for specific remote operating-system |
|||
The main tasks are: \n |
|||
.1. executing the command-array - with attributes |
|||
* conn.ip, host, port, user, password, ... for synchronous db-connection |
|||
* conn.root, ... for path-definitions for file-implementations (csv, ) |
|||
|
|||
""" |
|||
import basic.program |
|||
import utils.config_tool |
|||
|
|||
class CliFcts(): |
|||
""" |
|||
This interface defines each necessary connection to any kind of database. |
|||
The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool. |
|||
""" |
|||
def __init__(self): |
|||
self.comp = None |
|||
pass |
|||
|
|||
def setComp(self, comp): |
|||
self.comp = comp |
|||
|
|||
def execCommand(self, comp, command): |
|||
""" method to execute the statement |
|||
this method should only called by the class itself """ |
|||
raise Exception("method is not implemented") |
@ -0,0 +1,96 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import basic.program |
|||
import utils.config_tool |
|||
import utils.db_abstract |
|||
import pyspark |
|||
import basic.constants as B |
|||
|
|||
class DbFcts(utils.db_abstract.DbFcts): |
|||
""" |
|||
This interface defines each necessary connection to any kind of database. |
|||
The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool. |
|||
""" |
|||
def __init__(self): |
|||
pass |
|||
|
|||
|
|||
def selectRows(self, table): |
|||
""" method to select rows from a database |
|||
statement written in sql """ |
|||
tdata = {} |
|||
dry = 0 |
|||
# attr = self.getDbAttributes(table) |
|||
job = basic.program.Job.getInstance() |
|||
verify = -1+job.getDebugLevel("db_tool") |
|||
cmd = "SELECT "+"*" #",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]) |
|||
cmd += " FROM "+table |
|||
sqls = self.comp.composeSqlClauses(cmd) |
|||
data = [] |
|||
for k in sqls.keys(): |
|||
sql = sqls[k] |
|||
if dry == 1: |
|||
spark = self.getConnector() |
|||
df = spark.sql(sql) |
|||
for r in df: |
|||
data.append(r) |
|||
else: |
|||
print("select "+sql) |
|||
#self.comp.m.logInfo(cmd) |
|||
#tdata[B.DATA_NODE_HEADER] = self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER] |
|||
#tdata[B.DATA_NODE_DATA] = data |
|||
return tdata |
|||
|
|||
def deleteRows(self, table): |
|||
""" method to delete rows from a database |
|||
statement written in sql """ |
|||
job = basic.program.Job.getInstance() |
|||
dry = 0 |
|||
verify = -1+job.getDebugLevel("db_tool") |
|||
cmd = "DELETE FROM "+table |
|||
print("deleteRows "+cmd) |
|||
sqls = self.comp.composeSqlClauses(cmd) |
|||
print("deleteRows "+cmd) |
|||
print(sqls) |
|||
for k in sqls.keys(): |
|||
sql = sqls[k] |
|||
if dry == 1: |
|||
#spark = self.getConnector() |
|||
#df = spark.sql(cmd) |
|||
pass |
|||
else: |
|||
print("select "+sql) |
|||
#self.comp.m.logInfo(cmd) |
|||
|
|||
def insertRows(self, table, rows): |
|||
""" method to insert rows into a database |
|||
the rows will be interpreted by the ddl of the component |
|||
""" |
|||
job = basic.program.Job.getInstance() |
|||
verify = -1+job.getDebugLevel("db_tool") |
|||
spark = self.getConnector() |
|||
df = spark.createDataFrame(rows) |
|||
|
|||
self.comp.m.logInfo("cmd") |
|||
|
|||
def getConnector(self): |
|||
""" add-on-method to get the connector |
|||
this method should only called by the class itself """ |
|||
job = basic.program.Job.getInstance() |
|||
attr = self.getDbAttributes("null") |
|||
spark = None |
|||
if B.ATTR_DB_CONN_JAR in attr: |
|||
spark = pyspark.SparkSession\ |
|||
.builder\ |
|||
.appName("datest")\ |
|||
.config("sparkjar", "")\ |
|||
.getOrCreate() |
|||
return spark |
|||
|
|||
|
|||
|
|||
|
Loading…
Reference in new issue