# Funktionen zum Dateizugriff mit Suchen, Lesen, Schreiben # ------------------------------------------------------------ """ """ import codecs import json import os import os.path import re import time import xmltodict import yaml import platform import basic.toolHandling import basic.program import tools.data_const as D #import tools.tdata_tool import tools.date_tool import basic.constants as B def getDump(obj): result = vars(obj) return str(result) # if type(obj) == "__dict__" def getFiles(msg, job, path, pattern, conn): """ search filenames in the directory - if conn is set search remote :param msg: -- msg-Objekt :param path: -- path - String :param pattern: -- filename or pattern :param conn: :return: Array with filenames """ if conn is not None: return getRemoteFiles(msg, path, pattern, conn) # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) out = [] msg.debug(verify, "getFiles " + path + " , " + pattern) if not os.path.exists(path): return out for f in os.listdir(path): msg.debug(verify, "getFiles " + f) if re.search(pattern, f): msg.debug(verify, "match " + f) out.append(f) return out def removeFiles(msg, path, pattern, conn): """ search filenames in the directory and removes it - if conn is set search remote :param msg: -- msg-Objekt :param path: -- path - String :param pattern: -- filename as Pattern :param conn: :return: Array filenames """ pass def copyFiles(job, fileList, source, target, comp): """ copies files from source to target :param job: :param fileList: :param source: :param target: :param comp: :return: """ pass def getRemoteFiles(msg, path, pattern, conn): """ search filenames in the directory - if conn is set search remote :param msg: -- msg-Objekt :param path: -- path - String :param pattern: -- filename as Pattern :param conn: :return: Array filenames """ pass def getFilesRec(msg, job, path, pattern): """ Sucht Dateien im Verzeichnis rekursiv :param msg: -- msg-Objekt :param path: -- Pfad - String :param pattern: -- Dateiname als Pattern :return: Array mit gefundenen Dateien, absoluter Pfad """ # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) out = [] msg.debug(verify, "getFilesRec " + path + " , " + pattern) for (r, dirs, files) in os.walk(path): for f in files: msg.debug(verify, "getFilesRec " + f) if re.search(pattern, f): msg.debug(verify, "match " + f) out.append(os.path.join(r, f)) return out def getTree(msg, job, pfad): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) msg.debug(verify, "getTree " + pfad) tree = {} files = [] for f in os.listdir(pfad): if os.path.isDir(os.path.join(pfad, f)): tree[f] = getTree(msg, job, os.path.join(pfad, f)) elif os.path.isFile(os.path.join(pfad, f)): files.append(f) tree["_files_"] = files return tree def mkPaths(job, path, msg): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) # modus = job.conf.paths["mode"] dirname = os.path.dirname(path) if os.path.exists(dirname): return os.makedirs(dirname, exist_ok=True) def getFileEncoding(msg, job, path): encodings = ['utf-8', 'iso-8859-1'] # add more for e in encodings: try: fh = codecs.open(path, 'r', encoding=e) fh.readlines() fh.seek(0) except UnicodeDecodeError: print('got unicode error with %s , trying different encoding' % e) except: print("except") else: print('opening the file with encoding: %s ' % e) return e return detectFileEncode(job, path, msg) def detectFileEncode(job, path, msg): # return "" verify = int(job.getDebugLevel("file_tool")) cntIso = 0 cntUtf = 0 j = 0 CHAR_ISO = [196, 228, 214, 246, 220, 252, 191] with open(path, 'rb') as file: byte = file.read(1) while (byte): i = int.from_bytes(byte, "little") # byte = file.read(1) if (i in CHAR_ISO): cntIso += 1 if (i == 160): pass elif (i > 127): cntUtf += 1 j += 1 l = i byte = file.read(1) file.close() if (cntIso > cntUtf): return 'iso-8859-1' return 'utf-8' def read_file_lines(job, path, msg): lines = read_file_text(job, path, msg) if isinstance(lines, (str)): return lines.splitlines() return [] def read_file_text(job, path, msg): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) if not os.path.exists(path): return "" enc = detectFileEncode(job, path, msg) with open(path, 'r', encoding=enc) as file: text = file.read() file.close() return text def getModTime(job, filepath): out = "" mtime = os.path.getmtime(filepath) out = tools.date_tool.formatParsedDate(time.ctime(mtime), tools.date_tool.F_TIME_DEFAULT) return out def read_file_dict(job, path: str, msg, ttype: str = D.DFILE_TYPE_CSV) -> dict: """ reads and gets general a dict from any kind of filetyp :param path: with extension of filetype :param msg: optionally :return: """ # 20220329 generalize # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) doc = {} if not os.path.exists(path): return doc enc = detectFileEncode(job, path, msg) if D.DFILE_TYPE_YML in path[-4:]: with open(path, 'r', encoding=enc) as file: doc = yaml.full_load(file) file.close() elif D.DFILE_TYPE_JSON in path[-5:]: with open(path, 'r', encoding=enc) as file: doc = json.load(file) file.close() elif D.DFILE_TYPE_XML in path[-4:]: with open(path, 'r', encoding=enc) as file: res = xmltodict.parse(file.read()) # doc = dict(res) doc = castOrderedDict(res) file.close() elif D.DFILE_TYPE_CSV in path[-5:]: ffcts = basic.toolHandling.getFileTool(job, None, D.DFILE_TYPE_CSV) #doc = tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF) doc = ffcts.load_file(path, ttype) # tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF) doc["_path"] = path check_file_dict(job, doc, msg, ttype) return doc def check_file_dict(job, config: dict, msg, ttype: str): """ check-routine for different kind of dictionary-types :param job: :param config: :param msg: :param ttype: :return: """ MUST_NODES = [] MUSTNT_NODES = [] if ttype in [D.CSV_SPECTYPE_CTLG]: MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_KEYS] MUSTNT_NODES = [B.DATA_NODE_DATA] elif ttype in [D.CSV_SPECTYPE_DDL]: MUST_NODES = [B.DATA_NODE_HEADER] MUSTNT_NODES = [B.DATA_NODE_DATA, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] elif ttype in [D.CSV_SPECTYPE_DATA]: MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_DATA] MUSTNT_NODES = [B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] elif ttype in [D.CSV_SPECTYPE_CONF]: MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_DATA] MUSTNT_NODES = [B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] elif ttype in [D.CSV_SPECTYPE_TREE]: MUST_NODES = [B.DATA_NODE_HEADER, B.DATA_NODE_FIELDS, B.DATA_NODE_KEYS] MUSTNT_NODES = [B.DATA_NODE_DATA] elif ttype in [D.CSV_SPECTYPE_COMP]: MUST_NODES = [B.SUBJECT_ARTIFACTS, B.SUBJECT_STEPS, "functions", B.SUBJECT_DATATABLES] MUSTNT_NODES = [B.DATA_NODE_DATA] elif ttype in ["basic", "tool"]: # tool : tool-specific nodes print("anderer bekannter Ttyp " + ttype + " " + config["_path"]) return else: print("anderer Ttyp "+ttype+" "+config["_path"]) checkNodes(job, config, MUST_NODES, MUSTNT_NODES) def checkNodes(job, config, mustNodes, mustntNodes): a = str(config["_path"]).split(os.path.sep) b = a[-1].split(".") path = config["_path"] if b[0] in config: config = config[b[0]] try: if len(config) == 2: for x in B.LIST_SUBJECTS: if x[:-1] in config: config = config[x[:-1]] break except: pass for n in mustNodes: if n not in config: raise Exception("must-node doesnt exist "+n+" "+path) for n in mustntNodes: if n not in config: continue if len(config[n]) == 0: job.m.logWarn("empty mustnt-node "+n+" "+path) else: raise Exception("must-node doesnt exist "+n+" "+path) def castOrderedDict(res, job=None, key=""): if isinstance(res, dict): doc = dict(res) for x in doc: doc[x] = castOrderedDict(doc[x], job, x) elif isinstance(res, list): sublist = [] for i in range(0, len(res)): sublist.append(castOrderedDict(res[i], job, "")) doc = sublist else: doc = res return doc def write_tile_text(msg, job, path, text, enc="utf-8"): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) mkPaths(job, path, msg) with open(path, 'w', encoding=enc) as file: file.write(text) file.close() def write_file_dict(msg, job, path, dict, enc="utf-8", ttype=""): # job = basic.program.Job.getInstance() mkPaths(job, path, msg) if D.DFILE_TYPE_YML in path[-5:]: with open(path, 'w', encoding=enc) as file: yaml.dump(dict, file) file.close() elif D.DFILE_TYPE_JSON in path[-5:]: with open(path, 'w', encoding=enc) as file: doc = json.dumps(dict, indent=4) file.write(doc) file.close() elif D.DFILE_TYPE_XML in path[-4:]: with open(path, 'w', encoding=enc) as file: text = xmltodict.unparse(dict, pretty=True) if "\n" + text file.write(text) file.close() elif D.DFILE_TYPE_CSV in path[-4:]: print("fileWriter fuer csv") ffcts = basic.toolHandling.getFileTool(job, None, D.DFILE_TYPE_CSV) #doc = tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF) doc = ffcts.dump_data_file(job, dict, path, ttype) write_tile_text(msg, job, path, doc, enc)