|
@@ -0,0 +1,308 @@
|
|
|
+package com.gxzc.zen.common.util.upload
|
|
|
+
|
|
|
+import com.github.tobato.fastdfs.domain.StorePath
|
|
|
+import com.github.tobato.fastdfs.exception.FdfsServerException
|
|
|
+import com.github.tobato.fastdfs.service.AppendFileStorageClient
|
|
|
+import com.github.tobato.fastdfs.service.FastFileStorageClient
|
|
|
+import com.gxzc.zen.common.exception.ZenException
|
|
|
+import com.gxzc.zen.common.exception.ZenExceptionEnum
|
|
|
+import com.gxzc.zen.common.properties.UploadProperties
|
|
|
+import com.gxzc.zen.common.util.KeyLock
|
|
|
+import com.gxzc.zen.common.util.SpringContextHolder
|
|
|
+import com.gxzc.zen.common.util.upload.constants.CheckStatus
|
|
|
+import com.gxzc.zen.common.util.upload.constants.UploadStatus
|
|
|
+import com.gxzc.zen.common.util.upload.result.CheckResult
|
|
|
+import com.gxzc.zen.common.util.upload.result.UploadResult
|
|
|
+import org.apache.commons.io.FilenameUtils
|
|
|
+import org.slf4j.LoggerFactory
|
|
|
+import org.springframework.web.multipart.MultipartFile
|
|
|
+import java.io.BufferedInputStream
|
|
|
+import java.io.File
|
|
|
+import java.nio.file.Files
|
|
|
+import java.nio.file.Paths
|
|
|
+
|
|
|
+/**
|
|
|
+ *
|
|
|
+ * @author NorthLan
|
|
|
+ * @date 2018/8/29
|
|
|
+ * @url https://noahlan.com
|
|
|
+ */
|
|
|
+object FdfsUploadUtil {
|
|
|
+ private val logger = LoggerFactory.getLogger(FdfsUploadUtil::class.java)
|
|
|
+
|
|
|
+ private var uploadProperties = SpringContextHolder.getBean(UploadProperties::class.java)
|
|
|
+ get() {
|
|
|
+ if (field == null) {
|
|
|
+ field = SpringContextHolder.getBean(UploadProperties::class.java)
|
|
|
+ }
|
|
|
+ return field
|
|
|
+ }
|
|
|
+
|
|
|
+ private val appendFileStorageClient: AppendFileStorageClient by lazy { SpringContextHolder.getBean(AppendFileStorageClient::class.java)!! }
|
|
|
+ private val fastFileStorageClient: FastFileStorageClient by lazy { SpringContextHolder.getBean(FastFileStorageClient::class.java)!! }
|
|
|
+
|
|
|
+ // 针对文件md5加锁,保证上传合并文件时线程安全
|
|
|
+ private val checkMergeLock: KeyLock<Int> = KeyLock()
|
|
|
+ private val mergeLock: KeyLock<Int> = KeyLock()
|
|
|
+ private val uploadLock: KeyLock<Int> = KeyLock()
|
|
|
+ // private val mergeLock = ReentrantLock(true)
|
|
|
+ /**
|
|
|
+ * 检查文件是否存在,实现文件秒传
|
|
|
+ * 通过MD5查询数据库是否有记录
|
|
|
+ * 有记录: 查询真实文件是否存在
|
|
|
+ * 真实文件存在: 秒传
|
|
|
+ * 真实文件不存在: 数据库记录清理
|
|
|
+ * 无记录: 文件不存在,查询分片存在情况
|
|
|
+ * 状态:
|
|
|
+ * 1. 有数据库记录,有文件
|
|
|
+ * 2. 有数据库记录,无文件
|
|
|
+ * 3. 无数据库记录,文件分片不完整
|
|
|
+ * 4. 无数据库记录,文件分片完整
|
|
|
+ */
|
|
|
+ fun check(fdfsFile: FdfsFile?, fileMetadata: ZenFileMetadata): CheckResult {
|
|
|
+ val ret = CheckResult()
|
|
|
+ if (validateRequest(fileMetadata, null)) {
|
|
|
+ // fdfsFile != null 表示数据库中有记录,再次检查文件在FastDFS中是否存在
|
|
|
+ if (fdfsFile != null) {
|
|
|
+ // 数据库有记录
|
|
|
+ val fileInfo = try {
|
|
|
+ fastFileStorageClient.queryFileInfo(fdfsFile.group, fdfsFile.path)
|
|
|
+ } catch (e: FdfsServerException) {
|
|
|
+ null
|
|
|
+ }
|
|
|
+ if (fileInfo != null) {
|
|
|
+ // 文件存在
|
|
|
+ ret.checkStatus = CheckStatus.FDFS_DB_FILE
|
|
|
+ ret.uploadedChunks = mutableListOf()
|
|
|
+ for (i in 1..fileMetadata.totalChunks!!) {
|
|
|
+ ret.uploadedChunks!!.add(i)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 真实文件不存在
|
|
|
+ ret.checkStatus = CheckStatus.FDFS_DB_NO_FILE
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 数据库无记录
|
|
|
+ ret.checkStatus = CheckStatus.FDFS_NO_DB_FRAG_CHUNK
|
|
|
+ // 检查分片存在情况
|
|
|
+ for (i in 1..fileMetadata.totalChunks!!) {
|
|
|
+ if (chunkExists(i, fileMetadata.md5!!)) {
|
|
|
+ if (ret.uploadedChunks == null) {
|
|
|
+ ret.uploadedChunks = mutableListOf()
|
|
|
+ }
|
|
|
+ ret.uploadedChunks!!.add(i)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (ret.uploadedChunks != null && ret.uploadedChunks!!.size == fileMetadata.totalChunks) {
|
|
|
+ ret.checkStatus = CheckStatus.FDFS_NO_DB_FULL_CHUNK
|
|
|
+
|
|
|
+ val hashedMd5 = fileMetadata.md5!!.hashCode()
|
|
|
+ val tmpPath = uploadProperties!!.tmpPath!!
|
|
|
+
|
|
|
+ checkMergeLock.lock(hashedMd5)
|
|
|
+ try {
|
|
|
+ // 将分片依次上传到 fastdfs 中(同步操作)
|
|
|
+ // logger.warn("checking: 上传所有分片啊!!!")
|
|
|
+ val storePath = appendFileChunks(tmpPath, fileMetadata)
|
|
|
+ ret.fdfsFile = FdfsFile(storePath.group, storePath.path)
|
|
|
+ } finally {
|
|
|
+ checkMergeLock.unlock(hashedMd5)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw ZenException(ZenExceptionEnum.FILE_METADATA_VALIDATE_ERROR)
|
|
|
+ }
|
|
|
+ return ret
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 正常上传<br>
|
|
|
+ * 文件夹结构/文件名
|
|
|
+ */
|
|
|
+ fun upload(fileMetadata: ZenFileMetadata, file: MultipartFile): UploadResult {
|
|
|
+ val tmpPath = uploadProperties!!.tmpPath!!
|
|
|
+ val chunkSize = uploadProperties!!.chunkSize!!
|
|
|
+ var storePath: StorePath? = null
|
|
|
+ if (validateRequest(fileMetadata, file)) {
|
|
|
+ val filename = fileMetadata.filename!!
|
|
|
+ val fileExtName = FilenameUtils.getExtension(filename)
|
|
|
+ val hashedMd5 = fileMetadata.md5!!.hashCode()
|
|
|
+
|
|
|
+ // 如果分片小于chunkSize && totalChunk = 1 表示文件只有一个分片
|
|
|
+ if (fileMetadata.totalChunks == 1 && fileMetadata.chunkSize!! <= chunkSize) {
|
|
|
+ // 这里也需要加锁,避免并发上传失败
|
|
|
+ uploadLock.lock(hashedMd5)
|
|
|
+ try {
|
|
|
+ file.inputStream.use {
|
|
|
+ storePath = appendFileStorageClient.uploadAppenderFile(null, it, file.size, fileExtName)
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ uploadLock.unlock(hashedMd5)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 保存分片到本地磁盘临时路径
|
|
|
+ val chunkFilename = getChunkFilename(tmpPath, fileMetadata.chunkNumber, fileMetadata.md5)
|
|
|
+ val directory = Paths.get(tmpPath)
|
|
|
+ if (Files.notExists(directory)) {
|
|
|
+ Files.createDirectories(directory)
|
|
|
+ }
|
|
|
+ file.transferTo(File(chunkFilename))
|
|
|
+
|
|
|
+ mergeLock.lock(hashedMd5)
|
|
|
+ try {
|
|
|
+ if (checkChunks(tmpPath, fileMetadata)) {
|
|
|
+ // 将分片依次上传到 fastdfs 中(同步操作)
|
|
|
+ // logger.warn("上传所有分片啊!!! ${fileMetadata.chunkNumber} ${fileMetadata.totalChunks}")
|
|
|
+ storePath = appendFileChunks(tmpPath, fileMetadata)
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ mergeLock.unlock(hashedMd5)
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw ZenException(ZenExceptionEnum.FILE_METADATA_VALIDATE_ERROR)
|
|
|
+ }
|
|
|
+ var status: String = UploadStatus.UPLOADING
|
|
|
+ if (storePath != null) {
|
|
|
+ status = UploadStatus.UPLOADED
|
|
|
+ }
|
|
|
+ return UploadResult().apply {
|
|
|
+ this.status = status
|
|
|
+ this.storePath = storePath
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将单个文件所有分片上传到FastDFS中
|
|
|
+ */
|
|
|
+ private fun appendFileChunks(sourceRootPath: String, fileMetadata: ZenFileMetadata): StorePath {
|
|
|
+ val filename = fileMetadata.filename!!
|
|
|
+ val fileExtName = FilenameUtils.getExtension(filename)
|
|
|
+ val totalChunks = fileMetadata.totalChunks!!
|
|
|
+ val md5 = fileMetadata.md5!!
|
|
|
+ // val hashedMd5 = md5.hashCode()
|
|
|
+ // val chunkSize = uploadProperties!!.chunkSize!!
|
|
|
+ var storePath: StorePath? = null
|
|
|
+ for (i in 1..totalChunks) {
|
|
|
+ val sourceFile = File(getChunkFilename(sourceRootPath, i, md5))
|
|
|
+ val inputStream = BufferedInputStream(sourceFile.inputStream())
|
|
|
+ try {
|
|
|
+ if (i == 1) {
|
|
|
+ // first chunk
|
|
|
+ try {
|
|
|
+ storePath = appendFileStorageClient.uploadAppenderFile(null, inputStream, sourceFile.length(), fileExtName)
|
|
|
+ } catch (e: FdfsServerException) {
|
|
|
+ if (e.errorCode != 16 && e.errorCode != 2) {
|
|
|
+ throw ZenException(403, e.message!!)
|
|
|
+ } else {
|
|
|
+ throw e // 2:找不到节点或文件 16:服务器忙 可进行重试 ,其它情况均视为失败
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (storePath == null) {
|
|
|
+ throw ZenException(ZenExceptionEnum.FILE_FRAG_UPLOAD_FAILURE)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (storePath == null) {
|
|
|
+ throw ZenException(ZenExceptionEnum.FILE_FRAG_UPLOAD_FAILURE)
|
|
|
+ } else {
|
|
|
+ appendFileStorageClient.appendFile(storePath.group, storePath.path, inputStream, sourceFile.length())
|
|
|
+ // appendFileStorageClient.modifyFile(storePath.group, storePath.path, inputStream, sourceFile.length(), i * chunkSize)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ inputStream.close()
|
|
|
+ // 删除分片
|
|
|
+ sourceFile.delete()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return storePath!!
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查分片是否存在 (tmp目录)
|
|
|
+ */
|
|
|
+ private fun chunkExists(chunkNumber: Int?, md5: String): Boolean {
|
|
|
+ val tmpPath = uploadProperties!!.tmpPath!!
|
|
|
+ return Files.exists(Paths.get(getChunkFilename(tmpPath, chunkNumber, md5)))
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查分块是否上传完毕
|
|
|
+ */
|
|
|
+ private fun checkChunks(tmpPath: String, fileMetadata: ZenFileMetadata): Boolean {
|
|
|
+ val totalChunks = fileMetadata.totalChunks!!
|
|
|
+ val md5 = fileMetadata.md5!!
|
|
|
+ for (i in 1..totalChunks) {
|
|
|
+ if (Files.notExists(Paths.get(getChunkFilename(tmpPath, i, md5)))) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 不能这样判断,因为chunk是无序的
|
|
|
+ // return fileMetadata.chunkNumber == fileMetadata.totalChunks
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成 分片文件名
|
|
|
+ */
|
|
|
+ private fun getChunkFilename(path: String, chunkNumber: Int?, identifier: String?): String {
|
|
|
+ return "$path/upload-$identifier.$chunkNumber"
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * validate request multipart chunks
|
|
|
+ */
|
|
|
+ private fun validateRequest(fileMetadata: ZenFileMetadata, file: MultipartFile?): Boolean {
|
|
|
+ val md5 = fileMetadata.md5
|
|
|
+ val chunkNumber = fileMetadata.chunkNumber
|
|
|
+ val chunkSize = fileMetadata.chunkSize
|
|
|
+ val totalSize = fileMetadata.totalSize
|
|
|
+ val filename = fileMetadata.filename
|
|
|
+
|
|
|
+ if (chunkNumber == null || chunkNumber <= 0 ||
|
|
|
+ chunkSize == null || chunkSize <= 0 ||
|
|
|
+ totalSize == null || totalSize <= 0 ||
|
|
|
+ md5 == null || md5.isEmpty() ||
|
|
|
+ filename == null || filename.isEmpty()) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ val numberOfChunks = Math.max(Math.floor(totalSize / (chunkSize * 1.0)), 1.0).toInt()
|
|
|
+ if (chunkNumber > numberOfChunks) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ // is the file too large?
|
|
|
+ uploadProperties?.let {
|
|
|
+ val maxFileSize = it.maxFileSize
|
|
|
+ if (maxFileSize != null && maxFileSize > 0) {
|
|
|
+ if (totalSize > maxFileSize) {
|
|
|
+ logger.error("filesize limit: [${maxFileSize / 1024 / 1024} MB], now [${totalSize / 1024 / 1024} MB]")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (file != null) {
|
|
|
+ // The chunk in the POST request isn't the correct size
|
|
|
+ if (chunkNumber < numberOfChunks && file.size != chunkSize) {
|
|
|
+ logger.error("The chunk in the POST request isn't the correct size")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ // The chunks in the POST is the last one, and the fil is not the correct size
|
|
|
+ if (numberOfChunks > 1 && chunkNumber == numberOfChunks && file.size != (totalSize % chunkSize) + chunkSize) {
|
|
|
+ logger.error("The chunks in the POST is the last one, and the fil is not the correct size")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ // The file is only a single chunk, and the data size does not fit
|
|
|
+ if (numberOfChunks == 1 && file.size != totalSize) {
|
|
|
+ logger.error("The file is only a single chunk, and the data size does not fit")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ }
|
|
|
+}
|