Avoid data-transfer in distributed locking (#8004)

master
Harshavardhana 5 years ago committed by kannappanr
parent 843f481eb3
commit b52b90412b
  1. 25
      cmd/iam.go
  2. 36
      cmd/lock-rest-client.go
  3. 18
      cmd/lock-rest-server-common.go
  4. 144
      cmd/lock-rest-server.go

@ -891,9 +891,25 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
return errInvalidArgument return errInvalidArgument
} }
// Migrate IAM configuration doneCh := make(chan struct{})
if err := doIAMConfigMigration(objAPI); err != nil { defer close(doneCh)
return err
// Migrating IAM needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
// Migrate IAM configuration
if err := doIAMConfigMigration(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
}
return err
}
break
} }
if globalEtcdClient != nil { if globalEtcdClient != nil {
@ -902,9 +918,6 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
defer sys.watchIAMDisk(objAPI) defer sys.watchIAMDisk(objAPI)
} }
doneCh := make(chan struct{})
defer close(doneCh)
// Initializing IAM needs a retry mechanism for // Initializing IAM needs a retry mechanism for
// the following reasons: // the following reasons:
// - Read quorum is lost just after the initialization // - Read quorum is lost just after the initialization

@ -17,10 +17,8 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/gob"
"errors" "errors"
"io" "io"
"sync" "sync"
@ -131,32 +129,16 @@ func (client *lockRESTClient) Close() error {
// restCall makes a call to the lock REST server. // restCall makes a call to the lock REST server.
func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply bool, err error) { func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply bool, err error) {
values := url.Values{}
reader := bytes.NewBuffer(make([]byte, 0, 2048)) values.Set(lockRESTUID, args.UID)
err = gob.NewEncoder(reader).Encode(args) values.Set(lockRESTSource, args.Source)
if err != nil { values.Set(lockRESTResource, args.Resource)
return false, err values.Set(lockRESTServerAddr, args.ServerAddr)
} values.Set(lockRESTServerEndpoint, args.ServiceEndpoint)
respBody, err := client.call(call, nil, reader, -1)
if err != nil { respBody, err := client.call(call, values, nil, -1)
return false, err
}
var resp lockResponse
defer http.DrainBody(respBody) defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&resp) return err == nil, err
if err != nil || !resp.Success {
reqInfo := &logger.ReqInfo{}
reqInfo.AppendTags("resource", args.Resource)
reqInfo.AppendTags("serveraddress", args.ServerAddr)
reqInfo.AppendTags("serviceendpoint", args.ServiceEndpoint)
reqInfo.AppendTags("source", args.Source)
reqInfo.AppendTags("uid", args.UID)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
}
return resp.Success, err
} }
// RLock calls read lock REST API. // RLock calls read lock REST API.

@ -21,7 +21,7 @@ import (
"time" "time"
) )
const lockRESTVersion = "v1" const lockRESTVersion = "v2"
const lockRESTPath = minioReservedBucketPath + "/lock/" + lockRESTVersion const lockRESTPath = minioReservedBucketPath + "/lock/" + lockRESTVersion
var lockServicePath = path.Join(minioReservedBucketPath, lockServiceSubPath) var lockServicePath = path.Join(minioReservedBucketPath, lockServiceSubPath)
@ -33,6 +33,18 @@ const (
lockRESTMethodRUnlock = "runlock" lockRESTMethodRUnlock = "runlock"
lockRESTMethodForceUnlock = "forceunlock" lockRESTMethodForceUnlock = "forceunlock"
lockRESTMethodExpired = "expired" lockRESTMethodExpired = "expired"
// Unique ID of lock/unlock request.
lockRESTUID = "uid"
// Source contains the line number, function and file name of the code
// on the client node that requested the lock.
lockRESTSource = "source"
// Resource contains a entity to be locked/unlocked.
lockRESTResource = "resource"
// ServerAddr contains the address of the server who requested lock/unlock of the above resource.
lockRESTServerAddr = "serverAddr"
// ServiceEndpoint contains the network path of above server to do lock/unlock.
lockRESTServerEndpoint = "serverEndpoint"
) )
// nameLockRequesterInfoPair is a helper type for lock maintenance // nameLockRequesterInfoPair is a helper type for lock maintenance
@ -41,10 +53,6 @@ type nameLockRequesterInfoPair struct {
lri lockRequesterInfo lri lockRequesterInfo
} }
type lockResponse struct {
Success bool
}
// Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Similar to removeEntry but only removes an entry only if the lock entry exists in map.
func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) {
// Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry)

@ -18,7 +18,6 @@ package cmd
import ( import (
"context" "context"
"encoding/gob"
"errors" "errors"
"math/rand" "math/rand"
"net/http" "net/http"
@ -60,6 +59,15 @@ func (l *lockRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
return true return true
} }
func getLockArgs(r *http.Request) dsync.LockArgs {
return dsync.LockArgs{
UID: r.URL.Query().Get(lockRESTUID),
Resource: r.URL.Query().Get(lockRESTResource),
ServerAddr: r.URL.Query().Get(lockRESTServerAddr),
ServiceEndpoint: r.URL.Query().Get(lockRESTServerEndpoint),
}
}
// LockHandler - Acquires a lock. // LockHandler - Acquires a lock.
func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) {
if !l.IsValid(w, r) { if !l.IsValid(w, r) {
@ -67,28 +75,10 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
ctx := newContext(r, w, "Lock") if _, err := l.ll.Lock(getLockArgs(r)); err != nil {
var lockArgs dsync.LockArgs
if r.ContentLength < 0 {
l.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&lockArgs)
if err != nil {
l.writeErrorResponse(w, err) l.writeErrorResponse(w, err)
return return
} }
success, err := l.ll.Lock(lockArgs)
if err != nil {
l.writeErrorResponse(w, err)
return
}
resp := lockResponse{Success: success}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp))
w.(http.Flusher).Flush()
} }
// UnlockHandler - releases the acquired lock. // UnlockHandler - releases the acquired lock.
@ -98,28 +88,10 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
ctx := newContext(r, w, "Unlock") if _, err := l.ll.Unlock(getLockArgs(r)); err != nil {
var lockArgs dsync.LockArgs
if r.ContentLength < 0 {
l.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&lockArgs)
if err != nil {
l.writeErrorResponse(w, err)
return
}
success, err := l.ll.Unlock(lockArgs)
if err != nil {
l.writeErrorResponse(w, err) l.writeErrorResponse(w, err)
return return
} }
resp := lockResponse{Success: success}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp))
w.(http.Flusher).Flush()
} }
// LockHandler - Acquires an RLock. // LockHandler - Acquires an RLock.
@ -129,27 +101,10 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
ctx := newContext(r, w, "RLock") if _, err := l.ll.RLock(getLockArgs(r)); err != nil {
var lockArgs dsync.LockArgs
if r.ContentLength < 0 {
l.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&lockArgs)
if err != nil {
l.writeErrorResponse(w, err)
return
}
success, err := l.ll.RLock(lockArgs)
if err != nil {
l.writeErrorResponse(w, err) l.writeErrorResponse(w, err)
return return
} }
resp := lockResponse{Success: success}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp))
w.(http.Flusher).Flush()
} }
// RUnlockHandler - releases the acquired read lock. // RUnlockHandler - releases the acquired read lock.
@ -159,27 +114,10 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request)
return return
} }
ctx := newContext(r, w, "RUnlock") if _, err := l.ll.RUnlock(getLockArgs(r)); err != nil {
var lockArgs dsync.LockArgs
if r.ContentLength < 0 {
l.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&lockArgs)
if err != nil {
l.writeErrorResponse(w, err) l.writeErrorResponse(w, err)
return return
} }
success, err := l.ll.RUnlock(lockArgs)
if err != nil {
l.writeErrorResponse(w, err)
return
}
resp := lockResponse{Success: success}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp))
w.(http.Flusher).Flush()
} }
// ForceUnlockHandler - force releases the acquired lock. // ForceUnlockHandler - force releases the acquired lock.
@ -189,28 +127,10 @@ func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Reque
return return
} }
ctx := newContext(r, w, "ForceUnlock") if _, err := l.ll.ForceUnlock(getLockArgs(r)); err != nil {
var lockArgs dsync.LockArgs
if r.ContentLength < 0 {
l.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&lockArgs)
if err != nil {
l.writeErrorResponse(w, err) l.writeErrorResponse(w, err)
return return
} }
success, err := l.ll.ForceUnlock(lockArgs)
if err != nil {
l.writeErrorResponse(w, err)
return
}
resp := lockResponse{Success: success}
logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp))
w.(http.Flusher).Flush()
} }
// ExpiredHandler - query expired lock status. // ExpiredHandler - query expired lock status.
@ -220,19 +140,8 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request)
return return
} }
ctx := newContext(r, w, "Expired") lockArgs := getLockArgs(r)
var lockArgs dsync.LockArgs
if r.ContentLength < 0 {
l.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&lockArgs)
if err != nil {
l.writeErrorResponse(w, err)
return
}
success := true success := true
l.ll.mutex.Lock() l.ll.mutex.Lock()
defer l.ll.mutex.Unlock() defer l.ll.mutex.Unlock()
@ -246,11 +155,10 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request)
} }
} }
} }
// When we get here lock is no longer active due to either dsync.LockArgs.Resource if !success {
// being absent from map or uid not found for given dsync.LockArgs.Resource l.writeErrorResponse(w, errors.New("lock already expired"))
resp := lockResponse{Success: success} return
logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp)) }
w.(http.Flusher).Flush()
} }
// lockMaintenance loops over locks that have been active for some time and checks back // lockMaintenance loops over locks that have been active for some time and checks back
@ -323,12 +231,14 @@ func startLockMaintenance(lkSrv *lockRESTServer) {
// registerLockRESTHandlers - register lock rest router. // registerLockRESTHandlers - register lock rest router.
func registerLockRESTHandlers(router *mux.Router) { func registerLockRESTHandlers(router *mux.Router) {
subrouter := router.PathPrefix(lockRESTPath).Subrouter() subrouter := router.PathPrefix(lockRESTPath).Subrouter()
subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(globalLockServer.LockHandler)) queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource, lockRESTServerAddr, lockRESTServerEndpoint)
subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(globalLockServer.RLockHandler)) subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(globalLockServer.LockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.UnlockHandler)) subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(globalLockServer.RLockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.RUnlockHandler)) subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.UnlockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.ForceUnlockHandler)) subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.RUnlockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodExpired).HandlerFunc(httpTraceAll(globalLockServer.ExpiredHandler)) subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.ForceUnlockHandler)).Queries(queries...)
subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodExpired).HandlerFunc(httpTraceAll(globalLockServer.ExpiredHandler)).Queries(queries...)
router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler)) router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler))
// Start lock maintenance from all lock servers. // Start lock maintenance from all lock servers.

Loading…
Cancel
Save