From 5133ea50bdf1c84618ccc8191d9a26a2d647621c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 5 May 2016 20:28:22 -0700 Subject: [PATCH] xl/fs: Make i/o operations atomic. (#1496) --- fs-objects-multipart.go | 29 +++++++-- object-common-multipart.go | 30 ++++++++-- object-common.go | 3 + pkg/safe/safe.go | 5 +- xl-erasure-v1-metadata.go | 1 - xl-erasure-v1.go | 118 +++---------------------------------- xl-objects-multipart.go | 22 ++++++- 7 files changed, 81 insertions(+), 127 deletions(-) diff --git a/fs-objects-multipart.go b/fs-objects-multipart.go index f5387a948..6593bfbaf 100644 --- a/fs-objects-multipart.go +++ b/fs-objects-multipart.go @@ -72,6 +72,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload var fileReader io.ReadCloser fileReader, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, 0) if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } if err == errFileNotFound { return "", InvalidPart{} } @@ -79,10 +82,16 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } _, err = io.Copy(fileWriter, fileReader) if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } return "", err } err = fileReader.Close() if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } return "", err } md5Sums = append(md5Sums, part.ETag) @@ -90,25 +99,33 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload err = fileWriter.Close() if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } return "", err } + // Rename the file back to original location, if not delete the + // temporary object. + err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) + if err != nil { + if derr := fs.storage.DeleteFile(minioMetaBucket, tempObj); derr != nil { + return "", toObjectErr(derr, minioMetaBucket, tempObj) + } + return "", toObjectErr(err, bucket, object) + } + // Save the s3 md5. s3MD5, err := makeS3MD5(md5Sums...) if err != nil { return "", err } - // Cleanup all the parts. + // Cleanup all the parts if everything else has been safely committed. if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil { return "", err } - err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - // Return md5sum. return s3MD5, nil } diff --git a/object-common-multipart.go b/object-common-multipart.go index ba9ab0a4e..63e1c8e56 100644 --- a/object-common-multipart.go +++ b/object-common-multipart.go @@ -72,10 +72,16 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) } // Close the writer. if err = w.Close(); err != nil { + if clErr := safeCloseAndRemove(w); clErr != nil { + return "", toObjectErr(clErr, minioMetaBucket, tempUploadIDPath) + } return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } err = storage.RenameFile(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) if err != nil { + if derr := storage.DeleteFile(minioMetaBucket, tempUploadIDPath); derr != nil { + return "", toObjectErr(derr, minioMetaBucket, tempUploadIDPath) + } return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } return uploadID, nil @@ -124,19 +130,25 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa // Instantiate checksum hashers and create a multiwriter. if size > 0 { if _, err = io.CopyN(multiWriter, data, size); err != nil { - safeCloseAndRemove(fileWriter) + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } return "", toObjectErr(err, bucket, object) } // Reader shouldn't have more data what mentioned in size argument. // reading one more byte from the reader to validate it. // expected to fail, success validates existence of more data in the reader. if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil { - safeCloseAndRemove(fileWriter) + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } return "", UnExpectedDataSize{Size: int(size)} } } else { if _, err = io.Copy(multiWriter, data); err != nil { - safeCloseAndRemove(fileWriter) + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } return "", toObjectErr(err, bucket, object) } } @@ -144,12 +156,17 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) if md5Hex != "" { if newMD5Hex != md5Hex { - safeCloseAndRemove(fileWriter) + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } return "", BadDigest{md5Hex, newMD5Hex} } } err = fileWriter.Close() if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", toObjectErr(clErr, bucket, object) + } return "", err } @@ -157,7 +174,10 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, partSuffixMD5) err = storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path) if err != nil { - return "", err + if derr := storage.DeleteFile(minioMetaBucket, partSuffixPath); derr != nil { + return "", toObjectErr(derr, minioMetaBucket, partSuffixPath) + } + return "", toObjectErr(err, minioMetaBucket, partSuffixMD5Path) } return newMD5Hex, nil } diff --git a/object-common.go b/object-common.go index 0fdddbbd5..d41136dc1 100644 --- a/object-common.go +++ b/object-common.go @@ -174,6 +174,9 @@ func putObjectCommon(storage StorageAPI, bucket string, object string, size int6 } err = fileWriter.Close() if err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } return "", err } err = storage.RenameFile(minioMetaBucket, tempObj, bucket, object) diff --git a/pkg/safe/safe.go b/pkg/safe/safe.go index dcfc5dc49..fca649aec 100644 --- a/pkg/safe/safe.go +++ b/pkg/safe/safe.go @@ -70,9 +70,8 @@ func (f *File) Close() error { // error if any. func (f *File) CloseAndRemove() error { // close the embedded fd - if err := f.File.Close(); err != nil { - return err - } + f.File.Close() + // Remove the temp file. if err := os.Remove(f.Name()); err != nil { return err diff --git a/xl-erasure-v1-metadata.go b/xl-erasure-v1-metadata.go index 36320e713..e5c29ff45 100644 --- a/xl-erasure-v1-metadata.go +++ b/xl-erasure-v1-metadata.go @@ -29,7 +29,6 @@ type xlMetaV1 struct { Size int64 `json:"size"` ModTime time.Time `json:"modTime"` Version int64 `json:"version"` - Deleted bool `json:"deleted"` } `json:"stat"` Erasure struct { DataBlocks int `json:"data"` diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index 96fe7678f..8b8e47138 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -18,7 +18,6 @@ package main import ( "fmt" - "io" "os" slashpath "path" "strings" @@ -464,74 +463,23 @@ func (xl XL) DeleteFile(volume, path string) error { return errInvalidArgument } - // Lock right before reading from disk. - nsMutex.RLock(volume, path) - partsMetadata, errs := xl.getPartsMetadata(volume, path) - nsMutex.RUnlock(volume, path) - - var err error - - // List all the file versions on existing files. - versions := listFileVersions(partsMetadata, errs) - // Get highest file version. - higherVersion := highestInt(versions) - - // find online disks and meta data of higherVersion - var mdata *xlMetaV1 - onlineDiskCount := 0 - errFileNotFoundErr := 0 - for index, metadata := range partsMetadata { - if errs[index] == nil { - onlineDiskCount++ - if metadata.Stat.Version == higherVersion && mdata == nil { - mdata = &metadata - } - } else if errs[index] == errFileNotFound { - errFileNotFoundErr++ - } - } - - if errFileNotFoundErr == len(xl.storageDisks) { - return errFileNotFound - } else if mdata == nil || onlineDiskCount < xl.writeQuorum { - // return error if mdata is empty or onlineDiskCount doesn't meet write quorum - return errWriteQuorum - } - - // Increment to have next higher version. - higherVersion++ - - xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File) - deleteMetaData := (onlineDiskCount == len(xl.storageDisks)) - - // Set higher version to indicate file operation - mdata.Stat.Version = higherVersion - mdata.Stat.Deleted = true - nsMutex.Lock(volume, path) defer nsMutex.Unlock(volume, path) errCount := 0 // Update meta data file and remove part file for index, disk := range xl.storageDisks { - // no need to operate on failed disks - if errs[index] != nil { - errCount++ - continue - } - - // update meta data about delete operation - var metadataWriter io.WriteCloser - metadataWriter, err = disk.CreateFile(volume, xlMetaV1FilePath) + erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index)) + err := disk.DeleteFile(volume, erasureFilePart) if err != nil { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Errorf("CreateFile failed with %s", err) + }).Errorf("DeleteFile failed with %s", err) errCount++ - // We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum + // We can safely allow DeleteFile errors up to len(xl.storageDisks) - xl.writeQuorum // otherwise return failure. if errCount <= len(xl.storageDisks)-xl.writeQuorum { continue @@ -540,42 +488,8 @@ func (xl XL) DeleteFile(volume, path string) error { return err } - err = mdata.Write(metadataWriter) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - "diskIndex": index, - }).Errorf("Writing metadata failed with %s", err) - - errCount++ - - // We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum - // otherwise return failure. - if errCount <= len(xl.storageDisks)-xl.writeQuorum { - continue - } - // Safely close and remove. - if err = safeCloseAndRemove(metadataWriter); err != nil { - return err - } - return err - } - // Safely wrote, now rename to its actual location. - if err = metadataWriter.Close(); err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - "diskIndex": index, - }).Errorf("Metadata commit failed with %s", err) - if err = safeCloseAndRemove(metadataWriter); err != nil { - return err - } - return err - } - - erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index)) - err = disk.DeleteFile(volume, erasureFilePart) + xlMetaV1FilePath := slashpath.Join(path, "file.json") + err = disk.DeleteFile(volume, xlMetaV1FilePath) if err != nil { log.WithFields(logrus.Fields{ "volume": volume, @@ -584,7 +498,7 @@ func (xl XL) DeleteFile(volume, path string) error { errCount++ - // We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum + // We can safely allow DeleteFile errors up to len(xl.storageDisks) - xl.writeQuorum // otherwise return failure. if errCount <= len(xl.storageDisks)-xl.writeQuorum { continue @@ -594,24 +508,6 @@ func (xl XL) DeleteFile(volume, path string) error { } } - // Remove meta data file only if deleteMetaData is true. - if deleteMetaData { - for index, disk := range xl.storageDisks { - // no need to operate on failed disks - if errs[index] != nil { - continue - } - - err = disk.DeleteFile(volume, xlMetaV1FilePath) - if err != nil { - // No need to return the error as we updated the meta data file previously - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Errorf("DeleteFile failed with %s", err) - } - } - } // Return success. return nil } diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index b4bd3c8ae..5f580096a 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -119,7 +119,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber)) err = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix) - // TODO: We need a way to roll back if of the renames failed. if err != nil { return "", err } @@ -153,14 +152,35 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Close the writer. if err = w.Close(); err != nil { + if err = safeCloseAndRemove(w); err != nil { + return "", err + } return "", err } multipartObjFile := path.Join(object, multipartMetaFile) err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, bucket, multipartObjFile) if err != nil { + if derr := xl.storage.DeleteFile(minioMetaBucket, tempMultipartMetaFile); derr != nil { + return "", toObjectErr(err, minioMetaBucket, tempMultipartMetaFile) + } return "", toObjectErr(err, bucket, multipartObjFile) } + // Attempt a rename of the upload id to temporary location, if + // successful then delete it. + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + if err = xl.storage.RenameFile(minioMetaBucket, uploadIDPath, minioMetaBucket, tempUploadIDPath); err == nil { + if err = xl.storage.DeleteFile(minioMetaBucket, tempUploadIDPath); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) + } + return s3MD5, nil + } + // Rename if failed attempt to delete the original file. + err = xl.storage.DeleteFile(minioMetaBucket, uploadIDPath) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } // Return md5sum. return s3MD5, nil }