diff --git a/cmd/xl-v1-healing-common.go b/cmd/xl-v1-healing-common.go new file mode 100644 index 000000000..70ffe3bd3 --- /dev/null +++ b/cmd/xl-v1-healing-common.go @@ -0,0 +1,135 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "time" + +// commonTime returns a maximally occurring time from a list of time. +func commonTime(modTimes []time.Time) (modTime time.Time) { + var maxima int // Counter for remembering max occurrence of elements. + timeOccurenceMap := make(map[time.Time]int) + // Ignore the uuid sentinel and count the rest. + for _, time := range modTimes { + if time == timeSentinel { + continue + } + timeOccurenceMap[time]++ + } + // Find the common cardinality from previously collected + // occurrences of elements. + for time, count := range timeOccurenceMap { + if count > maxima { + maxima = count + modTime = time + } + } + // Return the collected common uuid. + return modTime +} + +// Beginning of unix time is treated as sentinel value here. +var timeSentinel = time.Unix(0, 0).UTC() + +// Boot modTimes up to disk count, setting the value to time sentinel. +func bootModtimes(diskCount int) []time.Time { + modTimes := make([]time.Time, diskCount) + // Boots up all the modtimes. + for i := range modTimes { + modTimes[i] = timeSentinel + } + return modTimes +} + +// Extracts list of times from xlMetaV1 slice and returns, skips +// slice elements which have errors. As a special error +// errFileNotFound is treated as a initial good condition. +func listObjectModtimes(partsMetadata []xlMetaV1, errs []error) (modTimes []time.Time) { + modTimes = bootModtimes(len(partsMetadata)) + // Set a new time value, specifically set when + // error == errFileNotFound (this is needed when this is a + // fresh PutObject). + timeNow := time.Now().UTC() + for index, metadata := range partsMetadata { + if errs[index] == nil { + // Once the file is found, save the uuid saved on disk. + modTimes[index] = metadata.Stat.ModTime + } else if errs[index] == errFileNotFound { + // Once the file is not found then the epoch is current time. + modTimes[index] = timeNow + } + } + return modTimes +} + +// Returns slice of online disks needed. +// - slice returing readable disks. +// - modTime of the Object +func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) { + onlineDisks = make([]StorageAPI, len(disks)) + + // List all the file commit ids from parts metadata. + modTimes := listObjectModtimes(partsMetadata, errs) + + // Reduce list of UUIDs to a single common value. + modTime = commonTime(modTimes) + + // Create a new online disks slice, which have common uuid. + for index, t := range modTimes { + if t == modTime { + onlineDisks[index] = disks[index] + } else { + onlineDisks[index] = nil + } + } + return onlineDisks, modTime +} + +// Return disks with the outdated or missing object. +func outDatedDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (outDatedDisks []StorageAPI) { + outDatedDisks = make([]StorageAPI, len(disks)) + latestDisks, _ := listOnlineDisks(disks, partsMetadata, errs) + for index, disk := range latestDisks { + if errorCause(errs[index]) == errFileNotFound { + outDatedDisks[index] = disks[index] + continue + } + if errs[index] != nil { + continue + } + if disk == nil { + outDatedDisks[index] = disks[index] + } + } + return outDatedDisks +} + +// Returns if the object should be healed. +func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool { + modTime := commonTime(listObjectModtimes(partsMetadata, errs)) + for index := range partsMetadata { + if errs[index] == errDiskNotFound { + continue + } + if errs[index] != nil { + return true + } + if modTime != partsMetadata[index].Stat.ModTime { + return true + } + } + return false +} diff --git a/cmd/xl-v1-healing_test.go b/cmd/xl-v1-healing-common_test.go similarity index 100% rename from cmd/xl-v1-healing_test.go rename to cmd/xl-v1-healing-common_test.go diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 70ffe3bd3..2e6c9c1af 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -16,120 +16,211 @@ package cmd -import "time" - -// commonTime returns a maximally occurring time from a list of time. -func commonTime(modTimes []time.Time) (modTime time.Time) { - var maxima int // Counter for remembering max occurrence of elements. - timeOccurenceMap := make(map[time.Time]int) - // Ignore the uuid sentinel and count the rest. - for _, time := range modTimes { - if time == timeSentinel { - continue - } - timeOccurenceMap[time]++ +import "sync" + +// Heals a bucket if it doesn't exist on one of the disks. +func (xl xlObjects) HealBucket(bucket string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return traceError(BucketNameInvalid{Bucket: bucket}) } - // Find the common cardinality from previously collected - // occurrences of elements. - for time, count := range timeOccurenceMap { - if count > maxima { - maxima = count - modTime = time + + // Verify if bucket exists. + if !xl.isBucketExist(bucket) { + return traceError(BucketNotFound{Bucket: bucket}) + } + + // Heal bucket - create buckets on disks where it does not exist. + + // get a random ID for lock instrumentation. + opsID := getOpsID() + + nsMutex.Lock(bucket, "", opsID) + defer nsMutex.Unlock(bucket, "", opsID) + + // Initialize sync waitgroup. + var wg = &sync.WaitGroup{} + + // Initialize list of errors. + var dErrs = make([]error, len(xl.storageDisks)) + + // Make a volume entry on all underlying storage disks. + for index, disk := range xl.storageDisks { + if disk == nil { + dErrs[index] = traceError(errDiskNotFound) + continue } + wg.Add(1) + // Make a volume inside a go-routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + if _, err := disk.StatVol(bucket); err != nil { + if err != errVolumeNotFound { + dErrs[index] = traceError(err) + return + } + if err = disk.MakeVol(bucket); err != nil { + dErrs[index] = traceError(err) + } + } + }(index, disk) } - // Return the collected common uuid. - return modTime -} -// Beginning of unix time is treated as sentinel value here. -var timeSentinel = time.Unix(0, 0).UTC() + // Wait for all make vol to finish. + wg.Wait() -// Boot modTimes up to disk count, setting the value to time sentinel. -func bootModtimes(diskCount int) []time.Time { - modTimes := make([]time.Time, diskCount) - // Boots up all the modtimes. - for i := range modTimes { - modTimes[i] = timeSentinel + // Do we have write quorum?. + if !isDiskQuorum(dErrs, xl.writeQuorum) { + // Purge successfully created buckets if we don't have writeQuorum. + xl.undoMakeBucket(bucket) + return toObjectErr(traceError(errXLWriteQuorum), bucket) } - return modTimes -} -// Extracts list of times from xlMetaV1 slice and returns, skips -// slice elements which have errors. As a special error -// errFileNotFound is treated as a initial good condition. -func listObjectModtimes(partsMetadata []xlMetaV1, errs []error) (modTimes []time.Time) { - modTimes = bootModtimes(len(partsMetadata)) - // Set a new time value, specifically set when - // error == errFileNotFound (this is needed when this is a - // fresh PutObject). - timeNow := time.Now().UTC() - for index, metadata := range partsMetadata { - if errs[index] == nil { - // Once the file is found, save the uuid saved on disk. - modTimes[index] = metadata.Stat.ModTime - } else if errs[index] == errFileNotFound { - // Once the file is not found then the epoch is current time. - modTimes[index] = timeNow - } + // Verify we have any other errors which should be returned as failure. + if reducedErr := reduceErrs(dErrs, []error{ + errDiskNotFound, + errFaultyDisk, + errDiskAccessDenied, + }); reducedErr != nil { + return toObjectErr(reducedErr, bucket) } - return modTimes + return nil } -// Returns slice of online disks needed. -// - slice returing readable disks. -// - modTime of the Object -func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) { - onlineDisks = make([]StorageAPI, len(disks)) +// HealObject heals a given object for all its missing entries. +// FIXME: If an object object was deleted and one disk was down, and later the disk comes back +// up again, heal on the object should delete it. +func (xl xlObjects) HealObject(bucket, object string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return traceError(BucketNameInvalid{Bucket: bucket}) + } - // List all the file commit ids from parts metadata. - modTimes := listObjectModtimes(partsMetadata, errs) + // Verify if object is valid. + if !IsValidObjectName(object) { + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) + } - // Reduce list of UUIDs to a single common value. - modTime = commonTime(modTimes) + // get a random ID for lock instrumentation. + opsID := getOpsID() - // Create a new online disks slice, which have common uuid. - for index, t := range modTimes { - if t == modTime { - onlineDisks[index] = disks[index] - } else { - onlineDisks[index] = nil - } + // Lock the object before healing. + nsMutex.RLock(bucket, object, opsID) + defer nsMutex.RUnlock(bucket, object, opsID) + + partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) + if err := reduceErrs(errs, nil); err != nil { + return toObjectErr(err, bucket, object) } - return onlineDisks, modTime -} -// Return disks with the outdated or missing object. -func outDatedDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (outDatedDisks []StorageAPI) { - outDatedDisks = make([]StorageAPI, len(disks)) - latestDisks, _ := listOnlineDisks(disks, partsMetadata, errs) - for index, disk := range latestDisks { - if errorCause(errs[index]) == errFileNotFound { - outDatedDisks[index] = disks[index] + if !xlShouldHeal(partsMetadata, errs) { + // There is nothing to heal. + return nil + } + + // List of disks having latest version of the object. + latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) + // List of disks having outdated version of the object or missing object. + outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs) + // Latest xlMetaV1 for reference. + latestMeta := pickValidXLMeta(partsMetadata, modTime) + + for index, disk := range outDatedDisks { + // Before healing outdated disks, we need to remove xl.json + // and part files from "bucket/object/" so that + // rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds. + if disk == nil { + // Not an outdated disk. continue } if errs[index] != nil { + // If there was an error (most likely errFileNotFound) continue } + // Outdated object with the same name exists that needs to be deleted. + outDatedMeta := partsMetadata[index] + // Delete all the parts. + for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ { + err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name)) + if err != nil { + return traceError(err) + } + } + // Delete xl.json file. + err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile)) + if err != nil { + return traceError(err) + } + } + + // Reorder so that we have data disks first and parity disks next. + latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks) + outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks) + partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata) + + // We write at temporary location and then rename to fianal location. + tmpID := getUUID() + + // Checksum of the part files. checkSumInfos[index] will contain checksums + // of all the part files in the outDatedDisks[index] + checkSumInfos := make([][]checkSumInfo, len(outDatedDisks)) + + // Heal each part. erasureHealFile() will write the healed part to + // .minio/tmp/uuid/ which needs to be renamed later to the final location. + for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { + partName := latestMeta.Parts[partIndex].Name + partSize := latestMeta.Parts[partIndex].Size + erasure := latestMeta.Erasure + sumInfo, err := latestMeta.Erasure.GetCheckSumInfo(partName) + if err != nil { + return err + } + // Heal the part file. + checkSums, err := erasureHealFile(latestDisks, outDatedDisks, + bucket, pathJoin(object, partName), + minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName), + partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm) + if err != nil { + return err + } + for index, sum := range checkSums { + if outDatedDisks[index] == nil { + continue + } + checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum}) + } + } + + // xl.json should be written to all the healed disks. + for index, disk := range outDatedDisks { if disk == nil { - outDatedDisks[index] = disks[index] + continue } + partsMetadata[index] = latestMeta + partsMetadata[index].Erasure.Checksum = checkSumInfos[index] + } + + // Generate and write `xl.json` generated from other disks. + err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks)) + if err != nil { + return toObjectErr(err, bucket, object) } - return outDatedDisks -} -// Returns if the object should be healed. -func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool { - modTime := commonTime(listObjectModtimes(partsMetadata, errs)) - for index := range partsMetadata { - if errs[index] == errDiskNotFound { + // Rename from tmp location to the actual location. + for _, disk := range outDatedDisks { + if disk == nil { continue } - if errs[index] != nil { - return true + // Remove any lingering partial data from current namespace. + err = disk.DeleteFile(bucket, retainSlash(object)) + if err != nil && err != errFileNotFound { + return traceError(err) } - if modTime != partsMetadata[index].Stat.ModTime { - return true + // Attempt a rename now from healed data to final location. + err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object)) + if err != nil { + return traceError(err) } } - return false + return nil } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 61ac04041..2f7c7353c 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -218,207 +218,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return nil } -func (xl xlObjects) HealBucket(bucket string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return traceError(BucketNameInvalid{Bucket: bucket}) - } - - // Heal bucket - create buckets on disks where it does not exist. - - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.Lock(bucket, "", opsID) - defer nsMutex.Unlock(bucket, "", opsID) - - // Initialize sync waitgroup. - var wg = &sync.WaitGroup{} - - // Initialize list of errors. - var dErrs = make([]error, len(xl.storageDisks)) - - // Make a volume entry on all underlying storage disks. - for index, disk := range xl.storageDisks { - if disk == nil { - dErrs[index] = traceError(errDiskNotFound) - continue - } - wg.Add(1) - // Make a volume inside a go-routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - if _, err := disk.StatVol(bucket); err != nil { - if err != errVolumeNotFound { - dErrs[index] = traceError(err) - return - } - if err = disk.MakeVol(bucket); err != nil { - dErrs[index] = traceError(err) - } - } - }(index, disk) - } - - // Wait for all make vol to finish. - wg.Wait() - - // Do we have write quorum?. - if !isDiskQuorum(dErrs, xl.writeQuorum) { - // Purge successfully created buckets if we don't have writeQuorum. - xl.undoMakeBucket(bucket) - return toObjectErr(traceError(errXLWriteQuorum), bucket) - } - - // Verify we have any other errors which should be returned as failure. - if reducedErr := reduceErrs(dErrs, []error{ - errDiskNotFound, - errFaultyDisk, - errDiskAccessDenied, - }); reducedErr != nil { - return toObjectErr(reducedErr, bucket) - } - return nil -} - -// HealObject heals a given object for all its missing entries. -// FIXME: If an object object was deleted and one disk was down, and later the disk comes back -// up again, heal on the object should delete it. -func (xl xlObjects) HealObject(bucket, object string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return traceError(BucketNameInvalid{Bucket: bucket}) - } - - // Verify if object is valid. - if !IsValidObjectName(object) { - return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) - } - - // get a random ID for lock instrumentation. - opsID := getOpsID() - - // Lock the object before healing. - nsMutex.RLock(bucket, object, opsID) - defer nsMutex.RUnlock(bucket, object, opsID) - - partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) - if err := reduceErrs(errs, nil); err != nil { - return toObjectErr(err, bucket, object) - } - - if !xlShouldHeal(partsMetadata, errs) { - // There is nothing to heal. - return nil - } - - // List of disks having latest version of the object. - latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) - // List of disks having outdated version of the object or missing object. - outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs) - // Latest xlMetaV1 for reference. - latestMeta := pickValidXLMeta(partsMetadata, modTime) - - for index, disk := range outDatedDisks { - // Before healing outdated disks, we need to remove xl.json - // and part files from "bucket/object/" so that - // rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds. - if disk == nil { - // Not an outdated disk. - continue - } - if errs[index] != nil { - // If there was an error (most likely errFileNotFound) - continue - } - // Outdated object with the same name exists that needs to be deleted. - outDatedMeta := partsMetadata[index] - // Delete all the parts. - for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ { - err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name)) - if err != nil { - return traceError(err) - } - } - // Delete xl.json file. - err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile)) - if err != nil { - return traceError(err) - } - } - - // Reorder so that we have data disks first and parity disks next. - latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks) - outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks) - partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata) - - // We write at temporary location and then rename to fianal location. - tmpID := getUUID() - - // Checksum of the part files. checkSumInfos[index] will contain checksums - // of all the part files in the outDatedDisks[index] - checkSumInfos := make([][]checkSumInfo, len(outDatedDisks)) - - // Heal each part. erasureHealFile() will write the healed part to - // .minio/tmp/uuid/ which needs to be renamed later to the final location. - for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { - partName := latestMeta.Parts[partIndex].Name - partSize := latestMeta.Parts[partIndex].Size - erasure := latestMeta.Erasure - sumInfo, err := latestMeta.Erasure.GetCheckSumInfo(partName) - if err != nil { - return err - } - // Heal the part file. - checkSums, err := erasureHealFile(latestDisks, outDatedDisks, - bucket, pathJoin(object, partName), - minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName), - partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm) - if err != nil { - return err - } - for index, sum := range checkSums { - if outDatedDisks[index] == nil { - continue - } - checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum}) - } - } - - // xl.json should be written to all the healed disks. - for index, disk := range outDatedDisks { - if disk == nil { - continue - } - partsMetadata[index] = latestMeta - partsMetadata[index].Erasure.Checksum = checkSumInfos[index] - } - - // Generate and write `xl.json` generated from other disks. - err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks)) - if err != nil { - return toObjectErr(err, bucket, object) - } - - // Rename from tmp location to the actual location. - for _, disk := range outDatedDisks { - if disk == nil { - continue - } - // Remove any lingering partial data from current namespace. - err = disk.DeleteFile(bucket, retainSlash(object)) - if err != nil && err != errFileNotFound { - return traceError(err) - } - // Attempt a rename now from healed data to final location. - err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object)) - if err != nil { - return traceError(err) - } - } - return nil -} - // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { // Verify if bucket is valid.