Browse Source

refactoring: basic-model - testcase

refactor
Ulrich 1 year ago
parent
commit
ddd4ebb4da
  1. 2
      model/application.py
  2. 2
      model/component.py
  3. 2
      model/datatable.py
  4. 33
      model/entity.py
  5. 2
      model/environment.py
  6. 6
      model/factory.py
  7. 2
      model/project.py
  8. 2
      model/release.py
  9. 2
      model/step.py
  10. 2
      model/story.py
  11. 2
      model/table.py
  12. 88
      model/testcase.py
  13. 114
      model/testsuite.py
  14. 2
      model/usecase.py
  15. 2
      model/user.py
  16. 2
      model/variant.py
  17. 4
      test/test_26testsuite.py
  18. 19
      test/test_27testcase.py
  19. 1
      tools/config_tool.py
  20. 4
      tools/data_const.py
  21. 55
      tools/filecsv_fcts.py

2
model/application.py

@ -186,7 +186,7 @@ class Application(model.entity.Entity):
LIST_SUBTABLES = [B.SUBJECT_APPS, B.SUBJECT_COMPS, B.SUBJECT_USECASES, B.SUBJECT_VARIANTS]
PREFIX_SUBTABLE = "ap"
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/component.py

@ -73,7 +73,7 @@ class Component(model.entity.Entity):
application = ""
attributes = ""
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/datatable.py

@ -39,7 +39,7 @@ class Datatable(model.entity.Entity):
if project != "":
self.project = project
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

33
model/entity.py

@ -68,7 +68,7 @@ class Entity:
if len(name) > 1:
self.getEntity(job, name)
def get_unique_names(self, job, storage="", project="", application="", gran="", args={}):
def get_unique_names(self, job, storage="", project="", application="", gran="", args={}, ttype: str=""):
"""
gets the entity-names from the defined storage - the field name must be an unique identifier
:param job:
@ -88,7 +88,19 @@ class Entity:
entityNames = self.read_unique_names(job, project, application, gran, args)
return [item for item in entityNames if item not in B.LIST_DATA_NODE]
def get_entities(self, job, storage="", project="", application="", gran="", args={}):
def select_unique_names(self, job, project, application, gran, args):
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def get_entities(self, job, storage="", project="", application="", gran="", ttype="", args={}):
"""
gets the entity-names from the defined storage
:param job:
@ -101,30 +113,19 @@ class Entity:
"""
entities = []
entityNames = self.get_unique_names(job, storage=storage, project=project, application=application,
gran=gran, args=args)
gran=gran, args=args, ttype=ttype)
for k in entityNames:
if storage == STORAGE_DB:
entity = self.select_entity(job, k)
elif storage == STORAGE_FILE:
print(" entity.read_e "+ k)
entity = self.read_entity(job, k)
else:
entity = self.read_entity(job, k)
entities.append(entity)
return entities
def read_unique_names(self, job, project, application, gran, args):
"""
reads the entity-names from file-storage
:param job:
:param opt. project: select-criteria if used and defined
:param opt. application: select-criteria if used and defined
:param opt. gran: granularity values testcase / testsuite / testplan
:param opt. args additional args
:return: list of entity-names
"""
raise Exception(B.EXCEPT_NOT_IMPLEMENT)
def select_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/environment.py

@ -81,7 +81,7 @@ class Environment(model.entity.Entity):
if project != "":
self.project = project
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

6
model/factory.py

@ -69,13 +69,13 @@ def getTestplan(job=None, project="", application="", name=""):
import model.testplan
return model.testplan.Testplan(job, project)
def getTestsuite(job=None, project="", application="", name=""):
def getTestsuite(job=None, name="", project="", application=""):
import model.testsuite
return model.testsuite.Testsuite(job, project)
return model.testsuite.Testsuite(job, name)
def getTestcase(job=None, project="", application="", name=""):
import model.testcase
return model.testcase.Testcase(job, project, name=name)
return model.testcase.Testcase(job, name)
def getStep(job=None, project="", name=""):
import model.step

2
model/project.py

@ -41,7 +41,7 @@ class Project(model.entity.Entity):
description = ""
reference = ""
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/release.py

@ -38,7 +38,7 @@ class Release(model.entity.Entity):
attributes = ""
reference = ""
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/step.py

@ -75,7 +75,7 @@ class Step(model.entity.Entity):
if len(name) > 1:
self.name = name
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/story.py

@ -35,7 +35,7 @@ class Story(model.entity.Entity):
description = ""
reference = ""
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/table.py

@ -116,7 +116,7 @@ class Table(model.entity.Entity):
"""
return sql
def read_unique_names(self, job, project="", application="", gran= "", args={}) -> list:
def read_unique_names(self, job, project="", application="", gran= "", args={}, ttype: str="") -> list:
return []
# table is not an real entity

88
model/testcase.py

@ -29,31 +29,10 @@ DEFAULT_SYNC = model.entity.SYNC_FULL_GIT2DB
TABLE_NAME = B.SUBJECT_APP
""" system-name for this entity """
FIELD_ID = "tcid"
FIELD_NAME = D.FIELD_NAME
FIELD_DESCRIPTION = B.SUBJECT_DESCRIPTION
FIELD_REFERENCE = B.SUBJECT_REFERENCE
FIELD_PROJECT = B.SUBJECT_PROJECT
FIELD_APPLICATION = B.SUBJECT_APP
LIST_FIELDS = [FIELD_ID, FIELD_NAME,
FIELD_DESCRIPTION, FIELD_REFERENCE, FIELD_PROJECT]
""" list of object-attributes """
LIST_NODES = [B.NODE_ATTRIBUTES]
SUB_USECASE = B.SUBJECT_USECASES
SUB_STORIES = B.SUBJECT_STORIES
SUB_STEPS = "steps"
SUB_TABLES = "tables"
LIST_SUBTABLES = { # with additional attributes for the subtable
B.SUBJECT_APPS: [],
SUB_TABLES: [D.DATA_ATTR_DATE],
SUB_STEPS: [],
SUB_USECASE: [B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE],
SUB_STORIES: [B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE]
}
LIST_SUB_DESCRIPT = [D.DATA_ATTR_USECASE_DESCR, D.DATA_ATTR_STORY_DESCR]
FILE_EXTENSION = D.DFILE_TYPE_YML
UNIQUE_FIELDS = [FIELD_NAME]
UNIQUE_FIELDS = [D.FIELD_NAME]
""" unique business field as human identifer """
IDENTIFYER_FIELDS = [FIELD_ID]
""" unique technical field as technical identifer """
@ -74,6 +53,8 @@ class Testcase(model.entity.Entity):
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_APPS,
B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_PROJECT]
LIST_NODES = [B.NODE_ATTRIBUTES]
LIST_SUBTABLES = [B.SUBJECT_USECASES, B.SUBJECT_STEPS, B.SUBJECT_DATATABLES, B.SUBJECT_STEPS]
tcid = ""
name = ""
description = ""
@ -84,19 +65,8 @@ class Testcase(model.entity.Entity):
tables = {}
steps = {}
def __init__(self, job, project, name=""):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
if len(project) > 1:
self.project = project
if len(name) > 1:
self.name = name
self.read_entity(job, name)
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:
@ -123,11 +93,23 @@ class Testcase(model.entity.Entity):
# r = tools.config_tool.select_config_path(job, P.KEY_TESTCASE, "TC0001")
# ttype=testcase => Aufteilung der Testspec in Bloecke und separater Aufruf zur Pruefung der Bloecke
config = self.getConfig(job, P.KEY_TESTCASE, name, tools.config_tool.get_plain_filename(job, name), B.SUBJECT_TESTCASE)
self.setAttributes(job, config, name, LIST_FIELDS, LIST_NODES, LIST_SUBTABLES)
self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES)
return self
@staticmethod
def rebuild_data(job, tdata: dict) -> dict:
def rebuild_data(job, data: dict) -> dict:
"""
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements
:param job:
:param data:
:return:
"""
data = tools.file_type.popSubjectsNode(job, data)
# data = tools.file_type.popNameNode(job, data)
return data
@staticmethod
def oldrebuild_data(job, tdata: dict) -> dict:
"""
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements
:param job:
@ -206,40 +188,6 @@ class Testcase(model.entity.Entity):
checkNodes[tools.file_type.OPT_NODES] = [B.SUBJECT_USECASES, B.SUBJECT_STORIES]
return tools.file_type.check_nodes(job, data, checkNodes)
def getFieldList(self):
"""
returns a list of scalar attributes
:return: LIST_FIELDS
"""
return LIST_FIELDS
def getNodeList(self):
"""
returns a list of sub-nodes - which can be persisted in a clob-field
:return: LIST_NODES
"""
return LIST_NODES
def getSubtableList(self):
"""
returns a list of sub-tables
:return: LIST_SUBTABLES
"""
return LIST_SUBTABLES
def getName(self):
"""
returns the name - maybe build from other attributes
:return:
"""
return self.name
def getIDName(self):
"""
it returns the name as unique-id - maybe build from few attributes
:return:
"""
return self.name
def xxread_entity(self, job, name):
"""

114
model/testsuite.py

@ -15,6 +15,7 @@ import tools.job_tool
import tools.path_tool
import tools.path_const as P
import model.entity
import tools.file_type
TABLE_NAMES = ["application", "ap_project", "ap_component"]
STORAGES = [model.entity.STORAGE_FILE, model.entity.STORAGE_DB]
@ -44,6 +45,12 @@ IDENTIFYER_FIELDS = [FIELD_ID]
class Testsuite(model.entity.Entity):
FIELD_ID = "tsid"
LIST_FIELDS = [FIELD_ID, D.FIELD_NAME, B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE, B.SUBJECT_PROJECT]
""" list of object-attributes """
LIST_NODES = [B.NODE_ATTRIBUTES]
LIST_SUBTABLES = [B.SUBJECT_USECASES, B.SUBJECT_STEPS, B.SUBJECT_TESTCASES]
name = ""
description = ""
application = ""
@ -52,17 +59,8 @@ class Testsuite(model.entity.Entity):
tables = {}
steps = []
def __init__(self, job, project, name=""):
"""
to be initialized by readSpec
:param job:
"""
self.job = job
self.project = project
if len(name) > 1:
self.name = name
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:
@ -78,6 +76,44 @@ class Testsuite(model.entity.Entity):
return outList
def read_entity(self, job, name):
"""
reads the entity from the file-system
:param job:
:param name:
:return:
"""
config = self.getConfig(job, P.KEY_TESTSUITE, tools.config_tool.get_plain_filename(job, name), "", ttype=B.SUBJECT_TESTSUITE)
return self.setAttributes(job, config, name, self.LIST_FIELDS, self.LIST_NODES, self.LIST_SUBTABLES)
@staticmethod
def rebuild_data(job, data: dict) -> dict:
"""
gets the subtable-tag from filecsv and sets the subtables in order to workable entity-elements
:param job:
:param data:
:return:
"""
data = tools.file_type.popSubjectsNode(job, data)
# data = tools.file_type.popNameNode(job, data)
return data
def check_data(self, job, data: dict) -> dict:
"""
it checks the data for the specific form
:param job:
:param tdata:
:param ttype:
:return:
"""
checkNodes = {}
checkNodes[tools.file_type.MUST_NODES] = [B.SUBJECT_TESTCASES]
checkNodes[tools.file_type.MUSTNT_NODES] = [B.DATA_NODE_DATA, B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS]
checkNodes[tools.file_type.OPT_NODES] = [B.SUBJECT_APPS, B.SUBJECT_USECASES]
tools.file_type.check_nodes(job, data, checkNodes)
return data
def old_read_entity(self, job, name):
"""
reads the entity from the file-system
:param job:
@ -105,64 +141,6 @@ class Testsuite(model.entity.Entity):
setattr(self, k, values)
return self
def getFieldList(self):
"""
returns a list of scalar attributes
:return: LIST_FIELDS
"""
return LIST_FIELDS
def getNodeList(self):
"""
returns a list of sub-nodes - which can be persisted in a clob-field
:return: LIST_NODES
"""
return LIST_NODES
def getSubtableList(self):
"""
returns a list of sub-tables
:return: LIST_SUBTABLES
"""
return LIST_SUBTABLES
def getName(self):
"""
returns the name - maybe build from other attributes
:return:
"""
return self.name
def getIDName(self):
"""
it returns the name as unique-id - maybe build from few attributes
:return:
"""
return self.name
def get_schema(self, tableName="", tableObject=None):
#TODO veraltet
dbtype = self.job.conf[B.TOPIC_NODE_DB][B.ATTR_TYPE]
dbi = basic.toolHandling.getDbTool(self.job, None, dbtype)
sql = dbi.getCreateTable("testsuite")
sql += dbi.getSchemaAttribut("tsid", "id")+","
sql += dbi.getSchemaAttribut("name", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut(B.SUBJECT_REFERENCE, D.TYPE_TEXT)+","
sql += dbi.getSchemaAttribut("project", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut("usecase", D.TYPE_STR)+","
sql += dbi.getSchemaAttribut(B.NODE_ATTRIBUTES, D.TYPE_TEXT)+","
sql += self.getHistoryFields()
sql += ");\n"
sql += self.getHistoryIndex("testsuite")
for attr in ["application", "testcase"]:
sql += dbi.getSchemaSubtable("ts", [{"attr":attr, "atype": D.TYPE_STR}])+"\n"
sql += dbi.getSchemaIndex(dbi.getIndexName("ts", attr),
dbi.getSubTableId(dbi.getSubTableName("ts", attr), attr))+"\n"
for attr in ["dtable", "step"]:
sql += dbi.getSchemaSubtable("ts", [{"attr":attr, "atype": D.TYPE_STR}, {"attr":B.NODE_ATTRIBUTES, "atype": D.TYPE_TEXT}])+"\n"
sql += dbi.getSchemaIndex(dbi.getSubTableName("ts", attr),
dbi.getSubTableId(dbi.getSubTableName("ts", attr), attr))+"\n"
return sql
def select_testsuite(job, project, testsuite):
jobProj = None

2
model/usecase.py

@ -35,7 +35,7 @@ class Usecase(model.entity.Entity):
reference = ""
attributes = ""
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/user.py

@ -49,7 +49,7 @@ class User(model.entity.Entity):
role = ""
attributes = ""
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

2
model/variant.py

@ -39,7 +39,7 @@ class Variant(model.entity.Entity):
component = ""
def read_unique_names(self, job, project, application, gran, args):
def read_unique_names(self, job, project, application, gran, args, ttype: str=""):
"""
reads the entity-names from file-storage
:param job:

4
test/test_26testsuite.py

@ -47,7 +47,7 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
testsuite = model.testsuite.Testsuite(job, "TESTPROJ")
testsuite = model.testsuite.Testsuite(job)
entityNames = testsuite.read_unique_names(job, "", "", "", {})
self.assertEqual(type(entityNames), list)
#entityNames = project.select_unique_names(job, "", "", "", {})
@ -76,7 +76,7 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
testsuite = model.testsuite.Testsuite(job, "TESTPROJ")
testsuite = model.testsuite.Testsuite(job)
name = "TST001"
acttestsuite = testsuite.read_entity(job, name)
self.assertEqual(getattr(acttestsuite, model.testsuite.FIELD_NAME), name)

19
test/test_27testcase.py

@ -11,12 +11,13 @@ import basic.constants as B
import test.constants as T
import model.testcase
import model.entity
import tools.data_const as D
HOME_PATH = test.constants.HOME_PATH
PYTHON_CMD = "python"
TEST_FUNCTIONS = ["test_10getEntityNames", "test_11getEntities", "test_12getEntity",
"test_13writeEntity", "test_14insertEntity"]
TEST_FUNCTIONS = ["test_20setSubtable"]
TEST_FUNCTIONS = ["test_10getEntityNames", "test_11getEntities", "test_12getEntity","test_20setSubtable"]
PROGRAM_NAME = "clean_workspace"
class MyTestCase(unittest.TestCase):
@ -31,7 +32,7 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
testcase = model.testcase.Testcase(job, "TESTPROJ")
testcase = model.testcase.Testcase(job)
entityNames = testcase.read_unique_names(job, "", "", "", {})
self.assertEqual(type(entityNames), list)
#entityNames = project.select_unique_names(job, "", "", "", {})
@ -45,9 +46,9 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
testcase = model.testcase.Testcase(job, "TESTPROJ")
testcase = model.testcase.Testcase(job)
entityNames = []
entityNames = testcase.get_entities(job, storage=model.entity.STORAGE_FILE)
entityNames = testcase.get_entities(job, storage=model.entity.STORAGE_FILE, ttype=B.SUBJECT_TESTCASE)
self.assertEqual(type(entityNames), list)
#entityNames = testcase.get_entities(job, storage=model.entity.STORAGE_DB)
#self.assertEqual(type(entityNames), list)
@ -60,11 +61,11 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
testcase = model.testcase.Testcase(job, "TESTPROJ")
testcase = model.testcase.Testcase(job)
name = "TC0001"
acttestcase = testcase.read_entity(job, name)
self.assertEqual(getattr(acttestcase, model.testcase.FIELD_NAME), name)
self.assertRaises(Exception, testcase.read_entity, job, "xyzxyz")
self.assertEqual(getattr(acttestcase, D.FIELD_NAME), name)
#self.assertRaises(Exception, testcase.read_entity, job, "xyzxyz")
def test_20setSubtable(self):
global mymsg
@ -74,7 +75,7 @@ class MyTestCase(unittest.TestCase):
if actfunction not in TEST_FUNCTIONS:
return
job = test.testtools.getJob()
testcase = model.testcase.Testcase(job, "TESTPROJ")
testcase = model.testcase.Testcase(job)
#testcase.set_subtables(job, {})
tdata = {
"_name": "TC0001",
@ -88,6 +89,7 @@ class MyTestCase(unittest.TestCase):
"story-id": "US-1234||||||"
}
}
"""
result = testcase.set_subtables(job, tdata)
print(str(result))
self.assertIn(B.SUBJECT_APP, result)
@ -99,6 +101,7 @@ class MyTestCase(unittest.TestCase):
self.assertIn(B.SUBJECT_USECASE, result)
print(str(result))
self.assertEqual(2, len(result[B.SUBJECT_USECASE][B.DATA_NODE_DATA]))
"""
if __name__ == '__main__':
unittest.main()

1
tools/config_tool.py

@ -335,6 +335,7 @@ def getConfig(job, modul: str, name: str, subname: str = "", ttype: str = D.CSV_
msg = None
if hasattr(job, "m"): msg = job.m
pathname = select_config_path(job, modul, name, subname)
print("+++ " + pathname)
confs = {}
if pathname is None:
return None

4
tools/data_const.py

@ -109,7 +109,9 @@ LIST_SUBTABLES = {
SUB_USECASE: [B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE],
SUB_STORIES: [B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE],
SUB_APPLICATIONS: [],
SUB_VARIANTS: []
SUB_VARIANTS: [],
B.SUBJECT_TESTCASES: [],
B.SUBJECT_TESTSUITES: []
}
LIST_SUBTABLES_ATTR = [B.SUBJECT_DESCRIPTION, B.SUBJECT_REFERENCE]

55
tools/filecsv_fcts.py

@ -194,61 +194,8 @@ class FileFcts(tools.file_abstract.FileFcts):
if verify: print("block else :: "+l)
print("unbekannter Block "+status+": "+l)
# end for
# TODO !! refactor to file_type
tfdata = tools.file_type.rebuild_tdata(job, tdata, tableAttr, ttype)
tgdata = self.restParse(job, tableAttr, tdata, ttype)
if ttype in [D.CSV_SPECTYPE_CTLG, D.CSV_SPECTYPE_DDL]:
return tfdata
return tgdata
def restParse(self, job, tableAttr, tdata, ttype):
if D.DATA_ATTR_TYPE not in tableAttr:
tableAttr[D.DATA_ATTR_TYPE] = ttype
if ttype+"s" in B.LIST_SUBJECTS:
print("csvfcts 198 "+ttype)
enty = model.factory.get_entity_object(job, ttype, {})
print(str(tdata))
tdata = enty.rebuild_data(job, tdata)
elif ttype in [D.CSV_SPECTYPE_DDL, D.CSV_SPECTYPE_CTLG, D.CSV_SPECTYPE_MDL]:
if len(tdata[B.DATA_NODE_TABLES]) > 1:
job.m.setError("Mehr als eine Tabelle in "+ttype)
elif len(tdata[B.DATA_NODE_TABLES]) == 0:
job.m.setError("Keine Tabelle in "+ttype)
tdata = {}
else:
data = {}
for k in tdata[B.DATA_NODE_TABLES]:
data[k] = tdata[B.DATA_NODE_TABLES][k]
tdata = data
for k in tableAttr:
tdata[k] = tableAttr[k]
if ttype in [D.CSV_SPECTYPE_CONF]:
fields = []
for k in tdata:
if k in ["_hit"] + D.LIST_DATA_ATTR:
continue
if B.DATA_NODE_DATA in tdata[k]:
tdata[k].pop(B.DATA_NODE_DATA)
for f in tdata[k]:
if f in [B.DATA_NODE_HEADER, "_hit"] + D.LIST_DATA_ATTR:
continue
fields.append(f)
tdata[k][B.DATA_NODE_FIELDS] = fields
header = []
elif ttype in [D.CSV_SPECTYPE_DDL]:
data = {}
for k in tdata:
pass
if B.DATA_NODE_TABLES in tdata and B.DATA_NODE_TABLES in tdata[B.DATA_NODE_TABLES]:
for k in tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES]:
if k in tdata[B.DATA_NODE_TABLES]:
print("Error")
else:
tdata[B.DATA_NODE_TABLES][k] = tdata[B.DATA_NODE_TABLES][B.DATA_NODE_TABLES][k]
tdata[B.DATA_NODE_TABLES].pop(B.DATA_NODE_TABLES)
if "_hit" in tdata:
tdata.pop("_hit")
return tdata
return tfdata
def buildCsv(self, msg, job, data, ttype=""):

Loading…
Cancel
Save