Ulrich Carmesin
3 years ago
5 changed files with 282 additions and 0 deletions
@ -0,0 +1,54 @@ |
|||
import unittest, os |
|||
|
|||
import basic.program |
|||
import utils.path_tool |
|||
import basic.toolHandling |
|||
import test.constants |
|||
import components.component |
|||
import basic.constants as B |
|||
HOME_PATH = test.constants.HOME_PATH |
|||
|
|||
conf = {} |
|||
|
|||
class MyTestCase(unittest.TestCase): |
|||
|
|||
def test_toolhandling(self): |
|||
job = basic.program.Job("unit") |
|||
args = {"application": "TEST", "application": "ENV01", "modus": "unit", "loglevel": "debug", "tool": "config_tool", |
|||
"modus": "unit"} |
|||
job.par.setParameterArgs(args) |
|||
#t = basic.toolHandling.ToolManager() |
|||
comp = components.component.Component() |
|||
comp.name = "testb" |
|||
comp.conf = {} |
|||
comp.conf[B.SUBJECT_ARTS] = {} |
|||
comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB] = {} |
|||
comp.conf[B.SUBJECT_CONN] = {} |
|||
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_DB] = {} |
|||
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_DB][B.ATTR_TYPE] = "shive" |
|||
tool = basic.toolHandling.getDbTool(comp) |
|||
self.assertRegex(str(type(tool)), 'dbshive_tool.DbFcts') |
|||
attr = tool.getDbAttributes("xx") |
|||
self.assertRegex(attr[B.ATTR_DB_PARTITION], 'n') |
|||
comp.conf[B.SUBJECT_CONN][B.TOPIC_NODE_DB][B.ATTR_DB_PARTITION] = "y" |
|||
attr = tool.getDbAttributes("xx") |
|||
self.assertRegex(attr[B.ATTR_DB_PARTITION], 'y') |
|||
comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][B.ATTR_DB_PARTITION] = "z" |
|||
attr = tool.getDbAttributes("xx") |
|||
self.assertRegex(attr[B.ATTR_DB_PARTITION], 'z') |
|||
# |
|||
sqls = comp.composeSqlClauses("SELECT * FROM lofts") |
|||
print(sqls) |
|||
setattr(job.par, B.PAR_DB_WHERE, "name like !%utz%! and regname = !+reg+!") |
|||
comp.conf[B.SUBJECT_ARTS][B.TOPIC_NODE_DB][B.ATTR_DB_PARTITION] = "+reg+" |
|||
sqls = comp.composeSqlClauses("SELECT * FROM lofts") |
|||
print(sqls) |
|||
setattr(job.par, B.PAR_DB_PARTITION, "BA,FV") |
|||
sqls = comp.composeSqlClauses("SELECT * FROM lofts") |
|||
print(sqls) |
|||
tool.deleteRows("deltable") |
|||
tool.selectRows("deltable") |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
unittest.main() |
@ -0,0 +1,92 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import basic.program |
|||
import utils.config_tool |
|||
import utils.db_abstract |
|||
import pyspark |
|||
import basic.constants as B |
|||
|
|||
class DbFcts(utils.db_abstract.DbFcts): |
|||
""" |
|||
This interface defines each necessary connection to any kind of database. |
|||
The specific technique how to connect to the concrete DBMS has to be implemented in the specific tool. |
|||
""" |
|||
def __init__(self): |
|||
pass |
|||
|
|||
|
|||
def selectRows(self, table): |
|||
""" method to select rows from a database |
|||
statement written in sql """ |
|||
tdata = {} |
|||
dry = 0 |
|||
# attr = self.getDbAttributes(table) |
|||
job = basic.program.Job.getInstance() |
|||
verify = -1+job.getDebugLevel("db_tool") |
|||
cmd = "SELECT "+"*" #",".join(self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER]) |
|||
cmd += " FROM "+table |
|||
sqls = self.comp.composeSqlClauses(cmd) |
|||
data = [] |
|||
for k in sqls.keys(): |
|||
sql = sqls[k] |
|||
if dry == 1: |
|||
spark = self.getConnector() |
|||
df = spark.sql(cmd) |
|||
for r in df: |
|||
data.append(r) |
|||
else: |
|||
print("select "+sql) |
|||
#self.comp.m.logInfo(cmd) |
|||
#tdata[B.DATA_NODE_HEADER] = self.comp.conf[B.DATA_NODE_DDL][table][B.DATA_NODE_HEADER] |
|||
#tdata[B.DATA_NODE_DATA] = data |
|||
return tdata |
|||
|
|||
def deleteRows(self, table): |
|||
""" method to delete rows from a database |
|||
statement written in sql """ |
|||
job = basic.program.Job.getInstance() |
|||
dry = 0 |
|||
verify = -1+job.getDebugLevel("db_tool") |
|||
cmd = "DELETE FROM "+table |
|||
print("deleteRows "+cmd) |
|||
sqls = self.comp.composeSqlClauses(cmd) |
|||
print("deleteRows "+cmd) |
|||
print(sqls) |
|||
for k in sqls.keys(): |
|||
sql = sqls[k] |
|||
if dry == 1: |
|||
#spark = self.getConnector() |
|||
#df = spark.sql(cmd) |
|||
pass |
|||
else: |
|||
print("select "+sql) |
|||
#self.comp.m.logInfo(cmd) |
|||
|
|||
def insertRows(self, table, rows): |
|||
""" method to insert rows into a database |
|||
the rows will be interpreted by the ddl of the component |
|||
""" |
|||
job = basic.program.Job.getInstance() |
|||
verify = -1+job.getDebugLevel("db_tool") |
|||
spark = self.getConnector() |
|||
df = spark.createDataFrame(rows) |
|||
|
|||
self.comp.m.logInfo("cmd") |
|||
|
|||
def getConnector(self): |
|||
""" add-on-method to get the connector |
|||
this method should only called by the class itself """ |
|||
job = basic.program.Job.getInstance() |
|||
spark = pyspark.SparkSession\ |
|||
.builder\ |
|||
.appName("datest")\ |
|||
.getOrCreate() |
|||
return spark |
|||
|
|||
|
|||
|
|||
|
Loading…
Reference in new issue