proactive deep heal object when a bitrot is detected (#9192)

master
Bala FA 5 years ago committed by GitHub
parent 886ae15464
commit 95e89f1712
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      cmd/admin-heal-ops.go
  2. 4
      cmd/background-newdisks-heal-ops.go
  3. 12
      cmd/bitrot-streaming.go
  4. 55
      cmd/erasure-decode.go
  5. 21
      cmd/global-heal.go
  6. 2
      cmd/xl-sets.go
  7. 6
      cmd/xl-v1-object.go

@ -306,6 +306,12 @@ func (ahs *allHealState) PopHealStatusJSON(path string,
return jbytes, ErrNone
}
// healSource denotes single entity and heal option.
type healSource struct {
path string // entity path (format, buckets, objects) to heal
opts *madmin.HealOpts // optional heal option overrides default setting
}
// healSequence - state for each heal sequence initiated on the
// server.
type healSequence struct {
@ -316,7 +322,7 @@ type healSequence struct {
path string
// List of entities (format, buckets, objects) to heal
sourceCh chan string
sourceCh chan healSource
// Report healing progress
reportProgress bool
@ -629,11 +635,19 @@ func (h *healSequence) healSequenceStart() {
}
}
func (h *healSequence) queueHealTask(path string, healType madmin.HealItemType) error {
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
var respCh = make(chan healResult)
defer close(respCh)
// Send heal request
globalBackgroundHealRoutine.queueHealTask(healTask{path: path, responseCh: respCh, opts: h.settings})
task := healTask{
path: source.path,
responseCh: respCh,
opts: h.settings,
}
if source.opts != nil {
task.opts = *source.opts
}
globalBackgroundHealRoutine.queueHealTask(task)
// Wait for answer and push result to the client
res := <-respCh
if !h.reportProgress {
@ -679,20 +693,20 @@ func (h *healSequence) healItemsFromSourceCh() error {
for {
select {
case path := <-h.sourceCh:
case source := <-h.sourceCh:
var itemType madmin.HealItemType
switch {
case path == nopHeal:
case source.path == nopHeal:
continue
case path == SlashSeparator:
case source.path == SlashSeparator:
itemType = madmin.HealItemMetadata
case !strings.Contains(path, SlashSeparator):
case !strings.Contains(source.path, SlashSeparator):
itemType = madmin.HealItemBucket
default:
itemType = madmin.HealItemObject
}
if err := h.queueHealTask(path, itemType); err != nil {
if err := h.queueHealTask(source, itemType); err != nil {
logger.LogIf(h.ctx, err)
}
@ -768,7 +782,7 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
return errHealStopSignalled
}
herr := h.queueHealTask(pathJoin(bucket, object), madmin.HealItemBucketMetadata)
herr := h.queueHealTask(healSource{path: pathJoin(bucket, object)}, madmin.HealItemBucketMetadata)
// Object might have been deleted, by the time heal
// was attempted we ignore this object an move on.
if isErrObjectNotFound(herr) {
@ -792,7 +806,7 @@ func (h *healSequence) healDiskFormat() error {
return errServerNotInitialized
}
return h.queueHealTask(SlashSeparator, madmin.HealItemMetadata)
return h.queueHealTask(healSource{path: SlashSeparator}, madmin.HealItemMetadata)
}
// healBuckets - check for all buckets heal or just particular bucket.
@ -834,7 +848,7 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error {
return errServerNotInitialized
}
if err := h.queueHealTask(bucket, madmin.HealItemBucket); err != nil {
if err := h.queueHealTask(healSource{path: bucket}, madmin.HealItemBucket); err != nil {
return err
}
@ -875,5 +889,5 @@ func (h *healSequence) healObject(bucket, object string) error {
return errHealStopSignalled
}
return h.queueHealTask(pathJoin(bucket, object), madmin.HealItemObject)
return h.queueHealTask(healSource{path: pathJoin(bucket, object)}, madmin.HealItemObject)
}

@ -77,10 +77,10 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
}
// Reformat disks
bgSeq.sourceCh <- SlashSeparator
bgSeq.sourceCh <- healSource{path: SlashSeparator}
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- nopHeal
bgSeq.sourceCh <- healSource{path: nopHeal}
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
// Compute the list of erasure set to heal

@ -27,6 +27,14 @@ import (
"github.com/minio/minio/cmd/logger"
)
type errHashMismatch struct {
message string
}
func (err *errHashMismatch) Error() string {
return err.message
}
// Calculates bitrot in chunks and writes the hash into the stream.
type streamingBitrotWriter struct {
iow *io.PipeWriter
@ -132,8 +140,8 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
b.h.Write(buf)
if !bytes.Equal(b.h.Sum(nil), b.hashBytes) {
err = fmt.Errorf("hashes do not match expected %s, got %s",
hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))
err := &errHashMismatch{fmt.Sprintf("hashes do not match expected %s, got %s",
hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))}
logger.LogIf(context.Background(), err)
return 0, err
}

@ -18,12 +18,16 @@ package cmd
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"github.com/minio/minio/cmd/logger"
)
var errHealRequired = errors.New("heal required")
// Reads in parallel from readers.
type parallelReader struct {
readers []io.ReaderAt
@ -72,6 +76,7 @@ func (p *parallelReader) Read() ([][]byte, error) {
readTriggerCh <- true
}
healRequired := int32(0) // Atomic bool flag.
readerIndex := 0
var wg sync.WaitGroup
// if readTrigger is true, it implies next disk.ReadAt() should be tried
@ -109,6 +114,9 @@ func (p *parallelReader) Read() ([][]byte, error) {
p.buf[i] = p.buf[i][:p.shardSize]
_, err := disk.ReadAt(p.buf[i], p.offset)
if err != nil {
if _, ok := err.(*errHashMismatch); ok {
atomic.StoreInt32(&healRequired, 1)
}
p.readers[i] = nil
// Since ReadAt returned error, trigger another read.
readTriggerCh <- true
@ -126,24 +134,49 @@ func (p *parallelReader) Read() ([][]byte, error) {
if p.canDecode(newBuf) {
p.offset += p.shardSize
if healRequired != 0 {
return newBuf, errHealRequired
}
return newBuf, nil
}
return nil, errXLReadQuorum
}
type errDecodeHealRequired struct {
err error
}
func (err *errDecodeHealRequired) Error() string {
return err.err.Error()
}
func (err *errDecodeHealRequired) Unwrap() error {
return err.err
}
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) error {
healRequired, err := e.decode(ctx, writer, readers, offset, length, totalLength)
if healRequired {
return &errDecodeHealRequired{err}
}
return err
}
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) (bool, error) {
if offset < 0 || length < 0 {
logger.LogIf(ctx, errInvalidArgument)
return errInvalidArgument
return false, errInvalidArgument
}
if offset+length > totalLength {
logger.LogIf(ctx, errInvalidArgument)
return errInvalidArgument
return false, errInvalidArgument
}
if length == 0 {
return nil
return false, nil
}
reader := newParallelReader(readers, e, offset, totalLength)
@ -151,6 +184,7 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read
startBlock := offset / e.blockSize
endBlock := (offset + length) / e.blockSize
var healRequired bool
var bytesWritten int64
for block := startBlock; block <= endBlock; block++ {
var blockOffset, blockLength int64
@ -173,21 +207,26 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read
}
bufs, err := reader.Read()
if err != nil {
return err
if errors.Is(err, errHealRequired) {
healRequired = true
} else {
return healRequired, err
}
}
if err = e.DecodeDataBlocks(bufs); err != nil {
logger.LogIf(ctx, err)
return err
return healRequired, err
}
n, err := writeDataBlocks(ctx, writer, bufs, e.dataBlocks, blockOffset, blockLength)
if err != nil {
return err
return healRequired, err
}
bytesWritten += n
}
if bytesWritten != length {
logger.LogIf(ctx, errLessData)
return errLessData
return healRequired, errLessData
}
return nil
return healRequired, nil
}

@ -47,7 +47,7 @@ func newBgHealSequence(numDisks int) *healSequence {
}
return &healSequence{
sourceCh: make(chan string),
sourceCh: make(chan healSource),
startTime: UTCNow(),
clientToken: bgHealingUUID,
settings: hs,
@ -101,19 +101,34 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects) error {
// Heal all buckets with all objects
for _, bucket := range buckets {
// Heal current bucket
bgSeq.sourceCh <- bucket.Name
bgSeq.sourceCh <- healSource{
path: bucket.Name,
}
// List all objects in the current bucket and heal them
listDir := listDirFactory(ctx, xlObj.getLoadBalancedDisks()...)
walkResultCh := startTreeWalk(ctx, bucket.Name, "", "", true, listDir, nil)
for walkEntry := range walkResultCh {
bgSeq.sourceCh <- pathJoin(bucket.Name, walkEntry.entry)
bgSeq.sourceCh <- healSource{
path: pathJoin(bucket.Name, walkEntry.entry),
}
}
}
return nil
}
// deepHealObject heals given object path in deep to fix bitrot.
func deepHealObject(objectPath string) {
// Get background heal sequence to send elements to heal
bgSeq, _ := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
bgSeq.sourceCh <- healSource{
path: objectPath,
opts: &madmin.HealOpts{ScanMode: madmin.HealDeepScan},
}
}
// Returns the duration to the next background healing round
func durationToNextHealRound(lastHeal time.Time) time.Duration {
if lastHeal.IsZero() {

@ -1863,7 +1863,7 @@ func (s *xlSets) healMRFRoutine() {
for _, u := range mrfUploads {
// Send an object to be healed with a timeout
select {
case bgSeq.sourceCh <- u:
case bgSeq.sourceCh <- healSource{path: u}:
case <-time.After(100 * time.Millisecond):
}

@ -294,9 +294,15 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO
// Note: we should not be defer'ing the following closeBitrotReaders() call as we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time
// we return from this function.
closeBitrotReaders(readers)
if err != nil {
if decodeHealErr, ok := err.(*errDecodeHealRequired); ok {
go deepHealObject(pathJoin(bucket, object))
err = decodeHealErr.err
}
if err != nil {
return toObjectErr(err, bucket, object)
}
}
for i, r := range readers {
if r == nil {
onlineDisks[i] = OfflineDisk

Loading…
Cancel
Save