diff --git a/cmd/fs-v1-multipart-common.go b/cmd/fs-v1-multipart-common.go deleted file mode 100644 index 47b63690e..000000000 --- a/cmd/fs-v1-multipart-common.go +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "fmt" - "io" - "runtime" - "time" - - pathutil "path" - - "github.com/minio/minio/pkg/lock" -) - -// Returns if the prefix is a multipart upload. -func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { - uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile) - _, err := fsStatFile(uploadsIDPath) - if err != nil { - if err == errFileNotFound { - return false - } - errorIf(err, "Unable to access uploads.json "+uploadsIDPath) - return false - } - return true -} - -// Delete uploads.json file wrapper handling a tricky case on windows. -func (fs fsObjects) deleteUploadsJSON(bucket, object, uploadID string) error { - timeID := fmt.Sprintf("%X", time.Now().UTC().UnixNano()) - tmpPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"+"+timeID) - - multipartBucketPath := pathJoin(fs.fsPath, minioMetaMultipartBucket) - uploadPath := pathJoin(multipartBucketPath, bucket, object) - uploadsMetaPath := pathJoin(uploadPath, uploadsJSONFile) - - // Special case for windows please read through. - if runtime.GOOS == globalWindowsOSName { - // Ordinarily windows does not permit deletion or renaming of files still - // in use, but if all open handles to that file were opened with FILE_SHARE_DELETE - // then it can permit renames and deletions of open files. - // - // There are however some gotchas with this, and it is worth listing them here. - // Firstly, Windows never allows you to really delete an open file, rather it is - // flagged as delete pending and its entry in its directory remains visible - // (though no new file handles may be opened to it) and when the very last - // open handle to the file in the system is closed, only then is it truly - // deleted. Well, actually only sort of truly deleted, because Windows only - // appears to remove the file entry from the directory, but in fact that - // entry is merely hidden and actually still exists and attempting to create - // a file with the same name will return an access denied error. How long it - // silently exists for depends on a range of factors, but put it this way: - // if your code loops creating and deleting the same file name as you might - // when operating a lock file, you're going to see lots of random spurious - // access denied errors and truly dismal lock file performance compared to POSIX. - // - // We work-around these un-POSIX file semantics by taking a dual step to - // deleting files. Firstly, it renames the file to tmp location into multipartTmpBucket - // We always open files with FILE_SHARE_DELETE permission enabled, with that - // flag Windows permits renaming and deletion, and because the name was changed - // to a very random name somewhere not in its origin directory before deletion, - // you don't see those unexpected random errors when creating files with the - // same name as a recently deleted file as you do anywhere else on Windows. - // Because the file is probably not in its original containing directory any more, - // deletions of that directory will not fail with “directory not empty” as they - // otherwise normally would either. - fsRenameFile(uploadsMetaPath, tmpPath) - - // Proceed to deleting the directory. - if err := fsDeleteFile(multipartBucketPath, uploadPath); err != nil { - return err - } - - // Finally delete the renamed file. - return fsDeleteFile(pathutil.Dir(tmpPath), tmpPath) - } - return fsDeleteFile(multipartBucketPath, uploadsMetaPath) -} - -// Removes the uploadID, called either by CompleteMultipart of AbortMultipart. If the resuling uploads -// slice is empty then we remove/purge the file. -func (fs fsObjects) removeUploadID(bucket, object, uploadID string, rwlk *lock.LockedFile) error { - uploadIDs := uploadsV1{} - _, err := uploadIDs.ReadFrom(rwlk) - if err != nil { - return err - } - - // Removes upload id from the uploads list. - uploadIDs.RemoveUploadID(uploadID) - - // Check this is the last entry. - if uploadIDs.IsEmpty() { - // No more uploads left, so we delete `uploads.json` file. - return fs.deleteUploadsJSON(bucket, object, uploadID) - } // else not empty - - // Write update `uploads.json`. - _, err = uploadIDs.WriteTo(rwlk) - return err -} - -// Adds a new uploadID if no previous `uploads.json` is -// found we initialize a new one. -func (fs fsObjects) addUploadID(bucket, object, uploadID string, initiated time.Time, rwlk *lock.LockedFile) error { - uploadIDs := uploadsV1{} - - _, err := uploadIDs.ReadFrom(rwlk) - // For all unexpected errors, we return. - if err != nil && errorCause(err) != io.EOF { - return err - } - - // If we couldn't read anything, we assume a default - // (empty) upload info. - if errorCause(err) == io.EOF { - uploadIDs = newUploadsV1("fs") - } - - // Adds new upload id to the list. - uploadIDs.AddUploadID(uploadID, initiated) - - // Write update `uploads.json`. - _, err = uploadIDs.WriteTo(rwlk) - return err -} diff --git a/cmd/fs-v1-multipart-common_test.go b/cmd/fs-v1-multipart-common_test.go deleted file mode 100644 index 026799580..000000000 --- a/cmd/fs-v1-multipart-common_test.go +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "path/filepath" - "testing" -) - -// TestFSWriteUploadJSON - tests for writeUploadJSON for FS -func TestFSWriteUploadJSON(t *testing.T) { - // Prepare for tests - disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) - defer removeAll(disk) - - obj := initFSObjects(disk, t) - - bucketName := "bucket" - objectName := "object" - - obj.MakeBucket(bucketName) - _, err := obj.NewMultipartUpload(bucketName, objectName, nil) - if err != nil { - t.Fatal("Unexpected err: ", err) - } - - // newMultipartUpload will fail. - removeAll(disk) // Remove disk. - _, err = obj.NewMultipartUpload(bucketName, objectName, nil) - if err != nil { - if _, ok := errorCause(err).(BucketNotFound); !ok { - t.Fatal("Unexpected err: ", err) - } - } -} diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 800cb2bb9..a9f55a5ea 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -24,12 +24,128 @@ import ( "io" "os" pathutil "path" + "runtime" "strings" "time" + "github.com/minio/minio/pkg/lock" "github.com/minio/sha256-simd" ) +// Returns if the prefix is a multipart upload. +func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { + uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile) + _, err := fsStatFile(uploadsIDPath) + if err != nil { + if err == errFileNotFound { + return false + } + errorIf(err, "Unable to access uploads.json "+uploadsIDPath) + return false + } + return true +} + +// Delete uploads.json file wrapper handling a tricky case on windows. +func (fs fsObjects) deleteUploadsJSON(bucket, object, uploadID string) error { + timeID := fmt.Sprintf("%X", time.Now().UTC().UnixNano()) + tmpPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"+"+timeID) + + multipartBucketPath := pathJoin(fs.fsPath, minioMetaMultipartBucket) + uploadPath := pathJoin(multipartBucketPath, bucket, object) + uploadsMetaPath := pathJoin(uploadPath, uploadsJSONFile) + + // Special case for windows please read through. + if runtime.GOOS == globalWindowsOSName { + // Ordinarily windows does not permit deletion or renaming of files still + // in use, but if all open handles to that file were opened with FILE_SHARE_DELETE + // then it can permit renames and deletions of open files. + // + // There are however some gotchas with this, and it is worth listing them here. + // Firstly, Windows never allows you to really delete an open file, rather it is + // flagged as delete pending and its entry in its directory remains visible + // (though no new file handles may be opened to it) and when the very last + // open handle to the file in the system is closed, only then is it truly + // deleted. Well, actually only sort of truly deleted, because Windows only + // appears to remove the file entry from the directory, but in fact that + // entry is merely hidden and actually still exists and attempting to create + // a file with the same name will return an access denied error. How long it + // silently exists for depends on a range of factors, but put it this way: + // if your code loops creating and deleting the same file name as you might + // when operating a lock file, you're going to see lots of random spurious + // access denied errors and truly dismal lock file performance compared to POSIX. + // + // We work-around these un-POSIX file semantics by taking a dual step to + // deleting files. Firstly, it renames the file to tmp location into multipartTmpBucket + // We always open files with FILE_SHARE_DELETE permission enabled, with that + // flag Windows permits renaming and deletion, and because the name was changed + // to a very random name somewhere not in its origin directory before deletion, + // you don't see those unexpected random errors when creating files with the + // same name as a recently deleted file as you do anywhere else on Windows. + // Because the file is probably not in its original containing directory any more, + // deletions of that directory will not fail with "directory not empty" as they + // otherwise normally would either. + fsRenameFile(uploadsMetaPath, tmpPath) + + // Proceed to deleting the directory. + if err := fsDeleteFile(multipartBucketPath, uploadPath); err != nil { + return err + } + + // Finally delete the renamed file. + return fsDeleteFile(pathutil.Dir(tmpPath), tmpPath) + } + return fsDeleteFile(multipartBucketPath, uploadsMetaPath) +} + +// Removes the uploadID, called either by CompleteMultipart of AbortMultipart. If the resuling uploads +// slice is empty then we remove/purge the file. +func (fs fsObjects) removeUploadID(bucket, object, uploadID string, rwlk *lock.LockedFile) error { + uploadIDs := uploadsV1{} + _, err := uploadIDs.ReadFrom(rwlk) + if err != nil { + return err + } + + // Removes upload id from the uploads list. + uploadIDs.RemoveUploadID(uploadID) + + // Check this is the last entry. + if uploadIDs.IsEmpty() { + // No more uploads left, so we delete `uploads.json` file. + return fs.deleteUploadsJSON(bucket, object, uploadID) + } // else not empty + + // Write update `uploads.json`. + _, err = uploadIDs.WriteTo(rwlk) + return err +} + +// Adds a new uploadID if no previous `uploads.json` is +// found we initialize a new one. +func (fs fsObjects) addUploadID(bucket, object, uploadID string, initiated time.Time, rwlk *lock.LockedFile) error { + uploadIDs := uploadsV1{} + + _, err := uploadIDs.ReadFrom(rwlk) + // For all unexpected errors, we return. + if err != nil && errorCause(err) != io.EOF { + return err + } + + // If we couldn't read anything, we assume a default + // (empty) upload info. + if errorCause(err) == io.EOF { + uploadIDs = newUploadsV1("fs") + } + + // Adds new upload id to the list. + uploadIDs.AddUploadID(uploadID, initiated) + + // Write update `uploads.json`. + _, err = uploadIDs.WriteTo(rwlk) + return err +} + // listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int) ([]uploadMetadata, bool, error) { var uploads []uploadMetadata diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index ee6126fed..75ef537a5 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -22,6 +22,33 @@ import ( "testing" ) +// TestFSWriteUploadJSON - tests for writeUploadJSON for FS +func TestFSWriteUploadJSON(t *testing.T) { + // Prepare for tests + disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) + defer removeAll(disk) + + obj := initFSObjects(disk, t) + + bucketName := "bucket" + objectName := "object" + + obj.MakeBucket(bucketName) + _, err := obj.NewMultipartUpload(bucketName, objectName, nil) + if err != nil { + t.Fatal("Unexpected err: ", err) + } + + // newMultipartUpload will fail. + removeAll(disk) // Remove disk. + _, err = obj.NewMultipartUpload(bucketName, objectName, nil) + if err != nil { + if _, ok := errorCause(err).(BucketNotFound); !ok { + t.Fatal("Unexpected err: ", err) + } + } +} + // TestNewMultipartUploadFaultyDisk - test NewMultipartUpload with faulty disks func TestNewMultipartUploadFaultyDisk(t *testing.T) { // Prepare for tests diff --git a/cmd/xl-v1-multipart-common.go b/cmd/xl-v1-multipart-common.go deleted file mode 100644 index 1734c88b0..000000000 --- a/cmd/xl-v1-multipart-common.go +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "path" - "sync" - "time" -) - -// updateUploadJSON - add or remove upload ID info in all `uploads.json`. -func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, isRemove bool) error { - uploadsPath := path.Join(bucket, object, uploadsJSONFile) - tmpUploadsPath := mustGetUUID() - - // slice to store errors from disks - errs := make([]error, len(xl.storageDisks)) - // slice to store if it is a delete operation on a disk - isDelete := make([]bool, len(xl.storageDisks)) - - wg := sync.WaitGroup{} - for index, disk := range xl.storageDisks { - if disk == nil { - errs[index] = traceError(errDiskNotFound) - continue - } - // Update `uploads.json` in a go routine. - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - - // read and parse uploads.json on this disk - uploadsJSON, err := readUploadsJSON(bucket, object, disk) - if errorCause(err) == errFileNotFound { - // If file is not found, we assume an - // default (empty) upload info. - uploadsJSON, err = newUploadsV1("xl"), nil - } - // If we have a read error, we store error and - // exit. - if err != nil { - errs[index] = err - return - } - - if !isRemove { - // Add the uploadID - uploadsJSON.AddUploadID(uploadID, initiated) - } else { - // Remove the upload ID - uploadsJSON.RemoveUploadID(uploadID) - if len(uploadsJSON.Uploads) == 0 { - isDelete[index] = true - } - } - - // For delete, rename to tmp, for the - // possibility of recovery in case of quorum - // failure. - if !isDelete[index] { - errs[index] = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, disk) - } else { - wErr := disk.RenameFile(minioMetaMultipartBucket, uploadsPath, minioMetaTmpBucket, tmpUploadsPath) - if wErr != nil { - errs[index] = traceError(wErr) - } - - } - }(index, disk) - } - - // Wait for all the writes to finish. - wg.Wait() - - // Do we have write quorum? - if !isDiskQuorum(errs, xl.writeQuorum) { - // No quorum. Perform cleanup on the minority of disks - // on which the operation succeeded. - - // There are two cases: - // - // 1. uploads.json file was updated -> we delete the - // file that we successfully overwrote on the - // minority of disks, so that the failed quorum - // operation is not partially visible. - // - // 2. uploads.json was deleted -> in this case since - // the delete failed, we restore from tmp. - for index, disk := range xl.storageDisks { - if disk == nil || errs[index] != nil { - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - if !isDelete[index] { - _ = disk.DeleteFile( - minioMetaMultipartBucket, - uploadsPath, - ) - } else { - _ = disk.RenameFile( - minioMetaTmpBucket, tmpUploadsPath, - minioMetaMultipartBucket, uploadsPath, - ) - } - }(index, disk) - } - wg.Wait() - return traceError(errXLWriteQuorum) - } - - // we do have quorum, so in case of delete upload.json file - // operation, we purge from tmp. - for index, disk := range xl.storageDisks { - if disk == nil || !isDelete[index] { - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - // isDelete[index] = true at this point. - _ = disk.DeleteFile(minioMetaTmpBucket, tmpUploadsPath) - }(index, disk) - } - wg.Wait() - - if reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { - return reducedErr - } - return nil -} - -// addUploadID - add upload ID and its initiated time to 'uploads.json'. -func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time) error { - return xl.updateUploadJSON(bucket, object, uploadID, initiated, false) -} - -// removeUploadID - remove upload ID in 'uploads.json'. -func (xl xlObjects) removeUploadID(bucket, object string, uploadID string) error { - return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, true) -} - -// Returns if the prefix is a multipart upload. -func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { - for _, disk := range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - _, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) - if err == nil { - return true - } - // For any reason disk was deleted or goes offline, continue - if isErrIgnored(err, objMetadataOpIgnoredErrs...) { - continue - } - break - } - return false -} - -// isUploadIDExists - verify if a given uploadID exists and is valid. -func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool { - uploadIDPath := path.Join(bucket, object, uploadID) - return xl.isObject(minioMetaMultipartBucket, uploadIDPath) -} - -// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket -func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) { - curpartPath := path.Join(bucket, object, uploadID, partName) - wg := sync.WaitGroup{} - for i, disk := range xl.storageDisks { - if disk == nil { - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - // Ignoring failure to remove parts that weren't present in CompleteMultipartUpload - // requests. xl.json is the authoritative source of truth on which parts constitute - // the object. The presence of parts that don't belong in the object doesn't affect correctness. - _ = disk.DeleteFile(minioMetaMultipartBucket, curpartPath) - }(i, disk) - } - wg.Wait() -} - -// statPart - returns fileInfo structure for a successful stat on part file. -func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) { - partNamePath := path.Join(bucket, object, uploadID, partName) - for _, disk := range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - fileInfo, err = disk.StatFile(minioMetaMultipartBucket, partNamePath) - if err == nil { - return fileInfo, nil - } - err = traceError(err) - // For any reason disk was deleted or goes offline we continue to next disk. - if isErrIgnored(err, objMetadataOpIgnoredErrs...) { - continue - } - - // Catastrophic error, we return. - break - } - return FileInfo{}, err -} - -// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. -func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) error { - var wg = &sync.WaitGroup{} - var mErrs = make([]error, len(disks)) - - srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile) - dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile) - - // Rename `xl.json` to all disks in parallel. - for index, disk := range disks { - if disk == nil { - mErrs[index] = traceError(errDiskNotFound) - continue - } - wg.Add(1) - // Rename `xl.json` in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - // Delete any dangling directories. - defer disk.DeleteFile(srcBucket, srcPrefix) - - // Renames `xl.json` from source prefix to destination prefix. - rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) - if rErr != nil { - mErrs[index] = traceError(rErr) - return - } - mErrs[index] = nil - }(index, disk) - } - // Wait for all the routines. - wg.Wait() - - // Do we have write Quorum?. - if !isDiskQuorum(mErrs, quorum) { - // Delete all `xl.json` successfully renamed. - deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs) - return traceError(errXLWriteQuorum) - } - - if reducedErr := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum); reducedErr != nil { - return reducedErr - } - return nil -} diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 58f421121..92fafc35e 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -25,12 +25,260 @@ import ( "io/ioutil" "path" "strings" + "sync" "time" "github.com/minio/minio/pkg/mimedb" "github.com/minio/sha256-simd" ) +// updateUploadJSON - add or remove upload ID info in all `uploads.json`. +func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, isRemove bool) error { + uploadsPath := path.Join(bucket, object, uploadsJSONFile) + tmpUploadsPath := mustGetUUID() + + // slice to store errors from disks + errs := make([]error, len(xl.storageDisks)) + // slice to store if it is a delete operation on a disk + isDelete := make([]bool, len(xl.storageDisks)) + + wg := sync.WaitGroup{} + for index, disk := range xl.storageDisks { + if disk == nil { + errs[index] = traceError(errDiskNotFound) + continue + } + // Update `uploads.json` in a go routine. + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + + // read and parse uploads.json on this disk + uploadsJSON, err := readUploadsJSON(bucket, object, disk) + if errorCause(err) == errFileNotFound { + // If file is not found, we assume an + // default (empty) upload info. + uploadsJSON, err = newUploadsV1("xl"), nil + } + // If we have a read error, we store error and + // exit. + if err != nil { + errs[index] = err + return + } + + if !isRemove { + // Add the uploadID + uploadsJSON.AddUploadID(uploadID, initiated) + } else { + // Remove the upload ID + uploadsJSON.RemoveUploadID(uploadID) + if len(uploadsJSON.Uploads) == 0 { + isDelete[index] = true + } + } + + // For delete, rename to tmp, for the + // possibility of recovery in case of quorum + // failure. + if !isDelete[index] { + errs[index] = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, disk) + } else { + wErr := disk.RenameFile(minioMetaMultipartBucket, uploadsPath, minioMetaTmpBucket, tmpUploadsPath) + if wErr != nil { + errs[index] = traceError(wErr) + } + + } + }(index, disk) + } + + // Wait for all the writes to finish. + wg.Wait() + + // Do we have write quorum? + if !isDiskQuorum(errs, xl.writeQuorum) { + // No quorum. Perform cleanup on the minority of disks + // on which the operation succeeded. + + // There are two cases: + // + // 1. uploads.json file was updated -> we delete the + // file that we successfully overwrote on the + // minority of disks, so that the failed quorum + // operation is not partially visible. + // + // 2. uploads.json was deleted -> in this case since + // the delete failed, we restore from tmp. + for index, disk := range xl.storageDisks { + if disk == nil || errs[index] != nil { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if !isDelete[index] { + _ = disk.DeleteFile( + minioMetaMultipartBucket, + uploadsPath, + ) + } else { + _ = disk.RenameFile( + minioMetaTmpBucket, tmpUploadsPath, + minioMetaMultipartBucket, uploadsPath, + ) + } + }(index, disk) + } + wg.Wait() + return traceError(errXLWriteQuorum) + } + + // we do have quorum, so in case of delete upload.json file + // operation, we purge from tmp. + for index, disk := range xl.storageDisks { + if disk == nil || !isDelete[index] { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + // isDelete[index] = true at this point. + _ = disk.DeleteFile(minioMetaTmpBucket, tmpUploadsPath) + }(index, disk) + } + wg.Wait() + + if reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum); reducedErr != nil { + return reducedErr + } + return nil +} + +// addUploadID - add upload ID and its initiated time to 'uploads.json'. +func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time) error { + return xl.updateUploadJSON(bucket, object, uploadID, initiated, false) +} + +// removeUploadID - remove upload ID in 'uploads.json'. +func (xl xlObjects) removeUploadID(bucket, object string, uploadID string) error { + return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, true) +} + +// Returns if the prefix is a multipart upload. +func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { + for _, disk := range xl.getLoadBalancedDisks() { + if disk == nil { + continue + } + _, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) + if err == nil { + return true + } + // For any reason disk was deleted or goes offline, continue + if isErrIgnored(err, objMetadataOpIgnoredErrs...) { + continue + } + break + } + return false +} + +// isUploadIDExists - verify if a given uploadID exists and is valid. +func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool { + uploadIDPath := path.Join(bucket, object, uploadID) + return xl.isObject(minioMetaMultipartBucket, uploadIDPath) +} + +// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket +func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) { + curpartPath := path.Join(bucket, object, uploadID, partName) + wg := sync.WaitGroup{} + for i, disk := range xl.storageDisks { + if disk == nil { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + // Ignoring failure to remove parts that weren't present in CompleteMultipartUpload + // requests. xl.json is the authoritative source of truth on which parts constitute + // the object. The presence of parts that don't belong in the object doesn't affect correctness. + _ = disk.DeleteFile(minioMetaMultipartBucket, curpartPath) + }(i, disk) + } + wg.Wait() +} + +// statPart - returns fileInfo structure for a successful stat on part file. +func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) { + partNamePath := path.Join(bucket, object, uploadID, partName) + for _, disk := range xl.getLoadBalancedDisks() { + if disk == nil { + continue + } + fileInfo, err = disk.StatFile(minioMetaMultipartBucket, partNamePath) + if err == nil { + return fileInfo, nil + } + err = traceError(err) + // For any reason disk was deleted or goes offline we continue to next disk. + if isErrIgnored(err, objMetadataOpIgnoredErrs...) { + continue + } + + // Catastrophic error, we return. + break + } + return FileInfo{}, err +} + +// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. +func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) error { + var wg = &sync.WaitGroup{} + var mErrs = make([]error, len(disks)) + + srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile) + dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile) + + // Rename `xl.json` to all disks in parallel. + for index, disk := range disks { + if disk == nil { + mErrs[index] = traceError(errDiskNotFound) + continue + } + wg.Add(1) + // Rename `xl.json` in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + // Delete any dangling directories. + defer disk.DeleteFile(srcBucket, srcPrefix) + + // Renames `xl.json` from source prefix to destination prefix. + rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) + if rErr != nil { + mErrs[index] = traceError(rErr) + return + } + mErrs[index] = nil + }(index, disk) + } + // Wait for all the routines. + wg.Wait() + + // Do we have write Quorum?. + if !isDiskQuorum(mErrs, quorum) { + // Delete all `xl.json` successfully renamed. + deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs) + return traceError(errXLWriteQuorum) + } + + if reducedErr := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum); reducedErr != nil { + return reducedErr + } + return nil +} + // listMultipartUploads - lists all multipart uploads. func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { result := ListMultipartsInfo{ diff --git a/cmd/xl-v1-multipart-common_test.go b/cmd/xl-v1-multipart_test.go similarity index 100% rename from cmd/xl-v1-multipart-common_test.go rename to cmd/xl-v1-multipart_test.go