After healing re-load disks with the new format (#5718)

This PR also fixes correct calculation of drive states
before and after healing of objects.

Fixes #5700
Fixes #5708
master
Harshavardhana 7 years ago committed by Nitish Tiwari
parent 76d1e8bbcd
commit 6e9c853312
  1. 49
      cmd/xl-sets.go
  2. 21
      cmd/xl-v1-healing.go
  3. 6
      cmd/xl-v1.go

@ -131,6 +131,31 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) {
return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This) return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This)
} }
// connectDisks - attempt to connect all the endpoints, loads format
// and re-arranges the disks in proper position.
func (s *xlSets) connectDisks() {
for _, endpoint := range s.endpoints {
if s.isConnected(endpoint) {
continue
}
disk, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err)
continue
}
s.formatMu.RLock()
i, j, err := findDiskIndex(s.format, format)
s.formatMu.RUnlock()
if err != nil {
printEndpointError(endpoint, err)
continue
}
s.xlDisksMu.Lock()
s.xlDisks[i][j] = disk
s.xlDisksMu.Unlock()
}
}
// monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected // monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected
// endpoints by reconnecting them and making sure to place them into right position in // endpoints by reconnecting them and making sure to place them into right position in
// the set topology, this monitoring happens at a given monitoring interval. // the set topology, this monitoring happens at a given monitoring interval.
@ -143,26 +168,7 @@ func (s *xlSets) monitorAndConnectEndpoints(doneCh chan struct{}, monitorInterva
ticker.Stop() ticker.Stop()
return return
case <-ticker.C: case <-ticker.C:
for _, endpoint := range s.endpoints { s.connectDisks()
if s.isConnected(endpoint) {
continue
}
disk, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err)
continue
}
s.formatMu.RLock()
i, j, err := findDiskIndex(s.format, format)
s.formatMu.RUnlock()
if err != nil {
printEndpointError(endpoint, err)
continue
}
s.xlDisksMu.Lock()
s.xlDisks[i][j] = disk
s.xlDisksMu.Unlock()
}
} }
} }
} }
@ -1036,6 +1042,9 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult
s.formatMu.Lock() s.formatMu.Lock()
s.format = refFormat s.format = refFormat
s.formatMu.Unlock() s.formatMu.Unlock()
// Connect disks, after saving the new reference format.
s.connectDisks()
} }
return res, nil return res, nil

@ -338,20 +338,24 @@ func healObject(storageDisks []StorageAPI, bucket string, object string,
outDatedDisks[i] = storageDisks[i] outDatedDisks[i] = storageDisks[i]
disksToHealCount++ disksToHealCount++
} }
var drive string
if v == nil { if v == nil {
if errs[i] != errDiskNotFound {
drive = outDatedDisks[i].String()
}
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "", UUID: "",
Endpoint: "", Endpoint: drive,
State: driveState, State: driveState,
}) })
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{ result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "", UUID: "",
Endpoint: "", Endpoint: drive,
State: driveState, State: driveState,
}) })
continue continue
} }
drive := v.String() drive = v.String()
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "", UUID: "",
Endpoint: drive, Endpoint: drive,
@ -492,7 +496,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string,
} }
// Rename from tmp location to the actual location. // Rename from tmp location to the actual location.
for diskIndex, disk := range outDatedDisks { for _, disk := range outDatedDisks {
if disk == nil { if disk == nil {
continue continue
} }
@ -504,12 +508,9 @@ func healObject(storageDisks []StorageAPI, bucket string, object string,
return result, toObjectErr(errors.Trace(aErr), bucket, object) return result, toObjectErr(errors.Trace(aErr), bucket, object)
} }
realDiskIdx := unshuffleIndex(diskIndex, latestMeta.Erasure.Distribution) for i, v := range result.Before.Drives {
if outDatedDisks[realDiskIdx] != nil { if v.Endpoint == disk.String() {
for i, v := range result.After.Drives { result.After.Drives[i].State = madmin.DriveStateOk
if v.Endpoint == outDatedDisks[realDiskIdx].String() {
result.After.Drives[i].State = madmin.DriveStateOk
}
} }
} }
} }

@ -197,6 +197,12 @@ func getStorageInfo(disks []StorageAPI) StorageInfo {
// This is the number of drives we report free and total space for // This is the number of drives we report free and total space for
availableDataDisks := uint64(onlineDisks - sscParity) availableDataDisks := uint64(onlineDisks - sscParity)
// Available data disks can be zero when onlineDisks is equal to parity,
// at that point we simply choose online disks to calculate the size.
if availableDataDisks == 0 {
availableDataDisks = uint64(onlineDisks)
}
// Return calculated storage info, choose the lowest Total and // Return calculated storage info, choose the lowest Total and
// Free as the total aggregated values. Total capacity is always // Free as the total aggregated values. Total capacity is always
// the multiple of smallest disk among the disk list. // the multiple of smallest disk among the disk list.

Loading…
Cancel
Save