import os import time import json import base64 import traceback from _log import logger, EventType import tools import shutil import cv2 import copy def is_image(path: str): img = cv2.imread(path) if img is None: return False return True class Interface: def __init__(self, interface=None): if interface is None: interface = os.getenv('PARAMS') if interface is None: interface = '{"width":512}' self.interface = self.decode_b64(interface) self.input = self.interface.get('input', '/input') if not os.path.exists(self.input): raise FileNotFoundError('The input file does not exist.') self.result = self.interface.get('result', '/output') if self.result is None: os.makedirs(str(self.result)) self.params = tools.ProcessorParams(self.interface.get('width'), self.interface.get('height'), self.interface.get('rotate'), self.interface.get('flip'), self.interface.get('rect'), self.result, None) self.input_dic = [] @staticmethod def decode_b64(interface): try: interface = base64.b64decode(interface.encode('utf-8')).decode("utf-8") except: interface = interface.encode('utf-8') logger.info("PREPARE", extra={'event_type': EventType.PROGRESS, 'params': interface}) interface = json.loads(interface) assert isinstance(interface, dict), 'The interface must be a dictionary.' return interface @staticmethod def encode_b64(string): return base64.b64encode(string.encode('utf-8')).decode("utf-8") def scan_files(self, path: str): if not os.path.isdir(path): self.input_dic.append(path) return files = os.listdir(path) for file in files: self.scan_files(os.path.join(path, file)) def run(self): logger.info("files scanning.", extra={'event_type': EventType.STEP_COMPLETE}) self.scan_files(self.input) files_in_total = len(self.input_dic) logger.info("files-in-total", extra={'event_type': EventType.METRICS, 'desc': '处理文件总个数', 'value': files_in_total}) count = 0 for path in self.input_dic: count += 1 logger.info("PROGRESS", extra={'event_type': EventType.PROGRESS, 'total': files_in_total, 'current': count}) try: real_path = os.path.dirname(path) real_path = os.path.relpath(real_path, self.input) output = os.path.join(self.result, real_path) if not os.path.exists(output): os.makedirs(output) params = copy.deepcopy(self.params) params.set_path(path, output) if not tools.processor(params): shutil.copy(path, os.path.join(output, os.path.basename(path))) except Exception as e: logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()}) def start(interface=None): logger.info('TASK_STARTED', extra={'event_type': EventType.TASK_STARTED}) t = time.time() try: Interface(interface=interface).run() except Exception as e: logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()}) logger.info("TASK_FINISHED", extra={'event_type': EventType.TASK_FINISHED}) logger.info("TASK_FINISHED") elapsed = time.time() - t logger.info("time-elapsed", extra={'event_type': EventType.METRICS, 'desc': '总时长', 'value': elapsed}) if __name__ == '__main__': start()