task.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import os
  2. import time
  3. import json
  4. import base64
  5. import traceback
  6. from _log import logger, EventType
  7. import tools
  8. import shutil
  9. import cv2
  10. import copy
  11. def is_image(path: str):
  12. img = cv2.imread(path)
  13. if img is None:
  14. return False
  15. return True
  16. class Interface:
  17. def __init__(self, interface=None):
  18. if interface is None:
  19. interface = os.getenv('PARAMS')
  20. if interface is None:
  21. interface = '{"width":512}'
  22. self.interface = self.decode_b64(interface)
  23. self.input = self.interface.get('input', '/input')
  24. if not os.path.exists(self.input):
  25. raise FileNotFoundError('The input file does not exist.')
  26. self.result = self.interface.get('result', '/output')
  27. if self.result is None:
  28. os.makedirs(str(self.result))
  29. self.params = tools.ProcessorParams(self.interface.get('width'),
  30. self.interface.get('height'),
  31. self.interface.get('rotate'),
  32. self.interface.get('flip'),
  33. self.interface.get('rect'),
  34. self.result,
  35. None)
  36. self.input_dic = []
  37. @staticmethod
  38. def decode_b64(interface):
  39. try:
  40. interface = base64.b64decode(interface.encode('utf-8')).decode("utf-8")
  41. except:
  42. interface = interface.encode('utf-8')
  43. logger.info("PREPARE", extra={'event_type': EventType.PROGRESS, 'params': interface})
  44. interface = json.loads(interface)
  45. assert isinstance(interface, dict), 'The interface must be a dictionary.'
  46. return interface
  47. @staticmethod
  48. def encode_b64(string):
  49. return base64.b64encode(string.encode('utf-8')).decode("utf-8")
  50. def scan_files(self, path: str):
  51. if not os.path.isdir(path):
  52. self.input_dic.append(path)
  53. return
  54. files = os.listdir(path)
  55. for file in files:
  56. self.scan_files(os.path.join(path, file))
  57. def run(self):
  58. logger.info("files scanning.", extra={'event_type': EventType.STEP_COMPLETE})
  59. self.scan_files(self.input)
  60. files_in_total = len(self.input_dic)
  61. logger.info("files-in-total",
  62. extra={'event_type': EventType.METRICS, 'desc': '处理文件总个数', 'value': files_in_total})
  63. count = 0
  64. for path in self.input_dic:
  65. count += 1
  66. logger.info("PROGRESS", extra={'event_type': EventType.PROGRESS, 'total': files_in_total, 'current': count})
  67. try:
  68. real_path = os.path.dirname(path)
  69. real_path = os.path.relpath(real_path, self.input)
  70. output = os.path.join(self.result, real_path)
  71. if not os.path.exists(output):
  72. os.makedirs(output)
  73. params = copy.deepcopy(self.params)
  74. params.set_path(path, output)
  75. if not tools.processor(params):
  76. shutil.copy(path, os.path.join(output, os.path.basename(path)))
  77. except Exception as e:
  78. logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
  79. def start(interface=None):
  80. logger.info('TASK_STARTED', extra={'event_type': EventType.TASK_STARTED})
  81. t = time.time()
  82. try:
  83. Interface(interface=interface).run()
  84. except Exception as e:
  85. logger.info(e, extra={'event_type': EventType.TASK_CRASHED, 'traceback': traceback.format_exc()})
  86. logger.info("TASK_FINISHED", extra={'event_type': EventType.TASK_FINISHED})
  87. logger.info("TASK_FINISHED")
  88. elapsed = time.time() - t
  89. logger.info("time-elapsed", extra={'event_type': EventType.METRICS, 'desc': '总时长', 'value': elapsed})
  90. if __name__ == '__main__':
  91. start()