From 7abadfccc2abf1e60d8c47a94ea8d20240b38b0b Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Sun, 9 Jun 2019 06:14:07 +0100 Subject: [PATCH] Add self-healing feature (#7604) - Background Heal routine receives heal requests from a channel, either to heal format, buckets or objects - Daily sweeper lists all objects in all buckets, these objects don't necessarly have read quorum so they can be removed if these objects are unhealable - Heal daily ops receives objects from the daily sweeper and send them to the heal routine. --- cmd/admin-handlers_test.go | 4 +- cmd/admin-heal-ops.go | 216 ++++++++++++++++++--------------- cmd/background-heal-ops.go | 160 ++++++++++++++++++++++++ cmd/daily-heal-ops.go | 89 ++++++++++++++ cmd/daily-sweeper.go | 145 ++++++++++++++++++++++ cmd/disk-cache.go | 4 +- cmd/fs-v1.go | 6 + cmd/gateway-unsupported.go | 5 + cmd/globals.go | 5 + cmd/object-api-common.go | 4 +- cmd/object-api-interface.go | 4 +- cmd/server-main.go | 13 +- cmd/test-utils_test.go | 3 + cmd/tree-walk-pool.go | 1 + cmd/xl-sets.go | 28 ++++- cmd/xl-v1-list-objects-heal.go | 5 + cmd/xl-v1-list-objects.go | 4 +- 17 files changed, 585 insertions(+), 111 deletions(-) create mode 100644 cmd/background-heal-ops.go create mode 100644 cmd/daily-heal-ops.go create mode 100644 cmd/daily-sweeper.go diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 6f00e900c..a594769df 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -268,7 +268,9 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) { initNSLock(isDistXL) // Init global heal state - initAllHealState(globalIsXL) + if globalIsXL { + globalAllHealState = initHealState() + } globalConfigSys = NewConfigSys() diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 9dff798a7..9e869fb1a 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -96,22 +96,15 @@ type allHealState struct { healSeqMap map[string]*healSequence } -var ( - // global server heal state - globalAllHealState allHealState -) - -// initAllHealState - initialize healing apparatus -func initAllHealState(isErasureMode bool) { - if !isErasureMode { - return - } - - globalAllHealState = allHealState{ +// initHealState - initialize healing apparatus +func initHealState() *allHealState { + healState := &allHealState{ healSeqMap: make(map[string]*healSequence), } - go globalAllHealState.periodicHealSeqsClean() + go healState.periodicHealSeqsClean() + + return healState } func (ahs *allHealState) periodicHealSeqsClean() { @@ -305,6 +298,14 @@ type healSequence struct { // path is just pathJoin(bucket, objPrefix) path string + // List of entities (format, buckets, objects) to heal + sourceCh chan string + + // Report healing progress, false if this is a background + // healing since currently there is no entity which will + // receive realtime healing status + reportProgress bool + // time at which heal sequence was started startTime time.Time @@ -348,14 +349,15 @@ func newHealSequence(bucket, objPrefix, clientAddr string, ctx := logger.SetReqInfo(context.Background(), reqInfo) return &healSequence{ - bucket: bucket, - objPrefix: objPrefix, - path: pathJoin(bucket, objPrefix), - startTime: UTCNow(), - clientToken: mustGetUUID(), - clientAddress: clientAddr, - forceStarted: forceStart, - settings: hs, + bucket: bucket, + objPrefix: objPrefix, + path: pathJoin(bucket, objPrefix), + reportProgress: true, + startTime: UTCNow(), + clientToken: mustGetUUID(), + clientAddress: clientAddr, + forceStarted: forceStart, + settings: hs, currentStatus: healSequenceStatus{ Summary: healNotStartedStatus, HealSettings: hs, @@ -484,7 +486,11 @@ func (h *healSequence) healSequenceStart() { h.currentStatus.StartTime = UTCNow() h.currentStatus.updateLock.Unlock() - go h.traverseAndHeal() + if h.sourceCh == nil { + go h.traverseAndHeal() + } else { + go h.healFromSourceCh() + } select { case err, ok := <-h.traverseAndHealDoneCh: @@ -519,39 +525,101 @@ func (h *healSequence) healSequenceStart() { } } -// traverseAndHeal - traverses on-disk data and performs healing -// according to settings. At each "safe" point it also checks if an -// external quit signal has been received and quits if so. Since the -// healing traversal may be mutating on-disk data when an external -// quit signal is received, this routine cannot quit immediately and -// has to wait until a safe point is reached, such as between scanning -// two objects. -func (h *healSequence) traverseAndHeal() { - var err error - checkErr := func(f func() error) { +func (h *healSequence) queueHealTask(path string, healType madmin.HealItemType) error { + var respCh = make(chan healResult) + defer close(respCh) + // Send heal request + globalBackgroundHealing.queueHealTask(healTask{path: path, responseCh: respCh, opts: h.settings}) + // Wait for answer and push result to the client + res := <-respCh + if !h.reportProgress { + return nil + } + res.result.Type = healType + if res.err != nil { + // Object might have been deleted, by the time heal + // was attempted, we should ignore this object and return success. + if isErrObjectNotFound(res.err) { + return nil + } + // Only report object error + if healType != madmin.HealItemObject { + return res.err + } + res.result.Detail = res.err.Error() + } + return h.pushHealResultItem(res.result) +} + +func (h *healSequence) healItemsFromSourceCh() error { + // Start healing the config prefix. + if err := h.healMinioSysMeta(minioConfigPrefix)(); err != nil { + return err + } + + // Start healing the bucket config prefix. + if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil { + return err + } + + for path := range h.sourceCh { + var itemType madmin.HealItemType switch { - case err != nil: - return - case h.isQuitting(): - err = errHealStopSignalled - return + case path == "/": + itemType = madmin.HealItemMetadata + case !strings.Contains(path, "/"): + itemType = madmin.HealItemBucket + default: + itemType = madmin.HealItemObject + } + + if err := h.queueHealTask(path, itemType); err != nil { + return err } - err = f() } + return nil +} + +func (h *healSequence) healFromSourceCh() { + if err := h.healItemsFromSourceCh(); err != nil { + h.traverseAndHealDoneCh <- err + } + close(h.traverseAndHealDoneCh) +} + +func (h *healSequence) healItems() error { // Start with format healing - checkErr(h.healDiskFormat) + if err := h.healDiskFormat(); err != nil { + return err + } // Start healing the config prefix. - checkErr(h.healMinioSysMeta(minioConfigPrefix)) + if err := h.healMinioSysMeta(minioConfigPrefix)(); err != nil { + return err + } // Start healing the bucket config prefix. - checkErr(h.healMinioSysMeta(bucketConfigPrefix)) + if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil { + return err + } // Heal buckets and objects - checkErr(h.healBuckets) + return h.healBuckets() +} - if err != nil { +// traverseAndHeal - traverses on-disk data and performs healing +// according to settings. At each "safe" point it also checks if an +// external quit signal has been received and quits if so. Since the +// healing traversal may be mutating on-disk data when an external +// quit signal is received, this routine cannot quit immediately and +// has to wait until a safe point is reached, such as between scanning +// two objects. +func (h *healSequence) traverseAndHeal() { + if err := h.healItems(); err != nil { + if h.isQuitting() { + err = errHealStopSignalled + } h.traverseAndHealDoneCh <- err } @@ -575,17 +643,14 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error { if h.isQuitting() { return errHealStopSignalled } - res, herr := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove, h.settings.ScanMode) + + herr := h.queueHealTask(pathJoin(bucket, object), madmin.HealItemBucketMetadata) // Object might have been deleted, by the time heal // was attempted we ignore this object an move on. if isErrObjectNotFound(herr) { return nil } - if herr != nil { - return herr - } - res.Type = madmin.HealItemBucketMetadata - return h.pushHealResultItem(res) + return herr }) } } @@ -603,26 +668,7 @@ func (h *healSequence) healDiskFormat() error { return errServerNotInitialized } - res, err := objectAPI.HealFormat(h.ctx, h.settings.DryRun) - // return any error, ignore error returned when disks have - // already healed. - if err != nil && err != errNoHealRequired { - return errFnHealFromAPIErr(h.ctx, err) - } - - // Healing succeeded notify the peers to reload format and re-initialize disks. - // We will not notify peers only if healing succeeded. - if err == nil { - for _, nerr := range globalNotificationSys.ReloadFormat(h.settings.DryRun) { - if nerr.Err != nil { - logger.GetReqInfo(h.ctx).SetTags("peerAddress", nerr.Host.String()) - logger.LogIf(h.ctx, nerr.Err) - } - } - } - - // Push format heal result - return h.pushHealResultItem(res) + return h.queueHealTask("/", madmin.HealItemMetadata) } // healBuckets - check for all buckets heal or just particular bucket. @@ -664,13 +710,7 @@ func (h *healSequence) healBucket(bucket string) error { return errServerNotInitialized } - result, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun, h.settings.Remove) - // handle heal-bucket error - if err != nil { - return err - } - - if err = h.pushHealResultItem(result); err != nil { + if err := h.queueHealTask(bucket, madmin.HealItemBucket); err != nil { return err } @@ -678,7 +718,7 @@ func (h *healSequence) healBucket(bucket string) error { if h.objPrefix != "" { // Check if an object named as the objPrefix exists, // and if so heal it. - _, err = objectAPI.GetObjectInfo(h.ctx, bucket, h.objPrefix, ObjectOptions{}) + _, err := objectAPI.GetObjectInfo(h.ctx, bucket, h.objPrefix, ObjectOptions{}) if err == nil { if err = h.healObject(bucket, h.objPrefix); err != nil { return err @@ -689,8 +729,7 @@ func (h *healSequence) healBucket(bucket string) error { return nil } - if err = objectAPI.HealObjects(h.ctx, bucket, - h.objPrefix, h.healObject); err != nil { + if err := objectAPI.HealObjects(h.ctx, bucket, h.objPrefix, h.healObject); err != nil { return errFnHealFromAPIErr(h.ctx, err) } return nil @@ -702,28 +741,11 @@ func (h *healSequence) healObject(bucket, object string) error { return errHealStopSignalled } - if globalHTTPServer != nil { - // Wait at max 1 minute for an inprogress request - // before proceeding to heal - waitCount := 60 - // Any requests in progress, delay the heal. - for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 { - waitCount-- - time.Sleep(1 * time.Second) - } - } // Get current object layer instance. objectAPI := newObjectLayerFn() if objectAPI == nil { return errServerNotInitialized } - hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove, h.settings.ScanMode) - if isErrObjectNotFound(err) { - return nil - } - if err != nil { - hri.Detail = err.Error() - } - return h.pushHealResultItem(hri) + return h.queueHealTask(pathJoin(bucket, object), madmin.HealItemObject) } diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go new file mode 100644 index 000000000..3624284b2 --- /dev/null +++ b/cmd/background-heal-ops.go @@ -0,0 +1,160 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "time" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/madmin" +) + +// healTask represents what to heal along with options +// path: '/' => Heal disk formats along with metadata +// path: 'bucket/' or '/bucket/' => Heal bucket +// path: 'bucket/object' => Heal object +type healTask struct { + path string + opts madmin.HealOpts + // Healing response will be sent here + responseCh chan healResult +} + +// healResult represents a healing result with a possible error +type healResult struct { + result madmin.HealResultItem + err error +} + +// healRoutine receives heal tasks, to heal buckets, objects and format.json +type healRoutine struct { + tasks chan healTask + doneCh chan struct{} +} + +// Add a new task in the tasks queue +func (h *healRoutine) queueHealTask(task healTask) { + h.tasks <- task +} + +// Wait for heal requests and process them +func (h *healRoutine) run() { + ctx := context.Background() + for { + select { + case task, ok := <-h.tasks: + if !ok { + break + } + if globalHTTPServer != nil { + // Wait at max 1 minute for an inprogress request + // before proceeding to heal + waitCount := 60 + // Any requests in progress, delay the heal. + for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 { + waitCount-- + time.Sleep(1 * time.Second) + } + } + + var res madmin.HealResultItem + var err error + bucket, object := urlPath2BucketObjectName(task.path) + switch { + case bucket == "" && object == "": + res, err = bgHealDiskFormat(ctx, task.opts) + case bucket != "" && object == "": + res, err = bgHealBucket(ctx, bucket, task.opts) + case bucket != "" && object != "": + res, err = bgHealObject(ctx, bucket, object, task.opts) + } + task.responseCh <- healResult{result: res, err: err} + case <-h.doneCh: + return + case <-GlobalServiceDoneCh: + return + } + } +} + +func initHealRoutine() *healRoutine { + return &healRoutine{ + tasks: make(chan healTask), + doneCh: make(chan struct{}), + } + +} + +func initBackgroundHealing() { + healBg := initHealRoutine() + go healBg.run() + + globalBackgroundHealing = healBg +} + +// bgHealDiskFormat - heals format.json, return value indicates if a +// failure error occurred. +func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealResultItem, error) { + // Get current object layer instance. + objectAPI := newObjectLayerFn() + if objectAPI == nil { + return madmin.HealResultItem{}, errServerNotInitialized + } + + res, err := objectAPI.HealFormat(ctx, opts.DryRun) + + // return any error, ignore error returned when disks have + // already healed. + if err != nil && err != errNoHealRequired { + return madmin.HealResultItem{}, err + } + + // Healing succeeded notify the peers to reload format and re-initialize disks. + // We will not notify peers if healing is not required. + if err == nil { + for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + } + + return res, nil +} + +// bghealBucket - traverses and heals given bucket +func bgHealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { + // Get current object layer instance. + objectAPI := newObjectLayerFn() + if objectAPI == nil { + return madmin.HealResultItem{}, errServerNotInitialized + } + + return objectAPI.HealBucket(ctx, bucket, opts.DryRun, opts.Remove) +} + +// bgHealObject - heal the given object and record result +func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) { + // Get current object layer instance. + objectAPI := newObjectLayerFn() + if objectAPI == nil { + return madmin.HealResultItem{}, errServerNotInitialized + } + return objectAPI.HealObject(ctx, bucket, object, opts.DryRun, opts.Remove, opts.ScanMode) +} diff --git a/cmd/daily-heal-ops.go b/cmd/daily-heal-ops.go new file mode 100644 index 000000000..100084d5a --- /dev/null +++ b/cmd/daily-heal-ops.go @@ -0,0 +1,89 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "sync" + "time" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/madmin" +) + +const ( + bgHealingUUID = "0000-0000-0000-0000" +) + +// NewBgHealSequence creates a background healing sequence +// operation which crawls all objects and heal them. +func newBgHealSequence(numDisks int) *healSequence { + + reqInfo := &logger.ReqInfo{API: "BackgroundHeal"} + ctx := logger.SetReqInfo(context.Background(), reqInfo) + + hs := madmin.HealOpts{ + // Remove objects that do not have read-quorum + Remove: true, + ScanMode: madmin.HealDeepScan, + } + + return &healSequence{ + sourceCh: make(chan string), + startTime: UTCNow(), + clientToken: bgHealingUUID, + settings: hs, + currentStatus: healSequenceStatus{ + Summary: healNotStartedStatus, + HealSettings: hs, + NumDisks: numDisks, + updateLock: &sync.RWMutex{}, + }, + traverseAndHealDoneCh: make(chan error), + stopSignalCh: make(chan struct{}), + ctx: ctx, + reportProgress: false, + } +} + +func initDailyHeal() { + go startDailyHeal() +} + +func startDailyHeal() { + var objAPI ObjectLayer + var ctx = context.Background() + + // Wait until the object API is ready + for { + objAPI = newObjectLayerFn() + if objAPI == nil { + time.Sleep(time.Second) + continue + } + break + } + + // Find number of disks in the setup + info := objAPI.StorageInfo(ctx) + numDisks := info.Backend.OnlineDisks + info.Backend.OfflineDisks + + nh := newBgHealSequence(numDisks) + globalSweepHealState.LaunchNewHealSequence(nh) + + registerDailySweepListener(nh.sourceCh) +} diff --git a/cmd/daily-sweeper.go b/cmd/daily-sweeper.go new file mode 100644 index 000000000..9ca790a78 --- /dev/null +++ b/cmd/daily-sweeper.go @@ -0,0 +1,145 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "sync" + "time" + + "github.com/minio/minio/cmd/logger" +) + +// The list of modules listening for the daily listing of all objects +// such as the daily heal ops, disk usage and bucket lifecycle management. +var globalDailySweepListeners = make([]chan string, 0) +var globalDailySweepListenersMu = sync.Mutex{} + +// Add a new listener to the daily objects listing +func registerDailySweepListener(ch chan string) { + globalDailySweepListenersMu.Lock() + defer globalDailySweepListenersMu.Unlock() + + globalDailySweepListeners = append(globalDailySweepListeners, ch) +} + +// Safe copy of globalDailySweepListeners content +func copyDailySweepListeners() []chan string { + globalDailySweepListenersMu.Lock() + defer globalDailySweepListenersMu.Unlock() + + var listenersCopy = make([]chan string, len(globalDailySweepListeners)) + copy(listenersCopy, globalDailySweepListeners) + + return listenersCopy +} + +// sweepRound will list all objects, having read quorum or not and +// feeds to all listeners, such as the background healing +func sweepRound(ctx context.Context, objAPI ObjectLayer) error { + zeroDuration := time.Millisecond + zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration) + + // General lock so we avoid parallel daily sweep by different instances. + sweepLock := globalNSMutex.NewNSLock("system", "daily-sweep") + if err := sweepLock.GetLock(zeroDynamicTimeout); err != nil { + return err + } + defer sweepLock.Unlock() + + buckets, err := objAPI.ListBuckets(ctx) + if err != nil { + return err + } + + // List all objects, having read quorum or not in all buckets + // and send them to all the registered sweep listeners + for _, bucket := range buckets { + // Send bucket names to all listeners + for _, l := range copyDailySweepListeners() { + l <- bucket.Name + } + + marker := "" + for { + res, err := objAPI.ListObjectsHeal(ctx, bucket.Name, "", marker, "", 1000) + if err != nil { + continue + } + for _, obj := range res.Objects { + for _, l := range copyDailySweepListeners() { + l <- pathJoin(bucket.Name, obj.Name) + } + } + if !res.IsTruncated { + break + } else { + marker = res.NextMarker + } + } + } + + return nil +} + +// initDailySweeper creates a go-routine which will list all +// objects in all buckets in a daily basis +func initDailySweeper() { + go dailySweeper() +} + +// List all objects in all buckets in a daily basis +func dailySweeper() { + var lastSweepTime time.Time + var objAPI ObjectLayer + + var ctx = context.Background() + + // Wait until the object layer is ready + for { + objAPI = newObjectLayerFn() + if objAPI == nil { + time.Sleep(time.Second) + continue + } + break + } + + // Perform a sweep round each 24 hours + for { + if time.Since(lastSweepTime) < 24*time.Hour { + time.Sleep(time.Hour) + continue + } + + err := sweepRound(ctx, objAPI) + if err != nil { + switch err.(type) { + // Unable to hold a lock means there is another + // instance doing the sweep round + case OperationTimedOut: + lastSweepTime = time.Now() + default: + logger.LogIf(ctx, err) + time.Sleep(time.Minute) + continue + } + } else { + lastSweepTime = time.Now() + } + } +} diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index ed84bf6cb..ef76560e9 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -435,7 +435,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark if delimiter == slashSeparator { recursive = false } - walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix}) + walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix, false}) if walkResultCh == nil { endWalkCh = make(chan struct{}) @@ -494,7 +494,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark } } - params := listParams{bucket, recursive, nextMarker, prefix} + params := listParams{bucket, recursive, nextMarker, prefix, false} if !eof { c.listPool.Set(params, walkResultCh, endWalkCh) } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 266a957b4..60101b6f7 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1151,6 +1151,12 @@ func (fs *FSObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) return []BucketInfo{}, NotImplemented{} } +// ListObjectsHeal - list all objects to be healed. Valid only for XL +func (fs *FSObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { + logger.LogIf(ctx, NotImplemented{}) + return ListObjectsInfo{}, NotImplemented{} +} + // SetBucketPolicy sets policy on bucket func (fs *FSObjects) SetBucketPolicy(ctx context.Context, bucket string, policy *policy.Policy) error { return savePolicyConfig(ctx, fs, bucket, policy) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 844cdd1ef..ec4090373 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -101,6 +101,11 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck return nil, NotImplemented{} } +// ListObjectsHeal - Not implemented stub +func (a GatewayUnsupported) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { + return ListObjectsInfo{}, NotImplemented{} +} + // HealObject - Not implemented stub func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (h madmin.HealResultItem, e error) { return h, NotImplemented{} diff --git a/cmd/globals.go b/cmd/globals.go index e45d34847..f186d5820 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -261,6 +261,11 @@ var ( // GlobalGatewaySSE sse options GlobalGatewaySSE gatewaySSE + // The always present healing routine ready to heal objects + globalBackgroundHealing *healRoutine + globalAllHealState *allHealState + globalSweepHealState *allHealState + // Add new variable global values here. ) diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index ceb80bb7b..a083f21a7 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -272,7 +272,7 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d recursive = false } - walkResultCh, endWalkCh := tpool.Release(listParams{bucket, recursive, marker, prefix}) + walkResultCh, endWalkCh := tpool.Release(listParams{bucket, recursive, marker, prefix, false}) if walkResultCh == nil { endWalkCh = make(chan struct{}) walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, endWalkCh) @@ -333,7 +333,7 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d } // Save list routine for the next marker if we haven't reached EOF. - params := listParams{bucket, recursive, nextMarker, prefix} + params := listParams{bucket, recursive, nextMarker, prefix, false} if !eof { tpool.Set(params, walkResultCh, endWalkCh) } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 4e9e0896e..7f6c838c3 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -90,9 +90,11 @@ type ObjectLayer interface { HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error) - ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error + ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) + ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) + // Policy operations SetBucketPolicy(context.Context, string, *policy.Policy) error GetBucketPolicy(context.Context, string) (*policy.Policy, error) diff --git a/cmd/server-main.go b/cmd/server-main.go index af7b6775f..6cf953e50 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -287,8 +287,11 @@ func serverMain(ctx *cli.Context) { // Initialize name space lock. initNSLock(globalIsDistXL) - // Init global heal state - initAllHealState(globalIsXL) + if globalIsXL { + // Init global heal state + globalAllHealState = initHealState() + globalSweepHealState = initHealState() + } // initialize globalTrace system globalTrace = NewTraceSys(context.Background(), globalEndpoints) @@ -372,6 +375,12 @@ func serverMain(ctx *cli.Context) { logger.Fatal(errors.New("Invalid KMS configuration"), "auto-encryption is enabled but server does not support encryption") } + if globalIsXL { + initBackgroundHealing() + initDailyHeal() + initDailySweeper() + } + globalObjLayerMutex.Lock() globalObjectAPI = newObject globalObjLayerMutex.Unlock() diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 7c9c38f9a..89c0de8e0 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -470,6 +470,9 @@ func resetGlobalStorageEnvs() { // reset global heal state func resetGlobalHealState() { + if globalAllHealState == nil { + return + } globalAllHealState.Lock() defer globalAllHealState.Unlock() for _, v := range globalAllHealState.healSeqMap { diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index df09a5dec..d3e50b94f 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -34,6 +34,7 @@ type listParams struct { recursive bool marker string prefix string + heal bool } // errWalkAbort - returned by doTreeWalk() if it returns prematurely. diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index e3b7ed203..b5071d839 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -844,6 +844,10 @@ func leastEntry(entriesCh []FileInfoCh, readQuorum int) (FileInfo, bool) { entriesCh[i].Push(entries[i]) } + if readQuorum < 0 { + return lentry, isTruncated + } + quorum := lentry.Quorum if quorum == 0 { quorum = readQuorum @@ -906,7 +910,7 @@ func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker str // ListObjects - implements listing of objects across disks, each disk is indepenently // walked and merged at this layer. Resulting value through the merge process sends // the data in lexically sorted order. -func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { +func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, heal bool) (loi ListObjectsInfo, err error) { if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil { return loi, err } @@ -944,13 +948,18 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi recursive = false } - entryChs, endWalkCh := s.pool.Release(listParams{bucket, recursive, marker, prefix}) + entryChs, endWalkCh := s.pool.Release(listParams{bucket, recursive, marker, prefix, heal}) if entryChs == nil { endWalkCh = make(chan struct{}) entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh) } - entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet/2) + readQuorum := s.drivesPerSet / 2 + if heal { + readQuorum = -1 + } + + entries := mergeEntriesCh(entryChs, maxKeys, readQuorum) if len(entries.Files) == 0 { return loi, nil } @@ -1004,11 +1013,18 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi loi.Objects = append(loi.Objects, objInfo) } if loi.IsTruncated { - s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, entryChs, endWalkCh) + s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix, heal}, entryChs, endWalkCh) } return loi, nil } +// ListObjects - implements listing of objects across disks, each disk is indepenently +// walked and merged at this layer. Resulting value through the merge process sends +// the data in lexically sorted order. +func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { + return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, false) +} + func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { // In list multipart uploads we are going to treat input prefix as the object, // this means that we are not supporting directory navigation. @@ -1536,3 +1552,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObj return nil } + +func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { + return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, true) +} diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index 45316275e..c43f329d7 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -23,6 +23,11 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { return nil, nil } +// This is not implemented, look for xl-sets.ListObjectsHeal() +func (xl xlObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { + return ListObjectsInfo{}, nil +} + // This is not implemented/needed anymore, look for xl-sets.HealObjects() func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, healFn func(string, string) error) (e error) { return nil diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index 6083dda02..c6d0d34ae 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -67,7 +67,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del recursive = false } - walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) + walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, false}) if walkResultCh == nil { endWalkCh = make(chan struct{}) listDir := listDirFactory(ctx, xl.getLoadBalancedDisks()...) @@ -118,7 +118,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del } } - params := listParams{bucket, recursive, nextMarker, prefix} + params := listParams{bucket, recursive, nextMarker, prefix, false} if !eof { xl.listPool.Set(params, walkResultCh, endWalkCh) }