123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- import os
- import time
- import json
- import base64
- import traceback
- from _log import logger, EventType
- import tools
- import shutil
- import cv2
- def is_image(path: str):
- img = cv2.imread(path)
- if img is None:
- return False
- return True
- class Interface:
- def __init__(self, interface=None):
- if interface is None:
- interface = os.getenv('PARAMS')
- if interface is None:
- interface = {
- }
- self.interface = self.decode_b64(interface)
- self.op_type = self.get_op_type()
- self.args = self.interface['args']
- self.input = self.interface.get('input', '/input')
- if not os.path.exists(self.input):
- raise FileNotFoundError('The input file does not exist.')
- self.result = self.interface.get('result', '/result')
- if self.result is None:
- raise FileNotFoundError('Need to give the result path.')
- self.input_dic = []
- self.IOU = self.args.get('IOU', 0.9)
- self.template = self.args['template']
- self.content = self.args.get('content')
- self.precision = self.args.get('precision', 0.9)
- self.time = self.args.get('time', 1)
- self.divisor = self.args.get('divisor', []) # {'coordinates': 0.4, 'properties': 0.3, 'imageResult': 0.3}
- self.distance = self.args.get('distance')
- @staticmethod
- def decode_b64(interface):
- interface = base64.b64decode(interface.encode('utf-8')).decode("utf-8")
- interface = json.loads(interface)
- assert isinstance(interface, dict), 'The interface must be a dictionary.'
- return interface
- @staticmethod
- def encode_b64(string):
- return base64.b64encode(string.encode('utf-8')).decode("utf-8")
- def get_op_type(self):
- op_type = self.interface.get('type', 'matting')
- op_type = op_type.upper()
- if op_type not in ['MATTING', 'RESIZE']:
- raise TypeError('The type must be matting or resize')
- return op_type
- def scan_files(self, path: str):
- if os.path.isdir(path):
- self.scan_files(path)
- else:
- self.input_dic.append(path)
- def run(self):
- logger.info("files scanning.", extra={'event_type': EventType.STEP_COMPLETE})
- self.scan_files(self.input)
- files_in_total = len(self.input_dic)
- logger.info("files-in-total",
- extra={'event_type': EventType.METRICS, 'desc': '处理文件总个数', 'value': files_in_total})
- count = 0
- for path in self.input_dic:
- count += 1
- logger.info("PROGRESS", extra={'event_type': EventType.PROGRESS, 'total': files_in_total, 'current': count})
- try:
- real_path = os.path.relpath(path, self.input)
- shutil.copy(path, os.path.join(self.result, real_path))
- if is_image(path):
- output = os.path.dirname(path)
- output = os.path.join(self.result, os.path.relpath(output, self.input))
- tools.seg(path, output)
- except Exception as e:
- logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
- def start(interface=None):
- logger.info('TASK_STARTED', extra={'event_type': EventType.TASK_STARTED})
- t = time.time()
- try:
- Interface(interface=interface).run()
- except Exception as e:
- logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
- logger.info("TASK_FINISHED", extra={'event_type': EventType.TASK_FINISHED})
- logger.info("TASK_FINISHED")
- elapsed = time.time() - t
- logger.info("time-elapsed", extra={'event_type': EventType.METRICS, 'desc': '总时长', 'value': elapsed})
- if __name__ == '__main__':
- start()
|