Delete temp object/part when PutObject{,Part} fails (#3004)

master
Krishnan Parthasarathi 8 years ago committed by Harshavardhana
parent 7d50361ca9
commit 6fc81dc162
  1. 12
      cmd/fs-v1-multipart.go
  2. 11
      cmd/fs-v1.go
  3. 8
      cmd/xl-v1-multipart.go
  4. 21
      cmd/xl-v1-object.go

@ -462,11 +462,14 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", traceError(IncompleteBody{}) return "", traceError(IncompleteBody{})
} }
// Delete temporary part in case of failure. If
// PutObjectPart succeeds then there would be nothing to
// delete.
defer fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" { if md5Hex != "" {
if newMD5Hex != md5Hex { if newMD5Hex != md5Hex {
// MD5 mismatch, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
return "", traceError(BadDigest{md5Hex, newMD5Hex}) return "", traceError(BadDigest{md5Hex, newMD5Hex})
} }
} }
@ -474,8 +477,6 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if sha256sum != "" { if sha256sum != "" {
newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)) newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil))
if newSHA256sum != sha256sum { if newSHA256sum != sha256sum {
// SHA256 mismatch, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
return "", traceError(SHA256Mismatch{}) return "", traceError(SHA256Mismatch{})
} }
} }
@ -504,9 +505,6 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
err = fs.storage.RenameFile(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) err = fs.storage.RenameFile(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil { if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpPartPath); dErr != nil {
return "", toObjectErr(traceError(dErr), minioMetaBucket, tmpPartPath)
}
return "", toObjectErr(traceError(err), minioMetaBucket, partPath) return "", toObjectErr(traceError(err), minioMetaBucket, partPath)
} }
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)

@ -384,6 +384,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// For size 0 we write a 0byte file. // For size 0 we write a 0byte file.
err = fs.storage.AppendFile(minioMetaBucket, tempObj, []byte("")) err = fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
if err != nil { if err != nil {
fs.storage.DeleteFile(minioMetaBucket, tempObj)
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object) return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
} }
} else { } else {
@ -397,8 +398,8 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
var bytesWritten int64 var bytesWritten int64
bytesWritten, err = fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj) bytesWritten, err = fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj)
if err != nil { if err != nil {
errorIf(err, "Failed to create object %s/%s", bucket, object)
fs.storage.DeleteFile(minioMetaBucket, tempObj) fs.storage.DeleteFile(minioMetaBucket, tempObj)
errorIf(err, "Failed to create object %s/%s", bucket, object)
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
@ -409,6 +410,10 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
return ObjectInfo{}, traceError(IncompleteBody{}) return ObjectInfo{}, traceError(IncompleteBody{})
} }
} }
// Delete the temporary object in the case of a
// failure. If PutObject succeeds, then there would be
// nothing to delete.
defer fs.storage.DeleteFile(minioMetaBucket, tempObj)
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
// Update the md5sum if not set with the newly calculated one. // Update the md5sum if not set with the newly calculated one.
@ -420,8 +425,6 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
md5Hex := metadata["md5Sum"] md5Hex := metadata["md5Sum"]
if md5Hex != "" { if md5Hex != "" {
if newMD5Hex != md5Hex { if newMD5Hex != md5Hex {
// MD5 mismatch, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tempObj)
// Returns md5 mismatch. // Returns md5 mismatch.
return ObjectInfo{}, traceError(BadDigest{md5Hex, newMD5Hex}) return ObjectInfo{}, traceError(BadDigest{md5Hex, newMD5Hex})
} }
@ -430,8 +433,6 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
if sha256sum != "" { if sha256sum != "" {
newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)) newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil))
if newSHA256sum != sha256sum { if newSHA256sum != sha256sum {
// SHA256 mismatch, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tempObj)
return ObjectInfo{}, traceError(SHA256Mismatch{}) return ObjectInfo{}, traceError(SHA256Mismatch{})
} }
} }

@ -413,11 +413,15 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Construct a tee reader for md5sum. // Construct a tee reader for md5sum.
teeReader := io.TeeReader(lreader, mw) teeReader := io.TeeReader(lreader, mw)
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
defer xl.deleteObject(minioMetaBucket, tmpPartPath)
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.
if sizeWritten < size { if sizeWritten < size {
@ -434,8 +438,6 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" { if md5Hex != "" {
if newMD5Hex != md5Hex { if newMD5Hex != md5Hex {
// MD5 mismatch, delete the temporary object.
xl.deleteObject(minioMetaBucket, tmpPartPath)
// Returns md5 mismatch. // Returns md5 mismatch.
return "", traceError(BadDigest{md5Hex, newMD5Hex}) return "", traceError(BadDigest{md5Hex, newMD5Hex})
} }
@ -444,8 +446,6 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if sha256sum != "" { if sha256sum != "" {
newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)) newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil))
if newSHA256sum != sha256sum { if newSHA256sum != sha256sum {
// SHA256 mismatch, delete the temporary object.
xl.deleteObject(minioMetaBucket, tmpPartPath)
return "", traceError(SHA256Mismatch{}) return "", traceError(SHA256Mismatch{})
} }
} }

@ -443,18 +443,19 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks) onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks)
// Delete temporary object in the event of failure. If
// PutObject succeeded there would be no temporary object to
// delete.
defer xl.deleteObject(minioMetaTmpBucket, tempObj)
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum)
if err != nil { if err != nil {
// Create file failed, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, tempErasureObj) return ObjectInfo{}, toObjectErr(err, minioMetaBucket, tempErasureObj)
} }
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.
if sizeWritten < size { if sizeWritten < size {
// Short write, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
return ObjectInfo{}, traceError(IncompleteBody{}) return ObjectInfo{}, traceError(IncompleteBody{})
} }
@ -486,8 +487,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
md5Hex := metadata["md5Sum"] md5Hex := metadata["md5Sum"]
if md5Hex != "" { if md5Hex != "" {
if newMD5Hex != md5Hex { if newMD5Hex != md5Hex {
// MD5 mismatch, delete the temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
// Returns md5 mismatch. // Returns md5 mismatch.
return ObjectInfo{}, traceError(BadDigest{md5Hex, newMD5Hex}) return ObjectInfo{}, traceError(BadDigest{md5Hex, newMD5Hex})
} }
@ -496,8 +495,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
if sha256sum != "" { if sha256sum != "" {
newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)) newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil))
if newSHA256sum != sha256sum { if newSHA256sum != sha256sum {
// SHA256 mismatch, delete the temporary object.
xl.deleteObject(minioMetaBucket, tempObj)
return ObjectInfo{}, traceError(SHA256Mismatch{}) return ObjectInfo{}, traceError(SHA256Mismatch{})
} }
} }
@ -514,14 +511,15 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Check if an object is present as one of the parent dir. // Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock). // -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) { if xl.parentDirIsObject(bucket, path.Dir(object)) {
// Parent (in the namespace) is an object, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object) return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
} }
// Rename if an object already exists to temporary location. // Rename if an object already exists to temporary location.
newUniqueID := getUUID() newUniqueID := getUUID()
if xl.isObject(bucket, object) { if xl.isObject(bucket, object) {
// Delete the temporary copy of the object that existed before this PutObject request.
defer xl.deleteObject(minioMetaTmpBucket, newUniqueID)
// NOTE: Do not use online disks slice here. // NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged // The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors. // regardless of `xl.json` status and rolled back in case of errors.
@ -561,9 +559,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
// Delete the temporary object.
xl.deleteObject(minioMetaTmpBucket, newUniqueID)
// Once we have successfully renamed the object, Close the buffer which would // Once we have successfully renamed the object, Close the buffer which would
// save the object on cache. // save the object on cache.
if size > 0 && xl.objCacheEnabled && newBuffer != nil { if size > 0 && xl.objCacheEnabled && newBuffer != nil {

Loading…
Cancel
Save