Avoid synchronizing usage writes (#11560)

If the periodic `case <-t.C:` save gets held up for a long time it will end up 
synchronize all disk writes for saving the caches.

We add jitter to per set writes so they don't sync up and don't hold a 
lock for the write, since it isn't needed anyway.

If an outage prevents writes for a long while we also add individual 
waits for each disk in case there was a queue.

Furthermore limit the number of buffers kept to 2GiB, since this could get 
huge in large clusters. This will not act as a hard limit but should be enough 
for normal operation.
master
Klaus Post 3 years ago committed by GitHub
parent 8e8a792d9d
commit 8a6b13c239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/data-usage-cache.go
  2. 7
      cmd/erasure-sets.go
  3. 8
      cmd/erasure.go

@ -522,7 +522,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
dataUsageBucket,
name,
NewPutObjReader(r),
ObjectOptions{})
ObjectOptions{NoLock: true})
if isErrBucketNotFound(err) {
return nil
}

@ -357,9 +357,14 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
mutex := newNSLock(globalIsDistErasure)
// Number of buffers, max 2GB.
n := setCount * setDriveCount
if n > 100 {
n = 100
}
// Initialize byte pool once for all sets, bpool size is set to
// setCount * setDriveCount with each memory upto blockSizeV1.
bp := bpool.NewBytePoolCap(setCount*setDriveCount, blockSizeV1, blockSizeV1*2)
bp := bpool.NewBytePoolCap(n, blockSizeV1, blockSizeV1*2)
for i := 0; i < setCount; i++ {
s.erasureDisks[i] = make([]StorageAPI, setDriveCount)

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"sync"
"time"
@ -345,7 +346,8 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
var saverWg sync.WaitGroup
saverWg.Add(1)
go func() {
const updateTime = 30 * time.Second
// Add jitter to the update time so multiple sets don't sync up.
var updateTime = 30*time.Second + time.Duration(float64(10*time.Second)*rand.Float64())
t := time.NewTicker(updateTime)
defer t.Stop()
defer saverWg.Done()
@ -429,11 +431,15 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
if r := cache.root(); r != nil {
root = cache.flatten(*r)
}
t := time.Now()
bucketResults <- dataUsageEntryInfo{
Name: cache.Info.Name,
Parent: dataUsageRoot,
Entry: root,
}
// We want to avoid synchronizing up all writes in case
// the results are piled up.
time.Sleep(time.Duration(float64(time.Since(t)) * rand.Float64()))
// Save cache
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
}

Loading…
Cancel
Save