heal buckets during init and make sure to wait on quorum (#9526)

heal buckets properly during expansion, and make sure
to wait for the quorum properly such that healing can
be retried.
master
Harshavardhana 5 years ago committed by GitHub
parent a2ccba69e5
commit 4c9de098b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      buildscripts/verify-build.sh
  2. 75
      cmd/server-main.go
  3. 17
      cmd/xl-sets.go
  4. 19
      cmd/xl-v1-healing.go
  5. 11
      cmd/xl-zones.go

@ -154,34 +154,6 @@ function run_test_erasure_sets() {
return "$rv" return "$rv"
} }
function run_test_dist_erasure_sets_ipv6()
{
minio_pids=( $(start_minio_dist_erasure_sets_ipv6) )
export SERVER_ENDPOINT="[::1]:9000"
(cd "$WORK_DIR" && "$FUNCTIONAL_TESTS")
rv=$?
for pid in "${minio_pids[@]}"; do
kill "$pid"
done
sleep 3
if [ "$rv" -ne 0 ]; then
for i in $(seq 0 9); do
echo "server$i log:"
cat "$WORK_DIR/dist-minio-v6-900$i.log"
done
fi
for i in $(seq 0 9); do
rm -f "$WORK_DIR/dist-minio-v6-900$i.log"
done
return "$rv"
}
function run_test_zone_erasure_sets() function run_test_zone_erasure_sets()
{ {
minio_pids=( $(start_minio_zone_erasure_sets) ) minio_pids=( $(start_minio_zone_erasure_sets) )

@ -208,20 +208,24 @@ func initSafeMode() (err error) {
// version is needed, migration is needed etc. // version is needed, migration is needed etc.
rquorum := InsufficientReadQuorum{} rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{} wquorum := InsufficientWriteQuorum{}
optimeout := OperationTimedOut{} for range newRetryTimerSimple(retryCtx) {
for n := range newRetryTimerSimple(retryCtx) {
// let one of the server acquire the lock, if not let them timeout. // let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop. // which shall be retried again by this loop.
if err = txnLk.GetLock(leaderLockTimeout); err == nil { if err = txnLk.GetLock(leaderLockTimeout); err != nil {
// Migrate all backend configs to encrypted backend configs, optionally logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
// handles rotating keys for encryption, if there is any retriable failure continue
// that shall be retried if there is an error. }
if err = handleEncryptedConfigBackend(newObject, true); err == nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. lock acquired")
// Upon success migrating the config, initialize all sub-systems
// if all sub-systems initialized successfully return right away // Migrate all backend configs to encrypted backend configs, optionally
if err = initAllSubsystems(newObject); err == nil { // handles rotating keys for encryption, if there is any retriable failure
return nil // that shall be retried if there is an error.
} if err = handleEncryptedConfigBackend(newObject, true); err == nil {
// Upon success migrating the config, initialize all sub-systems
// if all sub-systems initialized successfully return right away
if err = initAllSubsystems(newObject); err == nil {
// All successful return.
return nil
} }
} }
@ -230,15 +234,11 @@ func initSafeMode() (err error) {
errors.Is(err, errConfigNotFound) || errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.Canceled) || errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, &optimeout) ||
errors.As(err, &rquorum) || errors.As(err, &rquorum) ||
errors.As(err, &wquorum) || errors.As(err, &wquorum) ||
isErrBucketNotFound(err) { isErrBucketNotFound(err) {
if n < 5 { logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
logger.Info("Waiting for all MinIO sub-systems to be initialized..") txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible.
} else {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
}
continue continue
} }
@ -256,10 +256,41 @@ func initSafeMode() (err error) {
} }
func initAllSubsystems(newObject ObjectLayer) (err error) { func initAllSubsystems(newObject ObjectLayer) (err error) {
// List buckets to be re-used for loading configs. // %w is used by all error returns here to make sure
buckets, err := newObject.ListBuckets(GlobalContext) // we wrap the underlying error, make sure when you
if err != nil { // are modifying this code that you do so, if and when
return fmt.Errorf("Unable to list buckets: %w", err) // you want to add extra context to your error. This
// ensures top level retry works accordingly.
var buckets []BucketInfo
if globalIsDistXL || globalIsXL {
// List buckets to heal, and be re-used for loading configs.
buckets, err = newObject.ListBucketsHeal(GlobalContext)
if err != nil {
return fmt.Errorf("Unable to list buckets to heal: %w", err)
}
// Attempt a heal if possible and re-use the bucket names
// to reload their config.
wquorum := &InsufficientWriteQuorum{}
rquorum := &InsufficientReadQuorum{}
for _, bucket := range buckets {
if err = newObject.MakeBucketWithLocation(GlobalContext, bucket.Name, ""); err != nil {
if errors.As(err, &wquorum) || errors.As(err, &rquorum) {
// Retrun the error upwards for the caller to retry.
return fmt.Errorf("Unable to heal bucket: %w", err)
}
if _, ok := err.(BucketExists); !ok {
// ignore any other error and log for investigation.
logger.LogIf(GlobalContext, err)
continue
}
// Bucket already exists, nothing that needs to be done.
}
}
} else {
buckets, err = newObject.ListBuckets(GlobalContext)
if err != nil {
return fmt.Errorf("Unable to list buckets: %w", err)
}
} }
// Initialize config system. // Initialize config system.

@ -22,6 +22,7 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"net/http" "net/http"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -1689,20 +1690,18 @@ func (s *xlSets) HealObject(ctx context.Context, bucket, object string, opts mad
// Lists all buckets which need healing. // Lists all buckets which need healing.
func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
listBuckets := []BucketInfo{} var listBuckets []BucketInfo
var healBuckets = map[string]BucketInfo{} var healBuckets = make(map[string]VolInfo)
for _, set := range s.sets { for _, set := range s.sets {
buckets, _, err := listAllBuckets(set.getDisks()) // lists all unique buckets across drives.
if err != nil { if err := listAllBuckets(set.getDisks(), healBuckets); err != nil {
return nil, err return nil, err
} }
for _, currBucket := range buckets {
healBuckets[currBucket.Name] = BucketInfo(currBucket)
}
} }
for _, bucketInfo := range healBuckets { for _, v := range healBuckets {
listBuckets = append(listBuckets, bucketInfo) listBuckets = append(listBuckets, BucketInfo(v))
} }
sort.Sort(byBucketName(listBuckets))
return listBuckets, nil return listBuckets, nil
} }

@ -173,11 +173,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
// listAllBuckets lists all buckets from all disks. It also // listAllBuckets lists all buckets from all disks. It also
// returns the occurrence of each buckets in all disks // returns the occurrence of each buckets in all disks
func listAllBuckets(storageDisks []StorageAPI) (buckets map[string]VolInfo, func listAllBuckets(storageDisks []StorageAPI, healBuckets map[string]VolInfo) (err error) {
bucketsOcc map[string]int, err error) {
buckets = make(map[string]VolInfo)
bucketsOcc = make(map[string]int)
for _, disk := range storageDisks { for _, disk := range storageDisks {
if disk == nil { if disk == nil {
continue continue
@ -188,7 +184,7 @@ func listAllBuckets(storageDisks []StorageAPI) (buckets map[string]VolInfo,
if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) { if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
continue continue
} }
return nil, nil, err return err
} }
for _, volInfo := range volsInfo { for _, volInfo := range volsInfo {
// StorageAPI can send volume names which are // StorageAPI can send volume names which are
@ -197,13 +193,14 @@ func listAllBuckets(storageDisks []StorageAPI) (buckets map[string]VolInfo,
if isReservedOrInvalidBucket(volInfo.Name, false) { if isReservedOrInvalidBucket(volInfo.Name, false) {
continue continue
} }
// Increase counter per bucket name // always save unique buckets across drives.
bucketsOcc[volInfo.Name]++ if _, ok := healBuckets[volInfo.Name]; !ok {
// Save volume info under bucket name healBuckets[volInfo.Name] = volInfo
buckets[volInfo.Name] = volInfo }
} }
} }
return buckets, bucketsOcc, nil return nil
} }
// Only heal on disks where we are sure that healing is needed. We can expand // Only heal on disks where we are sure that healing is needed. We can expand

@ -44,16 +44,6 @@ func (z *xlZones) SingleZone() bool {
return len(z.zones) == 1 return len(z.zones) == 1
} }
func (z *xlZones) quickHealBuckets(ctx context.Context) {
bucketsInfo, err := z.ListBucketsHeal(ctx)
if err != nil {
return
}
for _, bucket := range bucketsInfo {
z.MakeBucketWithLocation(ctx, bucket.Name, "")
}
}
// Initialize new zone of erasure sets. // Initialize new zone of erasure sets.
func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, error) { func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, error) {
var ( var (
@ -88,7 +78,6 @@ func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer,
} }
} }
z.quickHealBuckets(ctx)
go intDataUpdateTracker.start(GlobalContext, localDrives...) go intDataUpdateTracker.start(GlobalContext, localDrives...)
return z, nil return z, nil
} }

Loading…
Cancel
Save