diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index cebe2b77c..166809d6a 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -262,11 +262,33 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]st return fs.newMultipartUpload(bucket, object, meta) } -// putObjectPart - reads incoming data until EOF for the part file on +// PutObjectPart - reads incoming data until EOF for the part file on // an ongoing multipart transaction. Internally incoming data is // written to '.minio/tmp' location and safely renamed to // '.minio/multipart' for reach parts. -func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { +func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return "", BucketNameInvalid{Bucket: bucket} + } + // Verify whether the bucket exists. + if !fs.isBucketExist(bucket) { + return "", BucketNotFound{Bucket: bucket} + } + if !IsValidObjectName(object) { + return "", ObjectNameInvalid{Bucket: bucket, Object: object} + } + + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + + nsMutex.RLock(minioMetaBucket, uploadIDPath) + // Just check if the uploadID exists to avoid copy if it doesn't. + uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID) + nsMutex.RUnlock(minioMetaBucket, uploadIDPath) + if !uploadIDExists { + return "", InvalidUploadID{UploadID: uploadID} + } + // Hold write lock on the part so that there is no parallel upload on the part. nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) @@ -304,7 +326,15 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, } } - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + // Hold write lock as we are updating fs.json + nsMutex.Lock(minioMetaBucket, uploadIDPath) + defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) + + // Just check if the uploadID exists to avoid copy if it doesn't. + if !fs.isUploadIDExists(bucket, object, uploadID) { + return "", InvalidUploadID{UploadID: uploadID} + } + fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) if err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) @@ -334,35 +364,6 @@ func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, return newMD5Hex, nil } -// PutObjectPart - reads incoming stream and internally erasure codes -// them. This call is similar to single put operation but it is part -// of the multipart transcation. -// -// Implements S3 compatible Upload Part API. -func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} - } - // Verify whether the bucket exists. - if !fs.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} - } - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} - } - - // Hold read lock on the uploadID so that no one aborts it. - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - - if !fs.isUploadIDExists(bucket, object, uploadID) { - return "", InvalidUploadID{UploadID: uploadID} - } - md5Sum, err := fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex) - return md5Sum, err -} - // listObjectParts - wrapper scanning through // '.minio/multipart/bucket/object/UPLOADID'. Lists all the parts // saved inside '.minio/multipart/bucket/object/UPLOADID'.