Data-Test-Executer Framework speziell zum Test von Datenverarbeitungen mit Datengenerierung, Systemvorbereitungen, Einspielungen, ganzheitlicher diversifizierender Vergleich
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

367 lines
11 KiB

# Funktionen zum Dateizugriff mit Suchen, Lesen, Schreiben
# ------------------------------------------------------------
"""
"""
import codecs
import json
import os
import os.path
import re
import time
import xmltodict
import yaml
import platform
import basic.toolHandling
import basic.program
import tools.data_const as D
#import tools.tdata_tool
import tools.date_tool
import basic.constants as B
import tools.file_type
def getDump(obj):
result = vars(obj)
return str(result)
# if type(obj) == "__dict__"
def getFiles(msg, job, path, pattern, conn):
"""
search filenames in the directory - if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename or pattern
:param conn:
:return: Array with filenames
"""
if conn is not None:
return getRemoteFiles(msg, path, pattern, conn)
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
out = []
msg.debug(verify, "getFiles " + path + " , " + pattern)
if not os.path.exists(path):
return out
for f in os.listdir(path):
msg.debug(verify, "getFiles " + f)
if re.search(pattern, f):
msg.debug(verify, "match " + f)
out.append(f)
return out
def removeFiles(msg, path, pattern, conn):
"""
search filenames in the directory and removes it
- if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename as Pattern
:param conn:
:return: Array filenames
"""
pass
def copyFiles(job, fileList, source, target, comp):
"""
copies files from source to target
:param job:
:param fileList:
:param source:
:param target:
:param comp:
:return:
"""
pass
def getRemoteFiles(msg, path, pattern, conn):
"""
search filenames in the directory - if conn is set search remote
:param msg: -- msg-Objekt
:param path: -- path - String
:param pattern: -- filename as Pattern
:param conn:
:return: Array filenames
"""
pass
def getFilesRec(msg, job, path, pattern):
"""
Sucht Dateien im Verzeichnis rekursiv
:param msg: -- msg-Objekt
:param path: -- Pfad - String
:param pattern: -- Dateiname als Pattern
:return: Array mit gefundenen Dateien, absoluter Pfad
"""
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
out = []
msg.debug(verify, "getFilesRec " + path + " , " + pattern)
for (r, dirs, files) in os.walk(path):
for f in files:
msg.debug(verify, "getFilesRec " + f)
if re.search(pattern, f):
msg.debug(verify, "match " + f)
out.append(os.path.join(r, f))
return out
def getTree(msg, job, pfad):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
msg.debug(verify, "getTree " + pfad)
tree = {}
files = []
for f in os.listdir(pfad):
if os.path.isDir(os.path.join(pfad, f)):
tree[f] = getTree(msg, job, os.path.join(pfad, f))
elif os.path.isFile(os.path.join(pfad, f)):
files.append(f)
tree["_files_"] = files
return tree
def mkPaths(job, path, msg):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
# modus = job.conf.paths["mode"]
dirname = os.path.dirname(path)
if os.path.exists(dirname):
return
os.makedirs(dirname, exist_ok=True)
def getFileEncoding(msg, job, path):
encodings = ['utf-8', 'iso-8859-1'] # add more
for e in encodings:
try:
fh = codecs.open(path, 'r', encoding=e)
fh.readlines()
fh.seek(0)
except UnicodeDecodeError:
print('got unicode error with %s , trying different encoding' % e)
except:
print("except")
else:
print('opening the file with encoding: %s ' % e)
return e
return detectFileEncode(job, path, msg)
def detectFileEncode(job, path, msg): # return ""
verify = int(job.getDebugLevel("file_tool"))
cntIso = 0
cntUtf = 0
j = 0
CHAR_ISO = [196, 228, 214, 246, 220, 252, 191]
with open(path, 'rb') as file:
byte = file.read(1)
while (byte):
i = int.from_bytes(byte, "little")
# byte = file.read(1)
if (i in CHAR_ISO):
cntIso += 1
if (i == 160):
pass
elif (i > 127):
cntUtf += 1
j += 1
l = i
byte = file.read(1)
file.close()
if (cntIso > cntUtf):
return 'iso-8859-1'
return 'utf-8'
def read_file_lines(job, path, msg):
lines = read_file_text(job, path, msg)
if isinstance(lines, (str)):
return lines.splitlines()
return []
def read_file_text(job, path, msg):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
if not os.path.exists(path):
return ""
enc = detectFileEncode(job, path, msg)
with open(path, 'r', encoding=enc) as file:
text = file.read()
file.close()
return text
def getModTime(job, filepath):
out = ""
mtime = os.path.getmtime(filepath)
out = tools.date_tool.formatParsedDate(time.ctime(mtime), tools.date_tool.F_TIME_DEFAULT)
return out
def read_file_dict(job, path: str, msg, ttype: str = D.DFILE_TYPE_CSV) -> dict:
"""
reads and gets general a dict from any kind of filetyp
:param path: with extension of filetype
:param msg: optionally
:return:
"""
# 20220329 generalize
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
doc = {}
if not os.path.exists(path):
return doc
enc = detectFileEncode(job, path, msg)
if D.DFILE_TYPE_YML in path[-4:]:
with open(path, 'r', encoding=enc) as file:
doc = yaml.full_load(file)
file.close()
doc = tools.file_type.rebuild_tdata(job, doc, {}, ttype)
elif D.DFILE_TYPE_JSON in path[-5:]:
with open(path, 'r', encoding=enc) as file:
doc = json.load(file)
file.close()
doc = tools.file_type.rebuild_tdata(job, doc, {}, ttype)
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'r', encoding=enc) as file:
res = xmltodict.parse(file.read())
# doc = dict(res)
doc = castOrderedDict(res)
file.close()
doc = tools.file_type.rebuild_tdata(job, doc, {}, ttype)
elif D.DFILE_TYPE_CSV in path[-5:]:
ffcts = basic.toolHandling.getFileTool(job, None, D.DFILE_TYPE_CSV)
#doc = tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF)
doc = ffcts.load_file(path, ttype)
# tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF)
doc["_path"] = path
# TODO !! refactor to file_type
return tools.file_type.check_tdata(job, doc, ttype)
#check_file_dict(job, doc, msg, ttype)
#return doc
def check_file_dict(job, config: dict, msg, ttype: str):
"""
check-routine for different kind of dictionary-types
:param job:
:param config:
:param msg:
:param ttype:
:return:
"""
MUST_NODES = []
MUSTNT_NODES = []
if ttype in [D.CSV_SPECTYPE_CTLG]:
MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_KEYS]
MUSTNT_NODES = [B.DATA_NODE_DATA]
elif ttype in [D.CSV_SPECTYPE_DDL]:
MUST_NODES = [B.DATA_NODE_HEADER]
MUSTNT_NODES = [B.DATA_NODE_DATA, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS]
elif ttype in [D.CSV_SPECTYPE_DATA]:
MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_DATA]
MUSTNT_NODES = [B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS]
elif ttype in [D.CSV_SPECTYPE_CONF]:
MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_DATA]
MUSTNT_NODES = [B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS]
elif ttype in [D.CSV_SPECTYPE_TREE]:
MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS]
MUSTNT_NODES = [B.DATA_NODE_DATA]
elif ttype in [D.CSV_SPECTYPE_COMP]:
MUST_NODES = [B.SUBJECT_ARTIFACTS, B.SUBJECT_STEPS, "functions", B.SUBJECT_DATATABLES]
MUSTNT_NODES = [B.DATA_NODE_DATA]
elif ttype+"s" in B.LIST_SUBJECTS:
print("subject-typ "+ttype+" "+config["_path"])
elif ttype in ["basic", "tool"]:
# tool : tool-specific nodes
#print("anderer bekannter Ttyp " + ttype + " " + config["_path"])
return
else:
print("anderer Ttyp "+ttype+" "+config["_path"])
checkNodes(job, config, MUST_NODES, MUSTNT_NODES)
def checkNodes(job, config, mustNodes, mustntNodes):
a = str(config["_path"]).split(os.path.sep)
b = a[-1].split(".")
path = config["_path"]
if b[0] in config:
config = config[b[0]]
try:
if len(config) == 2:
for x in B.LIST_SUBJECTS:
if x[:-1] in config:
config = config[x[:-1]]
break
except:
pass
for n in mustNodes:
if n not in config:
raise Exception("must-node doesnt exist "+n+" "+path)
for n in mustntNodes:
if n not in config:
continue
if len(config[n]) == 0:
job.m.logWarn("empty mustnt-node "+n+" "+path)
else:
raise Exception("must-node doesnt exist "+n+" "+path)
def castOrderedDict(res, job=None, key=""):
if isinstance(res, dict):
doc = dict(res)
for x in doc:
doc[x] = castOrderedDict(doc[x], job, x)
elif isinstance(res, list):
sublist = []
for i in range(0, len(res)):
sublist.append(castOrderedDict(res[i], job, ""))
doc = sublist
else:
doc = res
return doc
def write_tile_text(msg, job, path, text, enc="utf-8"):
# job = basic.program.Job.getInstance()
verify = int(job.getDebugLevel("file_tool"))
mkPaths(job, path, msg)
with open(path, 'w', encoding=enc) as file:
file.write(text)
file.close()
def write_file_dict(msg, job, path, dict, enc="utf-8", ttype=""):
# job = basic.program.Job.getInstance()
mkPaths(job, path, msg)
if D.DFILE_TYPE_YML in path[-5:]:
with open(path, 'w', encoding=enc) as file:
yaml.dump(dict, file)
file.close()
elif D.DFILE_TYPE_JSON in path[-5:]:
with open(path, 'w', encoding=enc) as file:
doc = json.dumps(dict, indent=4)
file.write(doc)
file.close()
elif D.DFILE_TYPE_XML in path[-4:]:
with open(path, 'w', encoding=enc) as file:
text = xmltodict.unparse(dict, pretty=True)
if "<?xml version=" not in text:
text = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + text
file.write(text)
file.close()
elif D.DFILE_TYPE_CSV in path[-4:]:
print("fileWriter fuer csv")
ffcts = basic.toolHandling.getFileTool(job, None, D.DFILE_TYPE_CSV)
#doc = tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF)
doc = ffcts.dump_data_file(job, dict, path, ttype)
write_tile_text(msg, job, path, doc, enc)