Avoid ticker timer to simplify disk usage (#6101)

This PR simplifies the code to avoid tracking
any running usage events. This PR also brings
in an upper threshold of upto 1 minute suspend
the usage function after which the usage would
proceed without waiting any longer.
master
Harshavardhana 6 years ago committed by kannappanr
parent 805186ab97
commit de251483d1
  1. 40
      cmd/fs-v1.go
  2. 41
      cmd/posix.go

@ -45,8 +45,6 @@ var defaultEtag = "00000000000000000000000000000000-1"
type FSObjects struct { type FSObjects struct {
// Disk usage metrics // Disk usage metrics
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// Disk usage running routine
usageRunning int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// Path to be exported over S3 API. // Path to be exported over S3 API.
fsPath string fsPath string
@ -183,13 +181,14 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error {
// diskUsage returns du information for the posix path, in a continuous routine. // diskUsage returns du information for the posix path, in a continuous routine.
func (fs *FSObjects) diskUsage(doneCh chan struct{}) { func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
ticker := time.NewTicker(globalUsageCheckInterval)
defer ticker.Stop()
usageFn := func(ctx context.Context, entry string) error { usageFn := func(ctx context.Context, entry string) error {
if globalHTTPServer != nil { if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to count the usage.
waitCount := 60
// Any requests in progress, delay the usage. // Any requests in progress, delay the usage.
for globalHTTPServer.GetRequestCount() > 0 { for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }
@ -213,34 +212,26 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
return nil return nil
} }
// Check if disk usage routine is running, if yes then return. // Return this routine upon errWalkAbort, continue for any other error on purpose
if atomic.LoadInt32(&fs.usageRunning) == 1 { // so that we can start the routine freshly in another 12 hours.
return if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err == errWalkAbort {
}
atomic.StoreInt32(&fs.usageRunning, 1)
defer atomic.StoreInt32(&fs.usageRunning, 0)
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil {
return return
} }
atomic.StoreInt32(&fs.usageRunning, 0)
for { for {
select { select {
case <-doneCh: case <-doneCh:
return return
case <-ticker.C: case <-time.After(globalUsageCheckInterval):
// Check if disk usage routine is running, if yes let it finish.
if atomic.LoadInt32(&fs.usageRunning) == 1 {
continue
}
atomic.StoreInt32(&fs.usageRunning, 1)
var usage uint64 var usage uint64
usageFn = func(ctx context.Context, entry string) error { usageFn = func(ctx context.Context, entry string) error {
if globalHTTPServer != nil { if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to count the usage.
waitCount := 60
// Any requests in progress, delay the usage. // Any requests in progress, delay the usage.
for globalHTTPServer.GetRequestCount() > 0 { for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }
@ -260,11 +251,8 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
} }
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil { if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil {
atomic.StoreInt32(&fs.usageRunning, 0)
continue continue
} }
atomic.StoreInt32(&fs.usageRunning, 0)
atomic.StoreUint64(&fs.totalUsed, usage) atomic.StoreUint64(&fs.totalUsed, usage)
} }
} }

@ -61,10 +61,8 @@ func isValidVolname(volname string) bool {
// posix - implements StorageAPI interface. // posix - implements StorageAPI interface.
type posix struct { type posix struct {
// Disk usage metrics // Disk usage metrics
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// Disk usage running routine ioErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
usageRunning int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
ioErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
diskPath string diskPath string
pool sync.Pool pool sync.Pool
@ -347,8 +345,12 @@ func (s *posix) diskUsage(doneCh chan struct{}) {
usageFn := func(ctx context.Context, entry string) error { usageFn := func(ctx context.Context, entry string) error {
if globalHTTPServer != nil { if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to count the usage.
waitCount := 60
// Any requests in progress, delay the usage. // Any requests in progress, delay the usage.
for globalHTTPServer.GetRequestCount() > 0 { for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }
@ -368,17 +370,11 @@ func (s *posix) diskUsage(doneCh chan struct{}) {
} }
} }
// Check if disk usage routine is running, if yes then return. // Return this routine upon errWalkAbort, continue for any other error on purpose
if atomic.LoadInt32(&s.usageRunning) == 1 { // so that we can start the routine freshly in another 12 hours.
if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err == errWalkAbort {
return return
} }
atomic.StoreInt32(&s.usageRunning, 1)
defer atomic.StoreInt32(&s.usageRunning, 0)
if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil {
return
}
atomic.StoreInt32(&s.usageRunning, 0)
for { for {
select { select {
@ -386,19 +382,16 @@ func (s *posix) diskUsage(doneCh chan struct{}) {
return return
case <-doneCh: case <-doneCh:
return return
case <-ticker.C: case <-time.After(globalUsageCheckInterval):
// Check if disk usage routine is running, if yes let it
// finish, before starting a new one.
if atomic.LoadInt32(&s.usageRunning) == 1 {
continue
}
atomic.StoreInt32(&s.usageRunning, 1)
var usage uint64 var usage uint64
usageFn = func(ctx context.Context, entry string) error { usageFn = func(ctx context.Context, entry string) error {
if globalHTTPServer != nil { if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to count the usage.
waitCount := 60
// Any requests in progress, delay the usage. // Any requests in progress, delay the usage.
for globalHTTPServer.GetRequestCount() > 0 { for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }
@ -417,11 +410,9 @@ func (s *posix) diskUsage(doneCh chan struct{}) {
} }
if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil { if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil {
atomic.StoreInt32(&s.usageRunning, 0)
continue continue
} }
atomic.StoreInt32(&s.usageRunning, 0)
atomic.StoreUint64(&s.totalUsed, usage) atomic.StoreUint64(&s.totalUsed, usage)
} }
} }

Loading…
Cancel
Save