# Funktionen zum Dateizugriff mit Suchen, Lesen, Schreiben # ------------------------------------------------------------ """ """ import codecs import json import os import os.path import re import yaml import basic.message import basic.program import utils.data_const as D from pprint import pp import utils.tdata_tool def getDump(obj): result="" print (str(type(obj))) result = vars(obj) return str(result) # if type(obj) == "__dict__" def getFiles(msg, path, pattern, conn): """ search filenames in the directory - if conn is set search remote :param msg: -- msg-Objekt :param path: -- Pfad - String :param pattern: -- Dateiname als Pattern :param conn: :return: Array mit gefundenen Dateien, nur Dateiname """ if conn is not None: return getRemoteFiles(msg, path, pattern, conn) job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) out = [] msg.debug(verify, "getFiles " + path + " , " + pattern) if not os.path.exists(path): return out for f in os.listdir(path): msg.debug(verify, "getFiles " + f) if re.search(pattern, f): msg.debug(verify, "match " + f) out.append(f) return out def getRemoteFiles(msg, path, pattern, conn): """ search filenames in the directory - if conn is set search remote :param msg: -- msg-Objekt :param path: -- Pfad - String :param pattern: -- Dateiname als Pattern :param conn: :return: Array mit gefundenen Dateien, nur Dateiname """ def getFilesRec(msg, path, pattern): """ Sucht Dateien im Verzeichnis rekursiv :param msg: -- msg-Objekt :param path: -- Pfad - String :param pattern: -- Dateiname als Pattern :return: Array mit gefundenen Dateien, absoluter Pfad """ job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) out = [] msg.debug(verify, "getFilesRec " + path + " , " + pattern) for (r, dirs, files) in os.walk(path): for f in files: msg.debug(verify, "getFilesRec " + f) if re.search(pattern, f): msg.debug(verify, "match " + f) out.append(os.path.join(r, f)) return out def getTree(msg, pfad): job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) msg.debug(verify, "getTree " + pfad ) tree = {} files = [] for f in os.listdir(pfad): if os.path.isDir(os.path.join(pfad, f)): tree[f] = getTree(msg, os.path.join(pfad, f)) elif os.path.isFile(os.path.join(pfad, f)): files.append(f) tree["_files_"] = files return tree def mkPaths(path, msg): job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) modus = job.conf.confs["paths"]["mode"] dirname = os.path.dirname(path) if os.path.exists(dirname): return os.makedirs(dirname, exist_ok=True) def getFileEncoding(msg, path): print("--- getFileEncoding "+path) encodings = ['utf-8', 'iso-8859-1'] # add more for e in encodings: print(e) try: fh = codecs.open(path, 'r', encoding=e) fh.readlines() fh.seek(0) except UnicodeDecodeError: print('got unicode error with %s , trying different encoding' % e) except: print("except") else: print('opening the file with encoding: %s ' % e) return e return detectFileEncode(path, msg) def detectFileEncode(path, msg): # return "" job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) print(path) cntIso = 0 cntUtf = 0 with open(path, 'rb') as file: while (byte := file.read(1)): i = int.from_bytes(byte, "little") #byte = file.read(1) if ((i == 196) or (i == 228) or (i == 214) or (i == 246) or (i == 220) or (i == 252) or (i == 191)): cntIso += 1 if (i == 160): pass elif (i > 127): cntUtf += 1 if (cntIso > cntUtf): return 'iso-8859-1' return 'utf-8' def readFileLines(path, msg): lines = readFileText(path, msg) if isinstance(lines, (str)): return lines.splitlines() return [] def readFileText(path, msg): job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) if not os.path.exists(path): return "" enc = detectFileEncode(path, msg) with open(path, 'r', encoding=enc) as file: text = file.read() file.close() return text def readFileDict(path, msg): """ reads and gets general a dict from any kind of filetyp :param path: with extension of filetype :param msg: optionally :return: """ # 20220329 generalize job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) doc = {} if not os.path.exists(path): return doc enc = detectFileEncode(path, msg) if D.DFILE_TYPE_YML in path[-4:]: with open(path, 'r', encoding=enc) as file: doc = yaml.full_load(file) file.close() elif D.DFILE_TYPE_JSON in path[-5:]: with open(path, 'r', encoding=enc) as file: doc = json.load(file) file.close() elif D.DFILE_TYPE_CSV in path[-5:]: doc = utils.tdata_tool.getCsvSpec(msg, path, "conf") return doc def writeFileText(msg, path, text, enc="utf-8"): job = basic.program.Job.getInstance() verify = int(job.getDebugLevel("file_tool")) mkPaths(path, msg) with open(path, 'w', encoding=enc) as file: file.write(text) file.close() def writeFileDict(msg, path, dict, enc="utf-8"): job = basic.program.Job.getInstance() mkPaths(path, msg) if D.DFILE_TYPE_YML in path[-5:]: with open(path, 'r', encoding=enc) as file: doc = yaml.dump(dict, file) file.close() elif D.DFILE_TYPE_JSON in path[-5:]: with open(path, 'w', encoding=enc) as file: doc = json.dumps(file, indent=4) file.write(doc) file.close()