Browse Source

feat: 增加一次性任务

tuon 1 year ago
parent
commit
146478f00b
6 changed files with 334 additions and 3 deletions
  1. 1 2
      Dockerfile
  2. 7 0
      Makefile
  3. 202 0
      _log.py
  4. 7 0
      bin/task/Dockerfile
  5. 3 1
      requirements.txt
  6. 114 0
      task.py

+ 1 - 2
Dockerfile

@@ -14,5 +14,4 @@ RUN apt update
 RUN apt install -y libgl1-mesa-glx
 
 WORKDIR /app
-ENTRYPOINT ["python"]
-CMD ["main.py", "--production"]
+CMD ["python","main.py", "--production"]

+ 7 - 0
Makefile

@@ -0,0 +1,7 @@
+VERSION=latest
+srv:
+	docker build -t registry.cn-hangzhou.aliyuncs.com/tuon-pub/matting-human:$(VERSION) -f ./Dockerfile .
+	docker push registry.cn-hangzhou.aliyuncs.com/tuon-pub/matting-human:$(VERSION)
+task:
+	docker build -t registry.cn-hangzhou.aliyuncs.com/tuon-pub/matting-human-task:$(VERSION) -f ./bin/task/Dockerfile .
+	docker push registry.cn-hangzhou.aliyuncs.com/tuon-pub/matting-human-task:$(VERSION)

+ 202 - 0
_log.py

@@ -0,0 +1,202 @@
+# coding: utf-8
+
+import logging
+import types
+import datetime
+import os
+from collections import namedtuple
+from enum import Enum
+import simplejson
+from pythonjsonlogger import jsonlogger
+
+
+###############################################################################
+
+class ServiceType(Enum):
+    AGENT = 1
+    TASK = 2
+    EXPORT = 3
+
+
+class EventType(Enum):
+    LOG = 1
+    TASK_STARTED = 2
+    TASK_FINISHED = 3
+    TASK_STOPPED = 4
+    TASK_CRASHED = 5
+    STEP_COMPLETE = 6
+    PROGRESS = 7
+    METRICS = 8
+    AGENT_READY = 9
+    TASK_VERIFIED = 10
+    TASK_REJECTED = 11
+    TASK_SUBMITTED = 12
+    TASK_SCHEDULED = 13
+    AGENT_EXITED = 14
+    LOGA = 15
+    STEP_STARTED = 16
+    FILES_UPLOADED = 17
+
+###############################################################################
+# predefined levels
+
+
+# level name: level, default exc_info, description
+LogLevelSpec = namedtuple('LogLevelSpec', [
+    'int',
+    'add_exc_info',
+    'descr',
+])
+
+LOGGING_LEVELS = {
+    'FATAL': LogLevelSpec(50, True, 'Critical error'),
+    'ERROR': LogLevelSpec(40, True, 'Error'),  # may be shown to end user
+    'WARN': LogLevelSpec(30, False, 'Warning'),  # may be shown to end user
+    'INFO': LogLevelSpec(20, False, 'Info'),  # may be shown to end user
+    'DEBUG': LogLevelSpec(10, False, 'Debug'),
+    'TRACE': LogLevelSpec(5, False, 'Trace'),
+}
+
+
+def _set_logging_levels(levels, the_logger):
+    for lvl_name, (lvl, def_exc_info, _) in levels.items():
+        logging.addLevelName(lvl, lvl_name.upper())  # two mappings
+
+        def construct_logger_member(lvl_val, default_exc_info):
+            return lambda self, msg, *args, exc_info=default_exc_info, **kwargs: \
+                self.log(lvl_val,
+                         msg,
+                         *args,
+                         exc_info=exc_info,
+                         **kwargs)
+
+        func = construct_logger_member(lvl, def_exc_info)
+        bound_method = types.MethodType(func, the_logger)
+        setattr(the_logger, lvl_name.lower(), bound_method)
+
+
+###############################################################################
+
+
+def _get_default_logging_fields():
+    supported_keys = [
+        'asctime',
+        # 'created',
+        # 'filename',
+        # 'funcName',
+        'levelname',
+        # 'levelno',
+        # 'lineno',
+        # 'module',
+        # 'msecs',
+        'message',
+        # 'name',
+        # 'pathname',
+        # 'process',
+        # 'processName',
+        # 'relativeCreated',
+        # 'thread',
+        # 'threadName'
+    ]
+    return ' '.join(['%({0:s})'.format(k) for k in supported_keys])
+
+
+def dumps_ignore_nan(obj, *args, **kwargs):
+    return simplejson.dumps(obj, ignore_nan=True, ensure_ascii=False, *args, **kwargs)
+
+
+class CustomJsonFormatter(jsonlogger.JsonFormatter):
+    additional_fields = {}
+
+    def __init__(self, format_string):
+        super().__init__(format_string, json_serializer=dumps_ignore_nan)
+
+    def process_log_record(self, log_record):
+        log_record['timestamp'] = log_record.pop('asctime', None)
+
+        levelname = log_record.pop('levelname', None)
+        if levelname is not None:
+            log_record['level'] = levelname.lower()
+
+        e_info = log_record.pop('exc_info', None)
+        if e_info is not None:
+            if e_info == 'NoneType: None':  # python logger is not ok here
+                pass
+            else:
+                log_record['stack'] = e_info.split('\n')
+
+        return jsonlogger.JsonFormatter.process_log_record(self, log_record)
+
+    def add_fields(self, log_record, record, message_dict):
+        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
+
+        for field, val in CustomJsonFormatter.additional_fields.items():
+            if (val is not None) and (field not in log_record):
+                log_record[field] = val
+
+    def formatTime(self, record, datefmt=None):
+        ct = datetime.datetime.fromtimestamp(record.created)
+        t = ct.strftime('%Y-%m-%dT%H:%M:%S')
+        s = '%s.%03dZ' % (t, record.msecs)
+        return s
+
+
+def _construct_logger(the_logger, loglevel_text):
+    for handler in the_logger.handlers:
+        the_logger.removeHandler(handler)
+
+    _set_logging_levels(LOGGING_LEVELS, the_logger)
+
+    the_logger.setLevel(loglevel_text.upper())
+
+    log_handler = logging.StreamHandler()
+    add_logger_handler(the_logger, log_handler)
+
+    the_logger.propagate = False
+
+
+###############################################################################
+
+
+def add_logger_handler(the_logger, log_handler):  # default format
+    logger_fmt_string = _get_default_logging_fields()
+    formatter = CustomJsonFormatter(logger_fmt_string)
+    log_handler.setFormatter(formatter)
+    the_logger.addHandler(log_handler)
+
+
+def add_default_logging_into_file(the_logger, log_dir):
+    fname = 'log_{}.txt'.format(
+        datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S"))
+    ofpath = os.path.join(log_dir, fname)
+
+    log_handler_file = logging.FileHandler(filename=ofpath)
+    add_logger_handler(the_logger, log_handler_file)
+
+
+# runs on all formatters
+def change_formatters_default_values(the_logger, field_name, value):
+    for handler in the_logger.handlers:
+        hfaf = handler.formatter.additional_fields
+        if value is not None:
+            hfaf[field_name] = value
+        else:
+            hfaf.pop(field_name, None)
+
+
+def set_global_logger():
+    loglevel = os.getenv('LOG_LEVEL', 'TRACE')  # use the env to set loglevel
+    the_logger = logging.getLogger('logger')  # optional logger name
+    _construct_logger(the_logger, loglevel)
+    return the_logger
+
+
+def get_task_logger(task_id):
+    loglevel = os.getenv('LOG_LEVEL', 'TRACE')  # use the env to set loglevel
+    logger_name = 'task_{}'.format(task_id)
+    the_logger = logging.getLogger(logger_name)  # optional logger name
+    _construct_logger(the_logger, loglevel)
+    return the_logger
+
+
+logger = set_global_logger()

+ 7 - 0
bin/task/Dockerfile

@@ -0,0 +1,7 @@
+FROM registry.cn-hangzhou.aliyuncs.com/tuon-pub/matting-human:latest
+
+WORKDIR /app
+
+
+
+CMD ["python", "task.py"]

+ 3 - 1
requirements.txt

@@ -12,4 +12,6 @@ six
 scipy
 pillow
 werkzeug==2.0.2
-wtforms
+wtforms
+simplejson
+python-json-logger==0.1.8

+ 114 - 0
task.py

@@ -0,0 +1,114 @@
+import os
+import time
+import json
+import base64
+import traceback
+from _log import logger, EventType
+import tools
+import shutil
+import cv2
+
+
+def is_image(path: str):
+    img = cv2.imread(path)
+    if img is None:
+        return False
+    return True
+
+
+class Interface:
+    def __init__(self, interface=None):
+        if interface is None:
+            interface = os.getenv('PARAMS')
+        if interface is None:
+            interface = {
+
+            }
+
+        self.interface = self.decode_b64(interface)
+        self.op_type = self.get_op_type()
+        self.args = self.interface['args']
+
+        self.input = self.interface.get('input', '/input')
+        if not os.path.exists(self.input):
+            raise FileNotFoundError('The input file does not exist.')
+
+        self.result = self.interface.get('result', '/result')
+        if self.result is None:
+            raise FileNotFoundError('Need to give the result path.')
+
+        self.input_dic = []
+
+        self.IOU = self.args.get('IOU', 0.9)
+        self.template = self.args['template']
+        self.content = self.args.get('content')
+        self.precision = self.args.get('precision', 0.9)
+        self.time = self.args.get('time', 1)
+        self.divisor = self.args.get('divisor', [])  # {'coordinates': 0.4, 'properties': 0.3, 'imageResult': 0.3}
+        self.distance = self.args.get('distance')
+
+    @staticmethod
+    def decode_b64(interface):
+        interface = base64.b64decode(interface.encode('utf-8')).decode("utf-8")
+        interface = json.loads(interface)
+
+        assert isinstance(interface, dict), 'The interface must be a dictionary.'
+
+        return interface
+
+    @staticmethod
+    def encode_b64(string):
+        return base64.b64encode(string.encode('utf-8')).decode("utf-8")
+
+    def get_op_type(self):
+        op_type = self.interface.get('type', 'matting')
+
+        op_type = op_type.upper()
+        if op_type not in ['MATTING', 'RESIZE']:
+            raise TypeError('The type must be matting or resize')
+
+        return op_type
+
+    def scan_files(self, path: str):
+        if os.path.isdir(path):
+            self.scan_files(path)
+        else:
+            self.input_dic.append(path)
+
+    def run(self):
+        logger.info("files scanning.", extra={'event_type': EventType.STEP_COMPLETE})
+        self.scan_files(self.input)
+        files_in_total = len(self.input_dic)
+        logger.info("files-in-total",
+                    extra={'event_type': EventType.METRICS, 'desc': '处理文件总个数', 'value': files_in_total})
+        count = 0
+        for path in self.input_dic:
+            count += 1
+            logger.info("PROGRESS", extra={'event_type': EventType.PROGRESS, 'total': files_in_total, 'current': count})
+            try:
+                real_path = os.path.relpath(path, self.input)
+                shutil.copy(path, os.path.join(self.result, real_path))
+                if is_image(path):
+                    output = os.path.dirname(path)
+                    output = os.path.join(self.result, os.path.relpath(output, self.input))
+                    tools.seg(path, output)
+
+            except Exception as e:
+                logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
+
+
+def start(interface=None):
+    logger.info('TASK_STARTED', extra={'event_type': EventType.TASK_STARTED})
+    t = time.time()
+    try:
+        Interface(interface=interface).run()
+    except Exception as e:
+        logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
+    logger.info("TASK_FINISHED", extra={'event_type': EventType.TASK_FINISHED})
+    logger.info("TASK_FINISHED")
+    elapsed = time.time() - t
+    logger.info("time-elapsed", extra={'event_type': EventType.METRICS, 'desc': '总时长', 'value': elapsed})
+
+
+if __name__ == '__main__':
+    start()