Make sure to re-load reference format after HealFormat (#5772)

This PR introduces ReloadFormat API call at objectlayer
to facilitate this. Previously we repurposed HealFormat
but we never ended up updating our reference format on
peers.

Fixes #5700
master
Harshavardhana 7 years ago committed by Nitish Tiwari
parent ae8e863ff4
commit 1d31ad499f
  1. 10
      cmd/admin-heal-ops.go
  2. 3
      cmd/admin-rpc-client.go
  3. 3
      cmd/admin-rpc-server.go
  4. 6
      cmd/auth-rpc-client.go
  5. 5
      cmd/fs-v1.go
  6. 5
      cmd/gateway-unsupported.go
  7. 1
      cmd/object-api-interface.go
  8. 157
      cmd/xl-sets.go
  9. 5
      cmd/xl-v1-errors.go
  10. 4
      cmd/xl-v1-healing.go

@ -539,11 +539,17 @@ func (h *healSequence) healDiskFormat() error {
} }
res, err := objectAPI.HealFormat(h.ctx, h.settings.DryRun) res, err := objectAPI.HealFormat(h.ctx, h.settings.DryRun)
if err != nil { // return any error, ignore error returned when disks have
// already healed.
if err != nil && err != errNoHealRequired {
return errFnHealFromAPIErr(err) return errFnHealFromAPIErr(err)
} }
peersReInitFormat(globalAdminPeers, h.settings.DryRun) // Healing succeeded notify the peers to reload format and re-initialize disks.
// We will not notify peers only if healing succeeded.
if err == nil {
peersReInitFormat(globalAdminPeers, h.settings.DryRun)
}
// Push format heal result // Push format heal result
return h.pushHealResultItem(res) return h.pushHealResultItem(res)

@ -84,8 +84,7 @@ func (lc localAdminClient) ReInitFormat(dryRun bool) error {
if objectAPI == nil { if objectAPI == nil {
return errServerNotInitialized return errServerNotInitialized
} }
_, err := objectAPI.HealFormat(context.Background(), dryRun) return objectAPI.ReloadFormat(context.Background(), dryRun)
return err
} }
// ListLocks - Fetches lock information from local lock instrumentation. // ListLocks - Fetches lock information from local lock instrumentation.

@ -94,8 +94,7 @@ func (s *adminCmd) ReInitFormat(args *ReInitFormatArgs, reply *AuthRPCReply) err
if objectAPI == nil { if objectAPI == nil {
return errServerNotInitialized return errServerNotInitialized
} }
_, err := objectAPI.HealFormat(context.Background(), args.DryRun) return objectAPI.ReloadFormat(context.Background(), args.DryRun)
return err
} }
// ListLocks - lists locks held by requests handled by this server instance. // ListLocks - lists locks held by requests handled by this server instance.

@ -136,6 +136,9 @@ func (authClient *AuthRPCClient) Login() (err error) {
} }
if err = rpcClient.Call(loginMethod, &loginArgs, &LoginRPCReply{}); err != nil { if err = rpcClient.Call(loginMethod, &loginArgs, &LoginRPCReply{}); err != nil {
// Closing the connection here.
rpcClient.Close()
// gob doesn't provide any typed errors for us to reflect // gob doesn't provide any typed errors for us to reflect
// upon, this is the only way to return proper error. // upon, this is the only way to return proper error.
if strings.Contains(err.Error(), "gob: wrong type") { if strings.Contains(err.Error(), "gob: wrong type") {
@ -198,6 +201,9 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface {
// gob doesn't provide any typed errors for us to reflect // gob doesn't provide any typed errors for us to reflect
// upon, this is the only way to return proper error. // upon, this is the only way to return proper error.
if err != nil && strings.Contains(err.Error(), "gob: wrong type") { if err != nil && strings.Contains(err.Error(), "gob: wrong type") {
// Close the rpc client also when the servers have mismatching rpc versions.
authClient.Close()
err = errRPCAPIVersionUnsupported err = errRPCAPIVersionUnsupported
} }
break break

@ -1046,6 +1046,11 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de
return result, nil return result, nil
} }
// ReloadFormat - no-op for fs, Valid only for XL.
func (fs *FSObjects) ReloadFormat(ctx context.Context, dryRun bool) error {
return errors.Trace(NotImplemented{})
}
// HealFormat - no-op for fs, Valid only for XL. // HealFormat - no-op for fs, Valid only for XL.
func (fs *FSObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { func (fs *FSObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
logger.LogIf(ctx, NotImplemented{}) logger.LogIf(ctx, NotImplemented{})

@ -89,6 +89,11 @@ func (a GatewayUnsupported) DeleteBucketPolicy(ctx context.Context, bucket strin
return NotImplemented{} return NotImplemented{}
} }
// ReloadFormat - Not implemented stub.
func (a GatewayUnsupported) ReloadFormat(ctx context.Context, dryRun bool) error {
return NotImplemented{}
}
// HealFormat - Not implemented stub // HealFormat - Not implemented stub
func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
logger.LogIf(ctx, NotImplemented{}) logger.LogIf(ctx, NotImplemented{})

@ -58,6 +58,7 @@ type ObjectLayer interface {
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error)
// Healing operations. // Healing operations.
ReloadFormat(ctx context.Context, dryRun bool) error
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error) HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error)

@ -36,15 +36,27 @@ import (
"github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/sync/errgroup"
) )
// setsStorageAPI is encapsulated type for Close()
type setsStorageAPI [][]StorageAPI
func (s setsStorageAPI) Close() error {
for i := 0; i < len(s); i++ {
for _, disk := range s[i] {
if disk == nil {
continue
}
disk.Close()
}
}
return nil
}
// xlSets implements ObjectLayer combining a static list of erasure coded // xlSets implements ObjectLayer combining a static list of erasure coded
// object sets. NOTE: There is no dynamic scaling allowed or intended in // object sets. NOTE: There is no dynamic scaling allowed or intended in
// current design. // current design.
type xlSets struct { type xlSets struct {
sets []*xlObjects sets []*xlObjects
// Format mutex to lock format.
formatMu sync.RWMutex
// Reference format. // Reference format.
format *formatXLV3 format *formatXLV3
@ -52,7 +64,7 @@ type xlSets struct {
xlDisksMu sync.RWMutex xlDisksMu sync.RWMutex
// Re-ordered list of disks per set. // Re-ordered list of disks per set.
xlDisks [][]StorageAPI xlDisks setsStorageAPI
// List of endpoints provided on the command line. // List of endpoints provided on the command line.
endpoints EndpointList endpoints EndpointList
@ -138,6 +150,29 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) {
return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This) return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This)
} }
// Re initializes all disks based on the reference format, this function is
// only used by HealFormat and ReloadFormat calls.
func (s *xlSets) reInitDisks(refFormat *formatXLV3, storageDisks []StorageAPI, formats []*formatXLV3) [][]StorageAPI {
xlDisks := make([][]StorageAPI, s.setCount)
for i := 0; i < len(refFormat.XL.Sets); i++ {
xlDisks[i] = make([]StorageAPI, s.drivesPerSet)
}
for k := range storageDisks {
if storageDisks[k] == nil || formats[k] == nil {
continue
}
i, j, err := findDiskIndex(refFormat, formats[k])
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", storageDisks[i].String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
continue
}
xlDisks[i][j] = storageDisks[k]
}
return xlDisks
}
// connectDisks - attempt to connect all the endpoints, loads format // connectDisks - attempt to connect all the endpoints, loads format
// and re-arranges the disks in proper position. // and re-arranges the disks in proper position.
func (s *xlSets) connectDisks() { func (s *xlSets) connectDisks() {
@ -150,9 +185,7 @@ func (s *xlSets) connectDisks() {
printEndpointError(endpoint, err) printEndpointError(endpoint, err)
continue continue
} }
s.formatMu.RLock()
i, j, err := findDiskIndex(s.format, format) i, j, err := findDiskIndex(s.format, format)
s.formatMu.RUnlock()
if err != nil { if err != nil {
// Close the internal connection to avoid connection leaks. // Close the internal connection to avoid connection leaks.
disk.Close() disk.Close()
@ -168,11 +201,15 @@ func (s *xlSets) connectDisks() {
// monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected // monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected
// endpoints by reconnecting them and making sure to place them into right position in // endpoints by reconnecting them and making sure to place them into right position in
// the set topology, this monitoring happens at a given monitoring interval. // the set topology, this monitoring happens at a given monitoring interval.
func (s *xlSets) monitorAndConnectEndpoints(doneCh chan struct{}, monitorInterval time.Duration) { func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) {
ticker := time.NewTicker(monitorInterval) ticker := time.NewTicker(monitorInterval)
for { for {
select { select {
case <-doneCh: case <-globalServiceDoneCh:
// Stop the timer.
ticker.Stop()
return
case <-s.disksConnectDoneCh:
// Stop the timer. // Stop the timer.
ticker.Stop() ticker.Stop()
return return
@ -240,7 +277,7 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
} }
// Start the disk monitoring and connect routine. // Start the disk monitoring and connect routine.
go s.monitorAndConnectEndpoints(globalServiceDoneCh, defaultMonitorConnectEndpointInterval) go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
return s, nil return s, nil
} }
@ -917,12 +954,76 @@ func formatsToDrivesInfo(endpoints EndpointList, formats []*formatXLV3, sErrs []
return beforeDrives return beforeDrives
} }
// Reloads the format from the disk, usually called by a remote peer notifier while
// healing in a distributed setup.
func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
// Acquire lock on format.json
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile)
if err = formatLock.GetRLock(globalHealingTimeout); err != nil {
return err
}
defer formatLock.RUnlock()
storageDisks, err := initStorageDisks(s.endpoints)
if err != nil {
return err
}
defer func(storageDisks []StorageAPI) {
if err != nil {
closeStorageDisks(storageDisks)
}
}(storageDisks)
formats, sErrs := loadFormatXLAll(storageDisks)
if err = checkFormatXLValues(formats); err != nil {
return err
}
for index, sErr := range sErrs {
if sErr != nil {
// Look for acceptable heal errors, for any other
// errors we should simply quit and return.
if _, ok := formatHealErrors[sErr]; !ok {
return fmt.Errorf("Disk %s: %s", s.endpoints[index], sErr)
}
}
}
refFormat, err := getFormatXLInQuorum(formats)
if err != nil {
return err
}
// kill the monitoring loop such that we stop writing
// to indicate that we will re-initialize everything
// with new format.
s.disksConnectDoneCh <- struct{}{}
// Replace the new format.
s.format = refFormat
s.xlDisksMu.Lock()
{
// Close all existing disks.
s.xlDisks.Close()
// Re initialize disks, after saving the new reference format.
s.xlDisks = s.reInitDisks(refFormat, storageDisks, formats)
}
s.xlDisksMu.Unlock()
// Restart monitoring loop to monitor reformatted disks again.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
return nil
}
// HealFormat - heals missing `format.json` on freshly or corrupted // HealFormat - heals missing `format.json` on freshly or corrupted
// disks (missing format.json but does have erasure coded data in it). // disks (missing format.json but does have erasure coded data in it).
func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
// Acquire lock on format.json // Acquire lock on format.json
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile) formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile)
if err := formatLock.GetLock(globalHealingTimeout); err != nil { if err = formatLock.GetLock(globalHealingTimeout); err != nil {
return madmin.HealResultItem{}, err return madmin.HealResultItem{}, err
} }
defer formatLock.Unlock() defer formatLock.Unlock()
@ -931,7 +1032,12 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult
if err != nil { if err != nil {
return madmin.HealResultItem{}, err return madmin.HealResultItem{}, err
} }
defer closeStorageDisks(storageDisks)
defer func(storageDisks []StorageAPI) {
if err != nil {
closeStorageDisks(storageDisks)
}
}(storageDisks)
formats, sErrs := loadFormatXLAll(storageDisks) formats, sErrs := loadFormatXLAll(storageDisks)
if err = checkFormatXLValues(formats); err != nil { if err = checkFormatXLValues(formats); err != nil {
@ -939,7 +1045,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult
} }
// Prepare heal-result // Prepare heal-result
res := madmin.HealResultItem{ res = madmin.HealResultItem{
Type: madmin.HealItemMetadata, Type: madmin.HealItemMetadata,
Detail: "disk-format", Detail: "disk-format",
DiskCount: s.setCount * s.drivesPerSet, DiskCount: s.setCount * s.drivesPerSet,
@ -965,8 +1071,9 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult
} }
} }
// no errors found, no healing is required.
if !hasAnyErrors(sErrs) { if !hasAnyErrors(sErrs) {
return res, nil return res, errNoHealRequired
} }
for index, sErr := range sErrs { for index, sErr := range sErrs {
@ -1046,12 +1153,26 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult
return madmin.HealResultItem{}, err return madmin.HealResultItem{}, err
} }
s.formatMu.Lock() // kill the monitoring loop such that we stop writing
// to indicate that we will re-initialize everything
// with new format.
s.disksConnectDoneCh <- struct{}{}
// Replace with new reference format.
s.format = refFormat s.format = refFormat
s.formatMu.Unlock()
// Connect disks, after saving the new reference format. s.xlDisksMu.Lock()
s.connectDisks() {
// Disconnect/relinquish all existing disks.
s.xlDisks.Close()
// Re initialize disks, after saving the new reference format.
s.xlDisks = s.reInitDisks(refFormat, storageDisks, tmpNewFormats)
}
s.xlDisksMu.Unlock()
// Restart our monitoring loop to start monitoring newly formatted disks.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
} }
return res, nil return res, nil

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2016 Minio, Inc. * Minio Cloud Storage, (C) 2016, 2018 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -23,3 +23,6 @@ var errXLReadQuorum = errors.New("Read failed. Insufficient number of disks onli
// errXLWriteQuorum - did not meet write quorum. // errXLWriteQuorum - did not meet write quorum.
var errXLWriteQuorum = errors.New("Write failed. Insufficient number of disks online") var errXLWriteQuorum = errors.New("Write failed. Insufficient number of disks online")
// errNoHealRequired - returned when healing is attempted on a previously healed disks.
var errNoHealRequired = errors.New("No healing is required")

@ -27,6 +27,10 @@ import (
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
) )
func (xl xlObjects) ReloadFormat(ctx context.Context, dryRun bool) error {
return errors.Trace(NotImplemented{})
}
func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
logger.LogIf(ctx, NotImplemented{}) logger.LogIf(ctx, NotImplemented{})
return madmin.HealResultItem{}, NotImplemented{} return madmin.HealResultItem{}, NotImplemented{}

Loading…
Cancel
Save