Use defer style to stop tickers to avoid current/possible misuse (#5883)

This commit ensures that all tickers are stopped using defer ticker.Stop()
style. This will also fix one bug seen when a client starts to listen to
event notifications and that case will result a leak in tickers.
master
Anis Elleuch 7 years ago committed by Dee Koder
parent 0f746a14a3
commit 9439dfef64
  1. 2
      cmd/admin-handlers.go
  2. 4
      cmd/disk-cache-fs.go
  3. 4
      cmd/fs-v1-multipart.go
  4. 4
      cmd/lock-rpc-server.go
  5. 7
      cmd/xl-sets.go
  6. 2
      cmd/xl-v1-multipart.go
  7. 4
      pkg/event/target/httpclient.go

@ -500,6 +500,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
// after 10s unless a response item comes in // after 10s unless a response item comes in
keepConnLive := func(w http.ResponseWriter, respCh chan healResp) { keepConnLive := func(w http.ResponseWriter, respCh chan healResp) {
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
started := false started := false
forLoop: forLoop:
for { for {
@ -528,7 +529,6 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
break forLoop break forLoop
} }
} }
ticker.Stop()
} }
// find number of disks in the setup // find number of disks in the setup

@ -155,11 +155,11 @@ func (cfs *cacheFSObjects) diskAvailable(size int64) bool {
// purges all content marked trash from the cache. // purges all content marked trash from the cache.
func (cfs *cacheFSObjects) purgeTrash() { func (cfs *cacheFSObjects) purgeTrash() {
ticker := time.NewTicker(time.Minute * cacheCleanupInterval) ticker := time.NewTicker(time.Minute * cacheCleanupInterval)
defer ticker.Stop()
for { for {
select { select {
case <-globalServiceDoneCh: case <-globalServiceDoneCh:
// Stop the timer.
ticker.Stop()
return return
case <-ticker.C: case <-ticker.C:
trashPath := path.Join(cfs.fsPath, minioMetaBucket, cacheTrashDir) trashPath := path.Join(cfs.fsPath, minioMetaBucket, cacheTrashDir)

@ -714,11 +714,11 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
// blocking and should be run in a go-routine. // blocking and should be run in a go-routine.
func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh chan struct{}) { func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
ticker := time.NewTicker(cleanupInterval) ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for { for {
select { select {
case <-doneCh: case <-doneCh:
// Stop the timer.
ticker.Stop()
return return
case <-ticker.C: case <-ticker.C:
now := time.Now() now := time.Now()

@ -69,6 +69,8 @@ func startLockMaintenance(lkSrv *lockServer) {
go func(lk *lockServer) { go func(lk *lockServer) {
// Initialize a new ticker with a minute between each ticks. // Initialize a new ticker with a minute between each ticks.
ticker := time.NewTicker(lockMaintenanceInterval) ticker := time.NewTicker(lockMaintenanceInterval)
// Stop the timer upon service closure and cleanup the go-routine.
defer ticker.Stop()
// Start with random sleep time, so as to avoid "synchronous checks" between servers // Start with random sleep time, so as to avoid "synchronous checks" between servers
time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval))) time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval)))
@ -76,8 +78,6 @@ func startLockMaintenance(lkSrv *lockServer) {
// Verifies every minute for locks held more than 2minutes. // Verifies every minute for locks held more than 2minutes.
select { select {
case <-globalServiceDoneCh: case <-globalServiceDoneCh:
// Stop the timer upon service closure and cleanup the go-routine.
ticker.Stop()
return return
case <-ticker.C: case <-ticker.C:
lk.lockMaintenance(lockValidityCheckInterval) lk.lockMaintenance(lockValidityCheckInterval)

@ -198,15 +198,14 @@ func (s *xlSets) connectDisks() {
// the set topology, this monitoring happens at a given monitoring interval. // the set topology, this monitoring happens at a given monitoring interval.
func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) { func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) {
ticker := time.NewTicker(monitorInterval) ticker := time.NewTicker(monitorInterval)
// Stop the timer.
defer ticker.Stop()
for { for {
select { select {
case <-globalServiceDoneCh: case <-globalServiceDoneCh:
// Stop the timer.
ticker.Stop()
return return
case <-s.disksConnectDoneCh: case <-s.disksConnectDoneCh:
// Stop the timer.
ticker.Stop()
return return
case <-ticker.C: case <-ticker.C:
s.connectDisks() s.connectDisks()

@ -837,11 +837,11 @@ func (xl xlObjects) AbortMultipartUpload(ctx context.Context, bucket, object, up
// Clean-up the old multipart uploads. Should be run in a Go routine. // Clean-up the old multipart uploads. Should be run in a Go routine.
func (xl xlObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh chan struct{}) { func (xl xlObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
ticker := time.NewTicker(cleanupInterval) ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for { for {
select { select {
case <-doneCh: case <-doneCh:
ticker.Stop()
return return
case <-ticker.C: case <-ticker.C:
var disk StorageAPI var disk StorageAPI

@ -62,8 +62,10 @@ func (target *HTTPClientTarget) start() {
return nil return nil
} }
for {
keepAliveTicker := time.NewTicker(500 * time.Millisecond) keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
for {
select { select {
case <-target.stopCh: case <-target.stopCh:
// We are asked to stop. // We are asked to stop.

Loading…
Cancel
Save