fix: various optimizations, idiomatic changes (#9179)

- acquire since leader lock for all background operations
  - healing, crawling and applying lifecycle policies.

- simplify lifecyle to avoid network calls, which was a
  bug in implementation - we should hold a leader and
  do everything from there, we have access to entire
  name space.

- make listing, walking not interfere by slowing itself
  down like the crawler.

- effectively use global context everywhere to ensure
  proper shutdown, in cache, lifecycle, healing

- don't read `format.json` for prometheus metrics in
  StorageInfo() call.
master
Harshavardhana 5 years ago committed by GitHub
parent ea18e51f4d
commit cfc9cfd84a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      cmd/auth-handler.go
  2. 64
      cmd/background-heal-ops.go
  3. 124
      cmd/background-newdisks-heal-ops.go
  4. 85
      cmd/daily-lifecycle-ops.go
  5. 37
      cmd/data-usage.go
  6. 2
      cmd/disk-cache-backend.go
  7. 8
      cmd/disk-cache.go
  8. 4
      cmd/gateway-main.go
  9. 30
      cmd/generic-handlers.go
  10. 66
      cmd/global-heal.go
  11. 8
      cmd/lock-rest-server.go
  12. 18
      cmd/notification.go
  13. 26
      cmd/peer-rest-client.go
  14. 1
      cmd/peer-rest-common.go
  15. 18
      cmd/peer-rest-server.go
  16. 9
      cmd/posix.go
  17. 34
      cmd/server-main.go
  18. 5
      cmd/xl-sets.go

@ -226,7 +226,7 @@ func getClaimsFromToken(r *http.Request) (map[string]interface{}, error) {
if err != nil {
// Base64 decoding fails, we should log to indicate
// something is malforming the request sent by client.
logger.LogIf(context.Background(), err, logger.Application)
logger.LogIf(r.Context(), err, logger.Application)
return nil, errAuthentication
}
claims.MapClaims[iampolicy.SessionPolicyName] = string(spBytes)
@ -246,7 +246,7 @@ func checkClaimsFromToken(r *http.Request, cred auth.Credentials) (map[string]in
}
claims, err := getClaimsFromToken(r)
if err != nil {
return nil, toAPIErrorCode(context.Background(), err)
return nil, toAPIErrorCode(r.Context(), err)
}
return claims, ErrNone
}
@ -460,7 +460,7 @@ func (a authHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.handler.ServeHTTP(w, r)
return
}
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrSignatureVersionNotSupported), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrSignatureVersionNotSupported), r.URL, guessIsBrowserReq(r))
}
// isPutActionAllowed - check if PUT operation is allowed on the resource, this

@ -66,8 +66,7 @@ func waitForLowHTTPReq(tolerance int32) {
}
// Wait for heal requests and process them
func (h *healRoutine) run() {
ctx := context.Background()
func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
for {
select {
case task, ok := <-h.tasks:
@ -83,18 +82,18 @@ func (h *healRoutine) run() {
bucket, object := path2BucketObject(task.path)
switch {
case bucket == "" && object == "":
res, err = bgHealDiskFormat(ctx, task.opts)
res, err = healDiskFormat(ctx, objAPI, task.opts)
case bucket != "" && object == "":
res, err = bgHealBucket(ctx, bucket, task.opts)
res, err = objAPI.HealBucket(ctx, bucket, task.opts.DryRun, task.opts.Remove)
case bucket != "" && object != "":
res, err = bgHealObject(ctx, bucket, object, task.opts)
res, err = objAPI.HealObject(ctx, bucket, object, task.opts)
}
if task.responseCh != nil {
task.responseCh <- healResult{result: res, err: err}
}
case <-h.doneCh:
return
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
}
}
@ -108,22 +107,10 @@ func initHealRoutine() *healRoutine {
}
func startBackgroundHealing() {
ctx := context.Background()
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
func startBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
// Run the background healer
globalBackgroundHealRoutine = initHealRoutine()
go globalBackgroundHealRoutine.run()
go globalBackgroundHealRoutine.run(ctx, objAPI)
// Launch the background healer sequence to track
// background healing operations
@ -133,20 +120,14 @@ func startBackgroundHealing() {
globalBackgroundHealState.LaunchNewHealSequence(nh)
}
func initBackgroundHealing() {
go startBackgroundHealing()
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
go startBackgroundHealing(ctx, objAPI)
}
// bgHealDiskFormat - heals format.json, return value indicates if a
// healDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
res, err := objectAPI.HealFormat(ctx, opts.DryRun)
func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpts) (madmin.HealResultItem, error) {
res, err := objAPI.HealFormat(ctx, opts.DryRun)
// return any error, ignore error returned when disks have
// already healed.
@ -167,24 +148,3 @@ func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealRes
return res, nil
}
// bghealBucket - traverses and heals given bucket
func bgHealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealBucket(ctx, bucket, opts.DryRun, opts.Remove)
}
// bgHealObject - heal the given object and record result
func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealObject(ctx, bucket, object, opts)
}

@ -25,32 +25,19 @@ import (
const defaultMonitorNewDiskInterval = time.Minute * 10
func initLocalDisksAutoHeal() {
go monitorLocalDisksAndHeal()
func initLocalDisksAutoHeal(ctx context.Context, objAPI ObjectLayer) {
go monitorLocalDisksAndHeal(ctx, objAPI)
}
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
// 1. Only the concerned erasure set will be listed and healed
// 2. Only the node hosting the disk is responsible to perform the heal
func monitorLocalDisksAndHeal() {
// Wait until the object layer is ready
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
z, ok := objAPI.(*xlZones)
if !ok {
return
}
ctx := context.Background()
var bgSeq *healSequence
var found bool
@ -64,64 +51,67 @@ func monitorLocalDisksAndHeal() {
// Perform automatic disk healing when a disk is replaced locally.
for {
time.Sleep(defaultMonitorNewDiskInterval)
// Attempt a heal as the server starts-up first.
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
for i, ep := range globalEndpoints {
localDisksToHeal := Endpoints{}
for _, endpoint := range ep.Endpoints {
if !endpoint.IsLocal {
continue
select {
case <-ctx.Done():
return
case <-time.After(defaultMonitorNewDiskInterval):
// Attempt a heal as the server starts-up first.
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
for i, ep := range globalEndpoints {
localDisksToHeal := Endpoints{}
for _, endpoint := range ep.Endpoints {
if !endpoint.IsLocal {
continue
}
// Try to connect to the current endpoint
// and reformat if the current disk is not formatted
_, _, err := connectEndpoint(endpoint)
if err == errUnformattedDisk {
localDisksToHeal = append(localDisksToHeal, endpoint)
}
}
// Try to connect to the current endpoint
// and reformat if the current disk is not formatted
_, _, err := connectEndpoint(endpoint)
if err == errUnformattedDisk {
localDisksToHeal = append(localDisksToHeal, endpoint)
if len(localDisksToHeal) == 0 {
continue
}
localDisksInZoneHeal[i] = localDisksToHeal
}
if len(localDisksToHeal) == 0 {
continue
}
localDisksInZoneHeal[i] = localDisksToHeal
}
// Reformat disks
bgSeq.sourceCh <- SlashSeparator
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- nopHeal
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
// Compute the list of erasure set to heal
for i, localDisksToHeal := range localDisksInZoneHeal {
var erasureSetToHeal []int
for _, endpoint := range localDisksToHeal {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
logger.LogIf(ctx, err)
continue
// Reformat disks
bgSeq.sourceCh <- SlashSeparator
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- nopHeal
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
// Compute the list of erasure set to heal
for i, localDisksToHeal := range localDisksInZoneHeal {
var erasureSetToHeal []int
for _, endpoint := range localDisksToHeal {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
logger.LogIf(ctx, err)
continue
}
// Calculate the set index where the current endpoint belongs
setIndex, _, err := findDiskIndex(z.zones[i].format, format)
if err != nil {
logger.LogIf(ctx, err)
continue
}
erasureSetToHeal = append(erasureSetToHeal, setIndex)
}
// Calculate the set index where the current endpoint belongs
setIndex, _, err := findDiskIndex(z.zones[i].format, format)
if err != nil {
logger.LogIf(ctx, err)
continue
}
erasureSetToHeal = append(erasureSetToHeal, setIndex)
erasureSetInZoneToHeal[i] = erasureSetToHeal
}
erasureSetInZoneToHeal[i] = erasureSetToHeal
}
// Heal all erasure sets that need
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
for _, setIndex := range erasureSetToHeal {
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex])
if err != nil {
logger.LogIf(ctx, err)
// Heal all erasure sets that need
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
for _, setIndex := range erasureSetToHeal {
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex])
if err != nil {
logger.LogIf(ctx, err)
}
}
}
}

@ -30,76 +30,30 @@ const (
bgLifecycleTick = time.Hour
)
type lifecycleOps struct {
LastActivity time.Time
}
// Register to the daily objects listing
var globalLifecycleOps = &lifecycleOps{}
func getLocalBgLifecycleOpsStatus() BgLifecycleOpsStatus {
return BgLifecycleOpsStatus{
LastActivity: globalLifecycleOps.LastActivity,
}
}
// initDailyLifecycle starts the routine that receives the daily
// listing of all objects and applies any matching bucket lifecycle
// rules.
func initDailyLifecycle() {
go startDailyLifecycle()
func initDailyLifecycle(ctx context.Context, objAPI ObjectLayer) {
go startDailyLifecycle(ctx, objAPI)
}
func startDailyLifecycle() {
var objAPI ObjectLayer
var ctx = context.Background()
// Wait until the object API is ready
func startDailyLifecycle(ctx context.Context, objAPI ObjectLayer) {
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Calculate the time of the last lifecycle operation in all peers node of the cluster
computeLastLifecycleActivity := func(status []BgOpsStatus) time.Time {
var lastAct time.Time
for _, st := range status {
if st.LifecycleOps.LastActivity.After(lastAct) {
lastAct = st.LifecycleOps.LastActivity
select {
case <-ctx.Done():
return
case <-time.NewTimer(bgLifecycleInterval).C:
// Perform one lifecycle operation
err := lifecycleRound(ctx, objAPI)
switch err.(type) {
case OperationTimedOut:
// Unable to hold a lock means there is another
// caller holding write lock, ignore and try next round.
continue
default:
logger.LogIf(ctx, err)
}
}
return lastAct
}
for {
// Check if we should perform lifecycle ops based on the last lifecycle activity, sleep one hour otherwise
allLifecycleStatus := []BgOpsStatus{
{LifecycleOps: getLocalBgLifecycleOpsStatus()},
}
if globalIsDistXL {
allLifecycleStatus = append(allLifecycleStatus, globalNotificationSys.BackgroundOpsStatus()...)
}
lastAct := computeLastLifecycleActivity(allLifecycleStatus)
if !lastAct.IsZero() && time.Since(lastAct) < bgLifecycleInterval {
time.Sleep(bgLifecycleTick)
}
// Perform one lifecycle operation
err := lifecycleRound(ctx, objAPI)
switch err.(type) {
// Unable to hold a lock means there is another
// instance doing the lifecycle round round
case OperationTimedOut:
time.Sleep(bgLifecycleTick)
default:
logger.LogIf(ctx, err)
time.Sleep(time.Minute)
continue
}
}
}
@ -107,13 +61,6 @@ func startDailyLifecycle() {
var lifecycleLockTimeout = newDynamicTimeout(60*time.Second, time.Second)
func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
// Lock to avoid concurrent lifecycle ops from other nodes
sweepLock := objAPI.NewNSLock(ctx, "system", "daily-lifecycle-ops")
if err := sweepLock.GetLock(lifecycleLockTimeout); err != nil {
return err
}
defer sweepLock.Unlock()
buckets, err := objAPI.ListBuckets(ctx)
if err != nil {
return err

@ -49,50 +49,19 @@ const (
)
// initDataUsageStats will start the crawler unless disabled.
func initDataUsageStats() {
func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) {
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
go runDataUsageInfoUpdateRoutine()
go runDataUsageInfo(ctx, objAPI)
}
}
// runDataUsageInfoUpdateRoutine will contain the main crawler.
func runDataUsageInfoUpdateRoutine() {
// Wait until the object layer is ready
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
runDataUsageInfo(GlobalContext, objAPI)
}
var dataUsageLockTimeout = lifecycleLockTimeout
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 crawler is running on the cluster.
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info")
for {
err := locker.GetLock(dataUsageLockTimeout)
if err != nil {
time.Sleep(5 * time.Minute)
continue
}
// Break without unlocking, this node will acquire
// data usage calculator role for its lifetime.
break
}
for {
select {
case <-ctx.Done():
locker.Unlock()
return
// Wait before starting next cycle and wait on startup.
case <-time.NewTimer(dataUsageStartDelay).C:
// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
err := objAPI.CrawlAndGetDataUsage(ctx, results)

@ -229,7 +229,7 @@ func (c *diskCache) toClear() uint64 {
}
// Purge cache entries that were not accessed.
func (c *diskCache) purge(ctx context.Context, doneCh <-chan struct{}) {
func (c *diskCache) purge(ctx context.Context) {
if c.diskUsageLow() {
return
}

@ -689,17 +689,17 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
if migrateSw {
go c.migrateCacheFromV1toV2(ctx)
}
go c.gc(ctx, GlobalServiceDoneCh)
go c.gc(ctx)
return c, nil
}
func (c *cacheObjects) gc(ctx context.Context, doneCh <-chan struct{}) {
func (c *cacheObjects) gc(ctx context.Context) {
ticker := time.NewTicker(cacheGCInterval)
defer ticker.Stop()
for {
select {
case <-doneCh:
case <-ctx.Done():
return
case <-ticker.C:
if c.migrating {
@ -714,7 +714,7 @@ func (c *cacheObjects) gc(ctx context.Context, doneCh <-chan struct{}) {
go func(d *diskCache) {
defer wg.Done()
d.resetGCCounter()
d.purge(ctx, doneCh)
d.purge(ctx)
}(dcache)
}
wg.Wait()

@ -267,7 +267,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(context.Background(), globalCacheConfig)
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
@ -277,7 +277,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
// Populate existing buckets to the etcd backend
if globalDNSConfig != nil {
buckets, err := newObject.ListBuckets(context.Background())
buckets, err := newObject.ListBuckets(GlobalContext)
if err != nil {
logger.Fatal(err, "Unable to list buckets")
}

@ -87,7 +87,7 @@ func setRequestHeaderSizeLimitHandler(h http.Handler) http.Handler {
// of the user-defined metadata to 2 KB.
func (h requestHeaderSizeLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if isHTTPHeaderSizeTooLarge(r.Header) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL, guessIsBrowserReq(r))
return
}
h.Handler.ServeHTTP(w, r)
@ -130,7 +130,7 @@ func filterReservedMetadata(h http.Handler) http.Handler {
// would be treated as metadata.
func (h reservedMetadataHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if containsReservedMetadata(r.Header) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrUnsupportedMetadata), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrUnsupportedMetadata), r.URL, guessIsBrowserReq(r))
return
}
h.Handler.ServeHTTP(w, r)
@ -371,14 +371,14 @@ func (h timeValidityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// All our internal APIs are sensitive towards Date
// header, for all requests where Date header is not
// present we will reject such clients.
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(errCode), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(errCode), r.URL, guessIsBrowserReq(r))
return
}
// Verify if the request date header is shifted by less than globalMaxSkewTime parameter in the past
// or in the future, reject request otherwise.
curTime := UTCNow()
if curTime.Sub(amzDate) > globalMaxSkewTime || amzDate.Sub(curTime) > globalMaxSkewTime {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL, guessIsBrowserReq(r))
return
}
}
@ -509,14 +509,14 @@ func (h resourceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If bucketName is present and not objectName check for bucket level resource queries.
if bucketName != "" && objectName == "" {
if ignoreNotImplementedBucketResources(r) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
return
}
}
// If bucketName and objectName are present check for its resource queries.
if bucketName != "" && objectName != "" {
if ignoreNotImplementedObjectResources(r) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
return
}
}
@ -589,20 +589,20 @@ func hasMultipleAuth(r *http.Request) bool {
func (h requestValidityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Check for bad components in URL path.
if hasBadPathComponent(r.URL.Path) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
return
}
// Check for bad components in URL query values.
for _, vv := range r.URL.Query() {
for _, v := range vv {
if hasBadPathComponent(v) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
return
}
}
}
if hasMultipleAuth(r) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r))
return
}
h.handler.ServeHTTP(w, r)
@ -648,10 +648,10 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
sr, err := globalDNSConfig.Get(bucket)
if err != nil {
if err == dns.ErrNoEntriesFound {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNoSuchBucket),
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchBucket),
r.URL, guessIsBrowserReq(r))
} else {
writeErrorResponse(context.Background(), w, toAPIError(context.Background(), err),
writeErrorResponse(r.Context(), w, toAPIError(r.Context(), err),
r.URL, guessIsBrowserReq(r))
}
return
@ -697,9 +697,9 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
sr, err := globalDNSConfig.Get(bucket)
if err != nil {
if err == dns.ErrNoEntriesFound {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), r.URL, guessIsBrowserReq(r))
} else {
writeErrorResponse(context.Background(), w, toAPIError(context.Background(), err), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, toAPIError(r.Context(), err), r.URL, guessIsBrowserReq(r))
}
return
}
@ -772,7 +772,7 @@ type criticalErrorHandler struct{ handler http.Handler }
func (h criticalErrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err == logger.ErrCritical { // handle
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
} else if err != nil {
panic(err) // forward other panic calls
}
@ -791,7 +791,7 @@ func (h sseTLSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest))
} else {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL, guessIsBrowserReq(r))
}
return
}

@ -18,7 +18,6 @@ package cmd
import (
"context"
"fmt"
"sync"
"time"
@ -126,65 +125,36 @@ func durationToNextHealRound(lastHeal time.Time) time.Duration {
}
// Healing leader will take the charge of healing all erasure sets
func execLeaderTasks(z *xlZones) {
ctx := context.Background()
// Hold a lock so only one server performs auto-healing
leaderLock := z.NewNSLock(ctx, minioMetaBucket, "leader")
for {
err := leaderLock.GetLock(leaderLockTimeout)
if err == nil {
break
}
time.Sleep(leaderTick)
}
// Hold a lock for healing the erasure set
zeroDuration := time.Millisecond
zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration)
lastScanTime := time.Now() // So that we don't heal immediately, but after one month.
func execLeaderTasks(ctx context.Context, z *xlZones) {
lastScanTime := UTCNow() // So that we don't heal immediately, but after one month.
for {
time.Sleep(durationToNextHealRound(lastScanTime))
for _, zone := range z.zones {
// Heal set by set
for i, set := range zone.sets {
setLock := z.zones[0].NewNSLock(ctx, "system", fmt.Sprintf("erasure-set-heal-%d", i))
if err := setLock.GetLock(zeroDynamicTimeout); err != nil {
logger.LogIf(ctx, err)
continue
}
if err := healErasureSet(ctx, i, set); err != nil {
setLock.Unlock()
logger.LogIf(ctx, err)
continue
select {
case <-ctx.Done():
return
case <-time.NewTimer(durationToNextHealRound(lastScanTime)).C:
for _, zone := range z.zones {
// Heal set by set
for i, set := range zone.sets {
if err := healErasureSet(ctx, i, set); err != nil {
logger.LogIf(ctx, err)
continue
}
}
setLock.Unlock()
}
lastScanTime = UTCNow()
}
lastScanTime = time.Now()
}
}
func startGlobalHeal() {
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
func startGlobalHeal(ctx context.Context, objAPI ObjectLayer) {
zones, ok := objAPI.(*xlZones)
if !ok {
return
}
execLeaderTasks(zones)
execLeaderTasks(ctx, zones)
}
func initGlobalHeal() {
go startGlobalHeal()
func initGlobalHeal(ctx context.Context, objAPI ObjectLayer) {
go startGlobalHeal(ctx, objAPI)
}

@ -293,9 +293,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
}
// Start lock maintenance from all lock servers.
func startLockMaintenance() {
var ctx = context.Background()
func startLockMaintenance(ctx context.Context) {
// Wait until the object API is ready
// no need to start the lock maintenance
// if ObjectAPI is not initialized.
@ -317,7 +315,7 @@ func startLockMaintenance() {
for {
// Verifies every minute for locks held more than 2 minutes.
select {
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
case <-ticker.C:
// Start with random sleep time, so as to avoid
@ -357,5 +355,5 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
}
}
go startLockMaintenance()
go startLockMaintenance(GlobalContext)
}

@ -267,24 +267,6 @@ func (sys *NotificationSys) BackgroundHealStatus() []madmin.BgHealState {
return states
}
// BackgroundOpsStatus - returns the status of all background operations of all peers
func (sys *NotificationSys) BackgroundOpsStatus() []BgOpsStatus {
states := make([]BgOpsStatus, len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
st, err := client.BackgroundOpsStatus()
if err != nil {
logger.LogIf(context.Background(), err)
} else {
states[idx] = st
}
}
return states
}
// StartProfiling - start profiling on remote peers, by initiating a remote RPC.
func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))

@ -614,32 +614,6 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error)
return state, err
}
// BgLifecycleOpsStatus describes the status
// of the background lifecycle operations
type BgLifecycleOpsStatus struct {
LastActivity time.Time
}
// BgOpsStatus describes the status of all operations performed
// in background such as auto-healing and lifecycle.
// Notice: We need to increase peer REST API version when adding
// new fields to this struct.
type BgOpsStatus struct {
LifecycleOps BgLifecycleOpsStatus
}
func (client *peerRESTClient) BackgroundOpsStatus() (BgOpsStatus, error) {
respBody, err := client.call(peerRESTMethodBackgroundOpsStatus, nil, nil, -1)
if err != nil {
return BgOpsStatus{}, err
}
defer http.DrainBody(respBody)
state := BgOpsStatus{}
err = gob.NewDecoder(respBody).Decode(&state)
return state, err
}
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) {
values := make(url.Values)
values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll))

@ -34,7 +34,6 @@ const (
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
peerRESTMethodBackgroundOpsStatus = "/backgroundopsstatus"
peerRESTMethodGetLocks = "/getlocks"
peerRESTMethodBucketPolicyRemove = "/removebucketpolicy"
peerRESTMethodLoadUser = "/loaduser"

@ -1129,22 +1129,6 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
func (s *peerRESTServer) BackgroundOpsStatusHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
ctx := newContext(r, w, "BackgroundOpsStatus")
state := BgOpsStatus{
LifecycleOps: getLocalBgLifecycleOpsStatus(),
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
// ConsoleLogHandler sends console logs of this node back to peer rest client
func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -1230,8 +1214,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketLifecycleRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketEncryptionSet).HandlerFunc(httpTraceHdrs(server.SetBucketSSEConfigHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketEncryptionRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketSSEConfigHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundOpsStatus).HandlerFunc(server.BackgroundOpsStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(httpTraceHdrs(server.ListenHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)

@ -672,11 +672,15 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st
return nil, err
}
ch = make(chan FileInfo)
// buffer channel matches the S3 ListObjects implementation
ch = make(chan FileInfo, maxObjectList)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
// Dynamic time delay.
t := UTCNow()
entries, err := s.ListDir(volume, dirPath, -1, leafFile)
sleepDuration(time.Since(t), 10.0)
if err != nil {
return
}
@ -701,7 +705,10 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st
Mode: os.ModeDir,
}
} else {
// Dynamic time delay.
t := UTCNow()
buf, err := s.ReadAll(volume, pathJoin(walkResult.entry, leafFile))
sleepDuration(time.Since(t), 10.0)
if err != nil {
continue
}

@ -26,6 +26,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"
"github.com/minio/cli"
"github.com/minio/minio/cmd/config"
@ -285,6 +286,27 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error)
return nil
}
func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 crawler is running on the cluster.
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader")
for {
err := locker.GetLock(leaderLockTimeout)
if err != nil {
time.Sleep(leaderTick)
continue
}
break
// No unlock for "leader" lock.
}
if globalIsXL {
initGlobalHeal(ctx, objAPI)
}
initDataUsageStats(ctx, objAPI)
initDailyLifecycle(ctx, objAPI)
}
// serverMain handler called for 'minio server' command.
func serverMain(ctx *cli.Context) {
if ctx.Args().First() == "help" || !endpointsPresent(ctx) {
@ -401,12 +423,11 @@ func serverMain(ctx *cli.Context) {
// Enable healing to heal drives if possible
if globalIsXL {
initBackgroundHealing()
initLocalDisksAutoHeal()
initGlobalHeal()
initBackgroundHealing(GlobalContext, newObject)
initLocalDisksAutoHeal(GlobalContext, newObject)
}
buckets, err := newObject.ListBuckets(context.Background())
buckets, err := newObject.ListBuckets(GlobalContext)
if err != nil {
logger.Fatal(err, "Unable to list buckets")
}
@ -416,7 +437,7 @@ func serverMain(ctx *cli.Context) {
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(context.Background(), globalCacheConfig)
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
@ -429,8 +450,7 @@ func serverMain(ctx *cli.Context) {
initFederatorBackend(buckets, newObject)
}
initDataUsageStats()
initDailyLifecycle()
go startBackgroundOps(GlobalContext, newObject)
// Disable safe mode operation, after all initialization is over.
globalObjLayerMutex.Lock()

@ -392,6 +392,11 @@ func (s *xlSets) StorageInfo(ctx context.Context, local bool) StorageInfo {
storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet)
}
if local {
// if local is true, we don't need to read format.json
return storageInfo
}
storageDisks, dErrs := initStorageDisksWithErrors(s.endpoints)
defer closeStorageDisks(storageDisks)

Loading…
Cancel
Save