|
|
@ -17,6 +17,7 @@ |
|
|
|
package cmd |
|
|
|
package cmd |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
|
|
|
|
"context" |
|
|
|
"errors" |
|
|
|
"errors" |
|
|
|
"math/rand" |
|
|
|
"math/rand" |
|
|
|
"net/http" |
|
|
|
"net/http" |
|
|
@ -29,7 +30,7 @@ import ( |
|
|
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
const ( |
|
|
|
// Lock maintenance interval.
|
|
|
|
// Lock maintenance interval.
|
|
|
|
lockMaintenanceInterval = 30 * time.Second |
|
|
|
lockMaintenanceInterval = 1 * time.Minute |
|
|
|
|
|
|
|
|
|
|
|
// Lock validity check interval.
|
|
|
|
// Lock validity check interval.
|
|
|
|
lockValidityCheckInterval = 2 * time.Minute |
|
|
|
lockValidityCheckInterval = 2 * time.Minute |
|
|
@ -179,6 +180,8 @@ func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterI |
|
|
|
return nlripMap |
|
|
|
return nlripMap |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var lockMaintenanceTimeout = newDynamicTimeout(60*time.Second, time.Second) |
|
|
|
|
|
|
|
|
|
|
|
// lockMaintenance loops over locks that have been active for some time and checks back
|
|
|
|
// lockMaintenance loops over locks that have been active for some time and checks back
|
|
|
|
// with the original server whether it is still alive or not
|
|
|
|
// with the original server whether it is still alive or not
|
|
|
|
//
|
|
|
|
//
|
|
|
@ -187,7 +190,14 @@ func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterI |
|
|
|
// - some network error (and server is up normally)
|
|
|
|
// - some network error (and server is up normally)
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// We will ignore the error, and we will retry later to get a resolve on this lock
|
|
|
|
// We will ignore the error, and we will retry later to get a resolve on this lock
|
|
|
|
func lockMaintenance(interval time.Duration) { |
|
|
|
func lockMaintenance(ctx context.Context, interval time.Duration, objAPI ObjectLayer) error { |
|
|
|
|
|
|
|
// Lock to avoid concurrent lock maintenance loops
|
|
|
|
|
|
|
|
maintenanceLock := objAPI.NewNSLock(ctx, "system", "lock-maintenance-ops") |
|
|
|
|
|
|
|
if err := maintenanceLock.GetLock(lockMaintenanceTimeout); err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
defer maintenanceLock.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Validate if long lived locks are indeed clean.
|
|
|
|
// Validate if long lived locks are indeed clean.
|
|
|
|
// Get list of long lived locks to check for staleness.
|
|
|
|
// Get list of long lived locks to check for staleness.
|
|
|
|
for lendpoint, nlrips := range getLongLivedLocks(interval) { |
|
|
|
for lendpoint, nlrips := range getLongLivedLocks(interval) { |
|
|
@ -203,7 +213,8 @@ func lockMaintenance(interval time.Duration) { |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Call back to original server verify whether the lock is still active (based on name & uid)
|
|
|
|
// Call back to original server verify whether the lock is
|
|
|
|
|
|
|
|
// still active (based on name & uid)
|
|
|
|
expired, err := c.Expired(dsync.LockArgs{ |
|
|
|
expired, err := c.Expired(dsync.LockArgs{ |
|
|
|
UID: nlrip.lri.UID, |
|
|
|
UID: nlrip.lri.UID, |
|
|
|
Resource: nlrip.name, |
|
|
|
Resource: nlrip.name, |
|
|
@ -230,15 +241,31 @@ func lockMaintenance(interval time.Duration) { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Start lock maintenance from all lock servers.
|
|
|
|
// Start lock maintenance from all lock servers.
|
|
|
|
func startLockMaintenance() { |
|
|
|
func startLockMaintenance() { |
|
|
|
|
|
|
|
var objAPI ObjectLayer |
|
|
|
|
|
|
|
var ctx = context.Background() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Wait until the object API is ready
|
|
|
|
|
|
|
|
for { |
|
|
|
|
|
|
|
objAPI = newObjectLayerWithoutSafeModeFn() |
|
|
|
|
|
|
|
if objAPI == nil { |
|
|
|
|
|
|
|
time.Sleep(time.Second) |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Initialize a new ticker with a minute between each ticks.
|
|
|
|
// Initialize a new ticker with a minute between each ticks.
|
|
|
|
ticker := time.NewTicker(lockMaintenanceInterval) |
|
|
|
ticker := time.NewTicker(lockMaintenanceInterval) |
|
|
|
// Stop the timer upon service closure and cleanup the go-routine.
|
|
|
|
// Stop the timer upon service closure and cleanup the go-routine.
|
|
|
|
defer ticker.Stop() |
|
|
|
defer ticker.Stop() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
r := rand.New(rand.NewSource(UTCNow().UnixNano())) |
|
|
|
for { |
|
|
|
for { |
|
|
|
// Verifies every minute for locks held more than 2 minutes.
|
|
|
|
// Verifies every minute for locks held more than 2 minutes.
|
|
|
|
select { |
|
|
|
select { |
|
|
@ -247,10 +274,13 @@ func startLockMaintenance() { |
|
|
|
case <-ticker.C: |
|
|
|
case <-ticker.C: |
|
|
|
// Start with random sleep time, so as to avoid
|
|
|
|
// Start with random sleep time, so as to avoid
|
|
|
|
// "synchronous checks" between servers
|
|
|
|
// "synchronous checks" between servers
|
|
|
|
r := rand.New(rand.NewSource(UTCNow().UnixNano())) |
|
|
|
|
|
|
|
duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) |
|
|
|
duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) |
|
|
|
time.Sleep(duration) |
|
|
|
time.Sleep(duration) |
|
|
|
lockMaintenance(lockValidityCheckInterval) |
|
|
|
if err := lockMaintenance(ctx, lockValidityCheckInterval, objAPI); err != nil { |
|
|
|
|
|
|
|
// Sleep right after an error.
|
|
|
|
|
|
|
|
duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) |
|
|
|
|
|
|
|
time.Sleep(duration) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|