From 989d7af9ac914451da5795b1e148f21babddc0fd Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 19 Oct 2018 11:00:09 -0700 Subject: [PATCH] Handle corrupted files correctly by fixing quorum (#6670) This PR completes the missing functionality from #6592 --- cmd/xl-v1-object.go | 108 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 86 insertions(+), 22 deletions(-) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 970fefca5..a6c4bf277 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -448,38 +448,102 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, op return info, nil } -// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. -func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) { - // Read metadata associated with the object from all disks. - metaArr, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object) +func (xl xlObjects) isObjectCorrupted(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) { + // We can consider an object data not reliable + // when xl.json is not found in read quorum disks. + var notFoundXLJSON int + for _, readErr := range errs { + if readErr == errFileNotFound { + notFoundXLJSON++ + } + } - // Get quorum for this object - readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs) - if err != nil { - return objInfo, err + for _, m := range metaArr { + if !m.IsValid() { + continue + } + validMeta = m + break + } + + // Return if the object is indeed corrupted. + return validMeta, len(xl.getDisks())-notFoundXLJSON < validMeta.Erasure.DataBlocks +} + +const xlCorruptedSuffix = ".CORRUPTED" + +// Renames the corrupted object and makes it visible. +func renameCorruptedObject(ctx context.Context, bucket, object string, validMeta xlMetaV1, disks []StorageAPI, errs []error) { + writeQuorum := validMeta.Erasure.DataBlocks + 1 + + // Move all existing objects into corrupted suffix. + rename(ctx, disks, bucket, object, bucket, object+xlCorruptedSuffix, true, writeQuorum, []error{errFileNotFound}) + + tempObj := mustGetUUID() + + // Get all the disks which do not have the file. + var cdisks = make([]StorageAPI, len(disks)) + for i, merr := range errs { + if merr == errFileNotFound { + cdisks[i] = disks[i] + } + } + + for _, disk := range cdisks { + if disk == nil { + continue + } + + // Write empty part file on missing disks. + disk.AppendFile(minioMetaTmpBucket, pathJoin(tempObj, "part.1"), []byte{}) + + // Write algorithm hash for empty part file. + alg := validMeta.Erasure.Checksums[0].Algorithm.New() + alg.Write([]byte{}) + + // Update the checksums and part info. + validMeta.Erasure.Checksums[0] = ChecksumInfo{ + Name: validMeta.Erasure.Checksums[0].Name, + Algorithm: validMeta.Erasure.Checksums[0].Algorithm, + Hash: alg.Sum(nil), + } + validMeta.Parts[0] = objectPartInfo{ + Number: 1, + Name: "part.1", + } + + // Write the `xl.json` with the newly calculated metadata. + writeXLMetadata(ctx, disk, minioMetaTmpBucket, tempObj, validMeta) } - const xlCorruptedSuffix = ".CORRUPTED" + // Finally rename all the parts into their respective locations. + rename(ctx, cdisks, minioMetaTmpBucket, tempObj, bucket, object+xlCorruptedSuffix, true, writeQuorum, []error{errFileNotFound}) +} + +// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. +func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) { + disks := xl.getDisks() + + // Read metadata associated with the object from all disks. + metaArr, errs := readAllXLMetadata(ctx, disks, bucket, object) + var readQuorum int // Having read quorum means we have xl.json in at least N/2 disks. if !strings.HasSuffix(object, xlCorruptedSuffix) { - // We can consider an object data not reliable - // when xl.json is not found in read quorum disks. - var notFoundXLJSON int - for _, readErr := range errs { - if readErr == errFileNotFound { - notFoundXLJSON++ - } - } - // If xl.json is not present in read quorum disks, - // add .CORRUPTED prefix to the current object. - if len(xl.getDisks())-notFoundXLJSON < readQuorum { - writeQuorum := readQuorum + 1 - rename(ctx, xl.getDisks(), bucket, object, bucket, object+xlCorruptedSuffix, true, writeQuorum, []error{errFileNotFound}) + if validMeta, ok := xl.isObjectCorrupted(metaArr, errs); ok { + renameCorruptedObject(ctx, bucket, object, validMeta, disks, errs) // Return err file not found since we renamed now the corrupted object return objInfo, errFileNotFound } + + // Not a corrupted object, attempt to get readquorum properly. + readQuorum, _, err = objectQuorumFromMeta(ctx, xl, metaArr, errs) + if err != nil { + return objInfo, err + } + } else { + // If this is a corrupted object, change read quorum to N/2 disks // so it will be visible to users, so they can delete it. readQuorum = len(xl.getDisks()) / 2