diff --git a/buildscripts/verify-healing.sh b/buildscripts/verify-healing.sh index b584ff74f..4bd477a04 100755 --- a/buildscripts/verify-healing.sh +++ b/buildscripts/verify-healing.sh @@ -74,7 +74,7 @@ function __init__() echo '{"version": "3", "credential": {"accessKey": "minio", "secretKey": "minio123"}, "region": "us-east-1"}' > "$MINIO_CONFIG_DIR/config.json" } -function perform_test_1() { +function perform_test() { minio_pids=( $(start_minio_3_node 60) ) for pid in "${minio_pids[@]}"; do if ! kill "$pid"; then @@ -86,106 +86,14 @@ function perform_test_1() { purge "$WORK_DIR" exit 1 fi + # forcibly killing, to proceed further properly. + kill -9 "$pid" done - echo "Testing in Distributed Erasure setup healing test case 1" - echo "Remove the contents of the disks belonging to '2' erasure set" + echo "Testing Distributed Erasure setup healing of drives" + echo "Remove the contents of the disks belonging to '${1}' erasure set" - rm -rf ${WORK_DIR}/2/*/ - - minio_pids=( $(start_minio_3_node 60) ) - for pid in "${minio_pids[@]}"; do - if ! kill "$pid"; then - for i in $(seq 1 3); do - echo "server$i log:" - cat "${WORK_DIR}/dist-minio-$[8000+$i].log" - done - echo "FAILED" - purge "$WORK_DIR" - exit 1 - fi - done - - rv=$(check_online) - if [ "$rv" == "1" ]; then - for pid in "${minio_pids[@]}"; do - kill -9 "$pid" - done - for i in $(seq 1 3); do - echo "server$i log:" - cat "${WORK_DIR}/dist-minio-$[8000+$i].log" - done - echo "FAILED" - purge "$WORK_DIR" - exit 1 - fi -} - -function perform_test_2() { - minio_pids=( $(start_minio_3_node 60) ) - for pid in "${minio_pids[@]}"; do - if ! kill "$pid"; then - for i in $(seq 1 3); do - echo "server$i log:" - cat "${WORK_DIR}/dist-minio-$[8000+$i].log" - done - echo "FAILED" - purge "$WORK_DIR" - exit 1 - fi - done - - echo "Testing in Distributed Erasure setup healing test case 2" - echo "Remove the contents of the disks belonging to '1' erasure set" - - rm -rf ${WORK_DIR}/1/*/ - - minio_pids=( $(start_minio_3_node 60) ) - for pid in "${minio_pids[@]}"; do - if ! kill "$pid"; then - for i in $(seq 1 3); do - echo "server$i log:" - cat "${WORK_DIR}/dist-minio-$[8000+$i].log" - done - echo "FAILED" - purge "$WORK_DIR" - exit 1 - fi - done - - rv=$(check_online) - if [ "$rv" == "1" ]; then - for pid in "${minio_pids[@]}"; do - kill -9 "$pid" - done - for i in $(seq 1 3); do - echo "server$i log:" - cat "${WORK_DIR}/dist-minio-$[8000+$i].log" - done - echo "FAILED" - purge "$WORK_DIR" - exit 1 - fi -} - -function perform_test_3() { - minio_pids=( $(start_minio_3_node 60) ) - for pid in "${minio_pids[@]}"; do - if ! kill "$pid"; then - for i in $(seq 1 3); do - echo "server$i log:" - cat "${WORK_DIR}/dist-minio-$[8000+$i].log" - done - echo "FAILED" - purge "$WORK_DIR" - exit 1 - fi - done - - echo "Testing in Distributed Erasure setup healing test case 3" - echo "Remove the contents of the disks belonging to '3' erasure set" - - rm -rf ${WORK_DIR}/3/*/ + rm -rf ${WORK_DIR}/${1}/*/ minio_pids=( $(start_minio_3_node 60) ) for pid in "${minio_pids[@]}"; do @@ -198,6 +106,9 @@ function perform_test_3() { purge "$WORK_DIR" exit 1 fi + # forcibly killing, to proceed further properly. + # if the previous kill is taking time. + kill -9 "$pid" done rv=$(check_online) @@ -217,9 +128,9 @@ function perform_test_3() { function main() { - perform_test_1 - perform_test_2 - perform_test_3 + perform_test "2" + perform_test "1" + perform_test "3" } ( __init__ "$@" && main "$@" ) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 10bb62c54..53c507726 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -356,25 +356,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, return } - // Allocate incoming content length bytes. - var deleteXMLBytes []byte - const maxBodySize = 2 * 1000 * 1024 // The max. XML contains 1000 object names (each at most 1024 bytes long) + XML overhead - if r.ContentLength > maxBodySize { // Only allocated memory for at most 1000 objects - deleteXMLBytes = make([]byte, maxBodySize) - } else { - deleteXMLBytes = make([]byte, r.ContentLength) - } - - // Read incoming body XML bytes. - if _, err := io.ReadFull(r.Body, deleteXMLBytes); err != nil { - logger.LogIf(ctx, err, logger.Application) - writeErrorResponse(ctx, w, toAdminAPIErr(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } + // The max. XML contains 100000 object names (each at most 1024 bytes long) + XML overhead + const maxBodySize = 2 * 100000 * 1024 // Unmarshal list of keys to be deleted. deleteObjects := &DeleteObjectsRequest{} - if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil { + if err := xmlDecoder(r.Body, deleteObjects, maxBodySize); err != nil { logger.LogIf(ctx, err, logger.Application) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL, guessIsBrowserReq(r)) return diff --git a/cmd/daily-lifecycle-ops.go b/cmd/daily-lifecycle-ops.go index 78d3df397..3202a46f0 100644 --- a/cmd/daily-lifecycle-ops.go +++ b/cmd/daily-lifecycle-ops.go @@ -104,12 +104,12 @@ func startDailyLifecycle() { } } -var lifecycleTimeout = newDynamicTimeout(60*time.Second, time.Second) +var lifecycleLockTimeout = newDynamicTimeout(60*time.Second, time.Second) func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error { // Lock to avoid concurrent lifecycle ops from other nodes sweepLock := objAPI.NewNSLock(ctx, "system", "daily-lifecycle-ops") - if err := sweepLock.GetLock(lifecycleTimeout); err != nil { + if err := sweepLock.GetLock(lifecycleLockTimeout); err != nil { return err } defer sweepLock.Unlock() diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 3232adbbd..be1b622d2 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -79,10 +79,12 @@ func timeToCrawl(ctx context.Context, objAPI ObjectLayer) time.Duration { return dataUsageCrawlInterval - waitDuration } +var dataUsageLockTimeout = lifecycleLockTimeout + func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer, endCh <-chan struct{}) { locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info") for { - err := locker.GetLock(newDynamicTimeout(time.Millisecond, time.Millisecond)) + err := locker.GetLock(dataUsageLockTimeout) if err != nil { time.Sleep(5 * time.Minute) continue diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index e24617691..3774090e4 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -188,9 +188,9 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { } // NewNSLock - initialize a new namespace RWLocker instance. -func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, object string) RWLocker { +func (fs *FSObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { // lockers are explicitly 'nil' for FS mode since there are only local lockers - return fs.nsMutex.NewNSLock(ctx, nil, bucket, object) + return fs.nsMutex.NewNSLock(ctx, nil, bucket, objects...) } // Shutdown - should be called when process shuts down. @@ -490,7 +490,6 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu // GetObjectNInfo - returns object info and a reader for object // content. func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { - if err = checkGetObjArgs(ctx, bucket, object); err != nil { return nil, err } diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 000f32cf5..9ee5ec23f 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -37,8 +37,8 @@ type GatewayLocker struct { } // NewNSLock - implements gateway level locker -func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, object string) RWLocker { - return l.nsMutex.NewNSLock(ctx, nil, bucket, object) +func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { + return l.nsMutex.NewNSLock(ctx, nil, bucket, objects...) } // NewGatewayLayerWithLocker - initialize gateway with locker. @@ -56,7 +56,7 @@ func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, endCh <-ch } // NewNSLock is a dummy stub for gateway. -func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, object string) RWLocker { +func (a GatewayUnsupported) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { logger.CriticalIf(ctx, errors.New("not implemented")) return nil } diff --git a/cmd/iam.go b/cmd/iam.go index c86b8ba6e..47b50eb41 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -337,14 +337,6 @@ func (sys *IAMSys) Load() error { // Perform IAM configuration migration. func (sys *IAMSys) doIAMConfigMigration(objAPI ObjectLayer) error { - // Take IAM configuration migration lock - lockPath := iamConfigPrefix + "/migration.lock" - objLock := objAPI.NewNSLock(context.Background(), minioMetaBucket, lockPath) - if err := objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer objLock.Unlock() - return sys.store.migrateBackendFormat(objAPI) } diff --git a/cmd/local-locker.go b/cmd/local-locker.go index fbdbb67fd..707613e26 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -49,12 +49,42 @@ func (l *localLocker) String() string { return l.endpoint.String() } +func (l *localLocker) canTakeUnlock(resources ...string) bool { + var lkCnt int + for _, resource := range resources { + isWriteLockTaken := isWriteLock(l.lockMap[resource]) + if isWriteLockTaken { + lkCnt++ + } + } + return lkCnt == len(resources) +} + +func (l *localLocker) canTakeLock(resources ...string) bool { + var noLkCnt int + for _, resource := range resources { + _, lockTaken := l.lockMap[resource] + if !lockTaken { + noLkCnt++ + } + } + return noLkCnt == len(resources) +} + func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() - _, isLockTaken := l.lockMap[args.Resource] - if !isLockTaken { // No locks held on the given name, so claim write lock - l.lockMap[args.Resource] = []lockRequesterInfo{ + + if !l.canTakeLock(args.Resources...) { + // Not all locks can be taken on resources, + // reject it completely. + return false, nil + } + + // No locks held on the all resources, so claim write + // lock on all resources at once. + for _, resource := range args.Resources { + l.lockMap[resource] = []lockRequesterInfo{ { Writer: true, Source: args.Source, @@ -64,24 +94,22 @@ func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) { }, } } - // return reply=true if lock was granted. - return !isLockTaken, nil + return true, nil } func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() - var lri []lockRequesterInfo - if lri, reply = l.lockMap[args.Resource]; !reply { - // No lock is held on the given name - return reply, fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resource) - } - if reply = isWriteLock(lri); !reply { - // Unless it is a write lock - return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resource, len(lri)) + + if !l.canTakeUnlock(args.Resources...) { + // Unless it is a write lock reject it. + return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources) } - if !l.removeEntry(args.Resource, args.UID, &lri) { - return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.UID) + for _, resource := range args.Resources { + lri := l.lockMap[resource] + if !l.removeEntry(resource, args.UID, &lri) { + return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s on resource %s", args.UID, resource) + } } return true, nil @@ -120,14 +148,15 @@ func (l *localLocker) RLock(args dsync.LockArgs) (reply bool, err error) { Timestamp: UTCNow(), TimeLastCheck: UTCNow(), } - if lri, ok := l.lockMap[args.Resource]; ok { + resource := args.Resources[0] + if lri, ok := l.lockMap[resource]; ok { if reply = !isWriteLock(lri); reply { // Unless there is a write lock - l.lockMap[args.Resource] = append(l.lockMap[args.Resource], lrInfo) + l.lockMap[resource] = append(l.lockMap[resource], lrInfo) } } else { // No locks held on the given name, so claim (first) read lock - l.lockMap[args.Resource] = []lockRequesterInfo{lrInfo} + l.lockMap[resource] = []lockRequesterInfo{lrInfo} reply = true } return reply, nil @@ -137,15 +166,17 @@ func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() var lri []lockRequesterInfo - if lri, reply = l.lockMap[args.Resource]; !reply { + + resource := args.Resources[0] + if lri, reply = l.lockMap[resource]; !reply { // No lock is held on the given name - return reply, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resource) + return reply, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", resource) } if reply = !isWriteLock(lri); !reply { // A write-lock is held, cannot release a read lock - return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resource) + return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource) } - if !l.removeEntry(args.Resource, args.UID, &lri) { + if !l.removeEntry(resource, args.UID, &lri) { return false, fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID) } return reply, nil @@ -176,11 +207,13 @@ func (l *localLocker) Expired(args dsync.LockArgs) (expired bool, err error) { defer l.mutex.Unlock() // Lock found, proceed to verify if belongs to given uid. - if lri, ok := l.lockMap[args.Resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == args.UID { - return false, nil + for _, resource := range args.Resources { + if lri, ok := l.lockMap[resource]; ok { + // Check whether uid is still active + for _, entry := range lri { + if entry.UID == args.UID { + return false, nil + } } } } diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 66e46f761..ffe8f9930 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "crypto/tls" "errors" @@ -104,9 +105,12 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply values := url.Values{} values.Set(lockRESTUID, args.UID) values.Set(lockRESTSource, args.Source) - values.Set(lockRESTResource, args.Resource) - - respBody, err := client.call(call, values, nil, -1) + var buffer bytes.Buffer + for _, resource := range args.Resources { + buffer.WriteString(resource) + buffer.WriteString("\n") + } + respBody, err := client.call(call, values, &buffer, -1) defer http.DrainBody(respBody) switch err { case nil: diff --git a/cmd/lock-rest-server-common.go b/cmd/lock-rest-server-common.go index f11ae2fa3..414b2914d 100644 --- a/cmd/lock-rest-server-common.go +++ b/cmd/lock-rest-server-common.go @@ -21,8 +21,8 @@ import ( ) const ( - lockRESTVersion = "v2" - lockRESTVersionPrefix = SlashSeparator + "v2" + lockRESTVersion = "v3" + lockRESTVersionPrefix = SlashSeparator + lockRESTVersion lockRESTPrefix = minioReservedBucketPath + "/lock" ) @@ -38,8 +38,6 @@ const ( // Source contains the line number, function and file name of the code // on the client node that requested the lock. lockRESTSource = "source" - // Resource contains a entity to be locked/unlocked. - lockRESTResource = "resource" ) var ( diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index f473cd2e6..4ece6c401 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -17,11 +17,13 @@ package cmd import ( + "bufio" "context" "errors" "math/rand" "net/http" "path" + "sort" "time" "github.com/gorilla/mux" @@ -55,12 +57,25 @@ func (l *lockRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { return true } -func getLockArgs(r *http.Request) dsync.LockArgs { - return dsync.LockArgs{ - UID: r.URL.Query().Get(lockRESTUID), - Source: r.URL.Query().Get(lockRESTSource), - Resource: r.URL.Query().Get(lockRESTResource), +func getLockArgs(r *http.Request) (args dsync.LockArgs, err error) { + args = dsync.LockArgs{ + UID: r.URL.Query().Get(lockRESTUID), + Source: r.URL.Query().Get(lockRESTSource), } + + var resources []string + bio := bufio.NewScanner(r.Body) + for bio.Scan() { + resources = append(resources, bio.Text()) + } + + if err := bio.Err(); err != nil { + return args, err + } + + sort.Strings(resources) + args.Resources = resources + return args, nil } // LockHandler - Acquires a lock. @@ -70,7 +85,13 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { return } - success, err := l.ll.Lock(getLockArgs(r)) + args, err := getLockArgs(r) + if err != nil { + l.writeErrorResponse(w, err) + return + } + + success, err := l.ll.Lock(args) if err == nil && !success { err = errLockConflict } @@ -87,7 +108,13 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) { return } - _, err := l.ll.Unlock(getLockArgs(r)) + args, err := getLockArgs(r) + if err != nil { + l.writeErrorResponse(w, err) + return + } + + _, err = l.ll.Unlock(args) // Ignore the Unlock() "reply" return value because if err == nil, "reply" is always true // Consequently, if err != nil, reply is always false if err != nil { @@ -103,7 +130,13 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) { return } - success, err := l.ll.RLock(getLockArgs(r)) + args, err := getLockArgs(r) + if err != nil { + l.writeErrorResponse(w, err) + return + } + + success, err := l.ll.RLock(args) if err == nil && !success { err = errLockConflict } @@ -120,10 +153,15 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request) return } + args, err := getLockArgs(r) + if err != nil { + l.writeErrorResponse(w, err) + return + } + // Ignore the RUnlock() "reply" return value because if err == nil, "reply" is always true. // Consequently, if err != nil, reply is always false - _, err := l.ll.RUnlock(getLockArgs(r)) - if err != nil { + if _, err = l.ll.RUnlock(args); err != nil { l.writeErrorResponse(w, err) return } @@ -136,17 +174,24 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) return } - lockArgs := getLockArgs(r) + args, err := getLockArgs(r) + if err != nil { + l.writeErrorResponse(w, err) + return + } l.ll.mutex.Lock() defer l.ll.mutex.Unlock() + // Lock found, proceed to verify if belongs to given uid. - if lri, ok := l.ll.lockMap[lockArgs.Resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == lockArgs.UID { - l.writeErrorResponse(w, errLockNotExpired) - return + for _, resource := range args.Resources { + if lri, ok := l.ll.lockMap[resource]; ok { + // Check whether uid is still active + for _, entry := range lri { + if entry.UID == args.UID { + l.writeErrorResponse(w, errLockNotExpired) + return + } } } } @@ -216,8 +261,8 @@ func lockMaintenance(ctx context.Context, interval time.Duration, objAPI ObjectL // Call back to original server verify whether the lock is // still active (based on name & uid) expired, err := c.Expired(dsync.LockArgs{ - UID: nlrip.lri.UID, - Resource: nlrip.name, + UID: nlrip.lri.UID, + Resources: []string{nlrip.name}, }) if err != nil { @@ -287,7 +332,7 @@ func startLockMaintenance() { // registerLockRESTHandlers - register lock rest router. func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { - queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource) + queries := restQueries(lockRESTUID, lockRESTSource) for _, ep := range endpointZones { for _, endpoint := range ep.Endpoints { if !endpoint.IsLocal { diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 501ecbfbe..d85123eef 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -21,6 +21,7 @@ import ( "errors" pathutil "path" "runtime" + "sort" "strings" "sync" @@ -51,16 +52,10 @@ func newNSLock(isDistXL bool) *nsLockMap { if isDistXL { return &nsMutex } - nsMutex.lockMap = make(map[nsParam]*nsLock) + nsMutex.lockMap = make(map[string]*nsLock) return &nsMutex } -// nsParam - carries name space resource. -type nsParam struct { - volume string - path string -} - // nsLock - provides primitives for locking critical namespace regions. type nsLock struct { *lsync.LRWMutex @@ -72,23 +67,24 @@ type nsLock struct { type nsLockMap struct { // Indicates if namespace is part of a distributed setup. isDistXL bool - lockMap map[nsParam]*nsLock + lockMap map[string]*nsLock lockMapMutex sync.RWMutex } // Lock the namespace resource. -func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) { +func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) { var nsLk *nsLock + resource := pathJoin(volume, path) + n.lockMapMutex.Lock() - param := nsParam{volume, path} - nsLk, found := n.lockMap[param] + nsLk, found := n.lockMap[resource] if !found { - n.lockMap[param] = &nsLock{ + nsLk = &nsLock{ LRWMutex: lsync.NewLRWMutex(ctx), ref: 1, } - nsLk = n.lockMap[param] + n.lockMap[resource] = nsLk } else { // Update ref count here to avoid multiple races. nsLk.ref++ @@ -109,7 +105,7 @@ func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, o nsLk.ref-- if nsLk.ref == 0 { // Remove from the map if there are no more references. - delete(n.lockMap, param) + delete(n.lockMap, resource) } n.lockMapMutex.Unlock() } @@ -117,10 +113,10 @@ func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, o } // Unlock the namespace resource. -func (n *nsLockMap) unlock(volume, path string, readLock bool) { - param := nsParam{volume, path} +func (n *nsLockMap) unlock(volume string, path string, readLock bool) { + resource := pathJoin(volume, path) n.lockMapMutex.RLock() - nsLk, found := n.lockMap[param] + nsLk, found := n.lockMap[resource] n.lockMapMutex.RUnlock() if !found { return @@ -137,45 +133,16 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) { nsLk.ref-- if nsLk.ref == 0 { // Remove from the map if there are no more references. - delete(n.lockMap, param) + delete(n.lockMap, resource) } } n.lockMapMutex.Unlock() } -// Lock - locks the given resource for writes, using a previously -// allocated name space lock or initializing a new one. -func (n *nsLockMap) Lock(volume, path, opsID string, timeout time.Duration) (locked bool) { - readLock := false // This is a write lock. - - lockSource := getSource() // Useful for debugging - return n.lock(context.Background(), volume, path, lockSource, opsID, readLock, timeout) -} - -// Unlock - unlocks any previously acquired write locks. -func (n *nsLockMap) Unlock(volume, path, opsID string) { - readLock := false - n.unlock(volume, path, readLock) -} - -// RLock - locks any previously acquired read locks. -func (n *nsLockMap) RLock(volume, path, opsID string, timeout time.Duration) (locked bool) { - readLock := true - - lockSource := getSource() // Useful for debugging - return n.lock(context.Background(), volume, path, lockSource, opsID, readLock, timeout) -} - -// RUnlock - unlocks any previously acquired read locks. -func (n *nsLockMap) RUnlock(volume, path, opsID string) { - readLock := true - n.unlock(volume, path, readLock) -} - // dsync's distributed lock instance. type distLockInstance struct { - rwMutex *dsync.DRWMutex - volume, path, opsID string + rwMutex *dsync.DRWMutex + opsID string } // Lock - block until write lock is taken or timeout has occurred. @@ -185,7 +152,7 @@ func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) if !di.rwMutex.GetLock(di.opsID, lockSource, timeout.Timeout()) { timeout.LogFailure() - return OperationTimedOut{Path: di.path} + return OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) return nil @@ -202,7 +169,7 @@ func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error start := UTCNow() if !di.rwMutex.GetRLock(di.opsID, lockSource, timeout.Timeout()) { timeout.LogFailure() - return OperationTimedOut{Path: di.path} + return OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) return nil @@ -215,22 +182,26 @@ func (di *distLockInstance) RUnlock() { // localLockInstance - frontend/top-level interface for namespace locks. type localLockInstance struct { - ctx context.Context - ns *nsLockMap - volume, path, opsID string + ctx context.Context + ns *nsLockMap + volume string + paths []string + opsID string } // NewNSLock - returns a lock instance for a given volume and // path. The returned lockInstance object encapsulates the nsLockMap, // volume, path and operation ID. -func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetLocker, volume, path string) RWLocker { +func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetLocker, volume string, paths ...string) RWLocker { opsID := mustGetUUID() if n.isDistXL { - return &distLockInstance{dsync.NewDRWMutex(ctx, pathJoin(volume, path), &dsync.Dsync{ + drwmutex := dsync.NewDRWMutex(ctx, &dsync.Dsync{ GetLockersFn: lockersFn, - }), volume, path, opsID} + }, pathsJoinPrefix(volume, paths...)...) + return &distLockInstance{drwmutex, opsID} } - return &localLockInstance{ctx, n, volume, path, opsID} + sort.Strings(paths) + return &localLockInstance{ctx, n, volume, paths, opsID} } // Lock - block until write lock is taken or timeout has occurred. @@ -238,9 +209,16 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error lockSource := getSource() start := UTCNow() readLock := false - if !li.ns.lock(li.ctx, li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) { - timeout.LogFailure() - return OperationTimedOut{Path: li.path} + var success []int + for i, path := range li.paths { + if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { + timeout.LogFailure() + for _, sint := range success { + li.ns.unlock(li.volume, li.paths[sint], readLock) + } + return OperationTimedOut{} + } + success = append(success, i) } timeout.LogSuccess(UTCNow().Sub(start)) return @@ -249,7 +227,9 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error // Unlock - block until write lock is released. func (li *localLockInstance) Unlock() { readLock := false - li.ns.unlock(li.volume, li.path, readLock) + for _, path := range li.paths { + li.ns.unlock(li.volume, path, readLock) + } } // RLock - block until read lock is taken or timeout has occurred. @@ -257,9 +237,16 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro lockSource := getSource() start := UTCNow() readLock := true - if !li.ns.lock(li.ctx, li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) { - timeout.LogFailure() - return OperationTimedOut{Path: li.path} + var success []int + for i, path := range li.paths { + if !li.ns.lock(li.ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { + timeout.LogFailure() + for _, sint := range success { + li.ns.unlock(li.volume, li.paths[sint], readLock) + } + return OperationTimedOut{} + } + success = append(success, i) } timeout.LogSuccess(UTCNow().Sub(start)) return @@ -268,7 +255,9 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro // RUnlock - block until read lock is released. func (li *localLockInstance) RUnlock() { readLock := true - li.ns.unlock(li.volume, li.path, readLock) + for _, path := range li.paths { + li.ns.unlock(li.volume, path, readLock) + } } func getSource() string { diff --git a/cmd/namespace-lock_test.go b/cmd/namespace-lock_test.go index 939b56dc9..2c4f34b02 100644 --- a/cmd/namespace-lock_test.go +++ b/cmd/namespace-lock_test.go @@ -18,184 +18,20 @@ package cmd import ( "testing" - "time" ) // WARNING: // -// Expected source line number is hard coded, 32, in the +// Expected source line number is hard coded, 31, in the // following test. Adding new code before this test or changing its // position will cause the line number to change and the test to FAIL // Tests getSource(). func TestGetSource(t *testing.T) { currentSource := func() string { return getSource() } gotSource := currentSource() - // Hard coded line number, 32, in the "expectedSource" value - expectedSource := "[namespace-lock_test.go:32:TestGetSource()]" + // Hard coded line number, 31, in the "expectedSource" value + expectedSource := "[namespace-lock_test.go:31:TestGetSource()]" if gotSource != expectedSource { t.Errorf("expected : %s, got : %s", expectedSource, gotSource) } } - -// Tests functionality provided by namespace lock. -func TestNamespaceLockTest(t *testing.T) { - isDistXL := false - nsMutex := newNSLock(isDistXL) - - // List of test cases. - testCases := []struct { - lk func(s1, s2, s3 string, t time.Duration) bool - unlk func(s1, s2, s3 string) - rlk func(s1, s2, s3 string, t time.Duration) bool - runlk func(s1, s2, s3 string) - lockedRefCount uint - unlockedRefCount uint - shouldPass bool - }{ - { - lk: nsMutex.Lock, - unlk: nsMutex.Unlock, - lockedRefCount: 1, - unlockedRefCount: 0, - shouldPass: true, - }, - { - rlk: nsMutex.RLock, - runlk: nsMutex.RUnlock, - lockedRefCount: 4, - unlockedRefCount: 2, - shouldPass: true, - }, - { - rlk: nsMutex.RLock, - runlk: nsMutex.RUnlock, - lockedRefCount: 1, - unlockedRefCount: 0, - shouldPass: true, - }, - } - - // Run all test cases. - - // Write lock tests. - testCase := testCases[0] - if !testCase.lk("a", "b", "c", 60*time.Second) { // lock once. - t.Fatalf("Failed to acquire lock") - } - nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}] - if !ok && testCase.shouldPass { - t.Errorf("Lock in map missing.") - } - // Validate locked ref count. - if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass { - t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref) - } - testCase.unlk("a", "b", "c") // unlock once. - if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { - t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref) - } - _, ok = nsMutex.lockMap[nsParam{"a", "b"}] - if ok && !testCase.shouldPass { - t.Errorf("Lock map found after unlock.") - } - - // Read lock tests. - testCase = testCases[1] - if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock once. - t.Fatalf("Failed to acquire first read lock") - } - if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock second time. - t.Fatalf("Failed to acquire second read lock") - } - if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock third time. - t.Fatalf("Failed to acquire third read lock") - } - if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock fourth time. - t.Fatalf("Failed to acquire fourth read lock") - } - nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}] - if !ok && testCase.shouldPass { - t.Errorf("Lock in map missing.") - } - // Validate locked ref count. - if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass { - t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref) - } - - testCase.runlk("a", "b", "c") // unlock once. - testCase.runlk("a", "b", "c") // unlock second time. - if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { - t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref) - } - _, ok = nsMutex.lockMap[nsParam{"a", "b"}] - if !ok && testCase.shouldPass { - t.Errorf("Lock map not found.") - } - - // Read lock 0 ref count. - testCase = testCases[2] - if !testCase.rlk("a", "c", "d", 60*time.Second) { // lock once. - t.Fatalf("Failed to acquire read lock") - } - - nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}] - if !ok && testCase.shouldPass { - t.Errorf("Lock in map missing.") - } - // Validate locked ref count. - if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass { - t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.lockedRefCount, nsLk.ref) - } - testCase.runlk("a", "c", "d") // unlock once. - if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { - t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref) - } - _, ok = nsMutex.lockMap[nsParam{"a", "c"}] - if ok && !testCase.shouldPass { - t.Errorf("Lock map not found.") - } -} - -func TestNamespaceLockTimedOut(t *testing.T) { - isDistXL := false - nsMutex := newNSLock(isDistXL) - // Get write lock - if !nsMutex.Lock("my-bucket", "my-object", "abc", 60*time.Second) { - t.Fatalf("Failed to acquire lock") - } - - // Second attempt for write lock on same resource should time out - locked := nsMutex.Lock("my-bucket", "my-object", "def", 1*time.Second) - if locked { - t.Fatalf("Should not have acquired lock") - } - - // Read lock on same resource should also time out - locked = nsMutex.RLock("my-bucket", "my-object", "def", 1*time.Second) - if locked { - t.Fatalf("Should not have acquired read lock while write lock is active") - } - - // Release write lock - nsMutex.Unlock("my-bucket", "my-object", "abc") - - // Get read lock - if !nsMutex.RLock("my-bucket", "my-object", "ghi", 60*time.Second) { - t.Fatalf("Failed to acquire read lock") - } - - // Write lock on same resource should time out - locked = nsMutex.Lock("my-bucket", "my-object", "klm", 1*time.Second) - if locked { - t.Fatalf("Should not have acquired lock") - } - - // 2nd read lock should be just fine - if !nsMutex.RLock("my-bucket", "my-object", "nop", 60*time.Second) { - t.Fatalf("Failed to acquire second read lock") - } - - // Release both read locks - nsMutex.RUnlock("my-bucket", "my-object", "ghi") - nsMutex.RUnlock("my-bucket", "my-object", "nop") -} diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index bd9a99855..6c92c4190 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -348,11 +348,10 @@ func (e ObjectTooSmall) Error() string { // OperationTimedOut - a timeout occurred. type OperationTimedOut struct { - Path string } func (e OperationTimedOut) Error() string { - return "Operation timed out: " + e.Path + return "Operation timed out" } /// Multipart related errors. diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 7aae5e9a9..b7e7ed9bb 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -55,7 +55,7 @@ const ( // ObjectLayer implements primitives for object API layer. type ObjectLayer interface { // Locking operations on object. - NewNSLock(ctx context.Context, bucket string, object string) RWLocker + NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker // Storage operations. Shutdown(context.Context) error diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index f9225d8fa..95264a85c 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -198,6 +198,16 @@ func retainSlash(s string) string { return strings.TrimSuffix(s, SlashSeparator) + SlashSeparator } +// pathsJoinPrefix - like pathJoin retains trailing SlashSeparator +// for all elements, prepends them with 'prefix' respectively. +func pathsJoinPrefix(prefix string, elem ...string) (paths []string) { + paths = make([]string, len(elem)) + for i, e := range elem { + paths[i] = pathJoin(prefix, e) + } + return paths +} + // pathJoin - like path.Join() but retains trailing SlashSeparator of the last element func pathJoin(elem ...string) string { trailingSlash := "" diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 9b10a1b6a..7f57d4346 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -335,8 +335,11 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS } // NewNSLock - initialize a new namespace RWLocker instance. -func (s *xlSets) NewNSLock(ctx context.Context, bucket string, object string) RWLocker { - return s.getHashedSet(object).NewNSLock(ctx, bucket, object) +func (s *xlSets) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { + if len(objects) == 1 { + return s.getHashedSet(objects[0]).NewNSLock(ctx, bucket, objects...) + } + return s.getHashedSet("").NewNSLock(ctx, bucket, objects...) } // StorageInfo - combines output of StorageInfo across all erasure coded object sets. diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 72405d1e6..1c6cb31c2 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -66,8 +66,8 @@ type xlObjects struct { } // NewNSLock - initialize a new namespace RWLocker instance. -func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, object string) RWLocker { - return xl.nsMutex.NewNSLock(ctx, xl.getLockers, bucket, object) +func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { + return xl.nsMutex.NewNSLock(ctx, xl.getLockers, bucket, objects...) } // Shutdown function for object storage interface. diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index bf88406de..320cd080f 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -83,8 +83,8 @@ func newXLZones(endpointZones EndpointZones) (ObjectLayer, error) { return z, nil } -func (z *xlZones) NewNSLock(ctx context.Context, bucket string, object string) RWLocker { - return z.zones[0].NewNSLock(ctx, bucket, object) +func (z *xlZones) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker { + return z.zones[0].NewNSLock(ctx, bucket, objects...) } type zonesAvailableSpace []zoneAvailableSpace @@ -445,20 +445,12 @@ func (z *xlZones) DeleteObjects(ctx context.Context, bucket string, objects []st derrs[i] = checkDelObjArgs(ctx, bucket, objects[i]) } - var objectLocks = make([]RWLocker, len(objects)) - for i := range objects { - if derrs[i] != nil { - continue - } - - // Acquire a write lock before deleting the object. - objectLocks[i] = z.NewNSLock(ctx, bucket, objects[i]) - if derrs[i] = objectLocks[i].GetLock(globalOperationTimeout); derrs[i] != nil { - continue - } - - defer objectLocks[i].Unlock() + // Acquire a bulk write lock across 'objects' + multiDeleteLock := z.NewNSLock(ctx, bucket, objects...) + if err := multiDeleteLock.GetLock(globalOperationTimeout); err != nil { + return nil, err } + defer multiDeleteLock.Unlock() for _, zone := range z.zones { errs, err := zone.DeleteObjects(ctx, bucket, objects) diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 7e1cb6a64..c6f988ebb 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -19,13 +19,10 @@ package dsync import ( "context" "errors" - "fmt" golog "log" "math" "math/rand" "os" - "path" - "runtime" "sync" "time" ) @@ -51,7 +48,7 @@ const drwMutexInfinite = time.Duration(1<<63 - 1) // A DRWMutex is a distributed mutual exclusion lock. type DRWMutex struct { - Name string + Names []string writeLocks []string // Array of nodes that granted a write lock readersLocks [][]string // Array of array of nodes that granted reader locks m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node @@ -74,10 +71,10 @@ func isLocked(uid string) bool { } // NewDRWMutex - initializes a new dsync RW mutex. -func NewDRWMutex(ctx context.Context, name string, clnt *Dsync) *DRWMutex { +func NewDRWMutex(ctx context.Context, clnt *Dsync, names ...string) *DRWMutex { return &DRWMutex{ - Name: name, writeLocks: make([]string, len(clnt.GetLockersFn())), + Names: names, clnt: clnt, ctx: ctx, } @@ -149,7 +146,7 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isRea locks := make([]string, len(restClnts)) // Try to acquire the lock. - success := lock(dm.clnt, &locks, dm.Name, id, source, isReadLock) + success := lock(dm.clnt, &locks, id, source, isReadLock, dm.Names...) if success { dm.m.Lock() @@ -176,7 +173,7 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isRea } // lock tries to acquire the distributed lock, returning true or false. -func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bool) bool { +func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool { restClnts := ds.GetLockersFn() @@ -199,9 +196,9 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo } args := LockArgs{ - UID: id, - Resource: lockName, - Source: source, + UID: id, + Resources: lockNames, + Source: source, } var locked bool @@ -259,7 +256,7 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo done = true // Increment the number of grants received from the buffered channel. i++ - releaseAll(ds, locks, lockName, isReadLock, restClnts) + releaseAll(ds, locks, isReadLock, restClnts, lockNames...) } } case <-timeout: @@ -267,7 +264,7 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo // timeout happened, maybe one of the nodes is slow, count // number of locks to check whether we have quorum or not if !quorumMet(locks, isReadLock, dquorum, dquorumReads) { - releaseAll(ds, locks, lockName, isReadLock, restClnts) + releaseAll(ds, locks, isReadLock, restClnts, lockNames...) } } @@ -289,8 +286,8 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo grantToBeReleased := <-ch if grantToBeReleased.isLocked() { // release lock - sendRelease(ds, restClnts[grantToBeReleased.index], lockName, - grantToBeReleased.lockUID, isReadLock) + sendRelease(ds, restClnts[grantToBeReleased.index], + grantToBeReleased.lockUID, isReadLock, lockNames...) } } }(isReadLock) @@ -321,10 +318,10 @@ func quorumMet(locks *[]string, isReadLock bool, quorum, quorumReads int) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(ds *Dsync, locks *[]string, lockName string, isReadLock bool, restClnts []NetLocker) { - for lock := 0; lock < len(restClnts); lock++ { +func releaseAll(ds *Dsync, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) { + for lock := range restClnts { if isLocked((*locks)[lock]) { - sendRelease(ds, restClnts[lock], lockName, (*locks)[lock], isReadLock) + sendRelease(ds, restClnts[lock], (*locks)[lock], isReadLock, lockNames...) (*locks)[lock] = "" } } @@ -362,7 +359,7 @@ func (dm *DRWMutex) Unlock() { } isReadLock := false - unlock(dm.clnt, locks, dm.Name, isReadLock, restClnts) + unlock(dm.clnt, locks, isReadLock, restClnts, dm.Names...) } // RUnlock releases a read lock held on dm. @@ -387,10 +384,10 @@ func (dm *DRWMutex) RUnlock() { } isReadLock := true - unlock(dm.clnt, locks, dm.Name, isReadLock, restClnts) + unlock(dm.clnt, locks, isReadLock, restClnts, dm.Names...) } -func unlock(ds *Dsync, locks []string, name string, isReadLock bool, restClnts []NetLocker) { +func unlock(ds *Dsync, locks []string, isReadLock bool, restClnts []NetLocker, names ...string) { // We don't need to synchronously wait until we have released all the locks (or the quorum) // (a subsequent lock will retry automatically in case it would fail to get quorum) @@ -399,21 +396,21 @@ func unlock(ds *Dsync, locks []string, name string, isReadLock bool, restClnts [ if isLocked(locks[index]) { // broadcast lock release to all nodes that granted the lock - sendRelease(ds, c, name, locks[index], isReadLock) + sendRelease(ds, c, locks[index], isReadLock, names...) } } } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(ds *Dsync, c NetLocker, name, uid string, isReadLock bool) { +func sendRelease(ds *Dsync, c NetLocker, uid string, isReadLock bool, names ...string) { if c == nil { log("Unable to call RUnlock", errors.New("netLocker is offline")) return } args := LockArgs{ - UID: uid, - Resource: name, + UID: uid, + Resources: names, } if isReadLock { if _, err := c.RUnlock(args); err != nil { @@ -425,38 +422,3 @@ func sendRelease(ds *Dsync, c NetLocker, name, uid string, isReadLock bool) { } } } - -// DRLocker returns a sync.Locker interface that implements -// the Lock and Unlock methods by calling drw.RLock and drw.RUnlock. -func (dm *DRWMutex) DRLocker() sync.Locker { - return (*drlocker)(dm) -} - -type drlocker DRWMutex - -var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - -func randString(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) -} - -func getSource() string { - var funcName string - pc, filename, lineNum, ok := runtime.Caller(2) - if ok { - filename = path.Base(filename) - funcName = runtime.FuncForPC(pc).Name() - } else { - filename = "" - lineNum = 0 - } - - return fmt.Sprintf("[%s:%d:%s()]", filename, lineNum, funcName) -} - -func (dr *drlocker) Lock() { (*DRWMutex)(dr).RLock(randString(16), getSource()) } -func (dr *drlocker) Unlock() { (*DRWMutex)(dr).RUnlock() } diff --git a/pkg/dsync/drwmutex_test.go b/pkg/dsync/drwmutex_test.go index c4246f0af..065deccff 100644 --- a/pkg/dsync/drwmutex_test.go +++ b/pkg/dsync/drwmutex_test.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "runtime" - "sync" "sync/atomic" "testing" "time" @@ -37,7 +36,7 @@ const ( func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { - drwm := NewDRWMutex(context.Background(), "simplelock", ds) + drwm := NewDRWMutex(context.Background(), ds, "simplelock") if !drwm.GetRLock(id, source, time.Second) { panic("Failed to acquire read lock") @@ -93,7 +92,7 @@ func TestSimpleWriteLockTimedOut(t *testing.T) { func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { - drwm := NewDRWMutex(context.Background(), "duallock", ds) + drwm := NewDRWMutex(context.Background(), ds, "duallock") // fmt.Println("Getting initial write lock") if !drwm.GetLock(id, source, time.Second) { @@ -153,7 +152,7 @@ func parallelReader(m *DRWMutex, clocked, cunlock, cdone chan bool) { // Borrowed from rwmutex_test.go func doTestParallelReaders(numReaders, gomaxprocs int) { runtime.GOMAXPROCS(gomaxprocs) - m := NewDRWMutex(context.Background(), "test-parallel", ds) + m := NewDRWMutex(context.Background(), ds, "test-parallel") clocked := make(chan bool) cunlock := make(chan bool) @@ -221,7 +220,7 @@ func HammerRWMutex(gomaxprocs, numReaders, numIterations int) { runtime.GOMAXPROCS(gomaxprocs) // Number of active readers + 10000 * number of active writers. var activity int32 - rwm := NewDRWMutex(context.Background(), "test", ds) + rwm := NewDRWMutex(context.Background(), ds, "test") cdone := make(chan bool) go writer(rwm, numIterations, &activity, cdone) var i int @@ -257,42 +256,6 @@ func TestRWMutex(t *testing.T) { HammerRWMutex(10, 5, n) } -// Borrowed from rwmutex_test.go -func TestDRLocker(t *testing.T) { - wl := NewDRWMutex(context.Background(), "test", ds) - var rl sync.Locker - wlocked := make(chan bool, 1) - rlocked := make(chan bool, 1) - rl = wl.DRLocker() - n := 10 - go func() { - for i := 0; i < n; i++ { - rl.Lock() - rl.Lock() - rlocked <- true - wl.Lock(id, source) - wlocked <- true - } - }() - for i := 0; i < n; i++ { - <-rlocked - rl.Unlock() - select { - case <-wlocked: - t.Fatal("RLocker() didn't read-lock it") - default: - } - rl.Unlock() - <-wlocked - select { - case <-rlocked: - t.Fatal("RLocker() didn't respect the write lock") - default: - } - wl.Unlock() - } -} - // Borrowed from rwmutex_test.go func TestUnlockPanic(t *testing.T) { defer func() { @@ -300,7 +263,7 @@ func TestUnlockPanic(t *testing.T) { t.Fatalf("unlock of unlocked RWMutex did not panic") } }() - mu := NewDRWMutex(context.Background(), "test", ds) + mu := NewDRWMutex(context.Background(), ds, "test") mu.Unlock() } @@ -311,7 +274,7 @@ func TestUnlockPanic2(t *testing.T) { t.Fatalf("unlock of unlocked RWMutex did not panic") } }() - mu := NewDRWMutex(context.Background(), "test-unlock-panic-2", ds) + mu := NewDRWMutex(context.Background(), ds, "test-unlock-panic-2") mu.RLock(id, source) mu.Unlock() } @@ -323,7 +286,7 @@ func TestRUnlockPanic(t *testing.T) { t.Fatalf("read unlock of unlocked RWMutex did not panic") } }() - mu := NewDRWMutex(context.Background(), "test", ds) + mu := NewDRWMutex(context.Background(), ds, "test") mu.RUnlock() } @@ -334,14 +297,14 @@ func TestRUnlockPanic2(t *testing.T) { t.Fatalf("read unlock of unlocked RWMutex did not panic") } }() - mu := NewDRWMutex(context.Background(), "test-runlock-panic-2", ds) + mu := NewDRWMutex(context.Background(), ds, "test-runlock-panic-2") mu.Lock(id, source) mu.RUnlock() } // Borrowed from rwmutex_test.go func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { - rwm := NewDRWMutex(context.Background(), "test", ds) + rwm := NewDRWMutex(context.Background(), ds, "test") b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { diff --git a/pkg/dsync/dsync-server_test.go b/pkg/dsync/dsync-server_test.go index a964acc2a..fad0fb7be 100644 --- a/pkg/dsync/dsync-server_test.go +++ b/pkg/dsync/dsync-server_test.go @@ -35,8 +35,8 @@ type lockServer struct { func (l *lockServer) Lock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if _, *reply = l.lockMap[args.Resource]; !*reply { - l.lockMap[args.Resource] = WriteLock // No locks held on the given name, so claim write lock + if _, *reply = l.lockMap[args.Resources[0]]; !*reply { + l.lockMap[args.Resources[0]] = WriteLock // No locks held on the given name, so claim write lock } *reply = !*reply // Negate *reply to return true when lock is granted or false otherwise return nil @@ -46,13 +46,13 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() var locksHeld int64 - if locksHeld, *reply = l.lockMap[args.Resource]; !*reply { // No lock is held on the given name - return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resource) + if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { // No lock is held on the given name + return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resources[0]) } if *reply = locksHeld == WriteLock; !*reply { // Unless it is a write lock - return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resource, locksHeld) + return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resources[0], locksHeld) } - delete(l.lockMap, args.Resource) // Remove the write lock + delete(l.lockMap, args.Resources[0]) // Remove the write lock return nil } @@ -62,12 +62,12 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() var locksHeld int64 - if locksHeld, *reply = l.lockMap[args.Resource]; !*reply { - l.lockMap[args.Resource] = ReadLock // No locks held on the given name, so claim (first) read lock + if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { + l.lockMap[args.Resources[0]] = ReadLock // No locks held on the given name, so claim (first) read lock *reply = true } else { if *reply = locksHeld != WriteLock; *reply { // Unless there is a write lock - l.lockMap[args.Resource] = locksHeld + ReadLock // Grant another read lock + l.lockMap[args.Resources[0]] = locksHeld + ReadLock // Grant another read lock } } return nil @@ -77,16 +77,16 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() var locksHeld int64 - if locksHeld, *reply = l.lockMap[args.Resource]; !*reply { // No lock is held on the given name - return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resource) + if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { // No lock is held on the given name + return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resources[0]) } if *reply = locksHeld != WriteLock; !*reply { // A write-lock is held, cannot release a read lock - return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resource) + return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resources[0]) } if locksHeld > ReadLock { - l.lockMap[args.Resource] = locksHeld - ReadLock // Remove one of the read locks held + l.lockMap[args.Resources[0]] = locksHeld - ReadLock // Remove one of the read locks held } else { - delete(l.lockMap, args.Resource) // Remove the (last) read lock + delete(l.lockMap, args.Resources[0]) // Remove the (last) read lock } return nil } @@ -97,7 +97,7 @@ func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error { if len(args.UID) != 0 { return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID) } - delete(l.lockMap, args.Resource) // Remove the lock (irrespective of write or read lock) + delete(l.lockMap, args.Resources[0]) // Remove the lock (irrespective of write or read lock) *reply = true return nil } diff --git a/pkg/dsync/dsync_test.go b/pkg/dsync/dsync_test.go index d3162f61b..95b53c462 100644 --- a/pkg/dsync/dsync_test.go +++ b/pkg/dsync/dsync_test.go @@ -89,7 +89,7 @@ func TestMain(m *testing.M) { func TestSimpleLock(t *testing.T) { - dm := NewDRWMutex(context.Background(), "test", ds) + dm := NewDRWMutex(context.Background(), ds, "test") dm.Lock(id, source) @@ -101,7 +101,7 @@ func TestSimpleLock(t *testing.T) { func TestSimpleLockUnlockMultipleTimes(t *testing.T) { - dm := NewDRWMutex(context.Background(), "test", ds) + dm := NewDRWMutex(context.Background(), ds, "test") dm.Lock(id, source) time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond) @@ -127,8 +127,8 @@ func TestSimpleLockUnlockMultipleTimes(t *testing.T) { // Test two locks for same resource, one succeeds, one fails (after timeout) func TestTwoSimultaneousLocksForSameResource(t *testing.T) { - dm1st := NewDRWMutex(context.Background(), "aap", ds) - dm2nd := NewDRWMutex(context.Background(), "aap", ds) + dm1st := NewDRWMutex(context.Background(), ds, "aap") + dm2nd := NewDRWMutex(context.Background(), ds, "aap") dm1st.Lock(id, source) @@ -151,9 +151,9 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) { // Test three locks for same resource, one succeeds, one fails (after timeout) func TestThreeSimultaneousLocksForSameResource(t *testing.T) { - dm1st := NewDRWMutex(context.Background(), "aap", ds) - dm2nd := NewDRWMutex(context.Background(), "aap", ds) - dm3rd := NewDRWMutex(context.Background(), "aap", ds) + dm1st := NewDRWMutex(context.Background(), ds, "aap") + dm2nd := NewDRWMutex(context.Background(), ds, "aap") + dm3rd := NewDRWMutex(context.Background(), ds, "aap") dm1st.Lock(id, source) @@ -216,8 +216,8 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) { // Test two locks for different resources, both succeed func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) { - dm1 := NewDRWMutex(context.Background(), "aap", ds) - dm2 := NewDRWMutex(context.Background(), "noot", ds) + dm1 := NewDRWMutex(context.Background(), ds, "aap") + dm2 := NewDRWMutex(context.Background(), ds, "noot") dm1.Lock(id, source) dm2.Lock(id, source) @@ -243,7 +243,7 @@ func HammerMutex(m *DRWMutex, loops int, cdone chan bool) { // Borrowed from mutex_test.go func TestMutex(t *testing.T) { c := make(chan bool) - m := NewDRWMutex(context.Background(), "test", ds) + m := NewDRWMutex(context.Background(), ds, "test") for i := 0; i < 10; i++ { go HammerMutex(m, 1000, c) } @@ -257,7 +257,7 @@ func BenchmarkMutexUncontended(b *testing.B) { *DRWMutex } b.RunParallel(func(pb *testing.PB) { - var mu = PaddedMutex{NewDRWMutex(context.Background(), "", ds)} + var mu = PaddedMutex{NewDRWMutex(context.Background(), ds, "")} for pb.Next() { mu.Lock(id, source) mu.Unlock() @@ -266,7 +266,7 @@ func BenchmarkMutexUncontended(b *testing.B) { } func benchmarkMutex(b *testing.B, slack, work bool) { - mu := NewDRWMutex(context.Background(), "", ds) + mu := NewDRWMutex(context.Background(), ds, "") if slack { b.SetParallelism(10) } @@ -309,7 +309,7 @@ func BenchmarkMutexNoSpin(b *testing.B) { // These goroutines yield during local work, so that switching from // a blocked goroutine to other goroutines is profitable. // As a matter of fact, this benchmark still triggers some spinning in the mutex. - m := NewDRWMutex(context.Background(), "", ds) + m := NewDRWMutex(context.Background(), ds, "") var acc0, acc1 uint64 b.SetParallelism(4) b.RunParallel(func(pb *testing.PB) { @@ -341,7 +341,7 @@ func BenchmarkMutexSpin(b *testing.B) { // profitable. To achieve this we create a goroutine per-proc. // These goroutines access considerable amount of local data so that // unnecessary rescheduling is penalized by cache misses. - m := NewDRWMutex(context.Background(), "", ds) + m := NewDRWMutex(context.Background(), ds, "") var acc0, acc1 uint64 b.RunParallel(func(pb *testing.PB) { var data [16 << 10]uint64 diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index 086787dd9..aec5187e9 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -21,8 +21,8 @@ type LockArgs struct { // Unique ID of lock/unlock request. UID string - // Resource contains a entity to be locked/unlocked. - Resource string + // Resources contains single or multiple entries to be locked/unlocked. + Resources []string // Source contains the line number, function and file name of the code // on the client node that requested the lock.