xl/fs: Make i/o operations atomic. (#1496)

master
Harshavardhana 9 years ago committed by Anand Babu (AB) Periasamy
parent 17868ccd7f
commit 5133ea50bd
  1. 29
      fs-objects-multipart.go
  2. 30
      object-common-multipart.go
  3. 3
      object-common.go
  4. 5
      pkg/safe/safe.go
  5. 1
      xl-erasure-v1-metadata.go
  6. 118
      xl-erasure-v1.go
  7. 22
      xl-objects-multipart.go

@ -72,6 +72,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
var fileReader io.ReadCloser
fileReader, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, 0)
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
if err == errFileNotFound {
return "", InvalidPart{}
}
@ -79,10 +82,16 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
_, err = io.Copy(fileWriter, fileReader)
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
err = fileReader.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
md5Sums = append(md5Sums, part.ETag)
@ -90,25 +99,33 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
// Rename the file back to original location, if not delete the
// temporary object.
err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
if derr := fs.storage.DeleteFile(minioMetaBucket, tempObj); derr != nil {
return "", toObjectErr(derr, minioMetaBucket, tempObj)
}
return "", toObjectErr(err, bucket, object)
}
// Save the s3 md5.
s3MD5, err := makeS3MD5(md5Sums...)
if err != nil {
return "", err
}
// Cleanup all the parts.
// Cleanup all the parts if everything else has been safely committed.
if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil {
return "", err
}
err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Return md5sum.
return s3MD5, nil
}

@ -72,10 +72,16 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string)
}
// Close the writer.
if err = w.Close(); err != nil {
if clErr := safeCloseAndRemove(w); clErr != nil {
return "", toObjectErr(clErr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
err = storage.RenameFile(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
if err != nil {
if derr := storage.DeleteFile(minioMetaBucket, tempUploadIDPath); derr != nil {
return "", toObjectErr(derr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
return uploadID, nil
@ -124,19 +130,25 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
safeCloseAndRemove(fileWriter)
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
// Reader shouldn't have more data what mentioned in size argument.
// reading one more byte from the reader to validate it.
// expected to fail, success validates existence of more data in the reader.
if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil {
safeCloseAndRemove(fileWriter)
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", UnExpectedDataSize{Size: int(size)}
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
safeCloseAndRemove(fileWriter)
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
}
@ -144,12 +156,17 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
safeCloseAndRemove(fileWriter)
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", err
}
@ -157,7 +174,10 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, partSuffixMD5)
err = storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path)
if err != nil {
return "", err
if derr := storage.DeleteFile(minioMetaBucket, partSuffixPath); derr != nil {
return "", toObjectErr(derr, minioMetaBucket, partSuffixPath)
}
return "", toObjectErr(err, minioMetaBucket, partSuffixMD5Path)
}
return newMD5Hex, nil
}

@ -174,6 +174,9 @@ func putObjectCommon(storage StorageAPI, bucket string, object string, size int6
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
err = storage.RenameFile(minioMetaBucket, tempObj, bucket, object)

@ -70,9 +70,8 @@ func (f *File) Close() error {
// error if any.
func (f *File) CloseAndRemove() error {
// close the embedded fd
if err := f.File.Close(); err != nil {
return err
}
f.File.Close()
// Remove the temp file.
if err := os.Remove(f.Name()); err != nil {
return err

@ -29,7 +29,6 @@ type xlMetaV1 struct {
Size int64 `json:"size"`
ModTime time.Time `json:"modTime"`
Version int64 `json:"version"`
Deleted bool `json:"deleted"`
} `json:"stat"`
Erasure struct {
DataBlocks int `json:"data"`

@ -18,7 +18,6 @@ package main
import (
"fmt"
"io"
"os"
slashpath "path"
"strings"
@ -464,74 +463,23 @@ func (xl XL) DeleteFile(volume, path string) error {
return errInvalidArgument
}
// Lock right before reading from disk.
nsMutex.RLock(volume, path)
partsMetadata, errs := xl.getPartsMetadata(volume, path)
nsMutex.RUnlock(volume, path)
var err error
// List all the file versions on existing files.
versions := listFileVersions(partsMetadata, errs)
// Get highest file version.
higherVersion := highestInt(versions)
// find online disks and meta data of higherVersion
var mdata *xlMetaV1
onlineDiskCount := 0
errFileNotFoundErr := 0
for index, metadata := range partsMetadata {
if errs[index] == nil {
onlineDiskCount++
if metadata.Stat.Version == higherVersion && mdata == nil {
mdata = &metadata
}
} else if errs[index] == errFileNotFound {
errFileNotFoundErr++
}
}
if errFileNotFoundErr == len(xl.storageDisks) {
return errFileNotFound
} else if mdata == nil || onlineDiskCount < xl.writeQuorum {
// return error if mdata is empty or onlineDiskCount doesn't meet write quorum
return errWriteQuorum
}
// Increment to have next higher version.
higherVersion++
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
deleteMetaData := (onlineDiskCount == len(xl.storageDisks))
// Set higher version to indicate file operation
mdata.Stat.Version = higherVersion
mdata.Stat.Deleted = true
nsMutex.Lock(volume, path)
defer nsMutex.Unlock(volume, path)
errCount := 0
// Update meta data file and remove part file
for index, disk := range xl.storageDisks {
// no need to operate on failed disks
if errs[index] != nil {
errCount++
continue
}
// update meta data about delete operation
var metadataWriter io.WriteCloser
metadataWriter, err = disk.CreateFile(volume, xlMetaV1FilePath)
erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
err := disk.DeleteFile(volume, erasureFilePart)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("CreateFile failed with %s", err)
}).Errorf("DeleteFile failed with %s", err)
errCount++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// We can safely allow DeleteFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
@ -540,42 +488,8 @@ func (xl XL) DeleteFile(volume, path string) error {
return err
}
err = mdata.Write(metadataWriter)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
"diskIndex": index,
}).Errorf("Writing metadata failed with %s", err)
errCount++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
// Safely close and remove.
if err = safeCloseAndRemove(metadataWriter); err != nil {
return err
}
return err
}
// Safely wrote, now rename to its actual location.
if err = metadataWriter.Close(); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
"diskIndex": index,
}).Errorf("Metadata commit failed with %s", err)
if err = safeCloseAndRemove(metadataWriter); err != nil {
return err
}
return err
}
erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
err = disk.DeleteFile(volume, erasureFilePart)
xlMetaV1FilePath := slashpath.Join(path, "file.json")
err = disk.DeleteFile(volume, xlMetaV1FilePath)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
@ -584,7 +498,7 @@ func (xl XL) DeleteFile(volume, path string) error {
errCount++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// We can safely allow DeleteFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
@ -594,24 +508,6 @@ func (xl XL) DeleteFile(volume, path string) error {
}
}
// Remove meta data file only if deleteMetaData is true.
if deleteMetaData {
for index, disk := range xl.storageDisks {
// no need to operate on failed disks
if errs[index] != nil {
continue
}
err = disk.DeleteFile(volume, xlMetaV1FilePath)
if err != nil {
// No need to return the error as we updated the meta data file previously
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("DeleteFile failed with %s", err)
}
}
}
// Return success.
return nil
}

@ -119,7 +119,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber))
err = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix)
// TODO: We need a way to roll back if of the renames failed.
if err != nil {
return "", err
}
@ -153,14 +152,35 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
}
// Close the writer.
if err = w.Close(); err != nil {
if err = safeCloseAndRemove(w); err != nil {
return "", err
}
return "", err
}
multipartObjFile := path.Join(object, multipartMetaFile)
err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, bucket, multipartObjFile)
if err != nil {
if derr := xl.storage.DeleteFile(minioMetaBucket, tempMultipartMetaFile); derr != nil {
return "", toObjectErr(err, minioMetaBucket, tempMultipartMetaFile)
}
return "", toObjectErr(err, bucket, multipartObjFile)
}
// Attempt a rename of the upload id to temporary location, if
// successful then delete it.
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID)
if err = xl.storage.RenameFile(minioMetaBucket, uploadIDPath, minioMetaBucket, tempUploadIDPath); err == nil {
if err = xl.storage.DeleteFile(minioMetaBucket, tempUploadIDPath); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
return s3MD5, nil
}
// Rename if failed attempt to delete the original file.
err = xl.storage.DeleteFile(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Return md5sum.
return s3MD5, nil
}

Loading…
Cancel
Save