diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index c7fe28054..ba99084f9 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -306,6 +306,12 @@ func (ahs *allHealState) PopHealStatusJSON(path string, return jbytes, ErrNone } +// healSource denotes single entity and heal option. +type healSource struct { + path string // entity path (format, buckets, objects) to heal + opts *madmin.HealOpts // optional heal option overrides default setting +} + // healSequence - state for each heal sequence initiated on the // server. type healSequence struct { @@ -316,7 +322,7 @@ type healSequence struct { path string // List of entities (format, buckets, objects) to heal - sourceCh chan string + sourceCh chan healSource // Report healing progress reportProgress bool @@ -629,11 +635,19 @@ func (h *healSequence) healSequenceStart() { } } -func (h *healSequence) queueHealTask(path string, healType madmin.HealItemType) error { +func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { var respCh = make(chan healResult) defer close(respCh) // Send heal request - globalBackgroundHealRoutine.queueHealTask(healTask{path: path, responseCh: respCh, opts: h.settings}) + task := healTask{ + path: source.path, + responseCh: respCh, + opts: h.settings, + } + if source.opts != nil { + task.opts = *source.opts + } + globalBackgroundHealRoutine.queueHealTask(task) // Wait for answer and push result to the client res := <-respCh if !h.reportProgress { @@ -679,20 +693,20 @@ func (h *healSequence) healItemsFromSourceCh() error { for { select { - case path := <-h.sourceCh: + case source := <-h.sourceCh: var itemType madmin.HealItemType switch { - case path == nopHeal: + case source.path == nopHeal: continue - case path == SlashSeparator: + case source.path == SlashSeparator: itemType = madmin.HealItemMetadata - case !strings.Contains(path, SlashSeparator): + case !strings.Contains(source.path, SlashSeparator): itemType = madmin.HealItemBucket default: itemType = madmin.HealItemObject } - if err := h.queueHealTask(path, itemType); err != nil { + if err := h.queueHealTask(source, itemType); err != nil { logger.LogIf(h.ctx, err) } @@ -768,7 +782,7 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error { return errHealStopSignalled } - herr := h.queueHealTask(pathJoin(bucket, object), madmin.HealItemBucketMetadata) + herr := h.queueHealTask(healSource{path: pathJoin(bucket, object)}, madmin.HealItemBucketMetadata) // Object might have been deleted, by the time heal // was attempted we ignore this object an move on. if isErrObjectNotFound(herr) { @@ -792,7 +806,7 @@ func (h *healSequence) healDiskFormat() error { return errServerNotInitialized } - return h.queueHealTask(SlashSeparator, madmin.HealItemMetadata) + return h.queueHealTask(healSource{path: SlashSeparator}, madmin.HealItemMetadata) } // healBuckets - check for all buckets heal or just particular bucket. @@ -834,7 +848,7 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error { return errServerNotInitialized } - if err := h.queueHealTask(bucket, madmin.HealItemBucket); err != nil { + if err := h.queueHealTask(healSource{path: bucket}, madmin.HealItemBucket); err != nil { return err } @@ -875,5 +889,5 @@ func (h *healSequence) healObject(bucket, object string) error { return errHealStopSignalled } - return h.queueHealTask(pathJoin(bucket, object), madmin.HealItemObject) + return h.queueHealTask(healSource{path: pathJoin(bucket, object)}, madmin.HealItemObject) } diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index aa8fb45d1..0f9a5f43a 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -77,10 +77,10 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { } // Reformat disks - bgSeq.sourceCh <- SlashSeparator + bgSeq.sourceCh <- healSource{path: SlashSeparator} // Ensure that reformatting disks is finished - bgSeq.sourceCh <- nopHeal + bgSeq.sourceCh <- healSource{path: nopHeal} var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal)) // Compute the list of erasure set to heal diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index abb6037d0..f6d55705b 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -27,6 +27,14 @@ import ( "github.com/minio/minio/cmd/logger" ) +type errHashMismatch struct { + message string +} + +func (err *errHashMismatch) Error() string { + return err.message +} + // Calculates bitrot in chunks and writes the hash into the stream. type streamingBitrotWriter struct { iow *io.PipeWriter @@ -132,8 +140,8 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { b.h.Write(buf) if !bytes.Equal(b.h.Sum(nil), b.hashBytes) { - err = fmt.Errorf("hashes do not match expected %s, got %s", - hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil))) + err := &errHashMismatch{fmt.Sprintf("hashes do not match expected %s, got %s", + hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))} logger.LogIf(context.Background(), err) return 0, err } diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 4bd2ca5ee..1e6afbee5 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -18,12 +18,16 @@ package cmd import ( "context" + "errors" "io" "sync" + "sync/atomic" "github.com/minio/minio/cmd/logger" ) +var errHealRequired = errors.New("heal required") + // Reads in parallel from readers. type parallelReader struct { readers []io.ReaderAt @@ -72,6 +76,7 @@ func (p *parallelReader) Read() ([][]byte, error) { readTriggerCh <- true } + healRequired := int32(0) // Atomic bool flag. readerIndex := 0 var wg sync.WaitGroup // if readTrigger is true, it implies next disk.ReadAt() should be tried @@ -109,6 +114,9 @@ func (p *parallelReader) Read() ([][]byte, error) { p.buf[i] = p.buf[i][:p.shardSize] _, err := disk.ReadAt(p.buf[i], p.offset) if err != nil { + if _, ok := err.(*errHashMismatch); ok { + atomic.StoreInt32(&healRequired, 1) + } p.readers[i] = nil // Since ReadAt returned error, trigger another read. readTriggerCh <- true @@ -126,24 +134,49 @@ func (p *parallelReader) Read() ([][]byte, error) { if p.canDecode(newBuf) { p.offset += p.shardSize + if healRequired != 0 { + return newBuf, errHealRequired + } return newBuf, nil } return nil, errXLReadQuorum } +type errDecodeHealRequired struct { + err error +} + +func (err *errDecodeHealRequired) Error() string { + return err.err.Error() +} + +func (err *errDecodeHealRequired) Unwrap() error { + return err.err +} + // Decode reads from readers, reconstructs data if needed and writes the data to the writer. func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) error { + healRequired, err := e.decode(ctx, writer, readers, offset, length, totalLength) + if healRequired { + return &errDecodeHealRequired{err} + } + + return err +} + +// Decode reads from readers, reconstructs data if needed and writes the data to the writer. +func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) (bool, error) { if offset < 0 || length < 0 { logger.LogIf(ctx, errInvalidArgument) - return errInvalidArgument + return false, errInvalidArgument } if offset+length > totalLength { logger.LogIf(ctx, errInvalidArgument) - return errInvalidArgument + return false, errInvalidArgument } if length == 0 { - return nil + return false, nil } reader := newParallelReader(readers, e, offset, totalLength) @@ -151,6 +184,7 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read startBlock := offset / e.blockSize endBlock := (offset + length) / e.blockSize + var healRequired bool var bytesWritten int64 for block := startBlock; block <= endBlock; block++ { var blockOffset, blockLength int64 @@ -173,21 +207,26 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read } bufs, err := reader.Read() if err != nil { - return err + if errors.Is(err, errHealRequired) { + healRequired = true + } else { + return healRequired, err + } } if err = e.DecodeDataBlocks(bufs); err != nil { logger.LogIf(ctx, err) - return err + return healRequired, err } n, err := writeDataBlocks(ctx, writer, bufs, e.dataBlocks, blockOffset, blockLength) if err != nil { - return err + return healRequired, err } bytesWritten += n } if bytesWritten != length { logger.LogIf(ctx, errLessData) - return errLessData + return healRequired, errLessData } - return nil + + return healRequired, nil } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index c33464767..fb4ef571b 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -47,7 +47,7 @@ func newBgHealSequence(numDisks int) *healSequence { } return &healSequence{ - sourceCh: make(chan string), + sourceCh: make(chan healSource), startTime: UTCNow(), clientToken: bgHealingUUID, settings: hs, @@ -101,19 +101,34 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects) error { // Heal all buckets with all objects for _, bucket := range buckets { // Heal current bucket - bgSeq.sourceCh <- bucket.Name + bgSeq.sourceCh <- healSource{ + path: bucket.Name, + } // List all objects in the current bucket and heal them listDir := listDirFactory(ctx, xlObj.getLoadBalancedDisks()...) walkResultCh := startTreeWalk(ctx, bucket.Name, "", "", true, listDir, nil) for walkEntry := range walkResultCh { - bgSeq.sourceCh <- pathJoin(bucket.Name, walkEntry.entry) + bgSeq.sourceCh <- healSource{ + path: pathJoin(bucket.Name, walkEntry.entry), + } } } return nil } +// deepHealObject heals given object path in deep to fix bitrot. +func deepHealObject(objectPath string) { + // Get background heal sequence to send elements to heal + bgSeq, _ := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) + + bgSeq.sourceCh <- healSource{ + path: objectPath, + opts: &madmin.HealOpts{ScanMode: madmin.HealDeepScan}, + } +} + // Returns the duration to the next background healing round func durationToNextHealRound(lastHeal time.Time) time.Duration { if lastHeal.IsZero() { diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 93ebf90df..775f7b0e7 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -1863,7 +1863,7 @@ func (s *xlSets) healMRFRoutine() { for _, u := range mrfUploads { // Send an object to be healed with a timeout select { - case bgSeq.sourceCh <- u: + case bgSeq.sourceCh <- healSource{path: u}: case <-time.After(100 * time.Millisecond): } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index ad85c9e57..4c8d7357e 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -295,7 +295,13 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO // we return from this function. closeBitrotReaders(readers) if err != nil { - return toObjectErr(err, bucket, object) + if decodeHealErr, ok := err.(*errDecodeHealRequired); ok { + go deepHealObject(pathJoin(bucket, object)) + err = decodeHealErr.err + } + if err != nil { + return toObjectErr(err, bucket, object) + } } for i, r := range readers { if r == nil {