diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index aecebfa78..c95f8501a 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -114,26 +114,50 @@ func (xl XL) MakeVol(volume string) error { if !isValidVolname(volume) { return errInvalidArgument } - // Collect if all disks report volume exists. - var volumeExistsMap = make(map[int]struct{}) + + // Verify if the volume already exists. + _, errs := xl.getAllVolumeInfo(volume) + + // Count errors other than errVolumeNotFound, bigger than the allowed + // readQuorum, if yes throw an error. + errCount := 0 + for _, err := range errs { + if err != nil && err != errVolumeNotFound { + errCount++ + if errCount > xl.readQuorum { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Errorf("%s", err) + return err + } + } + } + + createVolErr := 0 + volumeExistsErrCnt := 0 // Make a volume entry on all underlying storage disks. - for index, disk := range xl.storageDisks { + for _, disk := range xl.storageDisks { if err := disk.MakeVol(volume); err != nil { log.WithFields(logrus.Fields{ "volume": volume, }).Errorf("MakeVol failed with %s", err) - // We ignore error if errVolumeExists and creating a volume again. + // if volume already exists, count them. if err == errVolumeExists { - volumeExistsMap[index] = struct{}{} + volumeExistsErrCnt++ + // Return err if all disks report volume exists. + if volumeExistsErrCnt == len(xl.storageDisks) { + return errVolumeExists + } continue } - return err + // Update error counter separately. + createVolErr++ + if createVolErr <= len(xl.storageDisks)-xl.writeQuorum { + continue + } + return errWriteQuorum } } - // Return err if all disks report volume exists. - if len(volumeExistsMap) == len(xl.storageDisks) { - return errVolumeExists - } return nil } @@ -144,24 +168,24 @@ func (xl XL) DeleteVol(volume string) error { } // Collect if all disks report volume not found. - var volumeNotFoundMap = make(map[int]struct{}) + var volumeNotFoundErrCnt int // Remove a volume entry on all underlying storage disks. - for index, disk := range xl.storageDisks { + for _, disk := range xl.storageDisks { if err := disk.DeleteVol(volume); err != nil { log.WithFields(logrus.Fields{ "volume": volume, }).Errorf("DeleteVol failed with %s", err) // We ignore error if errVolumeNotFound. if err == errVolumeNotFound { - volumeNotFoundMap[index] = struct{}{} + volumeNotFoundErrCnt++ continue } return err } } // Return err if all disks report volume not found. - if len(volumeNotFoundMap) == len(xl.storageDisks) { + if volumeNotFoundErrCnt == len(xl.storageDisks) { return errVolumeNotFound } return nil @@ -218,48 +242,136 @@ func (xl XL) ListVols() (volsInfo []VolInfo, err error) { return volsInfo, nil } +// getAllVolumeInfo - get bucket volume info from all disks. +// Returns error slice indicating the failed volume stat operations. +func (xl XL) getAllVolumeInfo(volume string) (volsInfo []VolInfo, errs []error) { + errs = make([]error, len(xl.storageDisks)) + volsInfo = make([]VolInfo, len(xl.storageDisks)) + for index, disk := range xl.storageDisks { + volInfo, err := disk.StatVol(volume) + if err != nil { + errs[index] = err + continue + } + volsInfo[index] = volInfo + } + return volsInfo, errs +} + +// listAllVolumeInfo - list all stat volume info from all disks. +// Returns +// - stat volume info for all online disks. +// - boolean to indicate if healing is necessary. +// - error if any. +func (xl XL) listAllVolumeInfo(volume string) ([]VolInfo, bool, error) { + volsInfo, errs := xl.getAllVolumeInfo(volume) + notFoundCount := 0 + for _, err := range errs { + if err == errVolumeNotFound { + notFoundCount++ + // If we have errors with file not found greater than allowed read + // quorum we return err as errFileNotFound. + if notFoundCount > len(xl.storageDisks)-xl.readQuorum { + return nil, false, errVolumeNotFound + } + } + } + + // Calculate online disk count. + onlineDiskCount := 0 + for index := range errs { + if errs[index] == nil { + onlineDiskCount++ + } + } + + var heal bool + // If online disks count is lesser than configured disks, most + // probably we need to heal the file, additionally verify if the + // count is lesser than readQuorum, if not we throw an error. + if onlineDiskCount < len(xl.storageDisks) { + // Online disks lesser than total storage disks, needs to be + // healed. unless we do not have readQuorum. + heal = true + // Verify if online disks count are lesser than readQuorum + // threshold, return an error if yes. + if onlineDiskCount < xl.readQuorum { + log.WithFields(logrus.Fields{ + "volume": volume, + "onlineDiskCount": onlineDiskCount, + "readQuorumCount": xl.readQuorum, + }).Errorf("%s", errReadQuorum) + return nil, false, errReadQuorum + } + } + + // Return success. + return volsInfo, heal, nil +} + +// healVolume - heals any missing volumes. +func (xl XL) healVolume(volume string) error { + // Lists volume info for all online disks. + volsInfo, heal, err := xl.listAllVolumeInfo(volume) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Errorf("List online disks failed with %s", err) + return err + } + if !heal { + return nil + } + // Create volume if missing on online disks. + for index, volInfo := range volsInfo { + if volInfo.Name != "" { + continue + } + // Volinfo name would be an empty string, create it. + if err = xl.storageDisks[index].MakeVol(volume); err != nil { + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Errorf("MakeVol failed with error %s", err) + } + continue + } + } + return nil +} + // StatVol - get volume stat info. func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { if !isValidVolname(volume) { return VolInfo{}, errInvalidArgument } - var statVols []VolInfo - volumeNotFoundErrCnt := 0 - for _, disk := range xl.storageDisks { - volInfo, err = disk.StatVol(volume) - if err == nil { - // Collect all the successful attempts to verify quorum - // subsequently. - statVols = append(statVols, volInfo) - } else if err == errVolumeNotFound { - // Count total amount of volume not found errors. - volumeNotFoundErrCnt++ - } else if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - }).Errorf("StatVol failed with %s", err) - return VolInfo{}, err - } + volsInfo, heal, err := xl.listAllVolumeInfo(volume) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Errorf("listOnlineVolsInfo failed with %s", err) + return VolInfo{}, err } - // If volume not found err count is same as total storage disks, we - // really don't have the bucket, report a valid error. - if volumeNotFoundErrCnt == len(xl.storageDisks) { - return VolInfo{}, errVolumeNotFound - } else if len(statVols) < xl.readQuorum { - // If one of the disks have bucket we need to validate if we - // have read quorum, if not fail. - return VolInfo{}, errReadQuorum + if heal { + go func() { + if err = xl.healVolume(volume); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Errorf("healVolume failed with %s", err) + return + } + }() } // Loop through all statVols, calculate the actual usage values. var total, free int64 - for _, statVolInfo := range statVols { - free += statVolInfo.Free - total += statVolInfo.Total + for _, volInfo := range volsInfo { + free += volInfo.Free + total += volInfo.Total } - // Filter statVols and update the volInfo. - volInfo = removeDuplicateVols(statVols)[0] + // Filter volsInfo and update the volInfo. + volInfo = removeDuplicateVols(volsInfo)[0] volInfo.Free = free volInfo.Total = total return volInfo, nil