Compare commits
74 Commits
211 changed files with 14193 additions and 5318 deletions
@ -1,6 +0,0 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project version="4"> |
|||
<component name="VcsDirectoryMappings"> |
|||
<mapping directory="$PROJECT_DIR$" vcs="Git" /> |
|||
</component> |
|||
</project> |
|||
@ -0,0 +1,133 @@ |
|||
import os.path |
|||
import re |
|||
|
|||
import basic.component |
|||
import basic.constants as B |
|||
import basic.toolHandling |
|||
import tools.config_tool |
|||
import tools.data_const as D |
|||
import tools.file_tool |
|||
import tools.filecsv_fcts |
|||
import model.table |
|||
import model.factory |
|||
import tools.value_tool |
|||
import tools.data_tool |
|||
|
|||
COMP_NAME = B.ATTR_INST_TESTSERVER |
|||
|
|||
# class Testserver(basic.component.Component): |
|||
class Testserver: |
|||
""" |
|||
the Testserver represents the workspace with all resources for the automation |
|||
|
|||
""" |
|||
tables = {} |
|||
__instance = None |
|||
__writeDB = True |
|||
|
|||
def __init__(self, job): |
|||
""" |
|||
collect all resources into this object |
|||
:param job: |
|||
""" |
|||
print('init '+COMP_NAME) |
|||
self.m = job.m |
|||
self.conf = {} |
|||
if B.TOPIC_NODE_DB in job.conf: |
|||
self.conf[B.TOPIC_CONN] = {} |
|||
self.conf[B.TOPIC_CONN][B.TOPIC_NODE_DB] = {} |
|||
for attr in B.LIST_DB_ATTR: |
|||
if attr in job.conf[B.TOPIC_NODE_DB]: |
|||
self.conf[B.TOPIC_CONN][B.TOPIC_NODE_DB][attr] = job.conf[B.TOPIC_NODE_DB][attr] |
|||
# TODO was muss auf dem Testserver initial geladen werden? |
|||
self.model = {} |
|||
Testserver.__instance = self |
|||
for s in B.LIST_SUBJECTS: |
|||
self.model[tools.data_tool.getSingularKeyword(s)] = model.factory.get_entity_object(job, s, {}) |
|||
pass |
|||
|
|||
@staticmethod |
|||
def getInstance(job): |
|||
if Testserver.__instance == None: |
|||
return Testserver(job) |
|||
|
|||
|
|||
def createAdminDBTables(self, job): |
|||
""" |
|||
creates the complete data-model in the database. it contains: |
|||
* the model for administration |
|||
* the model of each project: |
|||
* * root-tables - defined in testcases TODO wie allgemein deklariert, special components/config |
|||
* * comp-artifacts - it could contain build-rules for building from testcase-spec |
|||
:param job: |
|||
:return: |
|||
""" |
|||
tables = {} |
|||
if B.TOPIC_NODE_DB in job.conf: |
|||
self.dbi = basic.toolHandling.getDbTool(job, self, job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]) |
|||
else: |
|||
return "No DB in job-config" |
|||
# the model for administration |
|||
for m in self.model.keys(): |
|||
print("\n==== model " + m) |
|||
self.createDBTable(job, B.ATTR_INST_TESTSERVER, B.ATTR_INST_TESTSERVER, m) |
|||
enty = self.model[m] |
|||
for t in enty.getSubtableNames(): |
|||
print("subtable "+t) |
|||
self.createDBTable(job, B.ATTR_INST_TESTSERVER, B.ATTR_INST_TESTSERVER, t) |
|||
|
|||
def createProjectDBTables(self, job): |
|||
""" |
|||
creates the complete data-model in the database. it contains: |
|||
* the model for administration |
|||
* the model of each project: |
|||
* * root-tables - defined in testcases TODO wie allgemein deklariert, special components/config |
|||
* * comp-artifacts - it could contain build-rules for building from testcase-spec |
|||
:param job: |
|||
:return: |
|||
""" |
|||
tables = {} |
|||
|
|||
path = os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_PROGRAM], "model") |
|||
fct = basic.toolHandling.getFileTool(job, None, "csv") |
|||
for m in sorted(os.listdir(path)): |
|||
if not re.match(r".*?\.csv", m): |
|||
print("sonstig "+m) |
|||
continue |
|||
print("model "+m) |
|||
modelPath = os.path.join(path, m) |
|||
modelDoc = fct.load_file(modelPath, D.CSV_SPECTYPE_DATA) |
|||
table = model.table.Table(job, project="", application="", component=COMP_NAME, name=m[:-4]) |
|||
sql = table.get_schema(tableName=m[:-4], tableObject=table) # [B.DATA_NODE_TABLES][m[:-4]] |
|||
job.m.logInfo(sql) |
|||
tables[m[:-4]] = modelDoc |
|||
for s in sql.split(";\n"): |
|||
if len(s) < 3: |
|||
continue |
|||
try: |
|||
self.dbi.execStatement(s+";", job.conf[B.TOPIC_NODE_DB]) |
|||
print("SQL executed: "+s) |
|||
except Exception as e: |
|||
raise Exception("Fehler bei createSchema "+s) |
|||
|
|||
def createDBTable(self, job, project, context, tablename): |
|||
""" |
|||
creates a table in the database |
|||
:param job: |
|||
:return: |
|||
""" |
|||
args = {} |
|||
args["context"] = context |
|||
table = model.table.Table(job) |
|||
table = table.read_entity(job, tablename, args=args) |
|||
sql = table.get_schema(tablename, model.table.TYPE_ADMIN) # [B.DATA_NODE_TABLES][m[:-4]] |
|||
job.m.logInfo(sql) |
|||
for s in sql.split(";\n"): |
|||
if len(s) < 3: |
|||
continue |
|||
try: |
|||
if self.__writeDB: |
|||
self.dbi.execStatement(s + ";", job.conf[B.TOPIC_NODE_DB]) |
|||
print("SQL executed: " + s) |
|||
except Exception as e: |
|||
raise Exception("Fehler bei createSchema " + s) |
|||
@ -0,0 +1,48 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import basic.toolHandling |
|||
import tools.data_const as D |
|||
import basic.constants as B |
|||
import model.entity |
|||
|
|||
class Connection(model.entity.Entity): |
|||
name = "" |
|||
description = "" |
|||
application = "" |
|||
usecase = [] |
|||
story = [] |
|||
tables = {} |
|||
steps = [] |
|||
|
|||
def __init__(self, job): |
|||
""" |
|||
to be initialized by readSpec |
|||
:param job: |
|||
""" |
|||
self.job = job |
|||
|
|||
|
|||
def get_schema(self, table=""): |
|||
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE] |
|||
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype) |
|||
print(str(dbi)) |
|||
sql = dbi.getCreateTable("connection") |
|||
sql += dbi.getSchemaAttribut("cnid", "id")+"," |
|||
sql += dbi.getSchemaAttribut("environment", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("component", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("type", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("ip", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("port", D.TYPE_INT)+"," |
|||
sql += dbi.getSchemaAttribut("hostname", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("dompath", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut(B.NODE_ATTRIBUTES, D.TYPE_TEXT)+"," |
|||
sql += self.getHistoryFields() |
|||
sql += ");\n" |
|||
sql += dbi.getSchemaIndex("connection", "environment") + "\n" |
|||
sql += self.getHistoryIndex("connection") |
|||
return sql |
|||
#dbi.execStatement(sql) |
|||
@ -1,109 +0,0 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
""" |
|||
this module implements the functionality of a test-step |
|||
which is defined in the test-specification |
|||
and is executed by any executer |
|||
there are 2 kinds of test-step |
|||
a) execute specific component in the job |
|||
b) execute specific test-entity in the test-suite-execution |
|||
""" |
|||
import basic.constants as B |
|||
import utils.data_const as D |
|||
import utils.i18n_tool |
|||
|
|||
LIST_ARGS = [ |
|||
"start", # for starting the specified main-program |
|||
"fct" # for calling the specified component-function |
|||
] |
|||
|
|||
class Step: |
|||
comp = "" |
|||
refLine = "" # in a: references the data-line(s) to be executed |
|||
execStep = "" # in a,b: executes only if the step is set in the job |
|||
args = {} |
|||
""" |
|||
the class contains each attribute of a test-step |
|||
""" |
|||
def __init__(self): |
|||
self.comp = "" |
|||
self.refLine = "" |
|||
self.execStep = "" |
|||
self.args = {} |
|||
|
|||
def getStepText(self, job): |
|||
text = self.comp+D.CSV_DELIMITER+str(self.execStep)+D.CSV_DELIMITER+self.refLine |
|||
for k in self.args: |
|||
text += D.CSV_DELIMITER+k+":"+self.args[k] |
|||
return text+"\n" |
|||
|
|||
def parseOldStep(job, fields): |
|||
step = {} |
|||
step[B.DATA_NODE_COMP] = fields[D.STEP_COMP_I] |
|||
step[B.ATTR_EXEC_REF] = fields[D.STEP_EXECNR_I] |
|||
step[B.ATTR_DATA_REF] = fields[D.STEP_REFNR_I] |
|||
step[B.ATTR_STEP_ARGS] = {} |
|||
if D.STEP_ARGS_I == D.STEP_LIST_I: |
|||
args = "" |
|||
for i in range(D.STEP_ARGS_I, len(fields)): |
|||
if len(fields[i]) < 1: |
|||
continue |
|||
if fields[i][0:1] == "#": |
|||
continue |
|||
args += "," + fields[i] |
|||
args = args[1:] |
|||
else: |
|||
args = fields[D.STEP_ARGS_I] |
|||
a = args.split(",") |
|||
for arg in a: |
|||
print("arg " + arg) |
|||
b = arg.split(":") |
|||
if len(b) < 2: |
|||
raise Exception(D.EXCP_MALFORMAT + "" + l) |
|||
step[B.ATTR_STEP_ARGS][b[0]] = b[1] |
|||
# data[B.DATA_NODE_STEPS].append(step) |
|||
return step |
|||
|
|||
def parseStep(job, fields): |
|||
step = Step() |
|||
step.comp = fields[D.STEP_COMP_I] |
|||
step.execStep = fields[D.STEP_EXECNR_I] |
|||
step.refLine = fields[D.STEP_REFNR_I] |
|||
setattr(step, B.ATTR_DATA_REF, step.refLine) |
|||
if D.STEP_ARGS_I == D.STEP_LIST_I: |
|||
args = "" |
|||
for i in range(D.STEP_ARGS_I, len(fields)): |
|||
if len(fields[i]) < 1: |
|||
continue |
|||
if fields[i][0:1] == "#": |
|||
continue |
|||
args += "," + fields[i] |
|||
args = args[1:] |
|||
else: |
|||
args = fields[D.STEP_ARGS_I] |
|||
a = args.split(",") |
|||
for arg in a: |
|||
print("arg " + arg) |
|||
b = arg.split(":") |
|||
if len(b) < 2: |
|||
raise Exception(D.EXCP_MALFORMAT + "" + str(fields)) |
|||
step.args[b[0]] = b[1] |
|||
if b[0] in LIST_ARGS: |
|||
setattr(step, b[0], b[1]) |
|||
# data[B.DATA_NODE_STEPS].append(step) |
|||
return step |
|||
|
|||
def getStepHeader(job): |
|||
text = "# " |
|||
text += utils.i18n_tool.I18n.getInstance(job).getText(f"{D.CSV_BLOCK_STEP=}", job) |
|||
text += ";"+utils.i18n_tool.I18n.getInstance(job).getText(f"{D.STEP_ATTR_COMP=}", job) |
|||
text += ";"+utils.i18n_tool.I18n.getInstance(job).getText(f"{D.STEP_ATTR_EXECNR=}", job) |
|||
text += ";"+utils.i18n_tool.I18n.getInstance(job).getText(f"{D.STEP_ATTR_REFNR=}", job) |
|||
text += ";"+utils.i18n_tool.I18n.getInstance(job).getText(f"{D.STEP_ATTR_ARGS=}", job) |
|||
return text + ";..;;;\n" |
|||
|
|||
|
|||
@ -0,0 +1,52 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import basic.toolHandling |
|||
import utils.data_const as D |
|||
import basic.constants as B |
|||
import model.entity |
|||
|
|||
class Testexecution(model.entity.Entity): |
|||
name = "" |
|||
description = "" # from testplan, testsuite, testcase |
|||
release = "" |
|||
path = "" |
|||
level = "" # testplan, testsuite, testcase |
|||
entities = {} |
|||
|
|||
def __init__(self, job): |
|||
""" |
|||
to be initialized by readSpec |
|||
:param job: |
|||
""" |
|||
self.job = job |
|||
|
|||
def get_schema(self): |
|||
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE] |
|||
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype) |
|||
sql = dbi.getCreateTable("testexecution") |
|||
sql += dbi.getSchemaAttribut("teid", "id")+"," |
|||
sql += dbi.getSchemaAttribut("name", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut(B.SUBJECT_REFERENCE, D.TYPE_TEXT)+"," |
|||
sql += dbi.getSchemaAttribut("prelease", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("type", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("entity", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("path", D.TYPE_STRING)+"," |
|||
sql += dbi.getSchemaAttribut("starttime", D.TYPE_TIME)+"," |
|||
sql += dbi.getSchemaAttribut("finishtime", D.TYPE_TIME)+"," |
|||
sql += dbi.getSchemaAttribut(B.NODE_ATTRIBUTES, D.TYPE_TEXT)+"," |
|||
sql += self.getHistoryFields() |
|||
sql += ");\n" |
|||
sql += dbi.getSchemaIndex("testexecution", "release") + "\n" |
|||
sql += self.getHistoryIndex("testplan") |
|||
for attr in ["entity"]: |
|||
sql += dbi.getSchemaSubtable("te", [{"attr":attr, "atype": D.TYPE_STR}, |
|||
{"attr":"type", "atype": D.TYPE_STR}, |
|||
{"attr":"path", "atype": D.TYPE_STRING}, |
|||
{"attr":B.NODE_ATTRIBUTES, "atype": D.TYPE_TEXT}])+"\n" |
|||
sql += dbi.getSchemaIndex(dbi.getSubTableName("te", attr), |
|||
dbi.getSubTableId(dbi.getSubTableName("te", attr), attr))+"\n" |
|||
return sql |
|||
@ -0,0 +1,69 @@ |
|||
Durch den Test soll die Qualität der Anwendung systematisch (Testabdeckung) nachgewiesen |
|||
und (mittels Regressionen) langfristig sichergestellt werden. |
|||
|
|||
|
|||
Rollen - sind jetzt beschrieben, ok |
|||
|
|||
Testobjekte |
|||
|
|||
logischer Testfall |
|||
Testfall in Managementsicht, aus den User-Story-Akzeptanzkriterien abgeleitet |
|||
Diese werden auf allen Ebenen reportet, insb. deren Status. |
|||
|
|||
physischer Testfall |
|||
Testfall in Spezifikations- und Ausführungssicht |
|||
konkreter auszuführender Testfall, die einzelnen Testschritte müssen spezifiziert/konfiguriert und protokolliert werden. |
|||
Arten physischer Testfälle: |
|||
* automatisierter Testfall |
|||
Alle Einzelschritte werden entsprechend des Automatisierungsframeworks im git-Repo spezifiziert. Entsprechend der Spezifikation wird der Testfall ausgeführt. |
|||
* manueller Testfall |
|||
Alle Einzelschritte werden detailliert (in Jira-..) spezifiziert. Entsprechend der Spezifikation wird der Testfall ausgeführt. |
|||
* explorativer Testfall |
|||
Die wesentlichen Schritt-Sequenzen werden detailliert (in Jira-..) spezifiziert. Von der Spezifikation kann und soll bei der Durchführung variiert werden. Die wesentlichen Schritte werden protokolliert. |
|||
|
|||
|
|||
|
|||
|
|||
Test im Software-Prozess |
|||
|
|||
@pre: Komponenten-/Unittests durch Entwickler:innen |
|||
|
|||
Q1: Lieferung entgegennehmen |
|||
* Entschlüsseln |
|||
* Programm: Artefakte verifizieren mittels md5-Hash |
|||
* Lieferung in git-Repo pushen |
|||
<< Exit wenn Lieferung nicht gelesen werden kann |
|||
|
|||
Q2: Statischer Test |
|||
--> NFA Wartbarkeit |
|||
* Vollständigkeit prüfen >>> gitVerteiler |
|||
* Code-Analyse >>> SonarQube |
|||
<< Exit bei Fehlen wesentlicher Programme (auch Fehlbennung gelten als Fehlen!) |
|||
<< Warnung bei Unvollständigkeit |
|||
|
|||
Q3: Installierbarkeit |
|||
--> NFA Installierbarkeit, Portierbarkeit |
|||
* Kubernetes-Container >>> JEE-Plattform? |
|||
* DB-Scripte auf Hive ausfuehren ? |
|||
* Cloudera-1-Rechner-Maschine >>> Linux-Maschine |
|||
* DevOps-Installation ** Testfälle hierzu beschreiben! |
|||
<< Exit bei Nicht-Installierbarkeit |
|||
|
|||
Q4: System-/Smoketest |
|||
* Bedienelemente |
|||
* dynamischer Smoketest (minimale Testfälle, v.a. Gutfälle) |
|||
* minimaler GUI-Test >>> |
|||
<< Exit bei technischen Blockern |
|||
|
|||
Q5: Regressions-/Progressionstest |
|||
--> Funktionaler Test, Sicherheitstest |
|||
* funktionale Regression (umfassende Testfälle, vollständige Äquivalenzklassen) |
|||
* erweiterte Testfälle zu neuen Funktionen |
|||
* Sichtbarkeit, Sperrkonstellationen >>> |
|||
<< Exit bei groben Fehlfunktionen |
|||
|
|||
Q6: Nutzbarkeit |
|||
--> NFA Usability, Performance, Last |
|||
* manuelle Benutzbarkeit, edu@ETU |
|||
<< Exit wenn die Nutzung unbrauchbar ist |
|||
<< Warnungen |
|||
@ -0,0 +1,321 @@ |
|||
application: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- apid |
|||
- name |
|||
- description |
|||
- reference |
|||
- attributes |
|||
- inscommit |
|||
- insauthor |
|||
- instime |
|||
- updcommit |
|||
- updauthor |
|||
- updtime |
|||
- actual |
|||
apid: |
|||
_field: apid |
|||
type: pk |
|||
name: |
|||
_field: name |
|||
type: str |
|||
index: I |
|||
description: |
|||
_field: description |
|||
type: string |
|||
reference: |
|||
_field: reference |
|||
type: str |
|||
attributes: |
|||
_field: attributes |
|||
type: string |
|||
insauthor: |
|||
_field: insauthor |
|||
type: str |
|||
inscommit: |
|||
_field: inscommit |
|||
type: str |
|||
instime: |
|||
_field: instime |
|||
type: time |
|||
updauthor: |
|||
_field: updauthor |
|||
type: str |
|||
updcommit: |
|||
_field: updcommit |
|||
type: str |
|||
updtime: |
|||
_field: updtime |
|||
type: time |
|||
actual: |
|||
_field: actual |
|||
type: int |
|||
index: I |
|||
ap_component: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- apcomid |
|||
- apid |
|||
- component |
|||
apcomid: |
|||
_field: apcomid |
|||
type: pk |
|||
apid: |
|||
_field: apid |
|||
type: int |
|||
index: I |
|||
component: |
|||
_field: component |
|||
type: str |
|||
index: I |
|||
ap_application: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- apappid |
|||
- apid |
|||
- application |
|||
apappid: |
|||
_field: apappid |
|||
type: pk |
|||
apid: |
|||
_field: apid |
|||
type: int |
|||
index: I |
|||
application: |
|||
_field: component |
|||
type: str |
|||
index: I |
|||
ap_project: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- approid |
|||
- apid |
|||
- project |
|||
- description |
|||
- reference |
|||
approid: |
|||
_field: apid |
|||
type: pk |
|||
apid: |
|||
_field: apid |
|||
type: int |
|||
index: I |
|||
project: |
|||
_field: project |
|||
type: str |
|||
index: I |
|||
description: |
|||
_field: description |
|||
type: string |
|||
reference: |
|||
_field: reference |
|||
type: str |
|||
environment: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- enid |
|||
- name |
|||
- description |
|||
- reference |
|||
- attributes |
|||
- inscommit |
|||
- insauthor |
|||
- instime |
|||
- updcommit |
|||
- updauthor |
|||
- updtime |
|||
- actual |
|||
enid: |
|||
_field: enid |
|||
type: pk |
|||
name: |
|||
_field: name |
|||
type: str |
|||
index: I |
|||
description: |
|||
_field: description |
|||
type: string |
|||
reference: |
|||
_field: reference |
|||
type: str |
|||
attributes: |
|||
_field: attributes |
|||
type: string |
|||
insauthor: |
|||
_field: insauthor |
|||
type: str |
|||
inscommit: |
|||
_field: inscommit |
|||
type: str |
|||
instime: |
|||
_field: instime |
|||
type: time |
|||
updauthor: |
|||
_field: updauthor |
|||
type: str |
|||
updcommit: |
|||
_field: updcommit |
|||
type: str |
|||
updtime: |
|||
_field: updtime |
|||
type: time |
|||
actual: |
|||
_field: actual |
|||
type: int |
|||
index: I |
|||
en_project: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- enproid |
|||
- enid |
|||
- project |
|||
enproid: |
|||
_field: enproid |
|||
type: pk |
|||
enid: |
|||
_field: enid |
|||
type: int |
|||
index: I |
|||
project: |
|||
_field: project |
|||
type: str |
|||
index: I |
|||
en_component: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- encomid |
|||
- enid |
|||
- component |
|||
- instance |
|||
- type |
|||
- ip |
|||
- port |
|||
- hostname |
|||
- dompath |
|||
- user |
|||
- password |
|||
- attributes |
|||
encomid: |
|||
_field: encomid |
|||
type: pk |
|||
enid: |
|||
_field: enid |
|||
index: I |
|||
type: int |
|||
component: |
|||
_field: component |
|||
index: I |
|||
type: str |
|||
instance: |
|||
_field: instance |
|||
type: int |
|||
type: |
|||
_field: type |
|||
type: str |
|||
ip: |
|||
_field: ip |
|||
type: str |
|||
port: |
|||
_field: port |
|||
type: str |
|||
hostname: |
|||
_field: hostname |
|||
type: str |
|||
dompath: |
|||
_field: dompath |
|||
type: str |
|||
user: |
|||
_field: user |
|||
type: str |
|||
password: |
|||
_field: password |
|||
type: str |
|||
attributes: |
|||
_field: attributes |
|||
type: string |
|||
component: |
|||
_header: |
|||
- _field |
|||
- type |
|||
- format |
|||
- index |
|||
_fields: |
|||
- coid |
|||
- name |
|||
- description |
|||
- reference |
|||
- attributes |
|||
- inscommit |
|||
- insauthor |
|||
- instime |
|||
- updcommit |
|||
- updauthor |
|||
- updtime |
|||
- actual |
|||
coid: |
|||
_field: apid |
|||
type: pk |
|||
name: |
|||
_field: name |
|||
type: str |
|||
index: I |
|||
description: |
|||
_field: description |
|||
type: string |
|||
reference: |
|||
_field: reference |
|||
type: str |
|||
attributes: |
|||
_field: attributes |
|||
type: string |
|||
insauthor: |
|||
_field: insauthor |
|||
type: str |
|||
inscommit: |
|||
_field: inscommit |
|||
type: str |
|||
instime: |
|||
_field: instime |
|||
type: time |
|||
updauthor: |
|||
_field: updauthor |
|||
type: str |
|||
updcommit: |
|||
_field: updcommit |
|||
type: str |
|||
updtime: |
|||
_field: updtime |
|||
type: time |
|||
actual: |
|||
_field: actual |
|||
type: int |
|||
index: I |
|||
|
|||
|
@ -0,0 +1,94 @@ |
|||
# This is a sample Python script. |
|||
import os |
|||
import traceback |
|||
import sys |
|||
import yaml |
|||
import basic.program |
|||
import basic.constants as B |
|||
import basic.message |
|||
import tools.path_const as P |
|||
import tools.config_tool as config_tool |
|||
import tools.file_tool as file_tool |
|||
import model.entity |
|||
import model.factory |
|||
#import model.table |
|||
|
|||
PROGRAM_NAME = "check_configuration" |
|||
|
|||
def startPyJob(job): |
|||
try: |
|||
job.m.logDebug("--- start " + PROGRAM_NAME + " ------>>>>") |
|||
components = job.par.component.split(",") |
|||
for c in components: |
|||
job.m.logInfo("------------------------------------------\ncheck component "+c) |
|||
checkComponent(job, c) |
|||
job.m.setMsg("Job " + PROGRAM_NAME + " fertig") |
|||
job.m.logDebug("<<<<<<<<----- " + PROGRAM_NAME + " ------") |
|||
except Exception as e: |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.setFatal(str(e)) |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.logDebug("execpt "+traceback.format_exc()) |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
|
|||
def checkComponent(job, componentName): |
|||
""" |
|||
checks the configurations of the component |
|||
:param job: |
|||
:param componentName: |
|||
:return: |
|||
""" |
|||
import model.component |
|||
configPath = config_tool.getExistingPath(job, [os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_COMPS], componentName, "CONFIG")]) |
|||
configTree = file_tool.read_file_dict(job, configPath, job.m) |
|||
for x in model.component.LIST_CP_SUBJECTS: |
|||
if "conf" not in configTree: |
|||
job.m.setError(componentName + ": root conf is not set: ") |
|||
break |
|||
if x not in configTree["conf"]: |
|||
job.m.setError(componentName + ": subject is not set: " + x) |
|||
else: |
|||
for c in configTree["conf"][x]: |
|||
if c == "none": |
|||
if len(configTree["conf"][x]) != 1: |
|||
job.m.setWarn("none is not the only subject in "+x) |
|||
continue |
|||
comps = model.component.select_components(job, None, None) |
|||
job.m.logInfo("Komponenten pruefen") |
|||
for c in configTree["conf"][model.component.CP_SUBJECT_COMPS]: |
|||
if c in ["none"]: |
|||
continue |
|||
if c not in comps: |
|||
job.m.setError(componentName + ": component " + c + " does not exist") |
|||
job.m.logInfo("- " + componentName + " uses component " + c) |
|||
job.m.logInfo("Steps pruefen") |
|||
for v in configTree["conf"][model.component.CP_SUBJECT_STEPS]: |
|||
if v == "none": |
|||
continue |
|||
job.m.logInfo("- "+componentName + " uses variant "+v) |
|||
job.m.logInfo("Tabellen pruefen") |
|||
tables = model.table.select_tables(job, None, None) |
|||
for t in configTree["conf"][model.component.CP_SUBJECT_TABLES]: |
|||
if t == "none": |
|||
continue |
|||
if t in tables: |
|||
job.m.logInfo("- "+componentName + " uses table " + t) |
|||
else: |
|||
job.m.setError(componentName + ": table " + t + " ist not defined.") |
|||
job.m.logInfo("Artefakte pruefen") |
|||
for a in configTree["conf"][model.component.CP_SUBJECT_ARTS]: |
|||
if t == "none": |
|||
continue |
|||
job.m.logInfo("- "+componentName + " uses artifact " + a) |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
job = basic.program.Job(PROGRAM_NAME) |
|||
print ("job "+str(job.__dict__)) |
|||
job.startJob() |
|||
if job.m.isRc("fatal"): |
|||
job.stopJob() |
|||
# now in theory the program is runnable |
|||
startPyJob(job) |
|||
job.stopJob() |
|||
# See PyCharm help at https://www.jetbrains.com/help/pycharm/ |
|||
@ -0,0 +1,60 @@ |
|||
# This is a sample Python script. |
|||
import sys# |
|||
# import jsonpickle # pip install jsonpickle |
|||
import yaml # pip install pyyaml |
|||
import basic.program |
|||
import basic.componentHandling |
|||
import basic.message |
|||
#import utils.tdata_tool |
|||
import traceback |
|||
|
|||
PROGRAM_NAME = "check_specification" |
|||
|
|||
def startPyJob(job): |
|||
try: |
|||
job.m.logDebug("--- start " + PROGRAM_NAME + " ------>>>>") |
|||
job.m.setMsg("Job " + PROGRAM_NAME + " fertig") |
|||
if hasattr(job.par, "testcase"): |
|||
testcase = getattr(job.par, "testcase") |
|||
print("Check testcase "+testcase) |
|||
elif hasattr(job.par, "testsuite"): |
|||
testsuite = getattr(job.par, "testsuite") |
|||
print("Check testsuite "+testsuite) |
|||
elif hasattr(job.par, "testplan"): |
|||
testplan = getattr(job.par, "testplan") |
|||
print("Check testplan "+testplan) |
|||
job.m.logDebug("<<<<<<<<----- " + PROGRAM_NAME + " ------") |
|||
except Exception as e: |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.setFatal(str(e)) |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.logDebug("execpt "+traceback.format_exc()) |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
|
|||
def checkHead(job): |
|||
pass |
|||
|
|||
if __name__ == '__main__': |
|||
x = basic.program.Job(PROGRAM_NAME) |
|||
print ("x "+str(x)) |
|||
x.startJob() |
|||
x.m.logDebug(str(vars(x.par)) + "\n" + str(vars(x.conf))) |
|||
if x.m.isRc("fatal"): |
|||
x.stopJob() |
|||
exit(x.m.rc * (-1) + 3) |
|||
# now in theory the program is runnable |
|||
x.m.setMsg("# job initialized") |
|||
cm = basic.componentHandling.ComponentManager.getInstance(x) |
|||
print("cm "+str(cm)) |
|||
cm.initComponents() |
|||
comps = cm.getComponents(x, PROGRAM_NAME) |
|||
x.m.setMsg("# Components initialized with these relevant components " + str(comps)) |
|||
for c in comps: |
|||
comp = cm.getComponent(c) |
|||
print(str(comp)) |
|||
comp.check_Instance() |
|||
x.m.merge(comp.m) |
|||
comp.confs["function"][PROGRAM_NAME] = comp.m.topmessage |
|||
|
|||
x.stopJob() |
|||
# See PyCharm help at https://www.jetbrains.com/help/pycharm/ |
|||
@ -0,0 +1,115 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
""" |
|||
program to clean the workspace : |
|||
* remove old debug-files |
|||
""" |
|||
import os |
|||
import re |
|||
import shutil |
|||
import sys |
|||
import traceback |
|||
import basic.program |
|||
import basic.constants as B |
|||
import tools.date_tool as date_tool |
|||
import tools.path_tool as path_tool |
|||
import tools.job_tool as job_tool |
|||
|
|||
LIMIT_DEBUG_FILES = -7 |
|||
PROGRAM_NAME = "clean_workspace" |
|||
|
|||
def startPyJob(job): |
|||
try: |
|||
job.m.logDebug("--- start " + PROGRAM_NAME + " ------>>>>") |
|||
# remove debug-files |
|||
removeDebugFiles(job) |
|||
# clean and archive log-files |
|||
cleanLogFiles(job) |
|||
job.m.setMsg("Job " + PROGRAM_NAME + " fertig") |
|||
job.m.logDebug("<<<<<<<<----- " + PROGRAM_NAME + " ------") |
|||
except Exception as e: |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.setFatal(str(e)) |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.logDebug("execpt "+traceback.format_exc()) |
|||
job.m.logDebug("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
|
|||
def removeDebugFiles(job): |
|||
""" |
|||
to remove debug-files in any relevant folder |
|||
:param job: |
|||
:return: |
|||
""" |
|||
job.m.logInfo("# # remove log-files # # #") |
|||
limit = date_tool.getActdate(date_tool.F_LOG, LIMIT_DEBUG_FILES)[0:8] |
|||
path = job.conf[B.TOPIC_PATH][B.ATTR_PATH_DEBUG] |
|||
cleanFolder(job, path, limit) |
|||
path = os.path.join(B.HOME_PATH, "temp") |
|||
cleanFolder(job, path, limit) |
|||
|
|||
def cleanFolder(job, path, limit): |
|||
""" |
|||
remove all files in the folder with a log-date older than limit |
|||
:param job: |
|||
:param path: |
|||
:param limit: |
|||
:return: |
|||
""" |
|||
date_regex = r"(.*)_(\d{8})_\d{6}" |
|||
cntRm = 0 |
|||
cntMv = 0 |
|||
cntAll = 0 |
|||
for f in os.listdir(path): |
|||
cntAll += 1 |
|||
if re.match(date_regex, f): |
|||
res = re.search(date_regex, f) |
|||
fileType = str(res.group(1)) |
|||
fileDate = str(res.group(2)) |
|||
if fileType in ["debug", "log", "start_dialog"]: |
|||
if fileDate >= limit: |
|||
continue |
|||
job.m.logInfo("remove " + os.path.join(path, f)) |
|||
os.remove(os.path.join(path, f)) |
|||
cntRm += 1 |
|||
else: |
|||
fileYear = fileDate[0:4] |
|||
actYear = date_tool.getActdate(date_tool.F_LOG)[0:4] |
|||
archivPath = os.path.join(path, fileYear) |
|||
if fileYear < actYear: |
|||
if not os.path.exists(archivPath): |
|||
os.mkdir(archivPath) |
|||
if not os.path.isdir(archivPath): |
|||
raise Exception("archiv-folder is not a directory: " + archivPath) |
|||
shutil.move(os.path.join(path, f), os.path.join(archivPath, f)) |
|||
cntMv += 1 |
|||
job.m.setMsg(str(cntRm) + " / " + str(cntAll) + " files removed in " + path) |
|||
job.m.setMsg(str(cntMv) + " / " + str(cntAll) + " files moved from " + path) |
|||
|
|||
def cleanLogFiles(job): |
|||
""" |
|||
searches all log-folder in test-documents and remove the oldest log-files except the newest |
|||
:param job: |
|||
:return: |
|||
""" |
|||
job.m.logInfo("# # clean log-files # # #") |
|||
limit = date_tool.getActdate(date_tool.F_LOG, LIMIT_DEBUG_FILES)[0:8] |
|||
path = path_tool.compose_path(job, "{job.par.wsdir}/{log}", None) |
|||
cleanFolder(job, path, limit) |
|||
environments = job_tool.select_environment(job, "", "ALL") |
|||
for env in environments: |
|||
jobEnv = "" |
|||
if hasattr(job.par, "environment"): |
|||
jobEnv = getattr(job.par, "environment") |
|||
setattr(job.par, "environment", env) |
|||
path = path_tool.compose_path(job, "{envlog}", None) |
|||
cleanFolder(job, path, limit) |
|||
setattr(job.par, "environment", jobEnv) |
|||
pass |
|||
|
|||
if __name__ == '__main__': |
|||
job = basic.program.Job(PROGRAM_NAME) |
|||
startPyJob(job) |
|||
@ -0,0 +1,56 @@ |
|||
# program to copy dummy-file as testcase-results |
|||
# ------------------------------------------------------------------------------------------------------------- |
|||
""" |
|||
|
|||
""" |
|||
import os |
|||
import shutil |
|||
import basic.program |
|||
import utils.path_tool |
|||
import utils.file_tool |
|||
import basic.constants as B |
|||
import utils.tdata_tool |
|||
import basic.componentHandling |
|||
import utils.path_const as P |
|||
import basic.message as message |
|||
|
|||
|
|||
PROGRAM_NAME = "copy_appdummy" |
|||
PROGRAM_DUMMY = "collect_testcase" |
|||
|
|||
def startPyJob(job): |
|||
cm = basic.componentHandling.ComponentManager.getInstance(job) |
|||
cm.initComponents() |
|||
comps = cm.getComponents(PROGRAM_DUMMY) |
|||
job.m.setMsg("# Components initialized with these relevant components " + str(comps)) |
|||
cm = basic.componentHandling.ComponentManager.getInstance(job, "init") |
|||
print("cm " + str(cm)) |
|||
cm.initComponents() |
|||
comps = cm.getComponents(PROGRAM_DUMMY) |
|||
for c in comps: |
|||
comp = cm.getComponent(c) |
|||
for cond in ["pre", "post"]: |
|||
tdatapath = utils.path_tool.composePattern(job, "{td"+cond+"exec}", comp) |
|||
envapppath = utils.path_tool.composePattern(job, "{tc"+cond+"cond}", comp) |
|||
if os.path.exists(tdatapath): |
|||
files = utils.file_tool.getFiles(job.m, job, tdatapath, ".+\.csv", None) |
|||
for f in files: |
|||
# shutil.copy() |
|||
print("cp " + os.path.join(tdatapath, f) + " " + os.path.join(envapppath, f)) |
|||
utils.file_tool.mkPaths(job, os.path.join(envapppath, f), job.m) |
|||
shutil.copy(os.path.join(tdatapath, f), os.path.join(envapppath, f)) |
|||
print(tdatapath) |
|||
|
|||
# Press the green button in the gutter to run the script. |
|||
if __name__ == '__main__': |
|||
print(PROGRAM_NAME) |
|||
x = basic.program.Job(PROGRAM_NAME) |
|||
x.startJob() |
|||
x.m.logDebug(str(vars(x.par)) + "\n" + str(vars(x.conf))) |
|||
if x.m.isRc("fatal"): |
|||
x.stopJob() |
|||
exit(x.m.rc * (-1) + 3) |
|||
startPyJob(x) |
|||
x.stopJob() |
|||
# See PyCharm help at https://www.jetbrains.com/help/pycharm/ |
|||
|
|||
@ -0,0 +1,319 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import json |
|||
import os |
|||
import datetime |
|||
import re |
|||
import subprocess |
|||
import traceback |
|||
|
|||
import yaml |
|||
|
|||
INSTALLED = False |
|||
try: |
|||
import basic.program |
|||
INSTALLED = True |
|||
except: |
|||
INSTALLED = False |
|||
|
|||
PROGRAM_NAME = "install_workspace" |
|||
CONFIG_FORMAT = "yml" |
|||
BASIS_FORMAT = "json" |
|||
|
|||
REPO_NAME = "_name" |
|||
REPO_URL = "url" |
|||
REPO_BRANCH = "_branch" |
|||
|
|||
|
|||
job = None |
|||
# ----------------------------------------------------------------------------------------- |
|||
# Miniimplementierung des Programmeahmens |
|||
class Logger: |
|||
""" |
|||
Kurzversion des Messages mit Standardfunktionen |
|||
* opel_logs() |
|||
* close_logs() |
|||
* log_info() |
|||
* log_error() |
|||
""" |
|||
def __init__(self, job, level, logTime, comp): |
|||
self.openLog(job, logTime) |
|||
|
|||
def openLog(self, job, logTime): |
|||
# job, level, logTime, componente |
|||
home = getHome() |
|||
path = os.path.join(home, "log") |
|||
if not os.path.exists(path): |
|||
os.mkdir(path) |
|||
logpath = os.path.join(home, "log", job.program+"_"+logTime+".txt") |
|||
print("logpath "+logpath) |
|||
self.logfile = open(logpath, "w") |
|||
|
|||
def logInfo(self, text): |
|||
self.logfile.write(text + "\n") |
|||
def logWarn(self, text): |
|||
self.logfile.write("WARN: "+text + "\n") |
|||
def setMsg(self, text): |
|||
self.logfile.write(text + "\n") |
|||
def logError(self, text): |
|||
self.logfile.write("ERROR:" + text + "\n") |
|||
print("ERROR:" + text) |
|||
|
|||
def closeLog(self): |
|||
self.logfile.close() |
|||
|
|||
class ActJob: |
|||
""" |
|||
Kurzversion des Jobs mit Standardfunktionen |
|||
* start_job() startet Job mit Messaging |
|||
* set_parameter() setzt Parameter aus args oder aus Aufruf |
|||
* stop_job() startet Job mit Messaging |
|||
""" |
|||
def __init__(self, program): |
|||
self.program = program |
|||
self.start = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|||
self.jobid = 100000 |
|||
self.conf = {} |
|||
self.par = {} |
|||
|
|||
def startJob(self): |
|||
self.m = Logger(self, "info", self.start, None) # job, level, logTime, componente |
|||
text = "# # # Start Job " + self.start + " # # # " |
|||
self.m.logInfo(text) |
|||
print(text) |
|||
|
|||
def stopJob(self): |
|||
self.ende = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|||
text = "# # # Stop Job " + self.start + " - " + self.ende + " # # # " |
|||
self.m.logInfo(text) |
|||
self.m.closeLog() |
|||
print(text) |
|||
|
|||
def getDebugLevel(self, tool): |
|||
return 0 |
|||
|
|||
def debug(self, verify, text): |
|||
self.m.logInfo(text) |
|||
|
|||
def setParameter(self, args): |
|||
for k in args: |
|||
setattr(self, k, args[k]) |
|||
|
|||
|
|||
# ----------------------------------------------------------------------------------------- |
|||
# Standardsteuerung Hauptverarbeitung |
|||
def startPyJob(job): |
|||
""" |
|||
Steuerung der Hauptverarbeitung, aufrufbar vom Programm selbst oder aus job_tool |
|||
:param job: |
|||
:return: |
|||
""" |
|||
job.m.logInfo("startPyJob gestertet ") |
|||
try: |
|||
setParameter(job) |
|||
readConfig(job) |
|||
createFolders(job) |
|||
createGit(job) |
|||
createBasisConfig(job) |
|||
createDb(job) |
|||
except Exception as e: |
|||
job.m.logError("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.logError(str(e)) |
|||
job.m.logError("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
job.m.logError("execpt "+traceback.format_exc()) |
|||
job.m.logError("+++++++++++++++++++++++++++++++++++++++++++++") |
|||
|
|||
# ----------------------------------------------------------------------------------------- |
|||
# konkrete Verarbeitungsroutinen |
|||
def setParameter(job): |
|||
job.m.logInfo("--- setze Parameter ") |
|||
|
|||
def readConfig(job): |
|||
job.m.logInfo("--- suche config-Datei ") |
|||
args = {} |
|||
args["home"] = getHome() |
|||
configPath = "" |
|||
for p in os.listdir(args["home"]): |
|||
print(p) |
|||
path = os.path.join(args["home"], p) |
|||
if os.path.isfile(path) and "workspace" in p: |
|||
configPath = path |
|||
break |
|||
if len(configPath) < 1: |
|||
raise Exception("Keine Konfiguration gefunden in "+args["home"]) |
|||
with open(configPath, 'r') as file: |
|||
doc = yaml.full_load(file) |
|||
file.close() |
|||
for k in doc: |
|||
args[k] = doc[k] |
|||
job.conf[k] = doc[k] |
|||
home = getHome() |
|||
for k in job.conf["paths"]: |
|||
job.conf["paths"][k] = os.path.join(home, job.conf["paths"][k]) |
|||
job.setParameter(args) |
|||
|
|||
def createFolders(job): |
|||
job.m.logInfo("--- erstelle Verzeichnisse ") |
|||
for p in job.paths: |
|||
path = os.path.join(job.home, job.paths[p]) |
|||
createFolder(job, path) |
|||
|
|||
def createFolder(job, path): |
|||
if not os.path.exists(path): |
|||
os.mkdir(path) |
|||
job.m.logInfo("Verzeichnis angelegt: "+ path) |
|||
elif not os.path.isdir(path): |
|||
job.m.logError("Verzeichnisname existiert und ist kein Verzeichnis "+ path) |
|||
else: |
|||
job.m.logInfo("Verzeichnis existiert: " + path) |
|||
|
|||
# -------------------------------------------------------------------------------------- |
|||
# git_tool |
|||
# -------------------------------------------------------------------------------------- |
|||
|
|||
def createGit(job): |
|||
job.m.logInfo("--- erstelle und aktualisiere git-Repos ") |
|||
repos = {} |
|||
local = {} |
|||
attr = { |
|||
REPO_NAME: "", |
|||
REPO_BRANCH: "" |
|||
} |
|||
# erstelle Repoliste mit den Attributen: name, branch, url |
|||
for r in job.repos: |
|||
if r in attr: |
|||
attr[r] = job.repos[r] |
|||
else: |
|||
repo = {} |
|||
for a in job.repos[r]: |
|||
repo[a] = job.repos[r][a] |
|||
repos[r] = repo |
|||
for k in attr: |
|||
a = k |
|||
for r in repos: |
|||
if a not in repos[r]: |
|||
repos[r][a] = attr[k] |
|||
for r in repos: |
|||
repo = repos[r] |
|||
path = os.path.join(job.home, job.paths[r]) |
|||
if os.path.exists(path): |
|||
local[REPO_URL] = os.path.join(job.home, job.paths[r]) |
|||
local[REPO_BRANCH] = repo[REPO_BRANCH] |
|||
local[REPO_NAME] = repo[REPO_NAME] |
|||
rpath = os.path.join(local[REPO_URL], ".git") |
|||
if os.path.exists(rpath): |
|||
job.m.logInfo("Repo existiert bereits "+r) |
|||
else: |
|||
job.m.logInfo("Repo erzeugen "+r) |
|||
initGit(job, local, repo) |
|||
updateLocal(job, local, repo) |
|||
else: |
|||
job.m.logError("Verzeichnis existiert nicht: " + path) |
|||
|
|||
def initGit(job, local, repo, bare=False): |
|||
job.m.logInfo("--- initialisiere git-Repo "+str(repo)+","+str(local)) |
|||
os.chdir(local[REPO_URL]) |
|||
cmd = "git init " |
|||
if bare: |
|||
cmd += " --bare" |
|||
execCmd(job, cmd) |
|||
cmd = "git checkout " + local[REPO_BRANCH] |
|||
execCmd(job, cmd) |
|||
cmd = "git remote add " + repo[REPO_NAME] + " " + repo[REPO_URL] |
|||
execCmd(job, cmd) |
|||
os.chdir(job.home) |
|||
|
|||
def execCmd(job, cmd): |
|||
job.m.logInfo(cmd) |
|||
text = "" |
|||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) |
|||
btext = process.communicate()[0] |
|||
text = btext.decode('utf-8') |
|||
job.m.logInfo(text) |
|||
return text |
|||
|
|||
def checkoutLocal(job, local): |
|||
os.chdir(local[REPO_URL]) |
|||
cmd = "git checkout " + local[REPO_BRANCH] |
|||
text = execCmd(job, cmd) |
|||
return text |
|||
|
|||
def updateLocal(job, local, repo): |
|||
job.m.logInfo("--- aktualisiere git-Repo "+str(repo)+","+str(local)) |
|||
text = checkoutLocal(job, local) |
|||
# if len(text) > 0 and re.match(r"[MA]\s\w+", text): |
|||
match = re.search(r"([DMA])\s(\S+)", text) |
|||
if match is not None: |
|||
os.chdir(job.home) |
|||
job.m.logError("ERROR: lokales Repo " + local[REPO_URL] + ", " + local[REPO_BRANCH] + " hat uncommited Aenderungen") |
|||
print("regex gefunden") |
|||
return |
|||
cmd = "git pull " + repo[REPO_NAME] + " " + repo[REPO_BRANCH] |
|||
text = execCmd(job, cmd) |
|||
job.m.logInfo(text) |
|||
os.chdir(job.home) |
|||
|
|||
def updateRemote(job, local, repo): |
|||
job.m.logInfo("--- aktualisiere git-Repo "+str(repo)+","+str(local)) |
|||
text = checkoutLocal(job, local) |
|||
cmd = "git push " + repo[REPO_NAME] + " " + repo[REPO_BRANCH] |
|||
text = execCmd(job, cmd) |
|||
os.chdir(job.home) |
|||
|
|||
def createBasisConfig(job): |
|||
job.m.logInfo("--- erstelle Basis-Koniguration ") |
|||
config = {} |
|||
config["basic"] = {} |
|||
config["basic"]["paths"] = {} |
|||
config["basic"]["paths"]["home"] = job.home |
|||
for p in job.paths: |
|||
path = os.path.join(job.home, job.paths[p]) |
|||
config["basic"]["paths"][p] = path |
|||
for p in ["temp", "config"]: |
|||
path = os.path.join(job.home, p) |
|||
createFolder(job, path) |
|||
config["basic"]["paths"][p] = path |
|||
if BASIS_FORMAT == "yml": |
|||
path = os.path.join(job.home, "config", "basis.json") |
|||
with open(path, 'w', encoding="utf-8") as file: |
|||
doc = yaml.dump(config, file) |
|||
file.write(doc) |
|||
file.close() |
|||
elif BASIS_FORMAT == "json": |
|||
path = os.path.join(job.home, "config", "basis.json") |
|||
with open(path, 'w', encoding="utf-8") as file: |
|||
doc = json.dumps(config, indent=4) |
|||
file.write(doc) |
|||
file.close() |
|||
|
|||
def createDb(job): |
|||
if "db" in job.conf: |
|||
import basic.connection |
|||
import basic.Testserver |
|||
|
|||
testserver = basic.Testserver.Testserver(job) |
|||
testserver.createAdminDBTables(job) |
|||
|
|||
def getHome(): |
|||
home = os.getcwd() |
|||
if home[-7:] == "program": |
|||
home = home[0:-8] |
|||
return home |
|||
|
|||
|
|||
# ----------------------------------------------------------------------------------------- |
|||
# Pythonstandard Programmaufruf |
|||
# Job-Objekt erzeugen und beenden |
|||
if __name__ == '__main__': |
|||
if INSTALLED: |
|||
#job = basic.program.Job(PROGRAM_NAME) |
|||
job = ActJob(PROGRAM_NAME) |
|||
else: |
|||
job = ActJob(PROGRAM_NAME) |
|||
job.startJob() |
|||
startPyJob(job) |
|||
job.stopJob() |
|||
@ -0,0 +1,280 @@ |
|||
#!/usr/bin/python |
|||
# program to execute programs for a testcases or for a testsuite |
|||
# PARAM from INPUT: --granularity --application --environment --testcase/testsuite |
|||
# main functions |
|||
# + input_param() : cache-actjob --> user-input --> local-param |
|||
# + start_job() : local-param --> cache-actjob --> start-param |
|||
# --------------------------------------------------- |
|||
|
|||
""" |
|||
|
|||
""" |
|||
import os.path |
|||
import json |
|||
import re |
|||
|
|||
import basic.program |
|||
import basic.constants as B |
|||
import tools.job_tool |
|||
import tools.file_tool |
|||
import tools.data_const as D |
|||
import tools.date_tool |
|||
import tools.path_tool |
|||
import tools.path_const as P |
|||
|
|||
tempJob = {} |
|||
|
|||
PROGRAM_NAME = "service" |
|||
|
|||
DLG_TESTCASE = "Testfall" |
|||
DLG_TESTSUITE = "Testsuite" |
|||
DLG_COMPLETE = "Komplettausfuehrung" |
|||
LIST_DLG_GRAN = [DLG_TESTCASE, DLG_TESTSUITE] |
|||
DLG_START_QUESTION = "was soll getestet werden" |
|||
# DLG_TESTPLAN = "Testplan" |
|||
DLG_ENVIRONMENT = "Umgebung" |
|||
DLG_APPLICATION = "Anwendung" |
|||
DLG_REDO = "wiederholen" |
|||
DLG_CONTINUE = "fortsetzen" |
|||
DLG_DUMMY_STEP = "Dummy-Schritt" |
|||
DLG_NEWJOB = "neuer Job" |
|||
|
|||
JOB_NR = { |
|||
DLG_TESTSUITE : { |
|||
"start": "init_testsuite", |
|||
"init_testsuite": { |
|||
"jobnr": "0" }, |
|||
"execute_testsuite": { |
|||
"jobnr": "1"}, |
|||
"collect_testsuite": { |
|||
"jobnr": "2"}, |
|||
"compare_testsuite": { |
|||
"jobnr": "3"}, |
|||
"finish_testsuite": { |
|||
"jobnr": "4"} |
|||
}, |
|||
DLG_TESTCASE: { |
|||
"start": "init_testcase", |
|||
"init_testcase": { |
|||
"jobnr": "5" }, |
|||
"execute_testcase": { |
|||
"jobnr": "6" }, |
|||
"collect_testcase": { |
|||
"jobnr": "7" }, |
|||
"copy_appdummy": { |
|||
"jobnr": "8" }, |
|||
"compare_testcase": { |
|||
"jobnr": "9" }, |
|||
}, |
|||
"check_environment": { |
|||
"jobnr": "10" }, |
|||
"test_executer": { |
|||
"jobnr": "11"}, |
|||
} |
|||
|
|||
JOB_LIST = [ |
|||
"init_testsuite", # 0 |
|||
"execute_testsuite", # 1 |
|||
"collect_testsuite", # 2 |
|||
"compare_testsuite", # 3 |
|||
"finish_testsuite", # 4 |
|||
"init_testcase", # 5 |
|||
"execute_testcase", # 6 |
|||
"collect_testcase", # 7 |
|||
"copy_appdummy", # 8 |
|||
"compare_testcase", # 9 |
|||
"check_environment", # 10 |
|||
"test_executer" # 11 |
|||
] |
|||
|
|||
appList = [] |
|||
envList = [] |
|||
entities = {} |
|||
entities[DLG_TESTCASE] = {} |
|||
entities[DLG_TESTSUITE] = {} |
|||
|
|||
def readContext(job): |
|||
for k in job.conf[B.SUBJECT_APPS]: |
|||
appList.append(k) |
|||
path = job.conf[B.TOPIC_PATH][B.ATTR_PATH_ENV] |
|||
if os.path.exists(path): |
|||
for d in os.listdir(path): |
|||
print ("-- "+d) |
|||
if not os.path.isdir(os.path.join(path, d)): |
|||
continue |
|||
if d[0:1] == "_": |
|||
continue |
|||
envList.append(d) |
|||
path = job.conf[B.TOPIC_PATH][B.ATTR_PATH_TDATA] |
|||
if os.path.exists(path): |
|||
for d in os.listdir(path): |
|||
print("tdata path "+d) |
|||
if not os.path.isdir(os.path.join(path, d)): |
|||
print("continue a") |
|||
continue |
|||
if d[0:1] == "_": |
|||
print("continue b") |
|||
continue |
|||
specpath = os.path.join(path, d, D.DFILE_TESTCASE_NAME + ".csv") |
|||
readSpec(job, d, DLG_TESTCASE, specpath) |
|||
specpath = os.path.join(path, d, D.DFILE_TESTSUITE_NAME + ".csv") |
|||
readSpec(job, d, DLG_TESTSUITE, specpath) |
|||
|
|||
def readSpec(job, testentity, testgran, specpath): |
|||
print("spec "+specpath) |
|||
if not os.path.isfile(specpath): |
|||
print("continue c") |
|||
return |
|||
text = tools.file_tool.read_file_text(job, specpath, job.m) |
|||
print("-----------\n"+text+"\n------------------") |
|||
if re.match(r".*?depricated;[jJyY]", text): |
|||
return |
|||
if re.match(r".*\nhead:application;", text): |
|||
print("## app gematcht") |
|||
res = re.search(r".*head:application;(.+)\n", text) |
|||
apps = res.group(1).replace(";", ",").split(",") |
|||
print("# "+str(apps)) |
|||
for a in apps: |
|||
if len(a) < 1: |
|||
break |
|||
if a not in entities[testgran]: |
|||
entities[testgran][a] = [] |
|||
print(a+" in "+testentity+" "+testgran+" -- "+str(entities)) |
|||
entities[testgran][a].append(testentity) |
|||
|
|||
def printProc(job, process): |
|||
print("--------------------------------------------------") |
|||
for k in process: |
|||
print("| {0:15s} : {1}".format(k, process[k])) |
|||
print("--------------------------------------------------") |
|||
|
|||
|
|||
def restartActualProcess(job): |
|||
""" |
|||
check if an actual process is open |
|||
:return: |
|||
""" |
|||
path = tools.path_tool.getActualJsonPath(job) |
|||
if os.path.exists(path): |
|||
actProc = tools.file_tool.read_file_dict(job, path, job.m) |
|||
print("restartActJob "+str(actProc)) |
|||
printProc(job, actProc) |
|||
step = int(actProc["step"]) |
|||
if actProc["program"] == "test_executer": |
|||
if step > 5: |
|||
dialogProcess(job) |
|||
else: |
|||
actProc["step"] = str(step+1) |
|||
tools.job_tool.start_child_process(job, actProc) |
|||
restartActualProcess(job) |
|||
selection = [DLG_NEWJOB, DLG_REDO] |
|||
|
|||
nr = int(JOB_NR[actProc["gran"]][actProc["program"]]["jobnr"]) |
|||
if (actProc["gran"] == DLG_TESTSUITE and nr < 4) or (actProc["gran"] == DLG_TESTCASE and nr < 9): |
|||
selection.append(DLG_CONTINUE) |
|||
if nr == 7: |
|||
selection.append(DLG_DUMMY_STEP) |
|||
choice = getChoice(job, selection, DLG_ENVIRONMENT) |
|||
print(choice) |
|||
if choice == DLG_REDO: |
|||
tools.job_tool.start_child_process(job, actProc) |
|||
restartActualProcess(job) |
|||
elif choice == DLG_DUMMY_STEP: |
|||
actProc["program"] = JOB_LIST[nr+1] |
|||
tools.job_tool.start_child_process(job, actProc) |
|||
restartActualProcess(job) |
|||
elif choice == DLG_CONTINUE: |
|||
if nr == 7: |
|||
nr = 9 |
|||
else: |
|||
nr += 1 |
|||
print (" act nr "+str(nr)) |
|||
actProc["step"] = str(step + 1) |
|||
actProc["program"] = JOB_LIST[nr] |
|||
tools.job_tool.start_child_process(job, actProc) |
|||
elif choice == DLG_NEWJOB: |
|||
dialogProcess(job) |
|||
else: |
|||
dialogProcess(job) |
|||
|
|||
|
|||
def dialogProcess(job): |
|||
""" |
|||
dialog for selection and starting a process |
|||
:param job: |
|||
:return: |
|||
""" |
|||
process = {} |
|||
index = 0 |
|||
print("create new process") |
|||
selection = [] |
|||
selection = LIST_DLG_GRAN |
|||
if DLG_TESTCASE + " - " + DLG_COMPLETE not in selection: |
|||
selection.append(DLG_TESTCASE + " - " + DLG_COMPLETE) |
|||
if DLG_TESTSUITE + " - " + DLG_COMPLETE not in selection: |
|||
selection.append(DLG_TESTSUITE + " - " + DLG_COMPLETE) |
|||
choice = getChoice(job, LIST_DLG_GRAN, DLG_START_QUESTION) |
|||
if DLG_COMPLETE in choice: |
|||
process["gran"] = choice[0:-3-len(DLG_COMPLETE)] |
|||
process["program"] = "test_executer" |
|||
process["step"] = 1 |
|||
else: |
|||
process["gran"] = choice |
|||
process["program"] = JOB_NR[process["gran"]]["start"] |
|||
process["step"] = 1 |
|||
if len(appList) == 1: |
|||
process["app"] = appList[0] |
|||
else: |
|||
process["app"] = getChoice(job, appList, DLG_ENVIRONMENT) |
|||
# |
|||
if len(envList) == 1: |
|||
process["env"] = envList[0] |
|||
else: |
|||
process["env"] = getChoice(job, envList, DLG_ENVIRONMENT) |
|||
# |
|||
if len(entities[process["gran"]][process["app"]]) == 1: |
|||
process["entity"] = entities[process["gran"]][process["app"]][0] |
|||
else: |
|||
process["entity"] = getChoice(job, entities[process["gran"]][process["app"]], process["gran"]) |
|||
print(str(process)) |
|||
setattr(job.par, B.PAR_ENV, process["env"]) |
|||
setattr(job.par, B.PAR_APP, process["app"]) |
|||
if process["gran"] == DLG_TESTCASE: |
|||
setattr(job.par, B.PAR_TESTCASE, process["entity"]) |
|||
setattr(job.par, B.PAR_TCTIME, tools.date_tool.getActdate(tools.date_tool.F_DIR)) |
|||
path = tools.path_tool.composePattern(job, "{"+P.P_TCBASE+"}", None) |
|||
process[B.PAR_TCDIR] = path |
|||
elif process["gran"] == DLG_TESTSUITE: |
|||
setattr(job.par, B.PAR_TESTSUITE, process["entity"]) |
|||
setattr(job.par, B.PAR_TSTIME, tools.date_tool.getActdate(tools.date_tool.F_DIR)) |
|||
path = tools.path_tool.composePattern(job, "{"+P.P_TSBASE+"}", None) |
|||
process[B.PAR_TSDIR] = path |
|||
tools.job_tool.start_child_process(job, process) |
|||
restartActualProcess(job) |
|||
|
|||
def getChoice(job, choiselist, description): |
|||
index = 0 |
|||
print("+------------- "+description+" ----------") |
|||
print('| | {:2d} : {:60s}'.format(0, "exit")) |
|||
for k in choiselist: |
|||
index += 1 |
|||
print('| | {:2d} : {:60s}'.format(index, k)) |
|||
print("+-----------------------------------------------") |
|||
choice = input("Auswahl 1-" + str(index) + ": ") |
|||
if not choice.isnumeric(): |
|||
print("FEHLER Fehleingabe "+choice) |
|||
getChoice(job, choiselist, description) |
|||
elif int(choice) < 1: |
|||
exit(0) |
|||
elif int(choice) > index: |
|||
print("FEHLER Fehleingabe "+choice) |
|||
getChoice(job, choiselist, description) |
|||
else: |
|||
return choiselist[int(choice) - 1] |
|||
|
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
job = basic.program.Job(PROGRAM_NAME, "", {}) |
|||
readContext(job) |
|||
restartActualProcess(job) |
|||
@ -0,0 +1,140 @@ |
|||
""" |
|||
Dieses Programm durchlaeuft das angegebene Programmverzeichnis und ermittelt zu jeder Datei den md5-Hash. |
|||
Wenn neben diesem Programm eine Datei *md5Hash.txt liegt, werden die Werte gegen diese Datei verglichen. |
|||
weitere Feature: |
|||
* in Anwendung ueberfuehren, z.B. eine jar |
|||
* aufrubar ueber cli und Dialog |
|||
* config zu Standardeingaben --path, --work; |
|||
* --name mit Aufbauregel Release + Name |
|||
* Namensliste hinterlegen mit: unterverzeichnis, repo-name und repo-branch |
|||
* Methoden zum Annehmen einer Lieferung (unzip Subzips, pruefen, git-push nach korrekter Pruefung |
|||
* Methoden zum Erzeugen einer Lieferung |
|||
Definition *_md5protokoll.txt: datei \t md5checksum \n |
|||
""" |
|||
import argparse |
|||
import datetime |
|||
import hashlib |
|||
import os |
|||
|
|||
def openLog(args): |
|||
startTime = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|||
path = os.path.join(getattr(args, "work"), getattr(args, "name") + "_" + startTime + ".txt") |
|||
logfile = open(path, 'w', encoding="utf-8") |
|||
logfile.write("* * * * * * * * * * PROTOKOLLL MD5-Checksum-Pruefung * * * * * * * * * * * * * * *\n") |
|||
logfile.write("Name: " + getattr(args, "name") + "\n") |
|||
logfile.write("Path: " + getattr(args, "path") + "\n") |
|||
logfile.write("Dir : " + getattr(args, "dir") + "\n") |
|||
return logfile |
|||
|
|||
def openResult(args, mode, suffix): |
|||
path = os.path.join(getattr(args, "work"), getattr(args, "name") + "_"+suffix+".txt") |
|||
if mode == "r" and not os.path.exists(path): |
|||
return None |
|||
resultfile = open(path, mode, encoding="utf-8") |
|||
return resultfile |
|||
|
|||
def traverseDir(logfile, resultfile, path, rootpath): |
|||
logfile.write("traverse " + path + "\n") |
|||
for f in sorted(os.listdir(path)): |
|||
if f[:1] == ".": |
|||
continue |
|||
if f[:2] == "__": |
|||
continue |
|||
if os.path.isfile(os.path.join(path, f)): |
|||
fname = os.path.join(path, f) |
|||
lname = fname.replace(rootpath, "") |
|||
logfile.write(". " + lname + "\n") |
|||
resultfile.write(lname + "\t" + getMD5Hash(fname) + "\n") |
|||
elif os.path.isdir(os.path.join(path, f)): |
|||
traverseDir(logfile, resultfile, os.path.join(path, f), rootpath) |
|||
|
|||
def getMD5Hash(path): |
|||
hash_md5 = hashlib.md5() |
|||
with open(path, "rb") as f: |
|||
for chunk in iter(lambda: f.read(4096), b""): |
|||
hash_md5.update(chunk) |
|||
return hash_md5.hexdigest() |
|||
|
|||
def compareLists(logfile, args): |
|||
protokollfile = openResult(args, "r", "md5protokoll") |
|||
if protokollfile is None: |
|||
logfile.write("Kein Vergleich, da Protokolldatei fehlt! \n") |
|||
return |
|||
resultfile = openResult(args, "r", "md5result") |
|||
protLines = protokollfile.readlines() |
|||
protokollfile.close() |
|||
resultLines = resultfile.readlines() |
|||
resultfile.close() |
|||
p = 0 |
|||
r = 0 |
|||
error = False |
|||
while (True): |
|||
# print("p " + str(p) + " r " + str(r)) |
|||
if len(protLines) > p: |
|||
protRow = protLines[p].replace("\r","").split("\t") |
|||
else: |
|||
protRow = None |
|||
if len(resultLines) > r: |
|||
resRow = resultLines[r].replace("\r","").split("\t") |
|||
else: |
|||
resRow = None |
|||
if protRow is None and resRow is None: |
|||
break |
|||
elif protRow is None and resRow is not None: |
|||
error = True |
|||
logfile.write("ERROR Result " + resRow[0] + ": ist ueberzaehlig\n") |
|||
r += 1 |
|||
elif resRow is not None and resRow is not None and protRow[0] > resRow[0]: |
|||
error = True |
|||
logfile.write("ERROR Result " + resRow[0] + ": ist ueberzaehlig\n") |
|||
r += 1 |
|||
elif resRow is None and protRow is not None: |
|||
error = True |
|||
logfile.write("ERROR Protokoll " + protRow[0] + ": ist ueberzaehlig\n") |
|||
p += 1 |
|||
elif protRow is not None and resRow is not None and protRow[0] < resRow[0]: |
|||
error = True |
|||
logfile.write("ERROR Protokoll " + protRow[0] + ": ist ueberzaehlig\n") |
|||
p += 1 |
|||
elif protRow is not None and resRow is not None and protRow[0] == resRow[0]: |
|||
if protRow[1] != resRow[1]: |
|||
error = True |
|||
logfile.write("ERROR "+protRow[0]+": md5Hash unterscheiden sich (" + protRow[1] + "!=" + resRow[1].strip() + ")\n") |
|||
r += 1 |
|||
p += 1 |
|||
if error: |
|||
logfile.write("\n+--------------------------------------------------------+\n") |
|||
logfile.write("| Fehler aufgetreten, die Dateien unterscheiden sich |\n") |
|||
logfile.write("+--------------------------------------------------------+\n") |
|||
else: |
|||
logfile.write("\nDateien unterscheiden sich nicht\n") |
|||
|
|||
|
|||
def readParameter(): |
|||
""" |
|||
--dir das zu durchsuchende Verzeichnis |
|||
--name Namenszusatz fuer das zu untersuchende Programmpaket |
|||
--work Arbeitserzeichnis mit: |
|||
<name>_md5result.txt erstellte Ergebnisdatei |
|||
<name>_md5protokoll.txt mitgelieferte Vergleichsdatei |
|||
<name>_JJJJMMTT_hhmmss.txt Protokolldatei |
|||
""" |
|||
parser = argparse.ArgumentParser() |
|||
parser.add_argument('-p', '--path', required=True, action='store') |
|||
parser.add_argument('-d', '--dir', required=True, action='store') |
|||
parser.add_argument('-n', '--name', required=True, action='store') |
|||
parser.add_argument('-w', '--work', required=True, action='store') |
|||
args = parser.parse_args() |
|||
return args |
|||
|
|||
if __name__ == '__main__': |
|||
args = readParameter() |
|||
logfile = openLog(args) |
|||
logfile.write("\n") |
|||
resultfile = openResult(args, "w", "md5result") |
|||
path = os.path.join(getattr(args, "path")) |
|||
traverseDir(logfile, resultfile, path, path) |
|||
resultfile.close() |
|||
logfile.write("\n") |
|||
compareLists(logfile, args) |
|||
logfile.close() |
|||
|
|
|
|
|
|
@ -0,0 +1,246 @@ |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import os |
|||
import basic.program |
|||
import basic.toolHandling |
|||
import basic.constants as B |
|||
import model.entity |
|||
import model.constants as M |
|||
import tools.data_const as D |
|||
import tools.path_const as P |
|||
import tools.config_tool |
|||
import tools.file_tool |
|||
import tools.git_tool |
|||
import tools.file_type |
|||
|
|||
TABLE_NAMES = ["application", "ap_project", "ap_component"] |
|||
DEFAULT_SYNC = M.SYNC_FULL_GIT2DB |
|||
|
|||
TABLE_NAME = B.SUBJECT_APP |
|||
""" system-name for this entity """ |
|||
FIELD_ID = "apid" |
|||
|
|||
FILE_EXTENSION = D.DFILE_TYPE_YML |
|||
UNIQUE_FIELDS = [D.FIELD_NAME] |
|||
""" unique business field as human identifer """ |
|||
IDENTIFYER_FIELDS = [FIELD_ID] |
|||
""" unique technical field as technical identifer """ |
|||
|
|||
def searchProjects(job, appl): |
|||
""" |
|||
search all relevant projects from server-configuration |
|||
filtered by parameter --application , --project |
|||
:param job: |
|||
:return: |
|||
""" |
|||
projects = {} |
|||
if B.SUBJECT_PROJECTS in job.conf: |
|||
for k in job.conf[B.SUBJECT_PROJECTS]: |
|||
if k in B.LIST_SUBJECTS: |
|||
continue |
|||
if hasattr(job.par, B.PAR_PROJ) and k != getattr(job.par, B.PAR_PROJ): |
|||
continue |
|||
if hasattr(job.par, B.PAR_APP) \ |
|||
and k not in appl[B.SUBJECT_APPS][getattr(job.par, B.PAR_APP)][B.SUBJECT_PROJECTS]: |
|||
continue |
|||
projects[k] = appl[B.SUBJECT_PROJECTS][k] |
|||
projects[k][B.SUBJECT_ENVIRONMENT] = [] |
|||
else: |
|||
job.conf[B.SUBJECT_PROJECTS] = appl[B.SUBJECT_PROJECTS] |
|||
return projects |
|||
|
|||
def select_applications(job, projectList): |
|||
""" |
|||
get all project which are configured for the workspace |
|||
with all environments where the application of the project are installed |
|||
:param job: |
|||
:return: |
|||
""" |
|||
appl = tools.config_tool.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS) |
|||
return searchApplications(job, projectList, appl) |
|||
|
|||
def searchApplications(job, projectList, appl): |
|||
appList = {} |
|||
for proj in projectList: |
|||
if hasattr(job, "par") and hasattr(job.par, B.PAR_PROJ) and proj != getattr(job.par, B.PAR_PROJ): |
|||
continue |
|||
for app in appl[B.SUBJECT_APPS]: |
|||
if B.SUBJECT_PROJECT in appl[B.SUBJECT_APPS][app] and proj != appl[B.SUBJECT_APPS][app][B.SUBJECT_PROJECT]: |
|||
continue |
|||
appList[app] = appl[B.SUBJECT_APPS][app] |
|||
return appList |
|||
|
|||
|
|||
import model.entity |
|||
def syncEnitities(job): |
|||
""" |
|||
synchronize the configuration with the database |
|||
:param job: |
|||
:return: |
|||
""" |
|||
syncMethod = DEFAULT_SYNC |
|||
if syncMethod.count("-") < 2: |
|||
return |
|||
fileTime = model.entity.VAL_ZERO_TIME |
|||
dbTime = model.entity.VAL_ZERO_TIME |
|||
# get git-commit |
|||
if "git" in syncMethod: |
|||
apppath = tools.config_tool.select_config_path(job, P.KEY_BASIC, B.SUBJECT_APPS, "") |
|||
repopath = apppath[len(job.conf[B.TOPIC_PATH][B.ATTR_PATH_COMPS]) + 1:] |
|||
gitresult = tools.git_tool.gitLog(job, B.ATTR_PATH_COMPS, repopath, 1) |
|||
fileTime = gitresult[0]["date"] |
|||
print(str(gitresult)) |
|||
if "db" in syncMethod: |
|||
if B.TOPIC_NODE_DB in job.conf: |
|||
dbi = basic.toolHandling.getDbTool(job, job.testserver, job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]) |
|||
else: |
|||
return "No DB in job-config" |
|||
data = dbi.selectRows(TABLE_NAMES[0], job) |
|||
print(str(data[B.DATA_NODE_DATA])) |
|||
if len(data[B.DATA_NODE_DATA]) > 0: |
|||
dbTime = data[B.DATA_NODE_DATA][0]["updtime"] |
|||
|
|||
if fileTime == dbTime: |
|||
print("gleich") |
|||
elif fileTime < dbTime: |
|||
print("db vorne") |
|||
(appObjects, appDict) = selectEntities(job, dbi) |
|||
print(str(appDict)) |
|||
applPath = tools.config_tool.select_config_path(job, P.KEY_BASIC, B.SUBJECT_APPS) |
|||
tools.file_tool.write_file_dict(job.m, job, applPath, appDict) |
|||
# |
|||
elif fileTime > dbTime: |
|||
print("git vorne") |
|||
applData = tools.config_tool.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS) |
|||
insertEntities(job, applData, dbTime, dbi) |
|||
|
|||
def selectEntities(job, dbi): |
|||
appObjects = [] |
|||
appDict = {} |
|||
appDict[B.SUBJECT_PROJECTS] = {} |
|||
appDict[B.SUBJECT_APPS] = {} |
|||
appData = dbi.selectRows(TABLE_NAMES[0], job) |
|||
projData = dbi.selectRows(TABLE_NAMES[1], job) |
|||
compData = dbi.selectRows(TABLE_NAMES[2], job) |
|||
for row in appData[B.DATA_NODE_DATA]: |
|||
ao = Application(job) |
|||
ao.setAppRow(row, "") |
|||
appDict[B.SUBJECT_APPS][ao.name] = {} |
|||
for f in job.testserver.conf[B.DATA_NODE_DDL][TABLE_NAMES[0]][B.DATA_NODE_HEADER]: |
|||
if f in model.entity.ENTITY_FIELDS: |
|||
continue |
|||
appDict[B.SUBJECT_APPS][ao.name][f] = getattr(ao, f) |
|||
apid = ao.apid |
|||
rows = [row for row in projData[B.DATA_NODE_DATA] if row["apid"] == apid] |
|||
ao.setProjRow(rows) |
|||
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_PROJECTS] = [] |
|||
for proj in getattr(ao, B.PAR_PROJ): |
|||
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_PROJECTS].append(proj) |
|||
if proj in appDict[B.SUBJECT_PROJECTS]: |
|||
appDict[B.SUBJECT_PROJECTS][proj][B.SUBJECT_APPS].append(ao.name) |
|||
continue |
|||
appDict[B.SUBJECT_PROJECTS][proj] = {} |
|||
appDict[B.SUBJECT_PROJECTS][proj][B.SUBJECT_APPS] = [] |
|||
appDict[B.SUBJECT_PROJECTS][proj][B.SUBJECT_APPS].append(ao.name) |
|||
aoproj = getattr(ao, "project")[proj] |
|||
for f in job.testserver.conf[B.DATA_NODE_DDL][TABLE_NAMES[1]][B.DATA_NODE_HEADER]: |
|||
if f in model.entity.ENTITY_FIELDS + ["approid", "apid"]: |
|||
continue |
|||
appDict[B.SUBJECT_PROJECTS][proj][f] = aoproj[f] |
|||
rows = [row for row in compData[B.DATA_NODE_DATA] if row["apid"] == apid] |
|||
ao.setCompRow(rows) |
|||
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_COMPS] = [] |
|||
for comp in getattr(ao, B.PAR_COMP): |
|||
appDict[B.SUBJECT_APPS][ao.name][B.SUBJECT_COMPS].append(comp) |
|||
appObjects.append(ao) |
|||
return appObjects, appDict |
|||
|
|||
def insertEntities(job,applData, dbTime, dbi): |
|||
# insertRows |
|||
# get list of application |
|||
if dbTime != model.entity.VAL_ZERO_TIME: |
|||
for t in TABLE_NAMES: |
|||
dbi.deleteRows(job, t) |
|||
for app in applData[B.SUBJECT_APPS]: |
|||
ao = Application(job) |
|||
ao.read_entity(job, app) |
|||
ao.insertEntity(dbi) |
|||
|
|||
|
|||
|
|||
class Application(model.entity.Entity): |
|||
""" table = "application" |
|||
job = None |
|||
name = "" |
|||
description = "" |
|||
reference = "" |
|||
components = {} |
|||
project = {} |
|||
""" |
|||
FIELD_ID = "apid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_PROJECT] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [B.NODE_ATTRIBUTES] |
|||
LIST_SUBTABLES = [B.SUBJECT_APPS, B.SUBJECT_COMPS, B.SUBJECT_USECASES, B.SUBJECT_VARIANTS] |
|||
PREFIX_SUBTABLE = "ap" |
|||
|
|||
def read_unique_names(self, job, project, application, gran, args, ttype: str=""): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param opt. project: select-criteria if used and defined |
|||
:param opt. application: select-criteria if used and defined |
|||
:param opt. gran: granularity values testcase / testsuite / testplan |
|||
:param opt. args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
config = self.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS, |
|||
tools.config_tool.get_plain_filename(job, ""), ttype=B.SUBJECT_APP) |
|||
conf = list(config[B.SUBJECT_APPS].keys()) |
|||
outList = [] |
|||
for k in conf: |
|||
if k[:1] != "_": |
|||
outList.append(k) |
|||
return outList |
|||
|
|||
def read_entity(self, job, name): |
|||
""" |
|||
reads the entity from the file-system |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = self.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS, |
|||
tools.config_tool.get_plain_filename(job, name), ttype=B.SUBJECT_APP) |
|||
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES) |
|||
|
|||
@staticmethod |
|||
def rebuild_data(job, data: dict) -> dict: |
|||
""" |
|||
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements |
|||
:param job: |
|||
:param data: |
|||
:return: |
|||
""" |
|||
data = tools.file_type.popSubjectsNode(job, data) |
|||
# data = tools.file_type.popNameNode(job, data) |
|||
return data |
|||
|
|||
def check_data(self, job, data: dict) -> dict: |
|||
""" |
|||
it checks the data for the specific form |
|||
:param job: |
|||
:param tdata: |
|||
:param ttype: |
|||
:return: |
|||
""" |
|||
checkNodes = {} |
|||
checkNodes[tools.file_type.MUST_NODES] = [B.SUBJECT_COMPS] |
|||
checkNodes[tools.file_type.MUSTNT_NODES] = [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] |
|||
checkNodes[tools.file_type.OPT_NODES] = [B.SUBJECT_APPS, B.SUBJECT_VARIANTS, B.SUBJECT_USECASES] |
|||
for conf in data: |
|||
tools.file_type.check_nodes(job, data[conf], checkNodes) |
|||
return data |
|||
|
|||
|
@ -0,0 +1,46 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import basic.toolHandling |
|||
import tools.data_const as D |
|||
import basic.constants as B |
|||
import model.entity |
|||
import tools.config_tool |
|||
|
|||
FIELD_ID = "arid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, |
|||
B.SUBJECT_PROJECT, B.SUBJECT_COMP, B.SUBJECT_TESTCASE] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [B.NODE_ATTRIBUTES] |
|||
|
|||
LIST_SUBTABLES = [] |
|||
|
|||
|
|||
class Artifact(model.entity.Entity): |
|||
FIELD_ID = "arid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, "artype", B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, |
|||
B.SUBJECT_PROJECT, B.SUBJECT_COMP, B.SUBJECT_TESTCASE] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [B.NODE_ATTRIBUTES] |
|||
|
|||
LIST_SUBTABLES = [] |
|||
name = "" |
|||
description = "" |
|||
prelease = "" |
|||
testsuites = {} |
|||
steps = [] |
|||
|
|||
|
|||
def read_entity(self, job, name): |
|||
""" |
|||
reads the entity from the file-system |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = {} |
|||
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES) |
|||
|
|||
@ -0,0 +1,117 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import os |
|||
import basic.program |
|||
import basic.constants as B |
|||
import tools.path_const as P |
|||
import tools.data_const as D |
|||
import tools.config_tool |
|||
import tools.path_tool |
|||
import tools.file_tool |
|||
# import tools.tdata_tool |
|||
|
|||
EXP_KEY_MISSING = "key is missing {}" |
|||
EXP_KEY_DOESNT_EXIST = "key doesnt exist in domain {}" |
|||
|
|||
class Catalog: |
|||
__instance = None |
|||
""" |
|||
in this class there should be managed each defined key-value-pairs |
|||
the pairs ara loaded from the path testdata/catalog: |
|||
* initially the csv-file catalog.csv |
|||
* on demand other csv-files in the path |
|||
""" |
|||
|
|||
def __init__(self): |
|||
self.catalog = {} |
|||
Catalog.__instance = self |
|||
pass |
|||
|
|||
|
|||
@staticmethod |
|||
def getInstance(): |
|||
if Catalog.__instance == None: |
|||
return Catalog() |
|||
return Catalog.__instance |
|||
|
|||
|
|||
def getValue(self, job, domain, key, subkey=""): |
|||
""" |
|||
this function gets the value of the domain an key |
|||
:param domain: |
|||
:param key: |
|||
:return: |
|||
""" |
|||
if not (isinstance(domain, str) or len(domain) < 1): |
|||
raise Exception(EXP_KEY_MISSING, (domain, key)) |
|||
if not (isinstance(key, str) or len(key) < 1): |
|||
job.m.setError(EXP_KEY_MISSING+" ("+domain+", "+key+")") |
|||
return "" |
|||
|
|||
if domain not in self.catalog: |
|||
self.readDomain(domain, job) |
|||
if key not in self.catalog[domain]: |
|||
job.m.setError(EXP_KEY_DOESNT_EXIST+" ("+domain+", "+key+")") |
|||
return "" |
|||
if len(subkey) > 0: |
|||
if subkey not in self.catalog[domain][key]: |
|||
job.m.setError(EXP_KEY_DOESNT_EXIST + " (" + domain + ", " + key + ", " + subkey + ")") |
|||
return "" |
|||
return self.catalog[domain][key][subkey].strip() |
|||
return self.catalog[domain][key] |
|||
|
|||
|
|||
def getKeys(self, domain, job): |
|||
""" |
|||
this function gets the value of the domain an key |
|||
:param domain: |
|||
:param key: |
|||
:return: |
|||
""" |
|||
if not (isinstance(domain, str) or len(domain) < 1): |
|||
raise Exception(EXP_KEY_MISSING, (domain)) |
|||
|
|||
if domain not in self.catalog: |
|||
self.readDomain(domain, job) |
|||
if domain not in self.catalog: |
|||
return [] |
|||
out = [] |
|||
for x in self.catalog[domain].keys(): |
|||
out.append(x) |
|||
return out |
|||
|
|||
|
|||
def readDomain(self, domain, job): |
|||
""" |
|||
this function reads the domain-entries |
|||
:param domain: |
|||
:return: |
|||
""" |
|||
if not (isinstance(domain, str) or len(domain) < 1): |
|||
raise Exception(EXP_KEY_MISSING, (domain)) |
|||
if domain in self.catalog: |
|||
return self.catalog[domain] |
|||
pathname = tools.config_tool.select_config_path(job, P.KEY_CATALOG, domain) |
|||
if pathname is None: |
|||
raise Exception(EXP_KEY_MISSING, (domain)) |
|||
if hasattr(job, "m"): |
|||
msg = job.m |
|||
else: |
|||
msg = None |
|||
data = tools.file_tool.read_file_dict(job, pathname, msg, D.CSV_SPECTYPE_CTLG) |
|||
if hasattr(job, "m"): |
|||
job.m.debug(12, "domain " + domain + " readed from " + pathname) |
|||
self.catalog[domain] = data[B.DATA_NODE_KEYS] |
|||
return data |
|||
|
|||
|
|||
def exportXSD(self, domain): |
|||
""" |
|||
this function exports the domain into xsd-declaration of simple types |
|||
:return: |
|||
""" |
|||
pass |
|||
|
|
|
|
|
@ -0,0 +1,146 @@ |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import os |
|||
import basic.toolHandling |
|||
import basic.constants as B |
|||
import model.entity |
|||
import model.factory |
|||
import tools.data_const as D |
|||
import tools.path_const as P |
|||
import tools.config_tool as config_tool |
|||
import tools.file_tool as file_tool |
|||
import tools.git_tool |
|||
import tools.file_type |
|||
|
|||
TABLE_NAMES = ["component", "co_step", "co_table", "co_artifact", "co_comps"] |
|||
DEFAULT_SYNC = model.entity.SYNC_FULL_GIT2DB |
|||
|
|||
TABLE_NAME = "component" |
|||
""" system-name for this entity """ |
|||
FIELD_ID = "coid" |
|||
|
|||
CP_SUBJECT_COMPS = "components" |
|||
CP_SUBJECT_STEPS = "steps" |
|||
CP_SUBJECT_TABLES = "tables" |
|||
CP_SUBJECT_ARTS = B.SUBJECT_ARTIFACTS |
|||
LIST_CP_SUBJECTS = [CP_SUBJECT_COMPS, CP_SUBJECT_STEPS, CP_SUBJECT_TABLES, CP_SUBJECT_ARTS] |
|||
|
|||
REL_ATTR_TYPE = "relationtyp" |
|||
REL_ATTR_FILE = "conffile" |
|||
REL_ATTR_FTYPE = "filetyp" |
|||
REL_ATTR_IP_PATTERN = "ippattern" |
|||
REL_ATTR_HOST_PATTERN = "hostpattern" |
|||
REL_ATTR_PORT_PATTERN = "portpattern" |
|||
REL_ATTR_URL_PATTERN = "urlpattern" |
|||
LIST_REL_ATTR = [REL_ATTR_TYPE, REL_ATTR_FILE, REL_ATTR_FTYPE, |
|||
REL_ATTR_IP_PATTERN, REL_ATTR_HOST_PATTERN, REL_ATTR_PORT_PATTERN, REL_ATTR_URL_PATTERN] |
|||
|
|||
|
|||
def select_components(job, project, application): |
|||
""" |
|||
get all project which are configured for the workspace |
|||
with all environments where the application of the project are installed |
|||
:param job: |
|||
:return: |
|||
""" |
|||
outList = [] |
|||
appl = tools.config_tool.getConfig(job, P.KEY_BASIC, B.SUBJECT_APPS) |
|||
path = job.conf[B.TOPIC_PATH][B.ATTR_PATH_COMPS] |
|||
for p in os.listdir(path): |
|||
if p in ["catalog", "config", "test", "tools"]: |
|||
continue |
|||
if p[0:1] in [".", "_"]: |
|||
continue |
|||
if not os.path.isdir(os.path.join(path, p)): |
|||
continue |
|||
outList.append(p) |
|||
return outList |
|||
|
|||
class Component(model.entity.Entity): |
|||
FIELD_ID = "coid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_PROJECT] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [B.NODE_ATTRIBUTES, B.DATA_NODE_TOPICS] |
|||
LIST_SUBTABLES = [B.SUBJECT_ARTIFACTS, B.SUBJECT_COMPS, B.SUBJECT_STEPS, B.SUBJECT_DATATABLES] |
|||
PREFIX_SUBTABLE = "co" |
|||
coid = 0 |
|||
name = "" |
|||
description = "" |
|||
reference = "" |
|||
project = "" |
|||
application = "" |
|||
attributes = "" |
|||
|
|||
def read_unique_names(self, job, project, application, gran, args, ttype: str=""): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param opt. project: select-criteria if used and defined |
|||
:param opt. application: select-criteria if used and defined |
|||
:param opt. gran: granularity values testcase / testsuite / testplan |
|||
:param opt. args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
# suche weiterleiten |
|||
if application != "": |
|||
app = model.factory.getApplication() |
|||
return list(app.components.keys()) |
|||
path = os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_COMPS]) |
|||
outList = self.getDirlist(job, path, "csv") |
|||
return outList |
|||
|
|||
def read_entity(self, job, name): |
|||
""" |
|||
reads the entity from the file-system |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = self.getConfig(job, P.KEY_COMP, tools.config_tool.get_plain_filename(job, name), "", ttype=B.SUBJECT_COMP) |
|||
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES) |
|||
|
|||
@staticmethod |
|||
def rebuild_data(job, data: dict) -> dict: |
|||
""" |
|||
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements |
|||
:param job: |
|||
:param data: |
|||
:return: |
|||
""" |
|||
data = tools.file_type.popSubjectsNode(job, data) |
|||
# data = tools.file_type.popNameNode(job, data) |
|||
return data |
|||
|
|||
def check_data(self, job, data: dict) -> dict: |
|||
""" |
|||
it checks the data for the specific form |
|||
:param job: |
|||
:param tdata: |
|||
:param ttype: |
|||
:return: |
|||
""" |
|||
checkNodes = {} |
|||
checkNodes[tools.file_type.MUST_NODES] = [B.SUBJECT_COMPS] |
|||
checkNodes[tools.file_type.MUSTNT_NODES] = [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] |
|||
checkNodes[tools.file_type.OPT_NODES] = [B.SUBJECT_APPS, B.SUBJECT_VARIANTS, B.SUBJECT_USECASES] |
|||
for conf in data: |
|||
tools.file_type.check_nodes(job, data[conf], checkNodes) |
|||
return data |
|||
|
|||
|
|||
def write_entity(self, job, name): |
|||
return |
|||
|
|||
def remove_entity(self, job, name): |
|||
return |
|||
def select_entity(self, job, name): |
|||
return |
|||
|
|||
def update_entity(self, job, name): |
|||
return |
|||
|
|||
def delete_entity(self, job, name): |
|||
return |
|||
|
|||
@ -0,0 +1,11 @@ |
|||
import basic.constants as B |
|||
SYNC_FULL_GIT2DB = "full-git-db" |
|||
SYNC_HEAD_GIT2DB = "head-git-db" |
|||
SYNC_COPY_FILE2DB = "copy-file-db" |
|||
SYNC_ONLY_GIT = "only-git" |
|||
SYNC_ONLY_DB = "only-db" |
|||
|
|||
STORAGE_DB = B.TOPIC_NODE_DB |
|||
STORAGE_FILE = B.TOPIC_NODE_FILE |
|||
|
|||
LIST_ENTITY_SYNC = [SYNC_ONLY_GIT, SYNC_FULL_GIT2DB, SYNC_HEAD_GIT2DB, SYNC_COPY_FILE2DB, SYNC_ONLY_DB] |
|||
|
@ -0,0 +1,58 @@ |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import os |
|||
import basic.constants as B |
|||
import model.entity |
|||
import tools.path_const as P |
|||
import tools.data_const as D |
|||
import tools.config_tool |
|||
import tools.file_tool |
|||
import tools.git_tool |
|||
|
|||
FIELD_ID = "dtid" |
|||
FIELD_NAME = "name" |
|||
FIELD_DESCRIPTION = B.SUBJECT_DESCRIPTION |
|||
FIELD_REFERENCE = B.SUBJECT_REFERENCE |
|||
FIELD_COMPONENT = B.SUBJECT_COMP |
|||
FIELD_ATTRIBUTES = B.NODE_ATTRIBUTES |
|||
FIELD_HEADER = "" |
|||
LIST_FIELDS = [FIELD_ID, FIELD_NAME, FIELD_DESCRIPTION, FIELD_REFERENCE, FIELD_COMPONENT] |
|||
LIST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_DATA, B.DATA_NODE_FIELDS] |
|||
|
|||
LIST_SUBTABLES = {} |
|||
|
|||
class Datatable(model.entity.Entity): |
|||
FIELD_ID = "dtid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, "dtdatabase", "dtschema", |
|||
B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_COMP] |
|||
LIST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_DATA, B.DATA_NODE_FIELDS, B.NODE_ATTRIBUTES, "fieldnames"] |
|||
|
|||
LIST_SUBTABLES = [] |
|||
dcid = 0 |
|||
document = "" |
|||
description = "" |
|||
project = "" |
|||
reference = "" |
|||
|
|||
|
|||
def read_unique_names(self, job, project, application, gran, args, ttype: str=""): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param opt. project: select-criteria if used and defined |
|||
:param opt. application: select-criteria if used and defined |
|||
:param opt. gran: granularity values testcase / testsuite / testplan |
|||
:param opt. args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
path = os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_COMPS], P.KEY_CATALOG, P.VAL_TABLES) |
|||
outList = self.getDirlist(job, path, "csv") |
|||
return outList |
|||
|
|||
def read_entity(self, job, name): |
|||
config = self.getConfig(job, P.KEY_CATALOG, name, tools.config_tool.get_plain_filename(job, name)) |
|||
return self.setAttributes(job, config, name, LIST_FIELDS, LIST_NODES, LIST_SUBTABLES) |
|||
|
|||
|
|||
|
|
@ -0,0 +1,643 @@ |
|||
import getpass |
|||
import os |
|||
import re |
|||
import basic.toolHandling |
|||
#import model.factory |
|||
# import model.entity |
|||
import tools.data_const as D |
|||
import tools.path_const as P |
|||
import basic.constants as B |
|||
import tools.config_tool |
|||
import tools.data_tool |
|||
import tools.date_tool |
|||
import tools.file_tool |
|||
|
|||
ENTITY_NAME = "name" |
|||
ENTITY_ATTRIBUTES = B.NODE_ATTRIBUTES |
|||
ENTITY_INS_COMMIT = "inscommit" |
|||
ENTITY_INS_AUTHOR = "insauthor" |
|||
ENTITY_INS_TIME = "instime" |
|||
ENTITY_UPD_COMMIT = "updcommit" |
|||
ENTITY_UPD_AUTHOR = "updauthor" |
|||
ENTITY_UPD_TIME = "updtime" |
|||
ENTITY_ACTUAL = "actual" |
|||
VAL_ACTUAL = 1 |
|||
VAL_ZERO_TIME = "2000-01-01_00-00-00" |
|||
ENTITY_FIELDS = [ENTITY_INS_COMMIT, ENTITY_INS_AUTHOR, ENTITY_INS_TIME, |
|||
ENTITY_UPD_COMMIT, ENTITY_UPD_AUTHOR, ENTITY_UPD_TIME, ENTITY_ACTUAL] |
|||
SYNC_FULL_GIT2DB = "full-git-db" |
|||
SYNC_HEAD_GIT2DB = "head-git-db" |
|||
SYNC_COPY_FILE2DB = "copy-file-db" |
|||
SYNC_ONLY_GIT = "only-git" |
|||
SYNC_ONLY_DB = "only-db" |
|||
|
|||
STORAGE_DB = B.TOPIC_NODE_DB |
|||
STORAGE_FILE = B.TOPIC_NODE_FILE |
|||
|
|||
LIST_ENTITY_SYNC = [SYNC_ONLY_GIT, SYNC_FULL_GIT2DB, SYNC_HEAD_GIT2DB, SYNC_COPY_FILE2DB, SYNC_ONLY_DB] |
|||
|
|||
print("is importing module.entity") |
|||
def getEntityValue(job, field, gitcommit): |
|||
if field == ENTITY_INS_COMMIT: |
|||
return "" |
|||
if field == ENTITY_INS_AUTHOR: |
|||
return getpass.getuser() |
|||
if field == ENTITY_INS_TIME: |
|||
return tools.date_tool.getActdate(tools.date_tool.F_DIR) |
|||
if field == ENTITY_UPD_COMMIT: |
|||
return gitcommit["commit"] |
|||
if field == ENTITY_UPD_AUTHOR: |
|||
return gitcommit["author"] |
|||
if field == ENTITY_UPD_TIME: |
|||
return gitcommit["date"] |
|||
if field == ENTITY_ACTUAL: |
|||
return VAL_ACTUAL |
|||
|
|||
|
|||
class Entity: |
|||
""" system-name for this entity """ |
|||
FIELD_ID = "" |
|||
LIST_FIELDS = [] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [] |
|||
LIST_SUBTABLES = [] |
|||
PREFIX_SUBTABLE = "" |
|||
|
|||
def __init__(self, job, entityname: str = "", name: str = "", args: dict = {}): |
|||
import model.table |
|||
self.job = job |
|||
if entityname == "": |
|||
classname = str(self) |
|||
a = classname.split(".") |
|||
entityname = a[1] |
|||
entityname = tools.data_tool.getSingularKeyword(entityname) |
|||
self.entityname = entityname |
|||
if entityname not in ["", "table"]: |
|||
self.setDdlAttributes(job, entityname) |
|||
for f in self.ddls[entityname][model.table.LISTNAME_SUBTABLE]: |
|||
self.setDdlAttributes(job, self.PREFIX_SUBTABLE + "_" + tools.data_tool.getSingularKeyword(f)) |
|||
if len(name) > 1: |
|||
self.getEntity(job, name, args) |
|||
|
|||
|
|||
def setDdlAttributes(self, job, entityname: str=""): |
|||
""" |
|||
|
|||
:param job: |
|||
:return: |
|||
""" |
|||
import model.table |
|||
self.ddls = {} |
|||
ddlargs = {model.table.TYPE_CONTEXT: B.ATTR_INST_TESTSERVER} |
|||
if entityname not in ["", B.SUBJECT_DATATABLES]: |
|||
table = model.table.Table(job) |
|||
table = table.read_entity(job, self.entityname, args=ddlargs) |
|||
self.ddls[entityname] = {} |
|||
self.ddls[entityname][model.table.LISTNAME_DDLNAMES] = getattr(table, model.table.LISTNAME_DDLNAMES) |
|||
self.ddls[entityname][model.table.LISTNAME_DDLFIELDS] = getattr(table, model.table.LISTNAME_DDLFIELDS) |
|||
listFields = [] |
|||
listNodes = [] |
|||
listSubtables = [] |
|||
for f in self.ddls[entityname][model.table.LISTNAME_DDLNAMES]: |
|||
if self.ddls[entityname][model.table.LISTNAME_DDLFIELDS][f][D.DDL_FIELD] in B.LIST_SUBJECTS: |
|||
listSubtables.append(f) |
|||
elif self.ddls[entityname][model.table.LISTNAME_DDLFIELDS][f][D.DDL_FORMAT] in ["jlob"]: |
|||
listNodes.append(f) |
|||
elif self.ddls[entityname][model.table.LISTNAME_DDLFIELDS][f][D.DDL_FIELD] in table.LIST_ADMINFIELDS: |
|||
pass |
|||
else: |
|||
listFields.append(f) |
|||
self.ddls[entityname][model.table.LISTNAME_FIELDS] = listFields |
|||
self.ddls[entityname][model.table.LISTNAME_NODES] = listNodes |
|||
self.ddls[entityname][model.table.LISTNAME_SUBTABLE] = listSubtables |
|||
# check LISTEN ... hard coded vs. configuered |
|||
# TODO why hard coded const ?? |
|||
for f in listFields: |
|||
if f not in self.LIST_FIELDS: |
|||
raise Exception(entityname + " " + str(self) + " a check list <-> LIST_FIELDS " + f) |
|||
for f in listNodes: |
|||
if f not in self.LIST_NODES: |
|||
raise Exception(entityname + " " + str(self) + " a check list <-> LIST_NODES " + f) |
|||
for f in listSubtables: |
|||
if f not in self.LIST_SUBTABLES: |
|||
raise Exception(entityname + " " + str(self) + " a check list <-> LIST_SUBTABLES " + f) |
|||
for f in self.LIST_FIELDS: |
|||
if f not in listFields: |
|||
raise Exception(entityname + " " + str(self) + " b check list <-> LIST_FIELDS " + f) |
|||
for f in self.LIST_NODES: |
|||
if f in B.LIST_DATA_NODE or f[:1] == "_": |
|||
continue |
|||
if f not in listNodes: |
|||
raise Exception(entityname + " " + str(self) + " b check list <-> LIST_NODES " + f) |
|||
for f in self.LIST_SUBTABLES: |
|||
if f not in listSubtables: |
|||
raise Exception(entityname + " " + str(self) + " b check list <-> LIST_SUBTABLES " + f) |
|||
|
|||
|
|||
def get_unique_names(self, job, storage = "", project = "", application = "", gran = "", |
|||
ttype: str = "", args: dict = {}) -> list: |
|||
""" |
|||
gets the entity-names from the defined storage - the field name must be an unique identifier |
|||
:param job: |
|||
:param storage: opt. values db / files - default files |
|||
:param project: opt. select-criteria if used and defined |
|||
:param application: opt. select-criteria if used and defined |
|||
:param gran: opt. granularity values testcase / testsuite / testplan |
|||
:param ttype: opt. ddd |
|||
:param args: opt. additional args |
|||
:return: list of entity-names |
|||
""" |
|||
entityNames = [] |
|||
if storage == STORAGE_DB: |
|||
entityNames = self.select_unique_names(job, project, application, gran, args) |
|||
elif storage == STORAGE_FILE: |
|||
entityNames = self.read_unique_names(job, project, application, gran, args) |
|||
else: |
|||
entityNames = self.read_unique_names(job, project, application, gran, args) |
|||
return [item for item in entityNames if item not in B.LIST_DATA_NODE] |
|||
|
|||
def select_unique_names(self, job, project, application, gran, args): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param project: opt. select-criteria if used and defined |
|||
:param application: opt. select-criteria if used and defined |
|||
:param gran: opt. granularity values testcase / testsuite / testplan |
|||
:param args: opt. additional args |
|||
:return: list of entity-names |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def get_entities(self, job, storage="", project="", application="", gran="", ttype="", args={}): |
|||
""" |
|||
gets the entity-names from the defined storage |
|||
:param job: |
|||
:param storage: opt. values db / files - default files |
|||
:param project: opt. select-criteria if used and defined |
|||
:param application: opt. select-criteria if used and defined |
|||
:param gran: opt. granularity values testcase / testsuite / testplan |
|||
:param args: opt. additional args |
|||
:return: list of entity-names |
|||
""" |
|||
entities = [] |
|||
entityNames = self.get_unique_names(job, storage=storage, project=project, application=application, |
|||
gran=gran, args=args, ttype=ttype) |
|||
for k in entityNames: |
|||
if storage == STORAGE_DB: |
|||
entity = self.select_entity(job, k) |
|||
elif storage == STORAGE_FILE: |
|||
print(" entity.read_e "+ k) |
|||
entity = self.read_entity(job, k) |
|||
else: |
|||
entity = self.read_entity(job, k) |
|||
entities.append(entity) |
|||
return entities |
|||
|
|||
def read_unique_names(self, job, project, application, gran, args, ttype: str=""): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param project: select-criteria if used and defined |
|||
:param application: select-criteria if used and defined |
|||
:param gran: granularity values testcase / testsuite / testplan |
|||
:param args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def setDbAttributes(self, job, tables): |
|||
""" |
|||
set the db-attributes like connection and ddl |
|||
:param job: |
|||
:param tables: list of table-names |
|||
:return: |
|||
""" |
|||
setattr(self, "m", job.m) |
|||
config = {} |
|||
config[B.TOPIC_CONN] = job.conf[B.TOPIC_NODE_DB] |
|||
config[B.DATA_NODE_DDL] = {} |
|||
for t in tables: |
|||
ddl = tools.db_abstract.get_ddl(job, B.ATTR_INST_TESTSERVER, t) |
|||
config[B.DATA_NODE_DDL][t] = ddl |
|||
setattr(self, "conf", config) |
|||
|
|||
def getEntity(self, job, name: str, args: dict={}): |
|||
if len(args) > 0: |
|||
self.set_entity(job, name, args) |
|||
elif B.TOPIC_NODE_DB in job.conf: |
|||
self.select_entity(job, name) |
|||
#self.read_entity(job, name) |
|||
else: |
|||
self.read_entity(job, name) |
|||
|
|||
def set_entity(self, job, name: str, args: dict): |
|||
setattr(self, D.FIELD_NAME, name) |
|||
for k in self.LIST_FIELDS: |
|||
if k in args: |
|||
setattr(self, k, args[k]) |
|||
for k in self.LIST_SUBTABLES: |
|||
if k in args: |
|||
setattr(self, k, args[k]) |
|||
for k in self.LIST_NODES: |
|||
if k in args: |
|||
setattr(self, k, args[k]) |
|||
|
|||
|
|||
def read_entity(self, job, name): |
|||
""" |
|||
reads the entity from the file-system |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
@staticmethod |
|||
def rebuild_data(job, tdata: dict) -> dict: |
|||
""" |
|||
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements |
|||
:param job: |
|||
:param tdata: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def check_data(self, job, tdata: dict) -> dict: |
|||
""" |
|||
it checks the data for the specific form |
|||
:param job: |
|||
:param tdata: |
|||
:param ttype: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def select_entity(self, job, name): |
|||
""" |
|||
reads the entity from the database |
|||
it should get the same result like read_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def write_entity(self, job, name): |
|||
""" |
|||
writes the entity into the file-system |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def insert_entity(self, job, name): |
|||
""" |
|||
inserts the entity into the database |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def update_entity(self, job, name): |
|||
""" |
|||
writes the entity into the database |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def remove_entity(self, job, name): |
|||
""" |
|||
removes the entity from the file-system |
|||
it similar to delete_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def removeEntity(self, job, name, storagepath, ext): |
|||
""" |
|||
removes the entity from the file-system |
|||
it similar to delete_entity |
|||
:param job: |
|||
:param name: single substring or list of name or dict of names with the keys as |
|||
:return: |
|||
""" |
|||
nameList = [] |
|||
if isinstance(name, dict): |
|||
nameList = name.keys() |
|||
elif isinstance(name, list): |
|||
nameList = name |
|||
else: |
|||
nameList.append(name) |
|||
for name in nameList: |
|||
pathname = os.path.join(storagepath, name + "." + ext) |
|||
os.remove(pathname) |
|||
|
|||
def delete_entity(self, job, name, table): |
|||
""" |
|||
deletes the entity into the database |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
""" 2023-05 """ |
|||
@staticmethod |
|||
def getConfig(job, module: str, subject: str, name: str, ttype: str = D.CSV_SPECTYPE_DDL) -> dict: |
|||
""" |
|||
reads the entity from the database |
|||
it should get the same result like read_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = tools.config_tool.getConfig(job, module, subject, ttype=ttype) |
|||
oldConfig = config |
|||
if config is not None: |
|||
if subject not in config: |
|||
newConfig = {} |
|||
newConfig[subject] = {} |
|||
for k in config: |
|||
newConfig[subject][k] = config[k] |
|||
config = newConfig |
|||
pass |
|||
if len(name) == 0: |
|||
return config |
|||
elif name in config[subject]: |
|||
outConfig = {} |
|||
outConfig[name] = config[subject][name] |
|||
return outConfig |
|||
elif B.DATA_NODE_KEYS in config[subject] \ |
|||
and name in config[subject][B.DATA_NODE_KEYS]: |
|||
# if csv-data is a catalog |
|||
outConfig = {} |
|||
outConfig[name] = config[subject][B.DATA_NODE_KEYS][name] |
|||
return outConfig |
|||
elif name == subject: |
|||
return config |
|||
raise Exception("keine Config zu "+name) |
|||
|
|||
@staticmethod |
|||
def set_subtables(job, tdata: dict) -> dict: |
|||
""" |
|||
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements |
|||
:param job: |
|||
:param tdata: |
|||
:return: |
|||
""" |
|||
raise Exception("not implemented ") |
|||
|
|||
|
|||
@staticmethod |
|||
def getDirlist(job, path, ext) -> list: |
|||
outList = [] |
|||
for k in os.listdir(path): |
|||
if k[:1] in [".", "_"]: |
|||
continue |
|||
if k in [P.KEY_CATALOG, P.KEY_TOOL, P.VAL_CONFIG, P.VAL_TEST, P.VAL_TOOLS]: |
|||
continue |
|||
if ext == "": |
|||
if not os.path.isdir(os.path.join(path, k)): |
|||
continue |
|||
outList.append(k) |
|||
continue |
|||
else: |
|||
if not os.path.isfile(os.path.join(path, k)): |
|||
continue |
|||
if len(k) < len(ext): |
|||
continue |
|||
xx = k[-len(ext):] |
|||
if ext != k[-len(ext):]: |
|||
continue |
|||
outList.append(k[:-len(ext)-1]) |
|||
return outList |
|||
|
|||
def setAttributes(self, job, config, rootname, fields, nodes, subjects): |
|||
""" |
|||
it sets the attributes of config into the entity-object |
|||
:param job: |
|||
:param config: dictionary of readed specification resp. configuration |
|||
:param rootname: rootname of config |
|||
:param fields: list of field-names, the model-const LIST_FIELDS |
|||
:param nodes: list of node-names, the model-const LIST_NODES |
|||
:param subjects: list of subtables-names, the model-const LIST_SUBTABLES |
|||
:return: |
|||
""" |
|||
""" 2023-05 """ |
|||
import model.factory |
|||
verify = False |
|||
if not job is None: |
|||
self.job = job |
|||
if rootname not in config: |
|||
return self |
|||
for k in fields + nodes: |
|||
key = tools.data_tool.getExistKeyword(k, config[rootname]) |
|||
if verify: print("setFields " + k + " / " + key) |
|||
if key in ["", D.FIELD_PROJECT]: |
|||
continue |
|||
if verify: print("setFields " + str(k) + " = " + str(config[rootname][key])) |
|||
if k in fields: |
|||
setattr(self, tools.data_tool.getSingularKeyword(k), tools.data_tool.getValueStr(config[rootname][key])) |
|||
elif k == "fieldnames": |
|||
setattr(self, tools.data_tool.getPluralKeyword(k), config[rootname][key]) |
|||
else: |
|||
setattr(self, tools.data_tool.getSingularKeyword(k), config[rootname][key]) |
|||
setattr(self, D.FIELD_NAME, rootname) |
|||
for k in subjects: |
|||
# tables: { person: { _header: [] , _data: {} } } |
|||
# |
|||
if k in [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_ROW]: |
|||
continue |
|||
objects = {} |
|||
key = tools.data_tool.getExistKeyword(k, config[rootname]) |
|||
if key == "": |
|||
continue |
|||
if not isinstance(config[rootname][key], dict): |
|||
continue |
|||
for o in config[rootname][key]: |
|||
if o in [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_ROW, B.DATA_NODE_PATH]: |
|||
continue |
|||
args = {} |
|||
print("### " + k + " " + o + " " + str(config[rootname][key][o])) |
|||
if not isinstance(config[rootname][key][o], dict): |
|||
objects[k] = o |
|||
continue |
|||
for x in config[rootname][key][o]: |
|||
args[x] = config[rootname][key][o][x] |
|||
# args[k] = config[rootname][key][o] |
|||
if verify: print("setSubObject " + o + " = " + str(args[k])) |
|||
object = model.factory.get_entity_object(self.job, entityname=k, name=o, args=args) |
|||
objects[object.getIDName()] = object |
|||
if verify: print("setSubtables " + k + " = " + str(objects)) |
|||
setattr(self, k, objects) |
|||
topics = {} |
|||
key = tools.data_tool.getExistKeyword(B.DATA_NODE_TOPICS, config[rootname]) |
|||
if key != "": |
|||
for k in B.LIST_TOPIC_NODES: |
|||
if k in config[rootname][key]: |
|||
topics[k] = config[rootname][key][k] |
|||
setattr(self, tools.data_tool.getPluralKeyword(B.DATA_NODE_TOPICS), topics) |
|||
return self |
|||
|
|||
def getFieldList(self) -> list: |
|||
""" |
|||
returns a list of scalar attributes |
|||
:return: LIST_FIELDS |
|||
""" |
|||
return self.LIST_FIELDS |
|||
|
|||
def getNodeList(self) -> list: |
|||
""" |
|||
returns a list of sub-nodes - which can be persisted in a clob-field |
|||
:return: LIST_NODES |
|||
""" |
|||
return self.LIST_NODES |
|||
|
|||
def getSubtableList(self) -> list: |
|||
""" |
|||
returns a list of sub-tables |
|||
:return: LIST_SUBTABLES |
|||
""" |
|||
return self.LIST_SUBTABLES |
|||
|
|||
def getPrefixSubtable(self) -> str: |
|||
""" |
|||
returns a list of sub-tables |
|||
:return: LIST_SUBTABLES |
|||
""" |
|||
return self.PREFIX_SUBTABLE |
|||
|
|||
def getSubtableNames(self) -> list: |
|||
""" |
|||
returns a list of sub-tables |
|||
:return: LIST_SUBTABLES |
|||
""" |
|||
out = [] |
|||
for t in self.LIST_SUBTABLES: |
|||
out.append(self.PREFIX_SUBTABLE+"_"+t) |
|||
return out |
|||
|
|||
def getName(self) -> str: |
|||
""" |
|||
returns the name - maybe build from other attributes |
|||
:return: |
|||
""" |
|||
return self.name |
|||
|
|||
def getIDName(self) -> str: |
|||
""" |
|||
it returns the name as unique-id - maybe build from few attributes |
|||
:return: |
|||
""" |
|||
return self.name |
|||
|
|||
def setSubtable(self, job, subtable, sublist): |
|||
outDict = {} |
|||
for k in sublist: |
|||
pass |
|||
|
|||
def getDbAttr(self, job): |
|||
out = {} |
|||
for attr in [B.ATTR_DB_HOST, B.ATTR_DB_USER, B.ATTR_DB_DATABASE, B.ATTR_DB_PASSWD]: |
|||
out[attr] = job.conf[B.TOPIC_NODE_DB][attr] |
|||
return out |
|||
|
|||
def getDdl(self, job, ddl): |
|||
out = {} |
|||
for t in ddl: |
|||
out[t] = {} |
|||
for f in ddl[t]: |
|||
out[t][f] = {} |
|||
for a in ddl[t][f]: |
|||
print("entity-23 "+f+", "+a+" "+str(ddl)) |
|||
out[t][f][a] = ddl[t][f][a] |
|||
out[t][f][D.DDL_FIELD] = f |
|||
out[t][B.DATA_NODE_HEADER] = list(ddl[t].keys()) |
|||
return out |
|||
|
|||
def createSchema(self, testserver): |
|||
if B.TOPIC_NODE_DB in self.job.conf: |
|||
dbi = basic.toolHandling.getDbTool(self.job, testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]) |
|||
else: |
|||
return "No DB in job-config" |
|||
sql = self.get_schema() |
|||
print(sql) |
|||
for s in sql.split(";\n"): |
|||
if len(s) < 3: continue |
|||
try: |
|||
# dbi.execStatement(s+";", self.job.conf[B.TOPIC_NODE_DB]) |
|||
print("SQL executed: "+s) |
|||
except Exception as e: |
|||
raise Exception("Fehler bei createSchema "+s) |
|||
|
|||
|
|||
def getHistoryFields(self): |
|||
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE] |
|||
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype) |
|||
sql = dbi.getSchemaAttribut("inscommit", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("insauthor", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("instime", D.TYPE_TIME)+"," |
|||
sql += dbi.getSchemaAttribut("updcommit", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("updauthor", D.TYPE_STR)+"," |
|||
sql += dbi.getSchemaAttribut("updtime", D.TYPE_TIME)+"," |
|||
sql += dbi.getSchemaAttribut("actual", D.TYPE_INT) |
|||
return sql |
|||
|
|||
def selectHistoryFields(self): |
|||
if B.TOPIC_NODE_DB in self.job.conf: |
|||
dbi = basic.toolHandling.getDbTool(self.job, self.testserver, self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]) |
|||
else: |
|||
return "No DB in job-config" |
|||
dbi.selectRows |
|||
|
|||
def getHistoryIndex(self, table) -> str: |
|||
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE] |
|||
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype) |
|||
sql = dbi.getSchemaIndex(table, "actual") + "\n" |
|||
return sql |
|||
|
|||
def get_schema(self, tableName, tableObject): |
|||
pass |
|||
|
|||
def insert_entity(self, job): |
|||
""" |
|||
inserts the entity into the database |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
pass |
|||
|
|||
def read_spec(job, testentity, testgran, specpath): |
|||
if not os.path.isfile(specpath): |
|||
return |
|||
text = tools.file_tool.read_file_text(job, specpath, job.m) |
|||
if re.match(r".*?depricated;[jJyY]", text): |
|||
return None |
|||
spec = {} |
|||
regex = re.compile(r".*\nhead:(.*?);(.+)") |
|||
for res in regex.finditer(text): |
|||
#res = re.search(r".*head:(.*?);(.+)\n", text) |
|||
key = res.group(1) |
|||
if key == B.SUBJECT_DESCRIPTION: |
|||
spec[B.SUBJECT_DESCRIPTION] = res.group(2).replace(";", "") |
|||
elif key in [B.SUBJECT_APPS, B.PAR_APP]: |
|||
apps = res.group(2).replace(";", ",").split(",") |
|||
spec[B.SUBJECT_APPS] = apps |
|||
else: |
|||
val = res.group(2).replace(";", "") |
|||
spec[key] = val |
|||
return spec |
|||
|
|||
|
|||
|
@ -0,0 +1,122 @@ |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import os |
|||
import basic.constants as B |
|||
import model.entity |
|||
import tools.path_const as P |
|||
import tools.config_tool |
|||
import tools.file_tool |
|||
import tools.git_tool |
|||
import tools.data_const as D |
|||
import tools.file_type |
|||
|
|||
TABLE_NAME = "environment" |
|||
""" system-name for this entity """ |
|||
FIELD_ID = "enid" |
|||
|
|||
FILE_EXTENSION = D.DFILE_TYPE_YML |
|||
UNIQUE_FIELDS = [D.FIELD_NAME] |
|||
""" unique business field as human identifer """ |
|||
IDENTIFYER_FIELDS = [FIELD_ID] |
|||
""" unique technical field as technical identifer """ |
|||
|
|||
TABLE_NAMES = ["environment", "en_project", "en_component"] |
|||
DEFAULT_SYNC = model.entity.SYNC_FULL_GIT2DB |
|||
|
|||
def select_environments(job, projectList): |
|||
""" |
|||
searches and gets environments in which the applications of the project are declared that these are installed |
|||
filtered by parameter --environment |
|||
:param job: |
|||
:return: |
|||
""" |
|||
environments = {} |
|||
path = job.conf[B.TOPIC_PATH][B.ATTR_PATH_ENV] |
|||
if not os.path.exists(path): |
|||
raise Exception("Umgebungsverzeichnis existiert nicht "+path) |
|||
for envdir in os.listdir(path): |
|||
if not os.path.isdir(os.path.join(path, envdir)): |
|||
continue |
|||
if envdir[0:1] == "_": |
|||
continue |
|||
try: |
|||
pathname = tools.config_tool.select_config_path(job, P.KEY_TOOL, "conn", envdir) |
|||
doc = tools.file_tool.read_file_dict(job, pathname, job.m) |
|||
for proj in doc[B.SUBJECT_ENVIRONMENT][B.CONF_NODE_GENERAL][B.SUBJECT_PROJECTS]: |
|||
if proj in projectList: |
|||
environments[envdir] = doc[B.SUBJECT_ENVIRONMENT][B.CONF_NODE_GENERAL] |
|||
elif len(projectList) == 1 and projectList[0] == "ALL": |
|||
environments[envdir] = doc[B.SUBJECT_ENVIRONMENT][B.CONF_NODE_GENERAL] |
|||
except: |
|||
continue |
|||
return environments |
|||
|
|||
|
|||
class Environment(model.entity.Entity): |
|||
FIELD_ID = "enid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE] |
|||
""" list of object-attributes """ |
|||
LIST_SUBTABLES = [B.SUBJECT_COMPS, B.SUBJECT_PROJECTS] |
|||
LIST_NODES = [B.NODE_ATTRIBUTES] |
|||
PREFIX_SUBTABLE = "en" |
|||
|
|||
name = "" |
|||
description = "" |
|||
reference = "" |
|||
attributes = "" |
|||
project = "" |
|||
component = "" |
|||
|
|||
|
|||
def read_unique_names(self, job, project, application, gran, args, ttype: str=""): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param opt. project: select-criteria if used and defined |
|||
:param opt. application: select-criteria if used and defined |
|||
:param opt. gran: granularity values testcase / testsuite / testplan |
|||
:param opt. args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
path = os.path.join(job.conf[B.TOPIC_PATH][B.ATTR_PATH_ENV]) |
|||
outList = self.getDirlist(job, path, "") |
|||
return outList |
|||
|
|||
def read_entity(self, job, name): |
|||
""" |
|||
reads the entity from the file-system |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = self.getConfig(job, P.KEY_ENV, name, |
|||
tools.config_tool.get_plain_filename(job, name), ttype=B.SUBJECT_ENVIRONMENT) |
|||
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES) |
|||
|
|||
@staticmethod |
|||
def rebuild_data(job, data: dict) -> dict: |
|||
""" |
|||
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements |
|||
:param job: |
|||
:param data: |
|||
:return: |
|||
""" |
|||
data = tools.file_type.popSubjectsNode(job, data) |
|||
data = tools.file_type.popNameNode(job, data) |
|||
return data |
|||
|
|||
def check_data(self, job, data: dict) -> dict: |
|||
""" |
|||
it checks the data for the specific form |
|||
:param job: |
|||
:param tdata: |
|||
:param ttype: |
|||
:return: |
|||
""" |
|||
checkNodes = {} |
|||
checkNodes[tools.file_type.MUST_NODES] = [B.SUBJECT_COMPS] |
|||
checkNodes[tools.file_type.MUSTNT_NODES] = [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] |
|||
checkNodes[tools.file_type.OPT_NODES] = [B.SUBJECT_PROJECTS] |
|||
return tools.file_type.check_nodes(job, data, checkNodes) |
|||
@ -0,0 +1,128 @@ |
|||
import model.entity |
|||
import basic.constants as B |
|||
import basic.Testserver |
|||
|
|||
class Magazin(): |
|||
__instance = None |
|||
__entities = {} |
|||
@staticmethod |
|||
def getInstance(): |
|||
if Magazin.__instance == None: |
|||
return Magazin() |
|||
|
|||
@staticmethod |
|||
def setEntity(name, object): |
|||
if name not in Magazin.__entities: |
|||
Magazin.__entities[name] = object |
|||
return Magazin.__entities[name] |
|||
|
|||
@staticmethod |
|||
def getEntity(name): |
|||
if name in Magazin.__entities: |
|||
return Magazin.__entities[name] |
|||
|
|||
@staticmethod |
|||
def hasEntity(name): |
|||
if name in Magazin.__entities: |
|||
return True |
|||
return False |
|||
|
|||
def get_entity_object(job, entityname: str, name: str="", args: dict={}): |
|||
if name == "" and len(args) == 0 and Magazin.hasEntity(entityname): |
|||
return Magazin.getEntity(entityname) |
|||
if entityname in [B.SUBJECT_STEPS, B.SUBJECT_STEP]: |
|||
entity = getStep(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_STORIES, B.SUBJECT_STORY, "storys"]: |
|||
entity = getStory(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_VARIANTS, B.SUBJECT_VARIANT]: |
|||
entity = getVariant(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_DATATABLES, B.SUBJECT_DATATABLE]: |
|||
entity = getDatatable(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_USECASES, B.SUBJECT_USECASE]: |
|||
entity = getUsecase(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_PROJECTS, B.SUBJECT_PROJECT]: |
|||
entity = getProject(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_APPS, B.SUBJECT_APP]: |
|||
entity = getApplication(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_COMPS, B.SUBJECT_COMP]: |
|||
entity = getComponent(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_ARTIFACTS, B.SUBJECT_ARTIFACT]: |
|||
entity = getArtifact(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_TESTCASES, B.SUBJECT_TESTCASE]: |
|||
entity = getTestcase(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_TESTSUITES, B.SUBJECT_TESTSUITE]: |
|||
entity = getTestsuite(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_TESTPLANS, B.SUBJECT_TESTPLAN]: |
|||
entity = getTestplan(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_USERS, B.SUBJECT_USER]: |
|||
entity = getUser(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_REL, B.SUBJECT_RELS]: |
|||
entity = getRelease(job, entityname, name, args) |
|||
elif entityname in [B.SUBJECT_ENVIRONMENT, B.SUBJECT_ENVIRONMENTS]: |
|||
entity = getEnvironment(job, entityname, name, args) |
|||
else: |
|||
return None |
|||
if name == "" and len(args) == 0 and not Magazin.hasEntity(entityname): |
|||
return Magazin.setEntity(entityname, entity) |
|||
return entity |
|||
|
|||
def getRelease(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.prelease |
|||
return model.prelease.Release(job, entityname, name, args) |
|||
|
|||
def getEnvironment(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.environment |
|||
return model.environment.Environment(job, entityname, name, args) |
|||
|
|||
def getArtifact(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.artifact |
|||
return model.artifact.Artifact(job, entityname, name, args) |
|||
|
|||
def getApplication(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.application |
|||
return model.application.Application(job, entityname, name, args) |
|||
|
|||
def getProject(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.project |
|||
return model.project.Project(job, entityname, name, args) |
|||
|
|||
def getComponent(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.component |
|||
return model.component.Component(job, entityname, name, args) |
|||
|
|||
def getTestplan(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.testplan |
|||
return model.testplan.Testplan(job, entityname, name, args) |
|||
|
|||
def getTestsuite(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.testsuite |
|||
return model.testsuite.Testsuite(job, entityname, name, args) |
|||
|
|||
def getTestcase(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.testcase |
|||
return model.testcase.Testcase(job, entityname, name, args) |
|||
|
|||
def getStep(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.step |
|||
return model.step.Step(job, entityname, name, args) |
|||
|
|||
def getStory(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.story |
|||
return model.story.Story(job, entityname, name, args) |
|||
|
|||
def getUsecase(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.usecase |
|||
return model.usecase.Usecase(job, entityname, name, args) |
|||
|
|||
def getUser(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.user |
|||
return model.user.User(job, entityname, name, args) |
|||
|
|||
def getVariant(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.variant |
|||
return model.variant.Variant(job, entityname, name, args) |
|||
|
|||
def getDatatable(job=None, entityname: str="" , name: str="", args: dict={}): |
|||
import model.datatable |
|||
return model.datatable.Datatable(job, entityname, name, args) |
|||
|
|||
|
|
@ -0,0 +1,121 @@ |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import basic.toolHandling |
|||
import basic.componentHandling |
|||
import basic.constants as B |
|||
import model.entity |
|||
import tools.path_const as P |
|||
import tools.data_const as D |
|||
import tools.config_tool |
|||
import tools.file_tool |
|||
import tools.git_tool |
|||
import tools.file_type |
|||
|
|||
TABLE_NAME = "prelease" |
|||
""" system-name for this entity """ |
|||
FIELD_ID = "rlid" |
|||
FIELD_PRELEASE = "prelease" |
|||
""" project-release""" |
|||
FIELD_APPRELEASE = "apprelease" |
|||
FILE_EXTENSION = D.DFILE_TYPE_CSV |
|||
UNIQUE_FIELDS = [D.FIELD_NAME] |
|||
""" unique business field as human identifer """ |
|||
IDENTIFYER_FIELDS = [FIELD_ID] |
|||
""" unique technical field as technical identifer """ |
|||
|
|||
class Release(model.entity.Entity): |
|||
FIELD_ID = "rlid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_PROJECT] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [B.NODE_ATTRIBUTES] |
|||
LIST_SUBTABLES = [B.SUBJECT_APPS, B.SUBJECT_STORIES] |
|||
PREFIX_SUBTABLE = "rl" |
|||
|
|||
|
|||
rlid = 0 |
|||
name = "" |
|||
project = "" |
|||
application = "" |
|||
description = "" |
|||
attributes = "" |
|||
reference = "" |
|||
|
|||
def read_unique_names(self, job, project, application, gran, args, ttype: str=""): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param opt. project: select-criteria if used and defined |
|||
:param opt. application: select-criteria if used and defined |
|||
:param opt. gran: granularity values testcase / testsuite / testplan |
|||
:param opt. args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
config = self.getConfig(job, P.KEY_CATALOG, B.SUBJECT_RELS, tools.config_tool.get_plain_filename(job, ""), D.CSV_SPECTYPE_CTLG) |
|||
outList = list(config[B.SUBJECT_RELS][B.DATA_NODE_KEYS].keys()) |
|||
return outList |
|||
|
|||
def read_entity(self, job, name): |
|||
""" |
|||
reads the entity from the file-system |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = self.getConfig(job, P.KEY_CATALOG, B.SUBJECT_RELS, |
|||
tools.config_tool.get_plain_filename(job, name), ttype=B.SUBJECT_REL) |
|||
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES) |
|||
|
|||
def rebuild_data(self, job, data: dict) -> dict: |
|||
""" |
|||
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements |
|||
:param job: |
|||
:param data: |
|||
:return: |
|||
""" |
|||
data = tools.file_type.popTablesNode(job, data) |
|||
data = tools.file_type.popSubjectsNode(job, data) |
|||
data = self.rebuildReleases(job, data) |
|||
return data |
|||
|
|||
def rebuildReleases(self, job, data: dict) -> dict: |
|||
outdata = {} |
|||
for row in data[B.DATA_NODE_DATA]: |
|||
if FIELD_PRELEASE not in row: |
|||
continue |
|||
if row[FIELD_PRELEASE] in outdata: |
|||
general = outdata[row[FIELD_PRELEASE]] |
|||
else: |
|||
general = {} |
|||
general[B.SUBJECT_APPS] = {} |
|||
if ( FIELD_APPRELEASE not in row |
|||
or len(FIELD_APPRELEASE) == 0 |
|||
or row[FIELD_APPRELEASE] == row[FIELD_PRELEASE]): |
|||
for f in self.LIST_FIELDS: |
|||
if f in row: |
|||
general[f] = row[f] |
|||
if B.SUBJECT_APPS in row and len(row[B.SUBJECT_APPS]) > 0: |
|||
a = str(row[B.SUBJECT_APPS]).split(",") |
|||
for app in a: |
|||
o = {} |
|||
o["appname"] = app |
|||
o["apprelease"] = row[FIELD_APPRELEASE] |
|||
o["prelease"] = row[FIELD_PRELEASE] |
|||
general[B.SUBJECT_APPS][app] = o |
|||
outdata[row[FIELD_PRELEASE]] = general |
|||
return outdata |
|||
|
|||
def check_data(self, job, data: dict) -> dict: |
|||
""" |
|||
it checks the data for the specific form |
|||
:param job: |
|||
:param tdata: |
|||
:param ttype: |
|||
:return: |
|||
""" |
|||
checkNodes = {} |
|||
checkNodes[tools.file_type.MUST_NODES] = [] #[B.SUBJECT_APPS] |
|||
checkNodes[tools.file_type.MUSTNT_NODES] = [] # [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] |
|||
checkNodes[tools.file_type.OPT_NODES] = [B.SUBJECT_PROJECTS] |
|||
return tools.file_type.check_nodes(job, data, checkNodes) |
|||
|
@ -0,0 +1,272 @@ |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
# Author : Ulrich Carmesin |
|||
# Source : gitea.ucarmesin.de |
|||
# --------------------------------------------------------------------------------------------------------- |
|||
import os |
|||
import basic.toolHandling |
|||
import basic.constants as B |
|||
import model.entity |
|||
import tools.path_const as P |
|||
import tools.data_const as D |
|||
import tools.config_tool |
|||
import tools.file_tool |
|||
import tools.db_abstract |
|||
import tools.git_tool |
|||
import tools.file_type |
|||
|
|||
TABLE_NAME = "project" |
|||
""" system-name for this entity """ |
|||
FIELD_ID = "prid" |
|||
FIELD_NAME = "name" |
|||
FIELD_DESCRIPTION = B.SUBJECT_DESCRIPTION |
|||
FIELD_REFERENCE = B.SUBJECT_REFERENCE |
|||
LIST_FIELDS = [FIELD_ID, FIELD_NAME, FIELD_DESCRIPTION, FIELD_REFERENCE] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [] |
|||
LIST_SUBTABLES = {} |
|||
FILE_EXTENSION = D.DFILE_TYPE_YML |
|||
UNIQUE_FIELDS = [FIELD_NAME] |
|||
""" unique business field as human identifer """ |
|||
IDENTIFYER_FIELDS = [FIELD_ID] |
|||
""" unique technical field as technical identifer """ |
|||
|
|||
class Project(model.entity.Entity): |
|||
FIELD_ID = "prid" |
|||
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE] |
|||
""" list of object-attributes """ |
|||
LIST_NODES = [] |
|||
LIST_SUBTABLES = [] |
|||
prid = 0 |
|||
name = "" |
|||
description = "" |
|||
reference = "" |
|||
|
|||
def read_unique_names(self, job, project, application, gran, args, ttype: str=""): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param opt. project: select-criteria if used and defined |
|||
:param opt. application: select-criteria if used and defined |
|||
:param opt. gran: granularity values testcase / testsuite / testplan |
|||
:param opt. args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
config = self.getConfig(job, B.SUBJECT_PROJECTS, "") |
|||
if B.SUBJECT_PROJECTS in config: |
|||
conf = list(config[B.SUBJECT_PROJECTS].keys()) |
|||
else: |
|||
conf = config.keys() |
|||
outList = [] |
|||
for k in conf: |
|||
if k[:1] != "_": |
|||
outList.append(k) |
|||
return outList |
|||
|
|||
def select_unique_names(self, job, project, application, gran, args): |
|||
""" |
|||
reads the entity-names from file-storage |
|||
:param job: |
|||
:param opt. project: select-criteria if used and defined |
|||
:param opt. application: select-criteria if used and defined |
|||
:param opt. gran: granularity values testcase / testsuite / testplan |
|||
:param opt. args additional args |
|||
:return: list of entity-names |
|||
""" |
|||
outList = [] |
|||
self.setDbAttributes(job, [TABLE_NAME]) |
|||
dbi = basic.toolHandling.getDbTool(job, self, job.conf[B.TOPIC_NODE_DB]["type"]) |
|||
data = dbi.selectRows(TABLE_NAME, job) |
|||
checkList = {} |
|||
for row in data[B.DATA_NODE_DATA]: |
|||
key = "" |
|||
for f in UNIQUE_FIELDS: |
|||
key += "_" + row[f] |
|||
if key in checkList: |
|||
continue |
|||
else: |
|||
checkList[key] = key |
|||
fields = [] |
|||
for f in UNIQUE_FIELDS: |
|||
fields.append(row[f]) |
|||
outList.append(fields) |
|||
return outList |
|||
|
|||
def read_entity(self, job, name): |
|||
""" |
|||
reads the entity from the file-system |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
print("name "+name) |
|||
config = self.getConfig(job, B.SUBJECT_PROJECTS, tools.config_tool.get_plain_filename(job, name)) |
|||
for k in LIST_FIELDS: |
|||
if k not in config: |
|||
continue |
|||
setattr(self, k, config[k]) |
|||
return self |
|||
|
|||
|
|||
def select_entity(self, job, name, row={}): |
|||
""" |
|||
reads the entity from the database |
|||
it should get the same result like read_entity |
|||
:param job: |
|||
:param name: unique field as string, unique fields as list |
|||
the unique-fields are defined in the class |
|||
:return: itself with filled object-attributes |
|||
""" |
|||
if row is None or len(row) == 0: |
|||
self.setDbAttributes(job, [TABLE_NAME]) |
|||
dbi = basic.toolHandling.getDbTool(job, self, job.conf[B.TOPIC_NODE_DB]["type"]) |
|||
if type(name) is list: |
|||
names = name |
|||
elif type(name) is str: |
|||
names = [name] |
|||
condition = "where " |
|||
for v in names: |
|||
condition += " and " + "" |
|||
data = dbi.selectRows(TABLE_NAME, job, "where username = \'" + names[0] + "\'") |
|||
if len(data[B.DATA_NODE_DATA]) > 1: |
|||
raise Exception("single selection with more than one result: "+names[0]) |
|||
elif len(data[B.DATA_NODE_DATA]) == 1: |
|||
row = data[B.DATA_NODE_DATA][0] |
|||
else: |
|||
raise Exception("no result for: "+names[0]) |
|||
for k in LIST_FIELDS: |
|||
if k not in row: |
|||
continue |
|||
setattr(self, k, row[k]) |
|||
return self |
|||
|
|||
def write_entity(self, job, name): |
|||
""" |
|||
writes the entity into the file-system |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = {} |
|||
config[model.project.TABLE_NAME] = {} |
|||
pathname = os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME], P.VAL_CONFIG, |
|||
P.VAL_USER, name + ".yml") |
|||
for k in LIST_FIELDS: |
|||
if getattr(self, k, "") == "" \ |
|||
or k == FIELD_ID: |
|||
continue |
|||
config[model.project.TABLE_NAME][k] = getattr(self, k, "") |
|||
tools.file_tool.write_file_dict(job.m, job, pathname, config) |
|||
return self |
|||
|
|||
def insert_entity(self, job, name="", table="", rows={}): |
|||
""" |
|||
inserts the entity into the database |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
if table == "": |
|||
table = self.entityname |
|||
if len(self.ddls) == 0: |
|||
self.insert_entity(job, name=name, table=self.entityname, rows=rows) |
|||
# self.setDbAttributes(job, [TABLE_NAME]) |
|||
dbi = basic.toolHandling.getDbTool(job, self, job.conf[B.TOPIC_NODE_DB]["type"]) |
|||
condition = "where" |
|||
for f in UNIQUE_FIELDS: |
|||
# TODO other db-formats than string has to be implemented |
|||
condition += " and " + f + " = \'" + getattr(self, f, "") + "\'" |
|||
condition = condition.replace("where and", "where ") |
|||
data = dbi.selectRows(TABLE_NAME, job, condition) |
|||
if len(data[B.DATA_NODE_DATA]) > 0: |
|||
print("update statt insert") |
|||
return |
|||
if rows is None or len(rows) == 0: |
|||
rows = [] |
|||
row = {} |
|||
for f in self.ddls[table]: |
|||
row[f] = getattr(self, f, "") |
|||
rows.append(row) |
|||
dbi.insertRows(job, table, rows) |
|||
|
|||
def update_entity(self, job, name): |
|||
""" |
|||
writes the entity into the database |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
raise Exception(B.EXCEPT_NOT_IMPLEMENT) |
|||
|
|||
def remove_entity(self, job, name): |
|||
""" |
|||
removes the entity from the file-system |
|||
it similar to delete_entity |
|||
:param job: |
|||
:param name: single substring or list of name or dict of names with the keys as |
|||
:return: |
|||
""" |
|||
self.removeEntity(job, name, os.path.join(job.conf[B.TOPIC_PATH][P.ATTR_PATH_HOME], P.VAL_CONFIG, P.VAL_USER), "yml") |
|||
|
|||
def delete_entity(self, job, name, table): |
|||
""" |
|||
deletes the entity into the database |
|||
it similar to update_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
self.setDbAttributes(job, [TABLE_NAME]) |
|||
dbi = basic.toolHandling.getDbTool(job, self, job.conf[B.TOPIC_NODE_DB]["type"]) |
|||
condition = "where" |
|||
for f in IDENTIFYER_FIELDS: |
|||
# TODO other db-formats than string has to be implemented |
|||
val = dbi.getDbValue(self.conf[B.DATA_NODE_DDL][table][f], getattr(self, f, "")) |
|||
condition += " and " + f + " = " + val + "" |
|||
condition = condition.replace("where and", "where ") |
|||
dbi.deleteRows(job, table, condition) |
|||
|
|||
@staticmethod |
|||
def getConfig(job, subject, name): |
|||
""" |
|||
reads the entity from the database |
|||
it should get the same result like read_entity |
|||
:param job: |
|||
:param name: |
|||
:return: |
|||
""" |
|||
config = tools.config_tool.getConfig(job, P.KEY_BASIC, subject, ttype=B.SUBJECT_PROJECT) |
|||
if config is not None: |
|||
if len(name) == 0: |
|||
return config |
|||
elif subject in config and name in config[subject]: |
|||
return config[subject][name] |
|||
elif name in config: |
|||
return config[name] |
|||
raise Exception("keine Config zu "+name) |
|||
|
|||
@staticmethod |
|||
def getCurrentUser(job): |
|||
return os.environ.get("USERNAME") |
|||
|
|||
@staticmethod |
|||
def rebuild_data(job, data: dict) -> dict: |
|||
""" |
|||
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements |
|||
:param job: |
|||
:param data: |
|||
:return: |
|||
""" |
|||
data = tools.file_type.popSubjectsNode(job, data) |
|||
data = tools.file_type.popNameNode(job, data) |
|||
return data |
|||
|
|||
@staticmethod |
|||
def check_data(job, data: dict) -> dict: |
|||
checkNodes = {} |
|||
checkNodes[tools.file_type.MUST_NODES] = [] |
|||
checkNodes[tools.file_type.MUSTNT_NODES] = [B.DATA_NODE_OPTION, B.DATA_NODE_DATA, B.DATA_NODE_FIELDS, B.DATA_NODE_HEADER] |
|||
checkNodes[tools.file_type.OPT_NODES] = [B.SUBJECT_PROJECTS, B.NODE_ATTRIBUTES] |
|||
return tools.file_type.check_nodes(job, data, checkNodes) |
|||
|