# Funktionen zum Dateizugriff mit Suchen, Lesen, Schreiben # ------------------------------------------------------------ """ """ import codecs import json import os import os.path import re import time import xmltodict import yaml import platform import basic.toolHandling import basic.program import tools.data_const as D #import tools.tdata_tool import tools.date_tool def getDump(obj): result = vars(obj) return str(result) # if type(obj) == "__dict__" def getFiles(msg, job, path, pattern, conn): """ search filenames in the directory - if conn is set search remote :param msg: -- msg-Objekt :param path: -- path - String :param pattern: -- filename or pattern :param conn: :return: Array with filenames """ if conn is not None: return getRemoteFiles(msg, path, pattern, conn) # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) out = [] msg.debug(verify, "getFiles " + path + " , " + pattern) if not os.path.exists(path): return out for f in os.listdir(path): msg.debug(verify, "getFiles " + f) if re.search(pattern, f): msg.debug(verify, "match " + f) out.append(f) return out def removeFiles(msg, path, pattern, conn): """ search filenames in the directory and removes it - if conn is set search remote :param msg: -- msg-Objekt :param path: -- path - String :param pattern: -- filename as Pattern :param conn: :return: Array filenames """ pass def copyFiles(job, fileList, source, target, comp): """ copies files from source to target :param job: :param fileList: :param source: :param target: :param comp: :return: """ pass def getRemoteFiles(msg, path, pattern, conn): """ search filenames in the directory - if conn is set search remote :param msg: -- msg-Objekt :param path: -- path - String :param pattern: -- filename as Pattern :param conn: :return: Array filenames """ pass def getFilesRec(msg, job, path, pattern): """ Sucht Dateien im Verzeichnis rekursiv :param msg: -- msg-Objekt :param path: -- Pfad - String :param pattern: -- Dateiname als Pattern :return: Array mit gefundenen Dateien, absoluter Pfad """ # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) out = [] msg.debug(verify, "getFilesRec " + path + " , " + pattern) for (r, dirs, files) in os.walk(path): for f in files: msg.debug(verify, "getFilesRec " + f) if re.search(pattern, f): msg.debug(verify, "match " + f) out.append(os.path.join(r, f)) return out def getTree(msg, job, pfad): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) msg.debug(verify, "getTree " + pfad) tree = {} files = [] for f in os.listdir(pfad): if os.path.isDir(os.path.join(pfad, f)): tree[f] = getTree(msg, job, os.path.join(pfad, f)) elif os.path.isFile(os.path.join(pfad, f)): files.append(f) tree["_files_"] = files return tree def mkPaths(job, path, msg): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) # modus = job.conf.paths["mode"] dirname = os.path.dirname(path) if os.path.exists(dirname): return os.makedirs(dirname, exist_ok=True) def getFileEncoding(msg, job, path): encodings = ['utf-8', 'iso-8859-1'] # add more for e in encodings: try: fh = codecs.open(path, 'r', encoding=e) fh.readlines() fh.seek(0) except UnicodeDecodeError: print('got unicode error with %s , trying different encoding' % e) except: print("except") else: print('opening the file with encoding: %s ' % e) return e return detectFileEncode(job, path, msg) def detectFileEncode(job, path, msg): # return "" verify = int(job.getDebugLevel("file_tool")) cntIso = 0 cntUtf = 0 j = 0 CHAR_ISO = [196, 228, 214, 246, 220, 252, 191] with open(path, 'rb') as file: byte = file.read(1) while (byte): i = int.from_bytes(byte, "little") # byte = file.read(1) if (i in CHAR_ISO): cntIso += 1 if (i == 160): pass elif (i > 127): cntUtf += 1 j += 1 l = i byte = file.read(1) file.close() if (cntIso > cntUtf): return 'iso-8859-1' return 'utf-8' def read_file_lines(job, path, msg): lines = read_file_text(job, path, msg) if isinstance(lines, (str)): return lines.splitlines() return [] def read_file_text(job, path, msg): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) if not os.path.exists(path): return "" enc = detectFileEncode(job, path, msg) with open(path, 'r', encoding=enc) as file: text = file.read() file.close() return text def getModTime(job, filepath): out = "" mtime = os.path.getmtime(filepath) out = tools.date_tool.formatParsedDate(time.ctime(mtime), tools.date_tool.F_TIME_DEFAULT) return out def read_file_dict(job, path, msg, ttype=D.DFILE_TYPE_CSV): """ reads and gets general a dict from any kind of filetyp :param path: with extension of filetype :param msg: optionally :return: """ # 20220329 generalize # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) doc = {} if not os.path.exists(path): return doc enc = detectFileEncode(job, path, msg) if D.DFILE_TYPE_YML in path[-4:]: with open(path, 'r', encoding=enc) as file: doc = yaml.full_load(file) file.close() elif D.DFILE_TYPE_JSON in path[-5:]: with open(path, 'r', encoding=enc) as file: doc = json.load(file) file.close() elif D.DFILE_TYPE_XML in path[-4:]: with open(path, 'r', encoding=enc) as file: res = xmltodict.parse(file.read()) # doc = dict(res) doc = castOrderedDict(res) file.close() elif D.DFILE_TYPE_CSV in path[-5:]: ffcts = basic.toolHandling.getFileTool(job, None, D.DFILE_TYPE_CSV) #doc = tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF) doc = ffcts.load_file(path, ttype) # tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF) return doc def castOrderedDict(res, job=None, key=""): if isinstance(res, dict): doc = dict(res) for x in doc: doc[x] = castOrderedDict(doc[x], job, x) elif isinstance(res, list): sublist = [] for i in range(0, len(res)): sublist.append(castOrderedDict(res[i], job, "")) doc = sublist else: doc = res return doc def write_tile_text(msg, job, path, text, enc="utf-8"): # job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) mkPaths(job, path, msg) with open(path, 'w', encoding=enc) as file: file.write(text) file.close() def write_file_dict(msg, job, path, dict, enc="utf-8", ttype=""): # job = basic.program.Job.getInstance() mkPaths(job, path, msg) if D.DFILE_TYPE_YML in path[-5:]: with open(path, 'w', encoding=enc) as file: yaml.dump(dict, file) file.close() elif D.DFILE_TYPE_JSON in path[-5:]: with open(path, 'w', encoding=enc) as file: doc = json.dumps(dict, indent=4) file.write(doc) file.close() elif D.DFILE_TYPE_XML in path[-4:]: with open(path, 'w', encoding=enc) as file: text = xmltodict.unparse(dict, pretty=True) if "\n" + text file.write(text) file.close() elif D.DFILE_TYPE_CSV in path[-4:]: print("fileWriter fuer csv") ffcts = basic.toolHandling.getFileTool(job, None, D.DFILE_TYPE_CSV) #doc = tools.tdata_tool.getCsvSpec(msg, job, path, D.CSV_SPECTYPE_CONF) doc = ffcts.dump_file(dict, path, ttype) write_tile_text(msg, job, path, doc, enc)