Add self-healing feature (#7604)

- Background Heal routine receives heal requests from a channel, either to
heal format, buckets or objects
- Daily sweeper lists all objects in all buckets, these objects
don't necessarly have read quorum so they can be removed if
these objects are unhealable
- Heal daily ops receives objects from the daily sweeper
and send them to the heal routine.
master
Anis Elleuch 5 years ago committed by kannappanr
parent 97090aa16c
commit 7abadfccc2
  1. 4
      cmd/admin-handlers_test.go
  2. 216
      cmd/admin-heal-ops.go
  3. 160
      cmd/background-heal-ops.go
  4. 89
      cmd/daily-heal-ops.go
  5. 145
      cmd/daily-sweeper.go
  6. 4
      cmd/disk-cache.go
  7. 6
      cmd/fs-v1.go
  8. 5
      cmd/gateway-unsupported.go
  9. 5
      cmd/globals.go
  10. 4
      cmd/object-api-common.go
  11. 4
      cmd/object-api-interface.go
  12. 13
      cmd/server-main.go
  13. 3
      cmd/test-utils_test.go
  14. 1
      cmd/tree-walk-pool.go
  15. 28
      cmd/xl-sets.go
  16. 5
      cmd/xl-v1-list-objects-heal.go
  17. 4
      cmd/xl-v1-list-objects.go

@ -268,7 +268,9 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) {
initNSLock(isDistXL)
// Init global heal state
initAllHealState(globalIsXL)
if globalIsXL {
globalAllHealState = initHealState()
}
globalConfigSys = NewConfigSys()

@ -96,22 +96,15 @@ type allHealState struct {
healSeqMap map[string]*healSequence
}
var (
// global server heal state
globalAllHealState allHealState
)
// initAllHealState - initialize healing apparatus
func initAllHealState(isErasureMode bool) {
if !isErasureMode {
return
}
globalAllHealState = allHealState{
// initHealState - initialize healing apparatus
func initHealState() *allHealState {
healState := &allHealState{
healSeqMap: make(map[string]*healSequence),
}
go globalAllHealState.periodicHealSeqsClean()
go healState.periodicHealSeqsClean()
return healState
}
func (ahs *allHealState) periodicHealSeqsClean() {
@ -305,6 +298,14 @@ type healSequence struct {
// path is just pathJoin(bucket, objPrefix)
path string
// List of entities (format, buckets, objects) to heal
sourceCh chan string
// Report healing progress, false if this is a background
// healing since currently there is no entity which will
// receive realtime healing status
reportProgress bool
// time at which heal sequence was started
startTime time.Time
@ -348,14 +349,15 @@ func newHealSequence(bucket, objPrefix, clientAddr string,
ctx := logger.SetReqInfo(context.Background(), reqInfo)
return &healSequence{
bucket: bucket,
objPrefix: objPrefix,
path: pathJoin(bucket, objPrefix),
startTime: UTCNow(),
clientToken: mustGetUUID(),
clientAddress: clientAddr,
forceStarted: forceStart,
settings: hs,
bucket: bucket,
objPrefix: objPrefix,
path: pathJoin(bucket, objPrefix),
reportProgress: true,
startTime: UTCNow(),
clientToken: mustGetUUID(),
clientAddress: clientAddr,
forceStarted: forceStart,
settings: hs,
currentStatus: healSequenceStatus{
Summary: healNotStartedStatus,
HealSettings: hs,
@ -484,7 +486,11 @@ func (h *healSequence) healSequenceStart() {
h.currentStatus.StartTime = UTCNow()
h.currentStatus.updateLock.Unlock()
go h.traverseAndHeal()
if h.sourceCh == nil {
go h.traverseAndHeal()
} else {
go h.healFromSourceCh()
}
select {
case err, ok := <-h.traverseAndHealDoneCh:
@ -519,39 +525,101 @@ func (h *healSequence) healSequenceStart() {
}
}
// traverseAndHeal - traverses on-disk data and performs healing
// according to settings. At each "safe" point it also checks if an
// external quit signal has been received and quits if so. Since the
// healing traversal may be mutating on-disk data when an external
// quit signal is received, this routine cannot quit immediately and
// has to wait until a safe point is reached, such as between scanning
// two objects.
func (h *healSequence) traverseAndHeal() {
var err error
checkErr := func(f func() error) {
func (h *healSequence) queueHealTask(path string, healType madmin.HealItemType) error {
var respCh = make(chan healResult)
defer close(respCh)
// Send heal request
globalBackgroundHealing.queueHealTask(healTask{path: path, responseCh: respCh, opts: h.settings})
// Wait for answer and push result to the client
res := <-respCh
if !h.reportProgress {
return nil
}
res.result.Type = healType
if res.err != nil {
// Object might have been deleted, by the time heal
// was attempted, we should ignore this object and return success.
if isErrObjectNotFound(res.err) {
return nil
}
// Only report object error
if healType != madmin.HealItemObject {
return res.err
}
res.result.Detail = res.err.Error()
}
return h.pushHealResultItem(res.result)
}
func (h *healSequence) healItemsFromSourceCh() error {
// Start healing the config prefix.
if err := h.healMinioSysMeta(minioConfigPrefix)(); err != nil {
return err
}
// Start healing the bucket config prefix.
if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil {
return err
}
for path := range h.sourceCh {
var itemType madmin.HealItemType
switch {
case err != nil:
return
case h.isQuitting():
err = errHealStopSignalled
return
case path == "/":
itemType = madmin.HealItemMetadata
case !strings.Contains(path, "/"):
itemType = madmin.HealItemBucket
default:
itemType = madmin.HealItemObject
}
if err := h.queueHealTask(path, itemType); err != nil {
return err
}
err = f()
}
return nil
}
func (h *healSequence) healFromSourceCh() {
if err := h.healItemsFromSourceCh(); err != nil {
h.traverseAndHealDoneCh <- err
}
close(h.traverseAndHealDoneCh)
}
func (h *healSequence) healItems() error {
// Start with format healing
checkErr(h.healDiskFormat)
if err := h.healDiskFormat(); err != nil {
return err
}
// Start healing the config prefix.
checkErr(h.healMinioSysMeta(minioConfigPrefix))
if err := h.healMinioSysMeta(minioConfigPrefix)(); err != nil {
return err
}
// Start healing the bucket config prefix.
checkErr(h.healMinioSysMeta(bucketConfigPrefix))
if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil {
return err
}
// Heal buckets and objects
checkErr(h.healBuckets)
return h.healBuckets()
}
if err != nil {
// traverseAndHeal - traverses on-disk data and performs healing
// according to settings. At each "safe" point it also checks if an
// external quit signal has been received and quits if so. Since the
// healing traversal may be mutating on-disk data when an external
// quit signal is received, this routine cannot quit immediately and
// has to wait until a safe point is reached, such as between scanning
// two objects.
func (h *healSequence) traverseAndHeal() {
if err := h.healItems(); err != nil {
if h.isQuitting() {
err = errHealStopSignalled
}
h.traverseAndHealDoneCh <- err
}
@ -575,17 +643,14 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
if h.isQuitting() {
return errHealStopSignalled
}
res, herr := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove, h.settings.ScanMode)
herr := h.queueHealTask(pathJoin(bucket, object), madmin.HealItemBucketMetadata)
// Object might have been deleted, by the time heal
// was attempted we ignore this object an move on.
if isErrObjectNotFound(herr) {
return nil
}
if herr != nil {
return herr
}
res.Type = madmin.HealItemBucketMetadata
return h.pushHealResultItem(res)
return herr
})
}
}
@ -603,26 +668,7 @@ func (h *healSequence) healDiskFormat() error {
return errServerNotInitialized
}
res, err := objectAPI.HealFormat(h.ctx, h.settings.DryRun)
// return any error, ignore error returned when disks have
// already healed.
if err != nil && err != errNoHealRequired {
return errFnHealFromAPIErr(h.ctx, err)
}
// Healing succeeded notify the peers to reload format and re-initialize disks.
// We will not notify peers only if healing succeeded.
if err == nil {
for _, nerr := range globalNotificationSys.ReloadFormat(h.settings.DryRun) {
if nerr.Err != nil {
logger.GetReqInfo(h.ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(h.ctx, nerr.Err)
}
}
}
// Push format heal result
return h.pushHealResultItem(res)
return h.queueHealTask("/", madmin.HealItemMetadata)
}
// healBuckets - check for all buckets heal or just particular bucket.
@ -664,13 +710,7 @@ func (h *healSequence) healBucket(bucket string) error {
return errServerNotInitialized
}
result, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun, h.settings.Remove)
// handle heal-bucket error
if err != nil {
return err
}
if err = h.pushHealResultItem(result); err != nil {
if err := h.queueHealTask(bucket, madmin.HealItemBucket); err != nil {
return err
}
@ -678,7 +718,7 @@ func (h *healSequence) healBucket(bucket string) error {
if h.objPrefix != "" {
// Check if an object named as the objPrefix exists,
// and if so heal it.
_, err = objectAPI.GetObjectInfo(h.ctx, bucket, h.objPrefix, ObjectOptions{})
_, err := objectAPI.GetObjectInfo(h.ctx, bucket, h.objPrefix, ObjectOptions{})
if err == nil {
if err = h.healObject(bucket, h.objPrefix); err != nil {
return err
@ -689,8 +729,7 @@ func (h *healSequence) healBucket(bucket string) error {
return nil
}
if err = objectAPI.HealObjects(h.ctx, bucket,
h.objPrefix, h.healObject); err != nil {
if err := objectAPI.HealObjects(h.ctx, bucket, h.objPrefix, h.healObject); err != nil {
return errFnHealFromAPIErr(h.ctx, err)
}
return nil
@ -702,28 +741,11 @@ func (h *healSequence) healObject(bucket, object string) error {
return errHealStopSignalled
}
if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return errServerNotInitialized
}
hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove, h.settings.ScanMode)
if isErrObjectNotFound(err) {
return nil
}
if err != nil {
hri.Detail = err.Error()
}
return h.pushHealResultItem(hri)
return h.queueHealTask(pathJoin(bucket, object), madmin.HealItemObject)
}

@ -0,0 +1,160 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
)
// healTask represents what to heal along with options
// path: '/' => Heal disk formats along with metadata
// path: 'bucket/' or '/bucket/' => Heal bucket
// path: 'bucket/object' => Heal object
type healTask struct {
path string
opts madmin.HealOpts
// Healing response will be sent here
responseCh chan healResult
}
// healResult represents a healing result with a possible error
type healResult struct {
result madmin.HealResultItem
err error
}
// healRoutine receives heal tasks, to heal buckets, objects and format.json
type healRoutine struct {
tasks chan healTask
doneCh chan struct{}
}
// Add a new task in the tasks queue
func (h *healRoutine) queueHealTask(task healTask) {
h.tasks <- task
}
// Wait for heal requests and process them
func (h *healRoutine) run() {
ctx := context.Background()
for {
select {
case task, ok := <-h.tasks:
if !ok {
break
}
if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
var res madmin.HealResultItem
var err error
bucket, object := urlPath2BucketObjectName(task.path)
switch {
case bucket == "" && object == "":
res, err = bgHealDiskFormat(ctx, task.opts)
case bucket != "" && object == "":
res, err = bgHealBucket(ctx, bucket, task.opts)
case bucket != "" && object != "":
res, err = bgHealObject(ctx, bucket, object, task.opts)
}
task.responseCh <- healResult{result: res, err: err}
case <-h.doneCh:
return
case <-GlobalServiceDoneCh:
return
}
}
}
func initHealRoutine() *healRoutine {
return &healRoutine{
tasks: make(chan healTask),
doneCh: make(chan struct{}),
}
}
func initBackgroundHealing() {
healBg := initHealRoutine()
go healBg.run()
globalBackgroundHealing = healBg
}
// bgHealDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
res, err := objectAPI.HealFormat(ctx, opts.DryRun)
// return any error, ignore error returned when disks have
// already healed.
if err != nil && err != errNoHealRequired {
return madmin.HealResultItem{}, err
}
// Healing succeeded notify the peers to reload format and re-initialize disks.
// We will not notify peers if healing is not required.
if err == nil {
for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
return res, nil
}
// bghealBucket - traverses and heals given bucket
func bgHealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealBucket(ctx, bucket, opts.DryRun, opts.Remove)
}
// bgHealObject - heal the given object and record result
func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealObject(ctx, bucket, object, opts.DryRun, opts.Remove, opts.ScanMode)
}

@ -0,0 +1,89 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
)
const (
bgHealingUUID = "0000-0000-0000-0000"
)
// NewBgHealSequence creates a background healing sequence
// operation which crawls all objects and heal them.
func newBgHealSequence(numDisks int) *healSequence {
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
ctx := logger.SetReqInfo(context.Background(), reqInfo)
hs := madmin.HealOpts{
// Remove objects that do not have read-quorum
Remove: true,
ScanMode: madmin.HealDeepScan,
}
return &healSequence{
sourceCh: make(chan string),
startTime: UTCNow(),
clientToken: bgHealingUUID,
settings: hs,
currentStatus: healSequenceStatus{
Summary: healNotStartedStatus,
HealSettings: hs,
NumDisks: numDisks,
updateLock: &sync.RWMutex{},
},
traverseAndHealDoneCh: make(chan error),
stopSignalCh: make(chan struct{}),
ctx: ctx,
reportProgress: false,
}
}
func initDailyHeal() {
go startDailyHeal()
}
func startDailyHeal() {
var objAPI ObjectLayer
var ctx = context.Background()
// Wait until the object API is ready
for {
objAPI = newObjectLayerFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Find number of disks in the setup
info := objAPI.StorageInfo(ctx)
numDisks := info.Backend.OnlineDisks + info.Backend.OfflineDisks
nh := newBgHealSequence(numDisks)
globalSweepHealState.LaunchNewHealSequence(nh)
registerDailySweepListener(nh.sourceCh)
}

@ -0,0 +1,145 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
)
// The list of modules listening for the daily listing of all objects
// such as the daily heal ops, disk usage and bucket lifecycle management.
var globalDailySweepListeners = make([]chan string, 0)
var globalDailySweepListenersMu = sync.Mutex{}
// Add a new listener to the daily objects listing
func registerDailySweepListener(ch chan string) {
globalDailySweepListenersMu.Lock()
defer globalDailySweepListenersMu.Unlock()
globalDailySweepListeners = append(globalDailySweepListeners, ch)
}
// Safe copy of globalDailySweepListeners content
func copyDailySweepListeners() []chan string {
globalDailySweepListenersMu.Lock()
defer globalDailySweepListenersMu.Unlock()
var listenersCopy = make([]chan string, len(globalDailySweepListeners))
copy(listenersCopy, globalDailySweepListeners)
return listenersCopy
}
// sweepRound will list all objects, having read quorum or not and
// feeds to all listeners, such as the background healing
func sweepRound(ctx context.Context, objAPI ObjectLayer) error {
zeroDuration := time.Millisecond
zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration)
// General lock so we avoid parallel daily sweep by different instances.
sweepLock := globalNSMutex.NewNSLock("system", "daily-sweep")
if err := sweepLock.GetLock(zeroDynamicTimeout); err != nil {
return err
}
defer sweepLock.Unlock()
buckets, err := objAPI.ListBuckets(ctx)
if err != nil {
return err
}
// List all objects, having read quorum or not in all buckets
// and send them to all the registered sweep listeners
for _, bucket := range buckets {
// Send bucket names to all listeners
for _, l := range copyDailySweepListeners() {
l <- bucket.Name
}
marker := ""
for {
res, err := objAPI.ListObjectsHeal(ctx, bucket.Name, "", marker, "", 1000)
if err != nil {
continue
}
for _, obj := range res.Objects {
for _, l := range copyDailySweepListeners() {
l <- pathJoin(bucket.Name, obj.Name)
}
}
if !res.IsTruncated {
break
} else {
marker = res.NextMarker
}
}
}
return nil
}
// initDailySweeper creates a go-routine which will list all
// objects in all buckets in a daily basis
func initDailySweeper() {
go dailySweeper()
}
// List all objects in all buckets in a daily basis
func dailySweeper() {
var lastSweepTime time.Time
var objAPI ObjectLayer
var ctx = context.Background()
// Wait until the object layer is ready
for {
objAPI = newObjectLayerFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Perform a sweep round each 24 hours
for {
if time.Since(lastSweepTime) < 24*time.Hour {
time.Sleep(time.Hour)
continue
}
err := sweepRound(ctx, objAPI)
if err != nil {
switch err.(type) {
// Unable to hold a lock means there is another
// instance doing the sweep round
case OperationTimedOut:
lastSweepTime = time.Now()
default:
logger.LogIf(ctx, err)
time.Sleep(time.Minute)
continue
}
} else {
lastSweepTime = time.Now()
}
}
}

@ -435,7 +435,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark
if delimiter == slashSeparator {
recursive = false
}
walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix})
walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix, false})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
@ -494,7 +494,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark
}
}
params := listParams{bucket, recursive, nextMarker, prefix}
params := listParams{bucket, recursive, nextMarker, prefix, false}
if !eof {
c.listPool.Set(params, walkResultCh, endWalkCh)
}

@ -1151,6 +1151,12 @@ func (fs *FSObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error)
return []BucketInfo{}, NotImplemented{}
}
// ListObjectsHeal - list all objects to be healed. Valid only for XL
func (fs *FSObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
logger.LogIf(ctx, NotImplemented{})
return ListObjectsInfo{}, NotImplemented{}
}
// SetBucketPolicy sets policy on bucket
func (fs *FSObjects) SetBucketPolicy(ctx context.Context, bucket string, policy *policy.Policy) error {
return savePolicyConfig(ctx, fs, bucket, policy)

@ -101,6 +101,11 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck
return nil, NotImplemented{}
}
// ListObjectsHeal - Not implemented stub
func (a GatewayUnsupported) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
return ListObjectsInfo{}, NotImplemented{}
}
// HealObject - Not implemented stub
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (h madmin.HealResultItem, e error) {
return h, NotImplemented{}

@ -261,6 +261,11 @@ var (
// GlobalGatewaySSE sse options
GlobalGatewaySSE gatewaySSE
// The always present healing routine ready to heal objects
globalBackgroundHealing *healRoutine
globalAllHealState *allHealState
globalSweepHealState *allHealState
// Add new variable global values here.
)

@ -272,7 +272,7 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
recursive = false
}
walkResultCh, endWalkCh := tpool.Release(listParams{bucket, recursive, marker, prefix})
walkResultCh, endWalkCh := tpool.Release(listParams{bucket, recursive, marker, prefix, false})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, endWalkCh)
@ -333,7 +333,7 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
}
// Save list routine for the next marker if we haven't reached EOF.
params := listParams{bucket, recursive, nextMarker, prefix}
params := listParams{bucket, recursive, nextMarker, prefix, false}
if !eof {
tpool.Set(params, walkResultCh, endWalkCh)
}

@ -90,9 +90,11 @@ type ObjectLayer interface {
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error)
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
// Policy operations
SetBucketPolicy(context.Context, string, *policy.Policy) error
GetBucketPolicy(context.Context, string) (*policy.Policy, error)

@ -287,8 +287,11 @@ func serverMain(ctx *cli.Context) {
// Initialize name space lock.
initNSLock(globalIsDistXL)
// Init global heal state
initAllHealState(globalIsXL)
if globalIsXL {
// Init global heal state
globalAllHealState = initHealState()
globalSweepHealState = initHealState()
}
// initialize globalTrace system
globalTrace = NewTraceSys(context.Background(), globalEndpoints)
@ -372,6 +375,12 @@ func serverMain(ctx *cli.Context) {
logger.Fatal(errors.New("Invalid KMS configuration"), "auto-encryption is enabled but server does not support encryption")
}
if globalIsXL {
initBackgroundHealing()
initDailyHeal()
initDailySweeper()
}
globalObjLayerMutex.Lock()
globalObjectAPI = newObject
globalObjLayerMutex.Unlock()

@ -470,6 +470,9 @@ func resetGlobalStorageEnvs() {
// reset global heal state
func resetGlobalHealState() {
if globalAllHealState == nil {
return
}
globalAllHealState.Lock()
defer globalAllHealState.Unlock()
for _, v := range globalAllHealState.healSeqMap {

@ -34,6 +34,7 @@ type listParams struct {
recursive bool
marker string
prefix string
heal bool
}
// errWalkAbort - returned by doTreeWalk() if it returns prematurely.

@ -844,6 +844,10 @@ func leastEntry(entriesCh []FileInfoCh, readQuorum int) (FileInfo, bool) {
entriesCh[i].Push(entries[i])
}
if readQuorum < 0 {
return lentry, isTruncated
}
quorum := lentry.Quorum
if quorum == 0 {
quorum = readQuorum
@ -906,7 +910,7 @@ func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker str
// ListObjects - implements listing of objects across disks, each disk is indepenently
// walked and merged at this layer. Resulting value through the merge process sends
// the data in lexically sorted order.
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, heal bool) (loi ListObjectsInfo, err error) {
if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil {
return loi, err
}
@ -944,13 +948,18 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
recursive = false
}
entryChs, endWalkCh := s.pool.Release(listParams{bucket, recursive, marker, prefix})
entryChs, endWalkCh := s.pool.Release(listParams{bucket, recursive, marker, prefix, heal})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh)
}
entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet/2)
readQuorum := s.drivesPerSet / 2
if heal {
readQuorum = -1
}
entries := mergeEntriesCh(entryChs, maxKeys, readQuorum)
if len(entries.Files) == 0 {
return loi, nil
}
@ -1004,11 +1013,18 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
loi.Objects = append(loi.Objects, objInfo)
}
if loi.IsTruncated {
s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, entryChs, endWalkCh)
s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix, heal}, entryChs, endWalkCh)
}
return loi, nil
}
// ListObjects - implements listing of objects across disks, each disk is indepenently
// walked and merged at this layer. Resulting value through the merge process sends
// the data in lexically sorted order.
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, false)
}
func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
// In list multipart uploads we are going to treat input prefix as the object,
// this means that we are not supporting directory navigation.
@ -1536,3 +1552,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObj
return nil
}
func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, true)
}

@ -23,6 +23,11 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
return nil, nil
}
// This is not implemented, look for xl-sets.ListObjectsHeal()
func (xl xlObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
return ListObjectsInfo{}, nil
}
// This is not implemented/needed anymore, look for xl-sets.HealObjects()
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, healFn func(string, string) error) (e error) {
return nil

@ -67,7 +67,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
recursive = false
}
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, false})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirFactory(ctx, xl.getLoadBalancedDisks()...)
@ -118,7 +118,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
}
}
params := listParams{bucket, recursive, nextMarker, prefix}
params := listParams{bucket, recursive, nextMarker, prefix, false}
if !eof {
xl.listPool.Set(params, walkResultCh, endWalkCh)
}

Loading…
Cancel
Save