change leader locker only for crawler (#10509)

master
Harshavardhana 4 years ago committed by GitHub
parent 3168e93730
commit 1cf322b7d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      cmd/data-crawler.go
  2. 6
      cmd/global-heal.go
  3. 18
      cmd/server-main.go
  4. 4
      pkg/dsync/drwmutex.go

@ -50,10 +50,14 @@ const (
healFolderIncludeProb = 32 // Include a clean folder one in n cycles. healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n. healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n.
// sleep for an hour after a lock timeout
// before retrying to acquire lock again.
dataCrawlerLeaderLockTimeoutSleepInterval = time.Hour
) )
var ( var (
globalCrawlerConfig crawler.Config globalCrawlerConfig crawler.Config
dataCrawlerLeaderLockTimeout = newDynamicTimeout(1*time.Minute, 30*time.Second)
) )
// initDataCrawler will start the crawler unless disabled. // initDataCrawler will start the crawler unless disabled.
@ -67,6 +71,18 @@ func initDataCrawler(ctx context.Context, objAPI ObjectLayer) {
// The function will block until the context is canceled. // The function will block until the context is canceled.
// There should only ever be one crawler running per cluster. // There should only ever be one crawler running per cluster.
func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 crawler is running on the cluster.
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "runDataCrawler.lock")
for {
err := locker.GetLock(dataCrawlerLeaderLockTimeout)
if err != nil {
time.Sleep(dataCrawlerLeaderLockTimeoutSleepInterval)
continue
}
break
// No unlock for "leader" lock.
}
// Load current bloom cycle // Load current bloom cycle
nextBloomCycle := intDataUpdateTracker.current() + 1 nextBloomCycle := intDataUpdateTracker.current() + 1
var buf bytes.Buffer var buf bytes.Buffer

@ -26,17 +26,11 @@ import (
const ( const (
bgHealingUUID = "0000-0000-0000-0000" bgHealingUUID = "0000-0000-0000-0000"
// sleep for an hour after a lock timeout
// before retrying to acquire lock again.
leaderLockTimeoutSleepInterval = time.Hour
) )
var leaderLockTimeout = newDynamicTimeout(1*time.Minute, 30*time.Second)
// NewBgHealSequence creates a background healing sequence // NewBgHealSequence creates a background healing sequence
// operation which crawls all objects and heal them. // operation which crawls all objects and heal them.
func newBgHealSequence() *healSequence { func newBgHealSequence() *healSequence {
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"} reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo)) ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo))

@ -366,22 +366,6 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
return nil return nil
} }
func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 crawler is running on the cluster.
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader")
for {
err := locker.GetLock(leaderLockTimeout)
if err != nil {
time.Sleep(leaderLockTimeoutSleepInterval)
continue
}
break
// No unlock for "leader" lock.
}
initDataCrawler(ctx, objAPI)
}
// serverMain handler called for 'minio server' command. // serverMain handler called for 'minio server' command.
func serverMain(ctx *cli.Context) { func serverMain(ctx *cli.Context) {
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM) signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM)
@ -503,7 +487,7 @@ func serverMain(ctx *cli.Context) {
globalObjectAPI = newObject globalObjectAPI = newObject
globalObjLayerMutex.Unlock() globalObjLayerMutex.Unlock()
go startBackgroundOps(GlobalContext, newObject) go initDataCrawler(GlobalContext, newObject)
logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode") logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode")

@ -154,7 +154,9 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
} }
} }
log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout) retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel() defer cancel()
for { for {
@ -163,6 +165,8 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
select { select {
case <-retryCtx.Done(): case <-retryCtx.Done():
log("lockBlocking canceled %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
// Caller context canceled or we timedout, // Caller context canceled or we timedout,
// return false anyways for both situations. // return false anyways for both situations.
return false return false

Loading…
Cancel
Save