123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import os
- import cv2
- from flask import request, jsonify, send_file
- from werkzeug.utils import secure_filename
- from werkzeug.datastructures import FileStorage
- import uuid
- from app import app
- from .resp import success_resp, error_resp
- import time
- from .req import is_empty
- UPLOAD_PATH = ""
- OUTPUT_DIR = ""
- class MimeTypes:
- png = "image/png"
- stream = "application/octet-stream"
- def get_upload_dir():
- return UPLOAD_PATH
- def get_output_dir():
- return OUTPUT_DIR
- def set_options(upload_path, save_dir):
- global UPLOAD_PATH, OUTPUT_DIR
- UPLOAD_PATH = upload_path
- OUTPUT_DIR = save_dir
- print("dirs", UPLOAD_PATH, OUTPUT_DIR)
- @app.route("/outputs/<path:filename>")
- def get_outputs_files(filename):
- # 处理后的文件
- download = is_empty(request.values.get('download')) is False
- mimetype = MimeTypes.stream if download else MimeTypes.png
- return send_file(os.path.join(OUTPUT_DIR, filename), mimetype=mimetype)
- @app.route("/uploads/<path:filename>")
- def get_upload_files(filename):
- # 用户上传的原始文件
- download = is_empty(request.values.get('download')) is False
- mimetype = MimeTypes.stream if download else MimeTypes.png
- return send_file(os.path.join(UPLOAD_PATH, filename), mimetype=mimetype)
- @app.route("/upload", methods=['POST'])
- def upload_file():
- # 上传图片
- if 'file' not in request.files:
- return jsonify(error_resp("请选择图片!"))
- file = request.files['file']
- file_md5 = request.form.get('fileMd5')
- file_path, file_id = save_upload_file(file, file_md5)
- img = cv2.imread(file_path)
- return jsonify(success_resp({
- "fileId": file_id,
- "url": file_url(file_path),
- "height": img.shape[0],
- "width": img.shape[1]
- }))
- @app.route("/download", methods = ['GET'])
- def download_file():
- file_id = request.values.get("fileId")
- file_path = get_file_id_path(file_id)
- if file_path is None:
- return "文件不存在", 404
- return send_file(file_path, mimetype="application/octet-stream", attachment_filename=file_id)
- def file_url(path, output=False):
- if output is True:
- file_id = os.path.relpath(path, OUTPUT_DIR)
- return "/outputs/{}".format(file_id)
- file_id = os.path.relpath(path, UPLOAD_PATH)
- return "/uploads/{}".format(file_id)
- def save_upload_file(file: FileStorage, md5 = None):
- # 如果区分用户,加文件前缀或者文件夹名称
- if file.filename == '':
- filename = "unknown.png"
- else:
- filename = secure_filename(file.filename)
- paths = os.path.splitext(filename)
- if md5 is not None and len(md5) > 0:
- filename = "{}_{}{}".format(paths[0], md5, paths[1])
- else:
- t = time.strftime("%Y%m%d%H%M%S", time.localtime())
- filename = "{}_{}_{}{}".format(paths[0], t, uuid.uuid4().hex, paths[1])
- file_path = get_upload_file_path(filename)
- if os.path.exists(file_path) is False:
- file.save(file_path)
- return file_path, filename
- def get_output_file_path(filename: str, suffix: str = None, fun: str = None):
- if filename == '':
- filename = "unknown.png"
- else:
- filename = secure_filename(filename)
- paths = os.path.splitext(filename)
- if suffix is None or len(suffix) == 0:
- suffix = time.strftime("%Y%m%d%H%M%S", time.localtime())
- if fun is not None:
- suffix = "{}_{}".format(fun, suffix)
- filename = "{}_{}{}".format(paths[0], suffix, paths[1])
- return os.path.join(OUTPUT_DIR, filename)
- def get_file_id(path, output=False):
- if output:
- file_id = os.path.relpath(path, OUTPUT_DIR)
- return "o_{}".format(file_id)
- file_id = os.path.relpath(path, UPLOAD_PATH)
- return file_id
- def get_file_id_path(filename: str):
- # 根据文件名称,获取文件的路径
- if str is None or len(filename) == 0:
- return None
- if filename.startswith("o_"):
- real_name = filename[2:]
- path = os.path.join(OUTPUT_DIR, real_name)
- if os.path.exists(path):
- return path
- path = os.path.join(UPLOAD_PATH, filename)
- if os.path.exists(path):
- return path
- return None
- def get_upload_file_path(name):
- return os.path.join(UPLOAD_PATH, name)
|