123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- import os
- import time
- import json
- import base64
- import traceback
- from _log import logger, EventType
- import tools
- import shutil
- import cv2
- import copy
- def is_image(path: str):
- img = cv2.imread(path)
- if img is None:
- return False
- return True
- class Interface:
- def __init__(self, interface=None):
- if interface is None:
- interface = os.getenv('PARAMS')
- if interface is None:
- interface = '{"width":512}'
- self.interface = self.decode_b64(interface)
- self.input = self.interface.get('input', '/input')
- if not os.path.exists(self.input):
- raise FileNotFoundError('The input file does not exist.')
- self.result = self.interface.get('result', '/output')
- if self.result is None:
- os.makedirs(str(self.result))
- self.params = tools.ProcessorParams(self.interface.get('width'),
- self.interface.get('height'),
- self.interface.get('rotate'),
- self.interface.get('flip'),
- self.interface.get('rect'),
- self.result,
- None)
- self.input_dic = []
- @staticmethod
- def decode_b64(interface):
- try:
- interface = base64.b64decode(interface.encode('utf-8')).decode("utf-8")
- except:
- interface = interface.encode('utf-8')
- logger.info("PREPARE", extra={'event_type': EventType.PROGRESS, 'params': interface})
- interface = json.loads(interface)
- assert isinstance(interface, dict), 'The interface must be a dictionary.'
- return interface
- @staticmethod
- def encode_b64(string):
- return base64.b64encode(string.encode('utf-8')).decode("utf-8")
- def scan_files(self, path: str):
- if not os.path.isdir(path):
- self.input_dic.append(path)
- return
- files = os.listdir(path)
- for file in files:
- self.scan_files(os.path.join(path, file))
- def run(self):
- logger.info("files scanning.", extra={'event_type': EventType.STEP_COMPLETE})
- self.scan_files(self.input)
- files_in_total = len(self.input_dic)
- logger.info("files-in-total",
- extra={'event_type': EventType.METRICS, 'desc': '处理文件总个数', 'value': files_in_total})
- count = 0
- for path in self.input_dic:
- count += 1
- logger.info("PROGRESS", extra={'event_type': EventType.PROGRESS, 'total': files_in_total, 'current': count})
- try:
- real_path = os.path.dirname(path)
- real_path = os.path.relpath(real_path, self.input)
- output = os.path.join(self.result, real_path)
- if not os.path.exists(output):
- os.makedirs(output)
- params = copy.deepcopy(self.params)
- params.set_path(path, output)
- if not tools.processor(params):
- shutil.copy(path, os.path.join(output, os.path.basename(path)))
- except Exception as e:
- logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
- def start(interface=None):
- logger.info('TASK_STARTED', extra={'event_type': EventType.TASK_STARTED})
- t = time.time()
- try:
- Interface(interface=interface).run()
- except Exception as e:
- logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
- logger.info("TASK_FINISHED", extra={'event_type': EventType.TASK_FINISHED})
- logger.info("TASK_FINISHED")
- elapsed = time.time() - t
- logger.info("time-elapsed", extra={'event_type': EventType.METRICS, 'desc': '总时长', 'value': elapsed})
- if __name__ == '__main__':
- start()
|