Continous healing: add optional bitrot check (#10417)

master
Klaus Post 4 years ago committed by GitHub
parent f355374962
commit fa01e640f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      cmd/config-current.go
  2. 3
      cmd/config/config.go
  3. 67
      cmd/config/crawler/crawler.go
  4. 19
      cmd/data-crawler.go
  5. 23
      cmd/xl-storage.go

@ -25,6 +25,7 @@ import (
"github.com/minio/minio/cmd/config/api" "github.com/minio/minio/cmd/config/api"
"github.com/minio/minio/cmd/config/cache" "github.com/minio/minio/cmd/config/cache"
"github.com/minio/minio/cmd/config/compress" "github.com/minio/minio/cmd/config/compress"
"github.com/minio/minio/cmd/config/crawler"
"github.com/minio/minio/cmd/config/dns" "github.com/minio/minio/cmd/config/dns"
"github.com/minio/minio/cmd/config/etcd" "github.com/minio/minio/cmd/config/etcd"
xldap "github.com/minio/minio/cmd/config/identity/ldap" xldap "github.com/minio/minio/cmd/config/identity/ldap"
@ -55,6 +56,7 @@ func initHelp() {
config.KmsKesSubSys: crypto.DefaultKesKVS, config.KmsKesSubSys: crypto.DefaultKesKVS,
config.LoggerWebhookSubSys: logger.DefaultKVS, config.LoggerWebhookSubSys: logger.DefaultKVS,
config.AuditWebhookSubSys: logger.DefaultAuditKVS, config.AuditWebhookSubSys: logger.DefaultAuditKVS,
config.CrawlerSubSys: crawler.DefaultKVS,
} }
for k, v := range notify.DefaultNotificationKVS { for k, v := range notify.DefaultNotificationKVS {
kvs[k] = v kvs[k] = v
@ -106,6 +108,10 @@ func initHelp() {
Key: config.APISubSys, Key: config.APISubSys,
Description: "manage global HTTP API call specific features, such as throttling, authentication types, etc.", Description: "manage global HTTP API call specific features, such as throttling, authentication types, etc.",
}, },
config.HelpKV{
Key: config.CrawlerSubSys,
Description: "manage continuous disk crawling for bucket disk usage, lifecycle, quota and data integrity checks",
},
config.HelpKV{ config.HelpKV{
Key: config.LoggerWebhookSubSys, Key: config.LoggerWebhookSubSys,
Description: "send server logs to webhook endpoints", Description: "send server logs to webhook endpoints",
@ -185,6 +191,7 @@ func initHelp() {
config.EtcdSubSys: etcd.Help, config.EtcdSubSys: etcd.Help,
config.CacheSubSys: cache.Help, config.CacheSubSys: cache.Help,
config.CompressionSubSys: compress.Help, config.CompressionSubSys: compress.Help,
config.CrawlerSubSys: crawler.Help,
config.IdentityOpenIDSubSys: openid.Help, config.IdentityOpenIDSubSys: openid.Help,
config.IdentityLDAPSubSys: xldap.Help, config.IdentityLDAPSubSys: xldap.Help,
config.PolicyOPASubSys: opa.Help, config.PolicyOPASubSys: opa.Help,
@ -246,6 +253,10 @@ func validateConfig(s config.Config, setDriveCount int) error {
return err return err
} }
if _, err := crawler.LookupConfig(s[config.CrawlerSubSys][config.Default]); err != nil {
return err
}
{ {
etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs) etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs)
if err != nil { if err != nil {
@ -427,6 +438,10 @@ func lookupConfigs(s config.Config, setDriveCount int) {
} }
} }
} }
globalCrawlerConfig, err = crawler.LookupConfig(s[config.CrawlerSubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to read crawler config: %w", err))
}
kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport()) kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport())
if err != nil { if err != nil {

@ -76,6 +76,7 @@ const (
KmsKesSubSys = "kms_kes" KmsKesSubSys = "kms_kes"
LoggerWebhookSubSys = "logger_webhook" LoggerWebhookSubSys = "logger_webhook"
AuditWebhookSubSys = "audit_webhook" AuditWebhookSubSys = "audit_webhook"
CrawlerSubSys = "crawler"
// Add new constants here if you add new fields to config. // Add new constants here if you add new fields to config.
) )
@ -112,6 +113,7 @@ var SubSystems = set.CreateStringSet([]string{
PolicyOPASubSys, PolicyOPASubSys,
IdentityLDAPSubSys, IdentityLDAPSubSys,
IdentityOpenIDSubSys, IdentityOpenIDSubSys,
CrawlerSubSys,
NotifyAMQPSubSys, NotifyAMQPSubSys,
NotifyESSubSys, NotifyESSubSys,
NotifyKafkaSubSys, NotifyKafkaSubSys,
@ -138,6 +140,7 @@ var SubSystemsSingleTargets = set.CreateStringSet([]string{
PolicyOPASubSys, PolicyOPASubSys,
IdentityLDAPSubSys, IdentityLDAPSubSys,
IdentityOpenIDSubSys, IdentityOpenIDSubSys,
CrawlerSubSys,
}...) }...)
// Constant separators // Constant separators

@ -0,0 +1,67 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package crawler
import (
"errors"
"github.com/minio/minio/cmd/config"
)
// Compression environment variables
const (
BitrotScan = "bitrotscan"
)
// Config represents the crawler settings.
type Config struct {
// Bitrot will perform bitrot scan on local disk when checking objects.
Bitrot bool `json:"bitrotscan"`
}
var (
// DefaultKVS - default KV config for crawler settings
DefaultKVS = config.KVS{
config.KV{
Key: BitrotScan,
Value: config.EnableOff,
},
}
// Help provides help for config values
Help = config.HelpKVS{
config.HelpKV{
Key: BitrotScan,
Description: `perform bitrot scan on disks when checking objects during crawl`,
Optional: true,
Type: "on|off",
},
}
)
// LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.CrawlerSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
bitrot := kvs.Get(BitrotScan)
if bitrot != config.EnableOn && bitrot != config.EnableOff {
return cfg, errors.New(BitrotScan + ": must be 'on' or 'off'")
}
cfg.Bitrot = bitrot == config.EnableOn
return cfg, nil
}

@ -27,9 +27,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/config/crawler"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/bucket/replication" "github.com/minio/minio/pkg/bucket/replication"
@ -37,6 +36,7 @@ import (
"github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/madmin"
"github.com/willf/bloom" "github.com/willf/bloom"
) )
@ -46,7 +46,14 @@ const (
dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles.
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles. dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
healDeleteDangling = true healDeleteDangling = true
healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n.
)
var (
globalCrawlerConfig crawler.Config
) )
// initDataCrawler will start the crawler unless disabled. // initDataCrawler will start the crawler unless disabled.
@ -174,9 +181,9 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
// Enable healing in XL mode. // Enable healing in XL mode.
if globalIsErasure { if globalIsErasure {
// Include a clean folder one in n cycles. // Include a clean folder one in n cycles.
s.healFolderInclude = 32 s.healFolderInclude = healFolderIncludeProb
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
s.healObjectSelect = 512 s.healObjectSelect = healObjectSelectProb
} }
if len(cache.Info.BloomFilter) > 0 { if len(cache.Info.BloomFilter) > 0 {
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}} s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
@ -624,7 +631,7 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action
if isErrObjectNotFound(err) || isErrVersionNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0 return 0
} }
if !errors.Is(err, NotImplemented{}) { if err != nil && !errors.Is(err, NotImplemented{}) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return 0 return 0
} }

@ -38,7 +38,7 @@ import (
"syscall" "syscall"
"time" "time"
humanize "github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/klauspost/readahead" "github.com/klauspost/readahead"
"github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config"
@ -46,6 +46,7 @@ import (
"github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/env"
xioutil "github.com/minio/minio/pkg/ioutil" xioutil "github.com/minio/minio/pkg/ioutil"
"github.com/minio/minio/pkg/madmin"
) )
const ( const (
@ -357,6 +358,7 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
if objAPI == nil { if objAPI == nil {
return cache, errServerNotInitialized return cache, errServerNotInitialized
} }
opts := globalCrawlerConfig
dataUsageInfo, err := crawlDataFolder(ctx, s.diskPath, cache, s.waitForLowActiveIO, func(item crawlItem) (int64, error) { dataUsageInfo, err := crawlDataFolder(ctx, s.diskPath, cache, s.waitForLowActiveIO, func(item crawlItem) (int64, error) {
// Look for `xl.meta/xl.json' at the leaf. // Look for `xl.meta/xl.json' at the leaf.
@ -394,6 +396,25 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
oi: oi, oi: oi,
}) })
if !version.Deleted { if !version.Deleted {
// Bitrot check local data
if size > 0 && item.heal && opts.Bitrot {
s.waitForLowActiveIO()
err := s.VerifyFile(ctx, item.bucket, item.objectPath(), version)
switch err {
case errFileCorrupt:
res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{Remove: healDeleteDangling, ScanMode: madmin.HealDeepScan})
if err != nil {
if !errors.Is(err, NotImplemented{}) {
logger.LogIf(ctx, err)
}
size = 0
} else {
size = res.ObjectSize
}
default:
// VerifyFile already logs errors
}
}
totalSize += size totalSize += size
} }
item.healReplication(ctx, objAPI, actionMeta{oi: oi}) item.healReplication(ctx, objAPI, actionMeta{oi: oi})

Loading…
Cancel
Save