@ -4,268 +4,334 @@
# Author : Ulrich Carmesin
# Source : gitea.ucarmesin.de
# ---------------------------------------------------------------------------------------------------------
"""
the issue of this tool is to transform extern data to the internal structure and the internal structure into extern data - i . e . mostly test - results .
* * * * * * * *
the testdata have several elements
* parameter ( - td - - tdata ) : to identify which testdata should be loaded
* source ( flaskdb : dbname / dir : filename ) : always structured in a table ( easy to specify ) with columns
* node : where the rows are
* action : what should be done - default insert
+ fields : dates in relation of a reference < day or a formula
* interface : configured in components and used in comparison with attributes to each field :
* ignored - if it should be ignored on differences , it is necessary on technical ID - fields
* id - field - necessary
* * * * * * * *
the testdata itself which are written in different artifacts of modern applications are mostly stored as tree
- so as xml , json , always with plain data in the leaf . So the intern structure should be also a tree - in python : dictionary .
"""
import os . path
import inspect
import basic . program
import utils . config_tool
import utils . file_tool
import basic . constants as B
import utils . data_const as D
import utils . path_const as P
import utils . path_tool
import utils . date_tool
import basic . step
import utils . i18n_tool
import re
TOOL_NAME = " tdata_tool "
""" name of the tool in order to switch debug-info on """
TDATA_NODES = [ D . CSV_BLOCK_OPTION ]
list_blocks = { } # lists of aliases
def getTdataAttr ( ) :
def getTestdata ( job = None ) :
"""
get the testdata from one of the possible sources
for the testcase resp testsuite of the job
: return :
"""
if job is None :
job = basic . program . Job . getInstance ( )
out = { } #
out [ D . ATTR_SRC_TYPE ] = D . DATA_SRC_DIR
print ( " ---getTdataAttr " )
print ( vars ( job . par ) )
if hasattr ( job . par , B . PAR_TESTCASE ) :
out [ D . ATTR_SRC_NAME ] = getattr ( job . par , B . PAR_TESTCASE )
elif hasattr ( job . par , B . PAR_TESTSUITE ) :
out [ D . ATTR_SRC_NAME ] = getattr ( job . par , B . PAR_TESTSUITE )
for p in [ D . ATTR_SRC_TYPE , D . ATTR_SRC_DATA , D . ATTR_SRC_NAME ] :
# out[p] = ""
if hasattr ( job . par , p ) :
out [ p ] = getattr ( job . par , p )
return out
if " testcase " in job . program :
return collectTestdata ( B . PAR_TESTCASE , getattr ( job . par , B . PAR_TESTCASE ) , job )
else :
return collectTestdata ( B . PAR_TESTSUITE , getattr ( job . par , B . PAR_TESTSUITE ) , job )
def getTestdata ( ) :
def collectTestdata ( gran , testentity , job ) :
"""
get the testdata from one of the possible soources
* dir : each file in the specific testarchiv
* csv : specific file
* db : specific db with a testcase - catalogue
collects the testdata from kind of the possible sources
for the testcase resp testsuite
: return :
"""
job = basic . program . Job . getInstance ( )
#reftyp = getattr(job.par, "tdtyp")
#source = getattr(job.par, "tdsrc")
#criteria = getattr(job.par, "tdname")
tdata = getTdataAttr ( ) # {"reftyp": reftyp, "source": source, "criteria": criteria}
print ( tdata )
if tdata [ D . ATTR_SRC_TYPE ] == " flaskdb " :
# read data-structure with sourcename
# connect to source
# select with all data with datastructure
job . m . setInfo ( " Test-Data readed from " + tdata [ D . ATTR_SRC_TYPE ] + " for " + tdata [ D . ATTR_SRC_NAME ] )
elif tdata [ D . ATTR_SRC_TYPE ] == D . DATA_SRC_CSV :
# read file in testdata
job . m . logInfo ( " Test-Data readed from " + tdata [ D . ATTR_SRC_TYPE ] + " for " + tdata [ D . ATTR_SRC_NAME ] )
elif tdata [ D . ATTR_SRC_TYPE ] == D . DATA_SRC_DIR :
path = os . path . join ( job . conf . getJobConf ( B . SUBJECT_PATH + " : " + B . ATTR_PATH_TDATA ) , tdata [ D . ATTR_SRC_NAME ] )
filename = os . path . join ( path , " testspec.csv " )
data = getCsvSpec ( job . m , filename , D . CSV_SPECTYPE_DATA )
for k in data :
tdata [ k ] = data [ k ]
if ( k == D . CSV_BLOCK_OPTION ) :
for p in data [ k ] :
setattr ( job . par , p , data [ k ] [ p ] )
files = utils . file_tool . getFiles ( job . m , path , " table_ " , None )
setBlockLists ( job )
if gran == B . PAR_TESTCASE :
basispath = utils . path_tool . rejoinPath ( job . conf . confs [ B . SUBJECT_PATH ] [ B . ATTR_PATH_TDATA ] , testentity )
pathname = utils . config_tool . getConfigPath ( P . KEY_TESTCASE , getattr ( job . par , B . PAR_TESTCASE ) , " " , job )
if gran == B . PAR_TESTSUITE :
basispath = utils . path_tool . rejoinPath ( job . conf . confs [ B . SUBJECT_PATH ] [ B . ATTR_PATH_TDATA ] , testentity )
pathname = utils . config_tool . getConfigPath ( P . KEY_TESTSUITE , getattr ( job . par , B . PAR_TESTSUITE ) , " " , job )
if pathname [ - 3 : ] == D . DFILE_TYPE_CSV :
tdata = getCsvSpec ( job . m , pathname , D . CSV_SPECTYPE_DATA )
else :
tdata = utils . file_tool . readFileDict ( pathname , job . m )
# get explicit specdata of includes
if D . CSV_BLOCK_IMPORT in tdata :
for pathname in tdata [ D . CSV_BLOCK_IMPORT ] :
pathname = utils . path_tool . rejoinPath ( pathname )
if job . conf . confs [ B . SUBJECT_PATH ] [ B . ATTR_PATH_TDATA ] not in pathname :
pathname = utils . path_tool . rejoinPath ( basispath , pathname )
if pathname [ - 3 : ] == D . DFILE_TYPE_CSV :
data = getCsvSpec ( job . m , pathname , D . CSV_SPECTYPE_DATA )
else :
data = utils . file_tool . readFileDict ( pathname , job . m )
for table in data [ D . CSV_BLOCK_TABLES ] :
if table in tdata [ D . CSV_BLOCK_TABLES ] :
print ( " Fehler " )
tdata [ D . CSV_BLOCK_TABLES ] [ table ] = data [ D . CSV_BLOCK_TABLES ] [ table ]
# get implicit specdata of spec-library
for prefix in list_blocks [ D . DFILE_TABLE_PREFIX ] :
files = utils . file_tool . getFiles ( job . m , basispath , prefix , None )
if len ( files ) < 0 :
continue
for f in files :
print ( f )
filename = os . path . join ( path , f )
data = readCsv ( job . m , filename , None )
table = f [ 6 : - 4 ]
print ( filename + " " + table )
if B . DATA_NODE_TABLES not in tdata :
tdata [ B . DATA_NODE_TABLES ] = { }
tdata [ B . DATA_NODE_TABLES ] [ table ] = data [ B . DATA_NODE_TABLES ] [ table ]
if f in tdata [ D . CSV_BLOCK_TABLES ] :
continue
pathname = utils . path_tool . rejoinPath ( basispath , f )
if pathname [ - 3 : ] == D . DFILE_TYPE_CSV :
data = getCsvSpec ( job . m , pathname , D . CSV_SPECTYPE_DATA )
else :
job . m . setFatal ( " test-Data: reftyp " + tdata [ D . ATTR_SRC_TYPE ] + " is not implemented " )
data = utils . file_tool . readFileDict ( pathname , job . m )
for table in data [ D . CSV_BLOCK_TABLES ] :
if table in tdata [ D . CSV_BLOCK_TABLES ] :
print ( " Fehler " )
tdata [ D . CSV_BLOCK_TABLES ] [ table ] = data [ D . CSV_BLOCK_TABLES ] [ table ]
# fill the options into job-parameter
for p in tdata [ D . CSV_BLOCK_OPTION ] :
setattr ( job . par , p , tdata [ D . CSV_BLOCK_OPTION ] [ p ] )
return tdata
def getCsvSpec ( msg , filename , type ) :
def setBlockLists ( job ) :
for block in D . LIST_BLOCK_CONST + D . LIST_ATTR_CONST + D . LIST_DFNAME_CONST :
list = utils . i18n_tool . I18n . getInstance ( ) . getAliasList ( block + " = ' " + eval ( " D. " + block ) + " ' " )
#list.append(eval("D."+block))
list_blocks [ eval ( " D. " + block ) ] = [ ]
for x in list :
list_blocks [ eval ( " D. " + block ) ] . append ( x . lower ( ) )
def readCsv ( msg , filename , comp , aliasNode = " " , job = None ) :
if job is None :
job = basic . program . Job . getInstance ( )
lines = utils . file_tool . readFileLines ( filename , msg )
print ( " readCsv " + filename )
return parseCsv ( msg , filename , lines , comp , aliasNode , job )
def parseCsv ( msg , filename , lines , comp , aliasNode = " " , job = None ) :
if job is None :
job = basic . program . Job . getInstance ( )
if len ( list_blocks ) < 1 :
setBlockLists ( job )
tdata = { }
if len ( aliasNode ) < 1 :
print ( str ( list_blocks ) )
aliasNode = extractAliasNode ( filename , comp , job )
if len ( aliasNode ) > 3 :
tdata [ D . DATA_ATTR_ALIAS ] = aliasNode
return parseCsvSpec ( msg , lines , D . CSV_SPECTYPE_DATA , tdata , job )
def extractAliasNode ( filename , comp , job ) :
basename = os . path . basename ( filename ) [ 0 : - 4 ]
for prefix in list_blocks [ D . DFILE_TABLE_PREFIX ] :
if basename . find ( prefix ) == 0 :
basename = basename [ len ( prefix ) : ]
if comp is None :
return " "
if B . TOPIC_NODE_DB in comp . conf [ B . SUBJECT_ARTS ] and basename in comp . conf [ B . DATA_NODE_DDL ] :
return B . DATA_NODE_TABLES + " : " + basename
return " "
def getCsvSpec ( msg , filename , ttype , job = None ) :
"""
get data from a csv - file
a = field [ 0 ] delimited by :
a ) data : like a table with data - array of key - value - pairs
a_0 is keyword [ option , step , CSV_HEADER_START ]
a_0 : { a_1 : { f_1 : v_1 , . . . . } # option, step
a_0 : { . . a_n : { _header : [ . . ] , _data : [ rows . . . ] # table, node
b ) tree : as a tree - the rows must be unique identified by the first column
a_0 is keyword in CSV_HEADER_START
a_0 : { . . a_n : { _header : [ fields . . ] , _data : { field : value }
c ) keys : as a tree - the rows must be unique identified by the first column
a_0 is keyword in CSV_HEADER_START
a_1 . . . a_n is key characterized by header - field like _fk * or _pk *
a_0 : { . . a_n : { _keys : [ _fpk * . . ] , _header : [ fields . . ] , _data : { pk_0 : { . . . pk_n : { field : value }
d ) conf :
_header : [ field_0 , . . . ]
{ field_0 : { attr_0 : val_0 , . . } , field_1 : { . . . } , . . . }
reads the specification from a csv - file and maps it into the internal data - structure
: param msg :
: param filename :
: param type :
: param job :
: return :
"""
if job is None :
job = basic . program . Job . getInstance ( )
lines = utils . file_tool . readFileLines ( filename , msg )
return parseCsvSpec ( msg , lines , type )
tdata = { } # the result
return parseCsvSpec ( msg , lines , ttype , tdata , job )
def parseCsvSpec ( msg , lines , type ) :
def parseCsvSpec ( msg , lines , ttype , tdata , job = None ) :
"""
: param msg :
: param lines :
: param type :
: param job :
: return :
"""
if job is None :
job = basic . program . Job . getInstance ( )
data = { }
header = [ ]
h = [ ] # from a[]
if len ( list_blocks ) < 1 :
setBlockLists ( job )
status = " start "
tableDate = utils . date_tool . getActdate ( utils . date_tool . F_DE )
tableDict = { }
verbose = False
tableAttr = { } # table
tableDict = { } # table
for l in lines :
print ( " lines " + l )
fields = l . split ( D . CSV_DELIMITER )
if verbose : print ( " lines " + l )
fields = splitFields ( l , D . CSV_DELIMITER , job )
# check empty line, comment
if ( len ( l . strip ( ) . replace ( D . CSV_DELIMITER , " " ) ) < 1 ) :
if ( len ( fields ) < 1 ) or ( len ( l . strip ( ) . replace ( D . CSV_DELIMITER , " " ) ) < 1 ) :
status = " start "
continue
if ( fields [ 0 ] [ 0 : 1 ] == " # " ) :
continue
a = fields [ 0 ] . lower ( ) . split ( " : " )
# keywords option, step, table
if a [ 0 ] not in data and ( a [ 0 ] in TDATA_NODES ) :
data [ a [ 0 ] ] = { }
if ( a [ 0 ] . lower ( ) == D . CSV_BLOCK_STEP ) :
if ( not B . DATA_NODE_STEPS in data ) :
data [ B . DATA_NODE_STEPS ] = [ ]
step = basic . step . parseStep ( job , fields )
"""
step = { }
step [ B . DATA_NODE_COMP ] = fields [ D . STEP_COMP_I ]
step [ B . ATTR_EXEC_REF ] = fields [ D . STEP_EXECNR_I ]
step [ B . ATTR_DATA_REF ] = fields [ D . STEP_REFNR_I ]
step [ B . ATTR_STEP_ARGS ] = { }
if D . STEP_ARGS_I == D . STEP_LIST_I :
args = " "
for i in range ( D . STEP_ARGS_I , len ( fields ) ) :
if len ( fields [ i ] ) < 1 :
if verbose : print ( str ( a ) + " -- " + str ( fields ) )
tableAttr = setTableAttribute ( tableAttr , a [ 0 ] , fields [ 1 ] , job )
if ( tableAttr [ " _hit " ] ) :
status = " TABLE_ALIAS "
continue
if fields [ i ] [ 0 : 1 ] == " # " :
if ( a [ 0 ] . lower ( ) in list_blocks [ D . CSV_BLOCK_HEAD ] ) :
if verbose : print ( " head " + l )
setTdataLine ( tdata , fields , D . CSV_BLOCK_HEAD , job )
status = " start "
continue
args + = " , " + fields [ i ]
args = args [ 1 : ]
else :
args = fields [ D . STEP_ARGS_I ]
a = args . split ( " , " )
for arg in a :
print ( " arg " + arg )
b = arg . split ( " : " )
if len ( b ) < 2 :
raise Exception ( D . EXCP_MALFORMAT + " " + l )
step [ B . ATTR_STEP_ARGS ] [ b [ 0 ] ] = b [ 1 ]
"""
data [ B . DATA_NODE_STEPS ] . append ( step )
elif ( a [ 0 ] . lower ( ) in list_blocks [ D . CSV_BLOCK_OPTION ] ) :
if verbose : print ( " option " + l )
setTdataLine ( tdata , fields , D . CSV_BLOCK_OPTION , job )
status = " start "
continue
elif ( a [ 0 ] . lower ( ) == D . CSV_BLOCK_OPTION ) :
if len ( a ) < 2 :
raise Exception ( D . EXCP_MALFORMAT + " " + l )
data [ a [ 0 ] ] [ a [ 1 ] ] = fields [ 1 ]
elif ( a [ 0 ] . lower ( ) in list_blocks [ D . CSV_BLOCK_STEP ] ) :
if verbose : print ( " step " + l )
step = basic . step . parseStep ( job , fields )
if D . CSV_BLOCK_STEP not in tdata :
tdata [ D . CSV_BLOCK_STEP ] = [ ]
tdata [ D . CSV_BLOCK_STEP ] . append ( step )
status = " start "
continue
elif ( a [ 0 ] . lower ( ) in list_blocks [ D . CSV_BLOCK_IMPORT ] ) :
if verbose : print ( " includes " + l )
if D . CSV_BLOCK_IMPORT not in tdata :
tdata [ D . CSV_BLOCK_IMPORT ] = [ ]
tdata [ D . CSV_BLOCK_IMPORT ] . append ( fields [ 1 ] )
status = " start "
continue
elif a [ 0 ] . lower ( ) == D . DATA_ATTR_DATE :
tableDate = fields [ 1 ]
elif ( a [ 0 ] . lower ( ) in D . CSV_HEADER_START ) :
# create deep structure a_0 ... a_n
print ( " tdata 136 CSV_HEADER_START " + str ( len ( a ) ) )
elif ( a [ 0 ] . lower ( ) in list_blocks [ D . CSV_BLOCK_TABLES ] ) :
if verbose : print ( " tables " + l )
h = a
header = [ ]
if B . DATA_NODE_TABLES not in data :
data [ B . DATA_NODE_TABLES ] = { }
h [ 0 ] = B . DATA_NODE_TABLES
comps = { }
tableDict = getTabContent ( msg , data , h )
i = 0
for f in fields :
i + = 1
if i < = 1 :
if ttype == D . CSV_SPECTYPE_CONF :
del h [ 0 ]
tableDict = getTdataContent ( msg , tdata , h )
setTableHeader ( tableDict , tableAttr , fields , ttype , job )
status = D . CSV_SPECTYPE_DATA
elif ( status == D . CSV_SPECTYPE_DATA ) :
tableDict = getTdataContent ( msg , tdata , h )
if verbose : print ( " setTableData " + str ( h ) + " " + str ( tableDict ) )
setTableData ( tableDict , fields , ttype , job )
elif ( status == " TABLE_ALIAS " ) and D . DATA_ATTR_ALIAS in tdata :
alias = tdata [ D . DATA_ATTR_ALIAS ]
b = alias . split ( " : " )
h = [ B . DATA_NODE_TABLES ] + b
tableDict = getTdataContent ( msg , tdata , h )
tableDict [ D . DATA_ATTR_ALIAS ] = alias
fields = [ alias ] + fields
setTableHeader ( tableDict , tableAttr , fields , ttype , job )
status = D . CSV_SPECTYPE_DATA
if ttype == D . CSV_SPECTYPE_CONF :
header = [ ]
for k in tdata :
if k in D . LIST_DATA_ATTR :
continue
if B . DATA_NODE_DATA in tdata [ k ] :
tdata [ k ] . pop ( B . DATA_NODE_DATA )
for f in tdata [ k ] :
if f in [ B . DATA_NODE_HEADER , " _hit " ] + D . LIST_DATA_ATTR :
continue
if len ( f ) < 1 :
break
header . append ( f )
tdata [ k ] [ B . DATA_NODE_HEADER ] = header
header = [ ]
if B . DATA_NODE_TABLES in tdata and B . DATA_NODE_TABLES in tdata [ B . DATA_NODE_TABLES ] :
for k in tdata [ B . DATA_NODE_TABLES ] [ B . DATA_NODE_TABLES ] :
if k in tdata [ B . DATA_NODE_TABLES ] :
if verbose : print ( " Error " )
else :
tdata [ B . DATA_NODE_TABLES ] [ k ] = tdata [ B . DATA_NODE_TABLES ] [ B . DATA_NODE_TABLES ] [ k ]
tdata [ B . DATA_NODE_TABLES ] . pop ( B . DATA_NODE_TABLES )
return tdata
def setTableHeader ( tableDict , tableAttr , fields , ttype , job ) :
header = [ ]
for i in range ( 1 , len ( fields ) ) :
header . append ( fields [ i ] . strip ( ) )
tableDict [ B . DATA_NODE_HEADER ] = header
print ( " tdata 165 header " + str ( header ) )
if type == D . CSV_SPECTYPE_TREE :
for attr in tableAttr :
tableDict [ attr ] = tableAttr [ attr ]
# preparate the sub-structure for row-data
if ttype == D . CSV_SPECTYPE_TREE :
tableDict [ B . DATA_NODE_DATA ] = { }
elif type == D . CSV_SPECTYPE_KEYS :
elif t type == D . CSV_SPECTYPE_KEYS :
tableDict [ D . CSV_NODETYPE_KEYS ] = { }
elif type == D . CSV_SPECTYPE_CONF :
tableDict = { }
headerFields = [ ]
tableDict [ D . DATA_ATTR_KEY ] = 1
if D . DATA_ATTR_KEY in tableAttr :
tableDict [ D . DATA_ATTR_KEY ] = header . index ( tableAttr [ D . DATA_ATTR_KEY ] ) + 1
else :
tableDict [ B . DATA_NODE_DATA ] = [ ]
tableDict [ D . DATA_ATTR_DATE ] = tableDate
setTabContent ( msg , data , tableDict , h )
status = D . CSV_SPECTYPE_DATA
continue
elif ( status == D . CSV_SPECTYPE_DATA ) :
# check A-col for substructure
# fill data
tableDict = getTabContent ( msg , data , h )
return tableDict
def setTableData ( tableDict , fields , ttype , job ) :
row = { }
print ( fields )
if ttype == D . CSV_SPECTYPE_DATA and " : " not in fields [ 0 ] and D . DATA_ATTR_ALIAS in tableDict :
fields = [ tableDict [ D . DATA_ATTR_ALIAS ] ] + fields
i = 1
# case-differentiation DATA or TREE
for f in header :
print ( str ( i ) + " " + str ( len ( fields ) ) + " " + str ( len ( header ) ) )
row [ f ] = fields [ i ]
if type == D . CSV_SPECTYPE_TREE :
tableDict [ B . DATA_NODE_DATA ] [ f ] = fields [ i ]
for f in tableDict [ B . DATA_NODE_HEADER ] :
row [ f ] = fields [ i ] . strip ( )
i + = 1
if type == D . CSV_SPECTYPE_DATA :
print ( " parseSpec " + str ( fields [ 0 ] ) )
if ttype == D . CSV_SPECTYPE_DATA :
if B . ATTR_DATA_COMP in tableDict :
tcomps = tableDict [ B . ATTR_DATA_COMP ]
else :
tcomps = { }
row [ B . ATTR_DATA_COMP ] = { }
for c in fields [ 0 ] . split ( " , " ) :
a = c . split ( " : " )
print ( " parseSpec " + str ( a ) )
comps [ a [ 0 ] ] = a [ 1 ]
row [ B . ATTR_DATA_COMP ] [ a [ 0 ] ] = a [ 1 ]
#row[B.ATTR_DATA_COMP] = fields[0].split(",")
tableDict [ B . ATTR_DATA_COMP ] = comps
tcomps [ a [ 0 ] ] = a [ 1 ]
row [ B . ATTR_DATA_COMP ] [ a [ 0 ] ] = a [ 1 ] . strip ( )
tableDict [ B . DATA_NODE_DATA ] . append ( row )
elif type == D . CSV_SPECTYPE_KEYS :
tableDict [ D . CSV_NODETYPE_KEYS ] [ fields [ 1 ] ] = row
elif type == D . CSV_SPECTYPE_CONF :
tableDict [ B . ATTR_DATA_COMP ] = tcomps
elif ttype == D . CSV_SPECTYPE_KEYS :
tableDict [ D . CSV_NODETYPE_KEYS ] [ fields [ tableDict [ D . DATA_ATTR_KEY ] ] . strip ( ) ] = row
elif ttype == D . CSV_SPECTYPE_CONF :
tableDict [ fields [ 1 ] ] = row
headerFields . append ( fields [ 1 ] )
setTabContent ( msg , data , tableDict , h )
if ( status in [ D . CSV_SPECTYPE_DATA , D . CSV_SPECTYPE_KEYS ] ) :
tableDict = getTabContent ( msg , data , h )
if type == D . CSV_SPECTYPE_CONF :
tableDict [ B . DATA_NODE_HEADER ] = headerFields
setTabContent ( msg , data , tableDict , h )
if type == D . CSV_SPECTYPE_CONF :
data = data [ B . DATA_NODE_TABLES ]
print ( " return getCsvSpec " + str ( data ) )
return data
return tableDict
def mergeTableComponents ( comps , rowComps ) :
for c in rowComps . split ( " , " ) :
a = c . split ( " : " )
comps [ a [ 0 ] ] = a [ 1 ]
return comps
def setTableAttribute ( tableAttr , key , val , job ) :
for attr in D . LIST_DATA_ATTR :
if ( key . lower ( ) in list_blocks [ attr ] ) :
tableAttr [ attr ] = val . strip ( )
tableAttr [ " _hit " ] = True
return tableAttr
tableAttr [ " _hit " ] = False
return tableAttr
def setTabContent ( msg , data , tabledata , path ) :
if len ( path ) > = 2 and path [ 1 ] not in data [ path [ 0 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] = { }
if len ( path ) > = 3 and path [ 2 ] not in data [ path [ 0 ] ] [ path [ 1 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] = { }
if len ( path ) > = 4 and path [ 3 ] not in data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] [ path [ 3 ] ] = { }
def setTdataLine ( tdata , fields , block , job ) :
"""
sets field ( s ) into tdata as a key - value - pair
additional fields will be concatenate to a intern separated list
: param tdata :
: param fields :
: param block :
: param job :
: return :
"""
a = fields [ 0 ] . lower ( ) . split ( " : " )
a [ 0 ] = block # normalized key
val = " "
for i in range ( 1 , len ( fields ) - 1 ) :
val + = D . INTERNAL_DELIMITER + fields [ i ]
if len ( val ) > len ( D . INTERNAL_DELIMITER ) :
val = val [ len ( D . INTERNAL_DELIMITER ) : ]
setTdataContent ( job . m , tdata , val , a )
return tdata
def setTdataContent ( msg , data , tabledata , path ) :
setTdataStructure ( msg , data , path )
if len ( path ) == 2 :
data [ path [ 0 ] ] [ path [ 1 ] ] = tabledata
elif len ( path ) == 3 :
@ -274,155 +340,54 @@ def setTabContent(msg, data, tabledata, path):
data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] [ path [ 3 ] ] = tabledata
def getTabContent ( msg , data , path ) :
if len ( path ) > = 2 and path [ 1 ] not in data [ path [ 0 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] = { }
if len ( path ) > = 3 and path [ 2 ] not in data [ path [ 0 ] ] [ path [ 1 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] = { }
if len ( path ) > = 4 and path [ 3 ] not in data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] [ path [ 3 ] ] = { }
def getTdataContent ( msg , data , path ) :
setTdataStructure ( msg , data , path )
if len ( path ) == 2 :
return data [ path [ 0 ] ] [ path [ 1 ] ]
elif len ( path ) == 3 :
return data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ]
elif len ( path ) == 4 :
return data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] [ path [ 3 ] ]
elif len ( path ) == 1 :
return data [ path [ 0 ] ]
else :
pass
def readCsv ( msg , filename , comp , aliasNode = " " ) :
lines = utils . file_tool . readFileLines ( filename , msg )
print ( " readCsv " + filename )
print ( lines )
return parseCsv ( msg , filename , lines , comp , aliasNode )
return None
def parseCsv ( msg , filename , lines , comp , aliasNode = " " ) :
job = basic . program . Job . getInstance ( )
verify = - 4 + job . getDebugLevel ( TOOL_NAME )
job . debug ( verify , " # # # # # # # # parseCsv " + filename + " : " + str ( lines ) )
fields = [ ]
nodes = [ ]
columns = [ ]
output = { }
state = 0
data = { }
tableDict = { }
tableDate = " "
tableCnt = 0
cnt = 0
basename = os . path . basename ( filename ) [ 0 : - 4 ]
startCols = 1
for line in lines :
fields = line . split ( ' ; ' )
testline = line . replace ( " ; " , " " )
a = fields [ 0 ] . split ( ' : ' )
job . debug ( verify , str ( state ) + " line " + line + " : " + str ( len ( fields ) ) + " : " + str ( fields ) )
if len ( testline ) < 2 and state < 1 :
state = 0
elif a [ 0 ] . lower ( ) == D . DATA_ATTR_DATE :
tableDate = fields [ 1 ]
state = 1
elif a [ 0 ] . lower ( ) == D . DATA_ATTR_COUNT :
tableCnt = fields [ 1 ]
state = 1
elif a [ 0 ] . lower ( ) in D . CSV_HEADER_START or \
( comp is not None and state == 1
and isCompTableFile ( comp , filename ) ) :
state = 2
columns = [ ]
h = a
if len ( h ) < 2 and comp is not None :
a = [ " table " , basename ]
h = a
startCols = 0
cnt = len ( fields )
job . debug ( verify , str ( state ) + " cnt " + str ( cnt ) )
data [ B . DATA_NODE_TABLES ] = { }
h [ 0 ] = B . DATA_NODE_TABLES
if not aliasNode . isspace ( ) and len ( aliasNode ) > 3 :
struct = aliasNode . split ( " : " )
for x in struct :
if len ( x ) > 2 :
nodes . append ( x )
job . debug ( verify , str ( state ) + " nodes " + str ( nodes ) )
elif len ( h ) > 1 :
for i in range ( 1 , len ( h ) ) :
nodes . append ( h [ i ] )
job . debug ( verify , str ( state ) + " nodes " + str ( nodes ) )
tableDict = getTabContent ( msg , data , h )
tableDict [ B . ATTR_DATA_COMP ] = { }
if len ( tableDate ) > 6 :
tableDict [ D . DATA_ATTR_DATE ] = tableDate
if int ( tableCnt ) > 0 :
tableDict [ D . DATA_ATTR_COUNT ] = tableCnt
j = 0
for i in range ( 1 , cnt ) :
if fields [ i ] [ 0 : 1 ] == " _ " :
startCols + = 1
continue
job . debug ( verify , str ( i ) + " cnt " + str ( fields [ i ] ) )
if len ( fields [ i ] ) > 0 :
columns . append ( fields [ i ] )
j = j + 1
cnt = j
tableDict [ B . DATA_NODE_HEADER ] = columns
job . debug ( verify , str ( state ) + " " + str ( cnt ) + " cols " + str ( columns ) )
elif state > = 2 and len ( testline ) > 2 :
job . debug ( verify , str ( state ) + " " + str ( len ( testline ) ) )
tableDict = getTabContent ( msg , data , h )
state = 3
row = { }
print ( line )
if startCols > 0 :
row [ B . ATTR_DATA_COMP ] = { }
row [ B . ATTR_DATA_COMP ] [ a [ 0 ] ] = a [ 1 ]
tableDict [ B . ATTR_DATA_COMP ] [ a [ 0 ] ] = a [ 1 ]
for i in range ( startCols , cnt + startCols ) :
print ( " for " + str ( i ) + " " + str ( len ( row ) ) + " " + str ( startCols ) + " " + str ( len ( fields ) ) )
print ( str ( fields [ i ] ) )
if i > = len ( columns ) + startCols :
break
row [ columns [ i - startCols ] ] = fields [ i ]
job . debug ( verify , str ( state ) + " row " + str ( row ) )
if B . DATA_NODE_DATA not in tableDict :
tableDict [ B . DATA_NODE_DATA ] = [ ]
tableDict [ B . DATA_NODE_DATA ] . append ( row )
setTabContent ( msg , data , tableDict , h )
elif state == 3 :
job . debug ( verify , " structure " + str ( state ) + " : " + str ( nodes ) )
state = 0
def setTdataStructure ( msg , data , path ) :
if len ( path ) > = 1 and path [ 0 ] not in data :
data [ path [ 0 ] ] = { }
if len ( path ) > = 2 and path [ 1 ] not in data [ path [ 0 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] = { }
if len ( path ) > = 3 and path [ 2 ] not in data [ path [ 0 ] ] [ path [ 1 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] = { }
if len ( path ) > = 4 and path [ 3 ] not in data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] :
data [ path [ 0 ] ] [ path [ 1 ] ] [ path [ 2 ] ] [ path [ 3 ] ] = { }
return data
def setSubnode ( i , nodes , data , tree ) :
print ( " setSubnode " + str ( i ) + " : " + " : " + str ( tree ) )
if i > = len ( nodes ) :
print ( " setSubnode a " + str ( i ) )
tree [ B . DATA_NODE_DATA ] = data
elif tree is not None and nodes [ i ] in tree . keys ( ) :
print ( " setSubnode b " + str ( i ) )
tree [ nodes [ i ] ] = setSubnode ( i + 1 , nodes , data , tree [ nodes [ i ] ] )
else :
print ( " setSubnode c " + str ( i ) )
tree [ nodes [ i ] ] = setSubnode ( ( i + 1 ) , nodes , data , { } )
return tree
def splitFields ( line , delimiter , job ) :
out = [ ]
fields = line . split ( delimiter )
for i in range ( 0 , len ( fields ) ) :
if fields [ i ] [ 0 : 1 ] == " # " :
break
if re . match ( r " ^ \" (.*) \" $ " , fields [ i ] ) :
fields [ i ] = fields [ i ] [ 1 : - 1 ]
out . append ( fields [ i ] )
return out
def getDataStructure ( comp ) :
# gets data-structure from the vml in the component-folder
job = basic . program . Job . getInstance ( )
verify = - 1 + job . getDebugLevel ( TOOL_NAME )
job . debug ( verify , " getDataStructure " + comp )
def normalizeDataRow ( dstruct , xpathtupel , row , referencedate ) :
# normalize data of the row if necessary
# raw-value is saved as new field with _raw as suffix
job = basic . program . Job . getInstance ( )
verify = - 1 + job . getDebugLevel ( TOOL_NAME )
job . debug ( verify , " calcDataRow " + row )
def writeCsvData ( filename , tdata , comp , job ) :
text = " "
if B . DATA_NODE_TABLES in tdata :
for k in tdata [ B . DATA_NODE_TABLES ] :
text + = buildCsvData ( tdata [ B . DATA_NODE_TABLES ] [ k ] , k , job )
text + = " \n "
utils . file_tool . writeFileText ( comp . m , filename , text )
def buildCsvData ( filename , tdata , comp ) :
def buildCsvData ( tdata , table , job = None ) :
"""
writes the testdata into a csv - file for documentation of the test - run
: param teststatus :
@ -430,55 +395,57 @@ def buildCsvData(filename, tdata, comp):
: param comp : if specific else None
: return :
"""
compColumn = not isCompTableFile ( comp , filename )
job = basic . program . Job . getInstance ( )
verify = - 1 + job . getDebugLevel ( TOOL_NAME )
job . debug ( verify , " writeDataTable " + str ( comp ) )
text = " "
for k in [ D . DATA_ATTR_DATE , D . DATA_ATTR_COUNT ] :
if k in tdata :
text + = k + " ; " + str ( tdata [ k ] ) + " \n "
header = " table "
header = utils . i18n_tool . I18n . getInstance ( ) . getText ( f " { B . DATA_NODE_TABLES =} " , job ) + " : " + table
for f in tdata [ B . DATA_NODE_HEADER ] :
header + = " ; " + f
if compColumn :
text + = header
else :
#text += "_nr;" + header[6:] + "\n"
text + = header [ 6 : ] + " \n "
header + = D . CSV_DELIMITER + f
text + = header + " \n "
i = 0
for r in tdata [ B . DATA_NODE_DATA ] :
row = " "
if B . ATTR_DATA_COMP in r :
for k in r [ B . ATTR_DATA_COMP ] :
row + = " , " + k + " : " + r [ B . ATTR_DATA_COMP ] [ k ]
row = row [ 1 : ]
i + = 1
for f in tdata [ B . DATA_NODE_HEADER ] :
if f in r :
row + = " ; " + str ( r [ f ] )
row + = D . CSV_DELIMITER + str ( r [ f ] )
else :
row + = " ; "
if compColumn :
row + = D . CSV_DELIMITER
text + = row
else :
text + = row [ 1 : ]
#text += str(i) + row
text + = " \n "
return text
def writeCsvData ( filename , tdata , comp ) :
def buildCsvSpec ( tdata , job = None ) :
text = " "
if B . DATA_NODE_TABLES in tdata :
for k in tdata [ B . DATA_NODE_TABLES ] :
text + = buildCsvData ( filename , tdata [ B . DATA_NODE_TABLES ] [ k ] , comp )
text + = " \n "
utils . file_tool . writeFileText ( comp . m , filename , text )
if D . CSV_BLOCK_IMPORT in tdata :
for k in tdata [ D . CSV_BLOCK_HEAD ] :
text + = utils . i18n_tool . I18n . getInstance ( ) . getText ( f " { D . CSV_BLOCK_HEAD =} " , job )
text + = " : " + k + D . CSV_DELIMITER + tdata [ D . CSV_BLOCK_HEAD ] [ k ] + " \n "
text + = " # option:key ;values;..;;;; \n "
if D . CSV_BLOCK_OPTION in tdata :
for k in tdata [ D . CSV_BLOCK_OPTION ] :
text + = utils . i18n_tool . I18n . getInstance ( ) . getText ( f " { D . CSV_BLOCK_OPTION =} " , job )
text + = " : " + k + D . CSV_DELIMITER + getHeadArgs ( tdata [ D . CSV_BLOCK_OPTION ] [ k ] , job ) + " \n "
text + = " #;;;;;; \n "
if D . CSV_BLOCK_STEP in tdata :
text + = basic . step . getStepHeader ( job )
i = 1
for step in tdata [ D . CSV_BLOCK_STEP ] :
text + = utils . i18n_tool . I18n . getInstance ( ) . getText ( f " { D . CSV_BLOCK_STEP =} " , job ) + " : " + str ( i )
text + = D . CSV_DELIMITER + step . getStepText ( job )
i + = 1
text + = " #;;;;;; \n "
if D . CSV_BLOCK_TABLES in tdata :
for k in tdata [ D . CSV_BLOCK_TABLES ] :
text + = buildCsvData ( tdata [ D . CSV_BLOCK_TABLES ] [ k ] , k , job )
text + = " #;;;;;; \n "
return text
def isCompTableFile ( comp , filename ) :
""" check if the filename belongs to the component """
basetable = os . path . basename ( filename ) [ 0 : - 4 ]
if comp is None :
return False
if B . TOPIC_NODE_DB in comp . conf [ B . SUBJECT_ARTS ] and basetable in comp . conf [ B . DATA_NODE_DDL ] \
and comp . name in filename :
return True
return False
def getHeadArgs ( value , job ) :
return value . replace ( D . INTERNAL_DELIMITER , D . CSV_DELIMITER )