You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
minio/cmd/global-heal.go

213 lines
5.9 KiB

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"errors"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/madmin"
)
const (
bgHealingUUID = "0000-0000-0000-0000"
)
// NewBgHealSequence creates a background healing sequence
// operation which crawls all objects and heal them.
func newBgHealSequence() *healSequence {
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo))
hs := madmin.HealOpts{
// Remove objects that do not have read-quorum
Remove: true,
ScanMode: madmin.HealNormalScan,
}
return &healSequence{
sourceCh: make(chan healSource),
respCh: make(chan healResult),
startTime: UTCNow(),
clientToken: bgHealingUUID,
// run-background heal with reserved bucket
bucket: minioReservedBucket,
settings: hs,
currentStatus: healSequenceStatus{
Summary: healNotStartedStatus,
HealSettings: hs,
},
cancelCtx: cancelCtx,
ctx: ctx,
reportProgress: false,
scannedItemsMap: make(map[madmin.HealItemType]int64),
healedItemsMap: make(map[madmin.HealItemType]int64),
healFailedItemsMap: make(map[string]int64),
}
}
func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
if globalBackgroundHealState == nil {
return madmin.BgHealState{}, false
}
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if !ok {
return madmin.BgHealState{}, false
}
var healDisksMap = map[string]struct{}{}
for _, ep := range getLocalDisksToHeal() {
healDisksMap[ep.String()] = struct{}{}
}
for _, ep := range globalBackgroundHealState.getHealLocalDisks() {
if _, ok := healDisksMap[ep.String()]; !ok {
healDisksMap[ep.String()] = struct{}{}
}
}
var healDisks []string
for disk := range healDisksMap {
healDisks = append(healDisks, disk)
}
return madmin.BgHealState{
ScannedItemsCount: bgSeq.getScannedItemsCount(),
LastHealActivity: bgSeq.lastHealActivity,
HealDisks: healDisks,
NextHealRound: UTCNow(),
}, true
}
func mustGetHealSequence(ctx context.Context) *healSequence {
// Get background heal sequence to send elements to heal
for {
globalHealStateLK.RLock()
hstate := globalBackgroundHealState
globalHealStateLK.RUnlock()
if hstate == nil {
time.Sleep(time.Second)
continue
}
bgSeq, ok := hstate.getHealSequenceByToken(bgHealingUUID)
if !ok {
time.Sleep(time.Second)
continue
}
return bgSeq
}
}
// healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketInfo) error {
bgSeq := mustGetHealSequence(ctx)
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
})
// Try to pro-actively heal backend-encrypted file.
if _, err := er.HealObject(ctx, minioMetaBucket, backendEncryptedFile, "", madmin.HealOpts{}); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err)
}
}
// Heal all buckets with all objects
for _, bucket := range buckets {
// Heal current bucket
if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err)
}
}
if serverDebugLog {
console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, er.setNumber+1)
}
disks, _ := er.getOnlineDisksWithHealing()
if len(disks) == 0 {
return errors.New("healErasureSet: No non-healing disks found")
}
// Limit listing to 3 drives.
if len(disks) > 3 {
disks = disks[:3]
}
healEntry := func(entry metaCacheEntry) {
if entry.isDir() {
return
}
fivs, err := entry.fileInfoVersions(bucket.Name)
if err != nil {
logger.LogIf(ctx, err)
return
}
waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
for _, version := range fivs.Versions {
if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true}); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err)
}
}
bgSeq.logHeal(madmin.HealItemObject)
}
}
err := listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: bucket.Name,
recursive: true,
forwardTo: "", //TODO(klauspost): Set this to last known offset when resuming.
minDisks: 1,
reportNotFound: false,
agreed: healEntry,
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, _ := entries.firstFound()
if entry != nil && !entry.isDir() {
healEntry(*entry)
}
},
finished: nil,
})
logger.LogIf(ctx, err)
}
return nil
}
// healObject heals given object path in deep to fix bitrot.
func healObject(bucket, object, versionID string, scan madmin.HealScanMode) {
// Get background heal sequence to send elements to heal
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if ok {
bgSeq.sourceCh <- healSource{
bucket: bucket,
object: object,
versionID: versionID,
opts: &madmin.HealOpts{
Remove: true, // if found dangling purge it.
ScanMode: scan,
},
}
}
}