From 62b1da3e2ce5a569c83096fd579fb4dcf9142660 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 10 Jun 2020 17:10:31 -0700 Subject: [PATCH] fix offline disk calculation (#9801) Current code was relying on globalEndpoints as the source of secondary truth to obtain the missing endpoints list when the disk is offline, this is problematic - there is no way to know if the getDisks() returned endpoints total is same as the ones list of globalEndpoints and it belongs to a particular set. - there is no order guarantee as getDisks() is ordered as per format.json, globalEndpoints may not be, so potentially end up including incorrect endpoints. To fix this bring getEndpoints() just like getDisks() to ensure that consistently ordered endpoints are always available for us to ensure that returned values are consistent with what each erasure set would observe. --- cmd/xl-sets.go | 81 +++++++++++++++++--------------------------- cmd/xl-v1-healing.go | 76 +++++++++++++++++++---------------------- cmd/xl-v1.go | 58 ++++++++++++------------------- 3 files changed, 87 insertions(+), 128 deletions(-) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index a08127057..eb1afa348 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -189,8 +189,8 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) { func (s *xlSets) connectDisks() { var wg sync.WaitGroup diskMap := s.getDiskMap() - for i, endpoint := range s.endpoints { - if isEndpointConnected(diskMap, s.endpointStrings[i]) { + for _, endpoint := range s.endpoints { + if isEndpointConnected(diskMap, endpoint.String()) { continue } wg.Add(1) @@ -214,6 +214,7 @@ func (s *xlSets) connectDisks() { s.xlDisks[setIndex][diskIndex].Close() } s.xlDisks[setIndex][diskIndex] = disk + s.endpointStrings[setIndex*s.drivesPerSet+diskIndex] = disk.String() s.xlDisksMu.Unlock() go func(setIndex int) { // Send a new disk connect event with a timeout @@ -251,6 +252,19 @@ func (s *xlSets) GetLockers(setIndex int) func() []dsync.NetLocker { } } +func (s *xlSets) GetEndpoints(setIndex int) func() []string { + return func() []string { + s.xlDisksMu.RLock() + defer s.xlDisksMu.RUnlock() + + eps := make([]string, s.drivesPerSet) + for i := 0; i < s.drivesPerSet; i++ { + eps[i] = s.endpointStrings[setIndex*s.drivesPerSet+i] + } + return eps + } +} + // GetDisks returns a closure for a given set, which provides list of disks per set. func (s *xlSets) GetDisks(setIndex int) func() []StorageAPI { return func() []StorageAPI { @@ -266,28 +280,20 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs // Initialize new set of erasure coded sets. func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatXLV3) (*xlSets, error) { - endpointStrings := make([]string, len(endpoints)) - for i, endpoint := range endpoints { - if endpoint.IsLocal { - endpointStrings[i] = endpoint.Path - } else { - endpointStrings[i] = endpoint.String() - } - } - setCount := len(format.XL.Sets) drivesPerSet := len(format.XL.Sets[0]) + endpointStrings := make([]string, len(endpoints)) // Initialize the XL sets instance. s := &xlSets{ sets: make([]*xlObjects, setCount), xlDisks: make([][]StorageAPI, setCount), xlLockers: make([][]dsync.NetLocker, setCount), - endpoints: endpoints, - endpointStrings: endpointStrings, setCount: setCount, drivesPerSet: drivesPerSet, format: format, + endpoints: endpoints, + endpointStrings: endpointStrings, disksConnectEvent: make(chan diskConnectInfo), disksConnectDoneCh: make(chan struct{}), distributionAlgo: format.XL.DistributionAlgo, @@ -309,8 +315,9 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA for i := 0; i < setCount; i++ { for j := 0; j < drivesPerSet; j++ { + endpoint := endpoints[i*drivesPerSet+j] // Rely on endpoints list to initialize, init lockers and available disks. - s.xlLockers[i][j] = newLockAPI(s.endpoints[i*drivesPerSet+j]) + s.xlLockers[i][j] = newLockAPI(endpoint) disk := storageDisks[i*drivesPerSet+j] if disk == nil { @@ -326,16 +333,18 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA disk.Close() continue } + s.endpointStrings[m*drivesPerSet+n] = disk.String() s.xlDisks[m][n] = disk } // Initialize xl objects for a given set. s.sets[i] = &xlObjects{ - getDisks: s.GetDisks(i), - getLockers: s.GetLockers(i), - nsMutex: mutex, - bp: bp, - mrfUploadCh: make(chan partialUpload, 10000), + getDisks: s.GetDisks(i), + getLockers: s.GetLockers(i), + getEndpoints: s.GetEndpoints(i), + nsMutex: mutex, + bp: bp, + mrfUploadCh: make(chan partialUpload, 10000), } go s.sets[i].cleanupStaleMultipartUploads(ctx, @@ -457,11 +466,12 @@ func (s *xlSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, []er for i, set := range s.sets { storageDisks := set.getDisks() + endpointStrings := set.getEndpoints() for j, storageErr := range storageInfoErrs[i] { if storageDisks[j] == OfflineDisk { storageInfo.Backend.Sets[i][j] = madmin.DriveInfo{ State: madmin.DriveStateOffline, - Endpoint: s.endpointStrings[i*s.drivesPerSet+j], + Endpoint: endpointStrings[j], } continue } @@ -1366,6 +1376,7 @@ func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) { s.xlDisks[m][n].Close() } + s.endpointStrings[m*s.drivesPerSet+n] = disk.String() s.xlDisks[m][n] = disk } s.xlDisksMu.Unlock() @@ -1578,6 +1589,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe s.xlDisks[m][n].Close() } + s.endpointStrings[m*s.drivesPerSet+n] = disk.String() s.xlDisks[m][n] = disk } s.xlDisksMu.Unlock() @@ -1609,35 +1621,6 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove b result.After.Drives = append(result.After.Drives, healResult.After.Drives...) } - for i := range s.endpoints { - var foundBefore bool - for _, v := range result.Before.Drives { - if s.endpointStrings[i] == v.Endpoint { - foundBefore = true - } - } - if !foundBefore { - result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: s.endpointStrings[i], - State: madmin.DriveStateOffline, - }) - } - var foundAfter bool - for _, v := range result.After.Drives { - if s.endpointStrings[i] == v.Endpoint { - foundAfter = true - } - } - if !foundAfter { - result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: s.endpointStrings[i], - State: madmin.DriveStateOffline, - }) - } - } - // Check if we had quorum to write, if not return an appropriate error. _, afterDriveOnline := result.GetOnlineCounts() if afterDriveOnline < ((s.setCount*s.drivesPerSet)/2)+1 { diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index ef3fb1373..20d9c57a3 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -47,16 +47,17 @@ func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remov } storageDisks := xl.getDisks() + storageEndpoints := xl.getEndpoints() // get write quorum for an object writeQuorum := getWriteQuorum(len(storageDisks)) // Heal bucket. - return healBucket(ctx, storageDisks, bucket, writeQuorum, dryRun) + return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, dryRun) } // Heal bucket - create buckets on disks where it does not exist. -func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, writeQuorum int, +func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []string, bucket string, writeQuorum int, dryRun bool) (res madmin.HealResultItem, err error) { // Initialize sync waitgroup. @@ -118,14 +119,11 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w } for i := range beforeState { - if storageDisks[i] != nil { - drive := storageDisks[i].String() - res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: drive, - State: beforeState[i], - }) - } + res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: storageEndpoints[i], + State: beforeState[i], + }) } // Initialize sync waitgroup. @@ -154,14 +152,11 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w } for i := range afterState { - if storageDisks[i] != nil { - drive := storageDisks[i].String() - res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: drive, - State: afterState[i], - }) - } + res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: storageEndpoints[i], + State: afterState[i], + }) } return res, nil @@ -231,6 +226,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string dataBlocks := latestXLMeta.Erasure.DataBlocks storageDisks := xl.getDisks() + storageEndpoints := xl.getEndpoints() // List of disks having latest version of the object xl.json // (by modtime). @@ -281,33 +277,29 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string driveState = madmin.DriveStateCorrupt } - var drive string - if storageDisks[i] != nil { - drive = storageDisks[i].String() - } if shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], modTime) { outDatedDisks[i] = storageDisks[i] disksToHealCount++ result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ UUID: "", - Endpoint: drive, + Endpoint: storageEndpoints[i], State: driveState, }) result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{ UUID: "", - Endpoint: drive, + Endpoint: storageEndpoints[i], State: driveState, }) continue } result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ UUID: "", - Endpoint: drive, + Endpoint: storageEndpoints[i], State: driveState, }) result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{ UUID: "", - Endpoint: drive, + Endpoint: storageEndpoints[i], State: driveState, }) } @@ -324,7 +316,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string if !dryRun && remove { err = xl.deleteObject(ctx, bucket, object, writeQuorum, false) } - return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), err + return defaultHealResult(latestXLMeta, storageDisks, storageEndpoints, errs, bucket, object), err } return result, toObjectErr(errXLReadQuorum, bucket, object) } @@ -490,6 +482,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string // is needed since we do not have a special backend format for directories. func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) { storageDisks := xl.getDisks() + storageEndpoints := xl.getEndpoints() // Initialize heal result object hr = madmin.HealResultItem{ @@ -515,10 +508,7 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr // Prepare object creation in all disks for i, err := range errs { - var drive string - if storageDisks[i] != nil { - drive = storageDisks[i].String() - } + drive := storageEndpoints[i] switch err { case nil: hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk} @@ -558,7 +548,7 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr // Populates default heal result item entries with possible values when we are returning prematurely. // This is to ensure that in any circumstance we are not returning empty arrays with wrong values. -func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []error, bucket, object string) madmin.HealResultItem { +func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, storageEndpoints []string, errs []error, bucket, object string) madmin.HealResultItem { // Initialize heal result object result := madmin.HealResultItem{ Type: madmin.HealItemObject, @@ -577,16 +567,17 @@ func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs [] for index, disk := range storageDisks { if disk == nil { result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ - UUID: "", - State: madmin.DriveStateOffline, + UUID: "", + Endpoint: storageEndpoints[index], + State: madmin.DriveStateOffline, }) result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{ - UUID: "", - State: madmin.DriveStateOffline, + UUID: "", + Endpoint: storageEndpoints[index], + State: madmin.DriveStateOffline, }) continue } - drive := disk.String() driveState := madmin.DriveStateCorrupt switch errs[index] { case errFileNotFound, errVolumeNotFound: @@ -594,12 +585,12 @@ func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs [] } result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ UUID: "", - Endpoint: drive, + Endpoint: storageEndpoints[index], State: driveState, }) result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{ UUID: "", - Endpoint: drive, + Endpoint: storageEndpoints[index], State: driveState, }) } @@ -725,6 +716,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts } storageDisks := xl.getDisks() + storageEndpoints := xl.getEndpoints() // Read metadata files from all the disks partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object) @@ -740,12 +732,12 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts xl.deleteObject(healCtx, bucket, object, writeQuorum, false) } err = reduceReadQuorumErrs(ctx, errs, nil, writeQuorum-1) - return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object) + return defaultHealResult(xlMetaV1{}, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object) } latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs) if err != nil { - return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object) + return defaultHealResult(xlMetaV1{}, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object) } errCount := 0 @@ -769,7 +761,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts xl.deleteObject(ctx, bucket, object, writeQuorum, false) } } - return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object) + return defaultHealResult(latestXLMeta, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object) } } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index c128faa0c..02dca619a 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -59,6 +59,10 @@ type xlObjects struct { // getLockers returns list of remote and local lockers. getLockers func() []dsync.NetLocker + // getEndpoints returns list of endpoint strings belonging this set. + // some may be local and some remote. + getEndpoints func() []string + // Locker mutex map. nsMutex *nsLockMap @@ -90,21 +94,17 @@ func (d byDiskTotal) Less(i, j int) bool { } // getDisksInfo - fetch disks info across all other storage API. -func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) { +func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) { disksInfo = make([]DiskInfo, len(disks)) onlineDisks = make(madmin.BackendDisks) offlineDisks = make(madmin.BackendDisks) - for _, disk := range disks { - if disk == OfflineDisk { - continue - } - peerAddr := disk.Hostname() - if _, ok := offlineDisks[peerAddr]; !ok { - offlineDisks[peerAddr] = 0 + for _, ep := range endpoints { + if _, ok := offlineDisks[ep]; !ok { + offlineDisks[ep] = 0 } - if _, ok := onlineDisks[peerAddr]; !ok { - onlineDisks[peerAddr] = 0 + if _, ok := onlineDisks[ep]; !ok { + onlineDisks[ep] = 0 } } @@ -136,32 +136,12 @@ func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs [] if disks[i] == OfflineDisk { continue } + ep := endpoints[i] if diskInfoErr != nil { - offlineDisks[disks[i].Hostname()]++ + offlineDisks[ep]++ continue } - onlineDisks[disks[i].Hostname()]++ - } - - // Iterate over the passed endpoints arguments and check - // if there are still disks missing from the offline/online lists - // and update them accordingly. - missingOfflineDisks := make(map[string]int) - for _, zone := range globalEndpoints { - for _, endpoint := range zone.Endpoints { - // if local is set and endpoint is not local - // we are not interested in remote disks. - if local && !endpoint.IsLocal { - continue - } - if _, ok := offlineDisks[endpoint.Host]; !ok { - missingOfflineDisks[endpoint.Host]++ - } - } - } - for missingDisk, n := range missingOfflineDisks { - onlineDisks[missingDisk] = 0 - offlineDisks[missingDisk] = n + onlineDisks[ep]++ } // Success. @@ -169,8 +149,8 @@ func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs [] } // Get an aggregated storage info across all disks. -func getStorageInfo(disks []StorageAPI, local bool) (StorageInfo, []error) { - disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, local) +func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []error) { + disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, endpoints) // Sort so that the first element is the smallest. sort.Sort(byDiskTotal(disksInfo)) @@ -206,19 +186,23 @@ func getStorageInfo(disks []StorageAPI, local bool) (StorageInfo, []error) { func (xl xlObjects) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) { disks := xl.getDisks() + endpoints := xl.getEndpoints() if local { var localDisks []StorageAPI - for _, disk := range disks { + var localEndpoints []string + for i, disk := range disks { if disk != nil { if disk.IsLocal() { // Append this local disk since local flag is true localDisks = append(localDisks, disk) + localEndpoints = append(localEndpoints, endpoints[i]) } } } disks = localDisks + endpoints = localEndpoints } - return getStorageInfo(disks, local) + return getStorageInfo(disks, endpoints) } // GetMetrics - is not implemented and shouldn't be called.