diff --git a/cmd/control-heal-main.go b/cmd/control-heal-main.go index 605ba3daa..2b8a15c70 100644 --- a/cmd/control-heal-main.go +++ b/cmd/control-heal-main.go @@ -35,7 +35,7 @@ var healCmd = cli.Command{ USAGE: minio control {{.Name}} -EAMPLES: +EXAMPLES: 1. Heal an object. $ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3 diff --git a/cmd/control-lock-main.go b/cmd/control-lock-main.go new file mode 100644 index 000000000..9cd5757e4 --- /dev/null +++ b/cmd/control-lock-main.go @@ -0,0 +1,139 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "encoding/json" + "fmt" + "net/url" + "path" + "time" + + "github.com/minio/cli" +) + +// SystemLockState - Structure to fill the lock state of entire object storage. +// That is the total locks held, total calls blocked on locks and state of all the locks for the entire system. +type SystemLockState struct { + TotalLocks int64 `json:"totalLocks"` + TotalBlockedLocks int64 `json:"totalBlockedLocks"` // count of operations which are blocked waiting for the lock to be released. + TotalAcquiredLocks int64 `json:"totalAcquiredLocks"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress). + LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"` +} + +// VolumeLockInfo - Structure to contain the lock state info for volume, path pair. +type VolumeLockInfo struct { + Bucket string `json:"bucket"` + Object string `json:"object"` + LocksOnObject int64 `json:"locksOnObject"` // All locks blocked + running for given pair. + LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress). + TotalBlockedLocks int64 `json:"locksBlockedOnObject"` // count of operations which are blocked waiting for the lock to be released. + LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"` // state information containing state of the locks for all operations on given pair. +} + +// OpsLockState - structure to fill in state information of the lock. +// structure to fill in status information for each operation with given operation ID. +type OpsLockState struct { + OperationID string `json:"opsID"` // string containing operation ID. + LockOrigin string `json:"lockOrigin"` // contant which mentions the operation type (Get Obejct, PutObject...) + LockType string `json:"lockType"` + Status string `json:"status"` // status can be running/ready/blocked. + StatusSince string `json:"statusSince"` // time info of the since how long the status holds true, value in seconds. +} + +// Read entire state of the locks in the system and return. +func generateSystemLockResponse() (SystemLockState, error) { + nsMutex.lockMapMutex.Lock() + defer nsMutex.lockMapMutex.Unlock() + + if nsMutex.debugLockMap == nil { + return SystemLockState{}, LockInfoNil{} + } + + lockState := SystemLockState{} + + lockState.TotalBlockedLocks = nsMutex.blockedCounter + lockState.TotalLocks = nsMutex.globalLockCounter + lockState.TotalAcquiredLocks = nsMutex.runningLockCounter + + for param := range nsMutex.debugLockMap { + volLockInfo := VolumeLockInfo{} + volLockInfo.Bucket = param.volume + volLockInfo.Object = param.path + volLockInfo.TotalBlockedLocks = nsMutex.debugLockMap[param].blocked + volLockInfo.LocksAcquiredOnObject = nsMutex.debugLockMap[param].running + volLockInfo.LocksOnObject = nsMutex.debugLockMap[param].ref + for opsID := range nsMutex.debugLockMap[param].lockInfo { + opsState := OpsLockState{} + opsState.OperationID = opsID + opsState.LockOrigin = nsMutex.debugLockMap[param].lockInfo[opsID].lockOrigin + opsState.LockType = nsMutex.debugLockMap[param].lockInfo[opsID].lockType + opsState.Status = nsMutex.debugLockMap[param].lockInfo[opsID].status + opsState.StatusSince = time.Now().Sub(nsMutex.debugLockMap[param].lockInfo[opsID].since).String() + + volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, opsState) + } + lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo) + } + + return lockState, nil +} + +var lockCmd = cli.Command{ + Name: "lock", + Usage: "info about the locks in the node.", + Action: lockControl, + CustomHelpTemplate: `NAME: + minio control {{.Name}} - {{.Usage}} + +USAGE: + minio control {{.Name}} http://localhost:9000/ + +EAMPLES: + 1. Get all the info about the blocked/held locks in the node: + $ minio control lock http://localhost:9000/ +`, +} + +// "minio control lock" entry point. +func lockControl(c *cli.Context) { + if len(c.Args()) != 1 { + cli.ShowCommandHelpAndExit(c, "lock", 1) + } + + parsedURL, err := url.Parse(c.Args()[0]) + fatalIf(err, "Unable to parse URL.") + + authCfg := &authConfig{ + accessKey: serverConfig.GetCredential().AccessKeyID, + secretKey: serverConfig.GetCredential().SecretAccessKey, + address: parsedURL.Host, + path: path.Join(reservedBucket, controlPath), + loginMethod: "Controller.LoginHandler", + } + client := newAuthClient(authCfg) + + args := &GenericArgs{} + reply := &SystemLockState{} + err = client.Call("Control.LockInfo", args, reply) + // logs the error and returns if err != nil. + fatalIf(err, "RPC Control.LockInfo call failed") + // print the lock info on the console. + b, err := json.MarshalIndent(*reply, "", " ") + fatalIf(err, "Failed to parse the RPC lock info response") + fmt.Print(string(b)) +} diff --git a/cmd/control-main.go b/cmd/control-main.go index a0494c628..aa9c17123 100644 --- a/cmd/control-main.go +++ b/cmd/control-main.go @@ -24,6 +24,7 @@ var controlCmd = cli.Command{ Usage: "Control and manage minio server.", Action: mainControl, Subcommands: []cli.Command{ + lockCmd, healCmd, shutdownCmd, }, diff --git a/cmd/control-shutdown-main.go b/cmd/control-shutdown-main.go index 9bc3905ad..a9198b4da 100644 --- a/cmd/control-shutdown-main.go +++ b/cmd/control-shutdown-main.go @@ -39,7 +39,7 @@ var shutdownCmd = cli.Command{ USAGE: minio control {{.Name}} http://localhost:9000/ -EAMPLES: +EXAMPLES: 1. Shutdown the server: $ minio control shutdown http://localhost:9000/ diff --git a/cmd/controller-handlers.go b/cmd/controller-handlers.go index 59a579689..a5b395ad3 100644 --- a/cmd/controller-handlers.go +++ b/cmd/controller-handlers.go @@ -153,3 +153,17 @@ func (c *controllerAPIHandlers) TryInitHandler(args *GenericArgs, reply *Generic return nil } + +// LockInfo - RPC control handler for `minio control lock`. +// Returns the info of the locks held in the system. +func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockState) error { + // obtain the lock state information. + lockInfo, err := generateSystemLockResponse() + // in case of error, return err to the RPC client. + if err != nil { + return err + } + // the response containing the lock info. + *reply = lockInfo + return nil +} diff --git a/cmd/controller-handlers_test.go b/cmd/controller-handlers_test.go deleted file mode 100644 index 321b0c45b..000000000 --- a/cmd/controller-handlers_test.go +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - // "net/rpc" - "testing" -) - -// Wrapper for calling heal disk metadata rpc Handler -func TestControllerHandlerHealDiskMetadata(t *testing.T) { - ExecObjectLayerTest(t, testHealDiskMetadataControllerHandler) -} - -// testHealDiskMetadataControllerHandler - Test Heal Disk Metadata handler -func testHealDiskMetadataControllerHandler(obj ObjectLayer, instanceType string, t TestErrHandler) { - // Register the API end points with XL/FS object layer. - serverAddress, random, err := initTestControllerRPCEndPoint(obj) - if err != nil { - t.Fatal(err) - } - // initialize the server and obtain the credentials and root. - // credentials are necessary to sign the HTTP request. - rootPath, err := newTestConfig("us-east-1") - if err != nil { - t.Fatalf("Init Test config failed") - } - // remove the root folder after the test ends. - defer removeAll(rootPath) - - authCfg := &authConfig{ - accessKey: serverConfig.GetCredential().AccessKeyID, - secretKey: serverConfig.GetCredential().SecretAccessKey, - address: serverAddress, - path: "/controller" + random, - loginMethod: "Controller.LoginHandler", - } - client := newAuthClient(authCfg) - - args := &GenericArgs{} - reply := &GenericReply{} - err = client.Call("Controller.HealDiskMetadataHandler", args, reply) - if instanceType == "FS" && err == nil { - t.Errorf("Test should fail with FS") - } - if instanceType == "XL" && err != nil { - t.Errorf("Test should succeed with XL %s", err.Error()) - } -} diff --git a/cmd/controller_test.go b/cmd/controller_test.go new file mode 100644 index 000000000..ff3d1b2f0 --- /dev/null +++ b/cmd/controller_test.go @@ -0,0 +1,298 @@ +/* + * Minio Cloud Storage, (C) 2015, 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "path" + "strconv" + "sync" + "time" + + . "gopkg.in/check.v1" +) + +// API suite container common to both FS and XL. +type TestRPCControllerSuite struct { + serverType string + testServer TestServer + endPoint string + accessKey string + secretKey string +} + +// Init and run test on XL backend. +var _ = Suite(&TestRPCControllerSuite{serverType: "XL"}) + +// Setting up the test suite. +// Starting the Test server with temporary FS backend. +func (s *TestRPCControllerSuite) SetUpSuite(c *C) { + s.testServer = StartTestRPCServer(c, s.serverType) + s.endPoint = s.testServer.Server.Listener.Addr().String() + s.accessKey = s.testServer.AccessKey + s.secretKey = s.testServer.SecretKey +} + +// Called implicitly by "gopkg.in/check.v1" after all tests are run. +func (s *TestRPCControllerSuite) TearDownSuite(c *C) { + s.testServer.Stop() +} + +// Tests to validate the correctness of lock instrumentation control RPC end point. +func (s *TestRPCControllerSuite) TestRPCControlLock(c *C) { + // enabling lock instrumentation. + globalDebugLock = true + // initializing the locks. + initNSLock(false) + // set debug lock info to `nil` so that the next tests have to initialize them again. + defer func() { + globalDebugLock = false + nsMutex.debugLockMap = nil + }() + + expectedResult := []lockStateCase{ + // Test case - 1. + // Case where 10 read locks are held. + // Entry for any of the 10 reads locks has to be found. + // Since they held in a loop, Lock origin for first 10 read locks (opsID 0-9) should be the same. + { + + volume: "my-bucket", + path: "my-object", + opsID: "0", + readLock: true, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + expectedGlobalLockCount: 10, + expectedRunningLockCount: 10, + expectedBlockedLockCount: 0, + + expectedVolPathLockCount: 10, + expectedVolPathRunningCount: 10, + expectedVolPathBlockCount: 0, + }, + // Test case 2. + // Testing the existance of entry for the last read lock (read lock with opsID "9"). + { + + volume: "my-bucket", + path: "my-object", + opsID: "9", + readLock: true, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + expectedGlobalLockCount: 10, + expectedRunningLockCount: 10, + expectedBlockedLockCount: 0, + + expectedVolPathLockCount: 10, + expectedVolPathRunningCount: 10, + expectedVolPathBlockCount: 0, + }, + + // Test case 3. + // Hold a write lock, and it should block since 10 read locks + // on <"my-bucket", "my-object"> are still held. + { + + volume: "my-bucket", + path: "my-object", + opsID: "10", + readLock: false, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + expectedGlobalLockCount: 11, + expectedRunningLockCount: 10, + expectedBlockedLockCount: 1, + + expectedVolPathLockCount: 11, + expectedVolPathRunningCount: 10, + expectedVolPathBlockCount: 1, + }, + + // Test case 4. + // Expected result when all the read locks are released and the blocked write lock acquires the lock. + { + + volume: "my-bucket", + path: "my-object", + opsID: "10", + readLock: false, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + expectedGlobalLockCount: 1, + expectedRunningLockCount: 1, + expectedBlockedLockCount: 0, + + expectedVolPathLockCount: 1, + expectedVolPathRunningCount: 1, + expectedVolPathBlockCount: 0, + }, + // Test case - 5. + // At the end after locks are released, its verified whether the counters are set to 0. + { + + volume: "my-bucket", + path: "my-object", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + expectedGlobalLockCount: 0, + expectedRunningLockCount: 0, + expectedBlockedLockCount: 0, + }, + } + + // used to make sure that the tests don't end till locks held in other go routines are released. + var wg sync.WaitGroup + + // Hold 5 read locks. We should find the info about these in the RPC response. + + // hold 10 read locks. + // Then call the RPC control end point for obtaining lock instrumentation info. + + for i := 0; i < 10; i++ { + nsMutex.RLock("my-bucket", "my-object", strconv.Itoa(i)) + } + + authCfg := &authConfig{ + accessKey: s.accessKey, + secretKey: s.secretKey, + address: s.endPoint, + path: path.Join(reservedBucket, controlPath), + loginMethod: "Controller.LoginHandler", + } + + client := newAuthClient(authCfg) + + defer client.Close() + + args := &GenericArgs{} + reply := &SystemLockState{} + // Call the lock instrumentation RPC end point. + err := client.Call("Controller.LockInfo", args, reply) + if err != nil { + c.Errorf("Add: expected no error but got string %q", err.Error()) + } + // expected lock info. + expectedLockStats := expectedResult[0] + // verify the actual lock info with the expected one. + // verify the existance entry for first read lock (read lock with opsID "0"). + verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 1) + expectedLockStats = expectedResult[1] + // verify the actual lock info with the expected one. + // verify the existance entry for last read lock (read lock with opsID "9"). + verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 2) + + // now hold a write lock in a different go routine and it should block since 10 read locks are + // still held. + wg.Add(1) + go func() { + defer wg.Done() + // blocks till all read locks are released. + nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(10)) + // Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock. + expectedWLockStats := expectedResult[3] + // Since the write lock acquired here, the number of blocked locks should reduce by 1 and + // count of running locks should increase by 1. + + // Call the RPC control handle to fetch the lock instrumentation info. + reply = &SystemLockState{} + // Call the lock instrumentation RPC end point. + err = client.Call("Controller.LockInfo", args, reply) + if err != nil { + c.Errorf("Add: expected no error but got string %q", err.Error()) + } + verifyRPCLockInfoResponse(expectedWLockStats, *reply, c, 4) + + // release the write lock. + nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10)) + + }() + // waiting for a second so that the attempt to acquire the write lock in + // the above go routines gets blocked. + time.Sleep(1 * time.Second) + // The write lock should have got blocked by now, + // check whether the entry for one blocked lock exists. + expectedLockStats = expectedResult[2] + + // Call the RPC control handle to fetch the lock instrumentation info. + reply = &SystemLockState{} + // Call the lock instrumentation RPC end point. + err = client.Call("Controller.LockInfo", args, reply) + if err != nil { + c.Errorf("Add: expected no error but got string %q", err.Error()) + } + verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 3) + // Release all the read locks held. + // the blocked write lock in the above go routines should get unblocked. + for i := 0; i < 10; i++ { + nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i)) + } + wg.Wait() + // Since all the locks are released. There shouldnt be any entry in the lock info. + // and all the counters should be set to 0. + reply = &SystemLockState{} + // Call the lock instrumentation RPC end point. + err = client.Call("Controller.LockInfo", args, reply) + if err != nil { + c.Errorf("Add: expected no error but got string %q", err.Error()) + } + + if reply.TotalAcquiredLocks != 0 && reply.TotalLocks != 0 && reply.TotalBlockedLocks != 0 { + c.Fatalf("The counters are not reset properly after all locks are released") + } + if len(reply.LocksInfoPerObject) != 0 { + c.Fatalf("Since all locks are released there shouldn't have been any lock info entry, but found %d", len(reply.LocksInfoPerObject)) + } +} + +// TestControllerHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`, +// asserts to validate the success. +func (s *TestRPCControllerSuite) TestControllerHandlerHealDiskMetadata(c *C) { + // The suite has already started the test RPC server, just send RPC calls. + authCfg := &authConfig{ + accessKey: s.accessKey, + secretKey: s.secretKey, + address: s.endPoint, + path: path.Join(reservedBucket, controlPath), + loginMethod: "Controller.LoginHandler", + } + + client := newAuthClient(authCfg) + defer client.Close() + + args := &GenericArgs{} + reply := &GenericReply{} + err := client.Call("Controller.HealDiskMetadataHandler", args, reply) + + if err != nil { + c.Errorf("Heal Meta Disk Handler test failed with %s", err.Error()) + } +} diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 0eff2f38b..eba83a0ef 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -58,9 +58,12 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var err error var eof bool if uploadIDMarker != "" { - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage) - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) if err != nil { return ListMultipartsInfo{}, err } @@ -110,9 +113,14 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var tmpUploads []uploadMetadata var end bool uploadIDMarker = "" - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage) - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) if err != nil { return ListMultipartsInfo{}, err } @@ -225,9 +233,13 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st fsMeta.Meta = meta } + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/" - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) uploadID = getUUID() initiated := time.Now().UTC() @@ -290,7 +302,7 @@ func getFSAppendDataPath(uploadID string) string { } // Append parts to fsAppendDataFile. -func appendParts(disk StorageAPI, bucket, object, uploadID string) { +func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) { uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) // fs-append.json path fsAppendMetaPath := getFSAppendMetaPath(uploadID) @@ -298,16 +310,16 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) { fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile) // Lock the uploadID so that no one modifies fs.json - nsMutex.RLock(minioMetaBucket, uploadIDPath) + nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID) fsMeta, err := readFSMetadata(disk, minioMetaBucket, fsMetaPath) - nsMutex.RUnlock(minioMetaBucket, uploadIDPath) + nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) if err != nil { return } // Lock fs-append.json so that there is no parallel append to the file. - nsMutex.Lock(minioMetaBucket, fsAppendMetaPath) - defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath) + nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID) + defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID) fsAppendMeta, err := readFSMetadata(disk, minioMetaBucket, fsAppendMetaPath) if err != nil { @@ -324,8 +336,9 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) { return } // Hold write lock on the part so that there is no parallel upload on the part. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number))) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number))) + partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number)) + nsMutex.Lock(minioMetaBucket, partPath, opsID) + defer nsMutex.Unlock(minioMetaBucket, partPath, opsID) // Proceed to append "part" fsAppendDataPath := getFSAppendDataPath(uploadID) @@ -345,7 +358,6 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) { } } // Path to the part that needs to be appended. - partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name) offset := int64(0) totalLeft := part.Size buf := make([]byte, readSizeV1) @@ -381,7 +393,7 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) { // If there are more parts that need to be appended to fsAppendDataFile _, appendNeeded = partToAppend(fsMeta, fsAppendMeta) if appendNeeded { - go appendParts(disk, bucket, object, uploadID) + go appendParts(disk, bucket, object, uploadID, opsID) } } @@ -404,10 +416,14 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - nsMutex.RLock(minioMetaBucket, uploadIDPath) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID) // Just check if the uploadID exists to avoid copy if it doesn't. uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID) - nsMutex.RUnlock(minioMetaBucket, uploadIDPath) + nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) if !uploadIDExists { return "", InvalidUploadID{UploadID: uploadID} } @@ -466,9 +482,13 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } } + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID = getOpsID() + // Hold write lock as we are updating fs.json - nsMutex.Lock(minioMetaBucket, uploadIDPath) - defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) + nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID) + defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID) // Just check if the uploadID exists to avoid copy if it doesn't. if !fs.isUploadIDExists(bucket, object, uploadID) { @@ -494,7 +514,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, fsMetaPath) } - go appendParts(fs.storage, bucket, object, uploadID) + go appendParts(fs.storage, bucket, object, uploadID, opsID) return newMD5Hex, nil } @@ -568,9 +588,14 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM if !IsValidObjectName(object) { return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} } + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !fs.isUploadIDExists(bucket, object, uploadID) { return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} @@ -601,12 +626,16 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Hold lock so that // 1) no one aborts this multipart upload // 2) no one does a parallel complete-multipart-upload on this // multipart upload - nsMutex.Lock(minioMetaBucket, uploadIDPath) - defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) + nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID) + defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID) if !fs.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} @@ -615,8 +644,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // fs-append.json path fsAppendMetaPath := getFSAppendMetaPath(uploadID) // Lock fs-append.json so that no parallel appendParts() is being done. - nsMutex.Lock(minioMetaBucket, fsAppendMetaPath) - defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath) + nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID) + defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID) // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := completeMultipartMD5(parts...) @@ -724,10 +753,14 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, bucket, object) } + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID = getOpsID() + // Hold the lock so that two parallel complete-multipart-uploads do not // leave a stale uploads.json behind. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. @@ -816,9 +849,13 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error return ObjectNameInvalid{Bucket: bucket, Object: object} } + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Hold lock so that there is no competing complete-multipart-upload or put-object-part. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !fs.isUploadIDExists(bucket, object, uploadID) { return InvalidUploadID{UploadID: uploadID} @@ -826,8 +863,8 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error fsAppendMetaPath := getFSAppendMetaPath(uploadID) // Lock fs-append.json so that no parallel appendParts() is being done. - nsMutex.Lock(minioMetaBucket, fsAppendMetaPath) - defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath) + nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID) + defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID) err := fs.abortMultipartUpload(bucket, object, uploadID) return err diff --git a/cmd/globals.go b/cmd/globals.go index 154b519c9..d24354184 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -21,6 +21,7 @@ import ( "github.com/fatih/color" "github.com/minio/minio/pkg/objcache" + "os" ) // Global constants for Minio. @@ -42,6 +43,10 @@ const ( var ( globalQuiet = false // Quiet flag set via command line globalTrace = false // Trace flag set via environment setting. + + globalDebug = false // Debug flag set to print debug info. + globalDebugLock = false // Lock debug info set via environment variable MINIO_DEBUG=lock . + globalDebugMemory = false // Memory debug info set via environment variable MINIO_DEBUG=mem // Add new global flags here. // Maximum connections handled per @@ -70,3 +75,15 @@ var ( colorBlue = color.New(color.FgBlue).SprintfFunc() colorBold = color.New(color.Bold).SprintFunc() ) + +// fetch from environment variables and set the global values related to locks. +func setGlobalsDebugFromEnv() { + debugEnv := os.Getenv("MINIO_DEBUG") + switch debugEnv { + case "lock": + globalDebugLock = true + case "mem": + globalDebugMemory = true + } + globalDebug = globalDebugLock || globalDebugMemory +} diff --git a/cmd/lock-instrument.go b/cmd/lock-instrument.go new file mode 100644 index 000000000..474a7870d --- /dev/null +++ b/cmd/lock-instrument.go @@ -0,0 +1,283 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "time" +) + +const ( + debugRLockStr = "RLock" + debugWLockStr = "WLock" +) + +// struct containing information of status (ready/running/blocked) of an operation with given operation ID. +type debugLockInfo struct { + lockType string // "Rlock" or "WLock". + lockOrigin string // contains the trace of the function which invoked the lock, obtained from runtime. + status string // status can be running/ready/blocked. + since time.Time // time info of the since how long the status holds true. +} + +// debugLockInfo - container for storing locking information for unique copy (volume,path) pair. +// ref variable holds the reference count for locks held for. +// `ref` values helps us understand the n locks held for given pair. +// `running` value helps us understand the total successful locks held (not blocked) for given pair and the operation is under execution. +// `blocked` value helps us understand the total number of operations blocked waiting on locks for given pair. +type debugLockInfoPerVolumePath struct { + ref int64 // running + blocked operations. + running int64 // count of successful lock acquire and running operations. + blocked int64 // count of number of operations blocked waiting on lock. + lockInfo (map[string]debugLockInfo) // map of [operationID] debugLockInfo{operation, status, since} . +} + +// returns an instance of debugLockInfo. +// need to create this for every unique pair of {volume,path}. +// total locks, number of calls blocked on locks, and number of successful locks held but not unlocked yet. +func newDebugLockInfoPerVolumePath() *debugLockInfoPerVolumePath { + return &debugLockInfoPerVolumePath{ + lockInfo: make(map[string]debugLockInfo), + ref: 0, + blocked: 0, + running: 0, + } +} + +// LockInfoNil - Returned if the lock info map is not initialized. +type LockInfoNil struct { +} + +func (l LockInfoNil) Error() string { + return fmt.Sprintf("Debug Lock Map not initialized:\n1. Enable Lock Debugging using right ENV settings \n2. Make sure initNSLock() is called.") +} + +// LockInfoOriginNotFound - While changing the state of the lock info its important that the entry for +// lock at a given origin exists, if not `LockInfoOriginNotFound` is returned. +type LockInfoOriginNotFound struct { + volume string + path string + operationID string + lockOrigin string +} + +func (l LockInfoOriginNotFound) Error() string { + return fmt.Sprintf("No lock state stored for the lock origined at \"%s\", for %s, %s, %s.", + l.lockOrigin, l.volume, l.path, l.operationID) +} + +// LockInfoVolPathMssing - Error interface. Returned when the info the +type LockInfoVolPathMssing struct { + volume string + path string +} + +func (l LockInfoVolPathMssing) Error() string { + return fmt.Sprintf("No entry in debug Lock Map for Volume: %s, path: %s.", l.volume, l.path) +} + +// LockInfoOpsIDNotFound - Returned when the lock state info exists, but the entry for +// given operation ID doesn't exist. +type LockInfoOpsIDNotFound struct { + volume string + path string + operationID string +} + +func (l LockInfoOpsIDNotFound) Error() string { + return fmt.Sprintf("No entry in lock info for %s, %s, %s.", l.operationID, l.volume, l.path) +} + +// LockInfoStateNotBlocked - When an attempt to change the state of the lock form `blocked` to `running` is done, +// its necessary that the state before the transsition is "blocked", otherwise LockInfoStateNotBlocked returned. +type LockInfoStateNotBlocked struct { + volume string + path string + operationID string +} + +func (l LockInfoStateNotBlocked) Error() string { + return fmt.Sprintf("Lock state should be \"Blocked\" for %s, %s, %s.", l.volume, l.path, l.operationID) +} + +// change the state of the lock from Blocked to Running. +func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, operationID string, readLock bool) error { + // This operation is not executed under the scope nsLockMap.mutex.Lock(), lock has to be explicitly held here. + n.lockMapMutex.Lock() + defer n.lockMapMutex.Unlock() + if n.debugLockMap == nil { + return LockInfoNil{} + } + // new state info to be set for the lock. + newLockInfo := debugLockInfo{ + lockOrigin: lockOrigin, + status: "Running", + since: time.Now().UTC(), + } + + // set lock type. + if readLock { + newLockInfo.lockType = debugRLockStr + } else { + newLockInfo.lockType = debugWLockStr + } + + // check whether the lock info entry for pair already exists and its not `nil`. + if debugLockMap, ok := n.debugLockMap[param]; ok { + // ``*debugLockInfoPerVolumePath` entry containing lock info for `param ` is `nil`. + if debugLockMap == nil { + return LockInfoNil{} + } + } else { + // The lock state info foe given pair should already exist. + // If not return `LockInfoVolPathMssing`. + return LockInfoVolPathMssing{param.volume, param.path} + } + + // Lock info the for the given operation ID shouldn't be `nil`. + if n.debugLockMap[param].lockInfo == nil { + return LockInfoOpsIDNotFound{param.volume, param.path, operationID} + } + + if lockInfo, ok := n.debugLockMap[param].lockInfo[operationID]; ok { + // The entry for the lock origined at `lockOrigin` should already exist. + // If not return `LockInfoOriginNotFound`. + if lockInfo.lockOrigin != lockOrigin { + return LockInfoOriginNotFound{param.volume, param.path, operationID, lockOrigin} + } + // Status of the lock should already be set to "Blocked". + // If not return `LockInfoStateNotBlocked`. + if lockInfo.status != "Blocked" { + return LockInfoStateNotBlocked{param.volume, param.path, operationID} + } + } else { + // The lock info entry for given `opsID` should already exist for given pair. + // If not return `LockInfoOpsIDNotFound`. + return LockInfoOpsIDNotFound{param.volume, param.path, operationID} + } + + // All checks finished. + // changing the status of the operation from blocked to running and updating the time. + n.debugLockMap[param].lockInfo[operationID] = newLockInfo + + // After locking unblocks decrease the blocked counter. + n.blockedCounter-- + // Increase the running counter. + n.runningLockCounter++ + n.debugLockMap[param].blocked-- + n.debugLockMap[param].running++ + return nil +} + +// change the state of the lock from Ready to Blocked. +func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID string, readLock bool) error { + if n.debugLockMap == nil { + return LockInfoNil{} + } + + newLockInfo := debugLockInfo{ + lockOrigin: lockOrigin, + status: "Blocked", + since: time.Now().UTC(), + } + if readLock { + newLockInfo.lockType = debugRLockStr + } else { + newLockInfo.lockType = debugWLockStr + } + + if lockInfo, ok := n.debugLockMap[param]; ok { + if lockInfo == nil { + // *debugLockInfoPerVolumePath entry is nil, initialize here to avoid any case of `nil` pointer access. + n.initLockInfoForVolumePath(param) + } + } else { + // State info entry for the given doesn't exist, initializing it. + n.initLockInfoForVolumePath(param) + } + + // lockInfo is a map[string]debugLockInfo, which holds map[OperationID]{status,time, origin} of the lock. + if n.debugLockMap[param].lockInfo == nil { + n.debugLockMap[param].lockInfo = make(map[string]debugLockInfo) + } + // The status of the operation with the given operation ID is marked blocked till its gets unblocked from the lock. + n.debugLockMap[param].lockInfo[operationID] = newLockInfo + // Increment the Global lock counter. + n.globalLockCounter++ + // Increment the counter for number of blocked opertions, decrement it after the locking unblocks. + n.blockedCounter++ + // increment the reference of the lock for the given pair. + n.debugLockMap[param].ref++ + // increment the blocked counter for the given pair. + n.debugLockMap[param].blocked++ + return nil +} + +// deleteLockInfoEntry - Deletes the lock state information for given pair. Called when nsLk.ref count is 0. +func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error { + if n.debugLockMap == nil { + return LockInfoNil{} + } + // delete the lock info for the given operation. + if _, found := n.debugLockMap[param]; found { + // Remove from the map if there are no more references for the given (volume,path) pair. + delete(n.debugLockMap, param) + } else { + return LockInfoVolPathMssing{param.volume, param.path} + } + return nil +} + +// deleteLockInfoEntry - Deletes the entry for given opsID in the lock state information of given pair. +// called when the nsLk ref count for the given pair is not 0. +func (n *nsLockMap) deleteLockInfoEntryForOps(param nsParam, operationID string) error { + if n.debugLockMap == nil { + return LockInfoNil{} + } + // delete the lock info for the given operation. + if infoMap, found := n.debugLockMap[param]; found { + // the opertion finished holding the lock on the resource, remove the entry for the given operation with the operation ID. + if _, foundInfo := infoMap.lockInfo[operationID]; foundInfo { + // decrease the global running and lock reference counter. + n.runningLockCounter-- + n.globalLockCounter-- + // decrease the lock referene counter for the lock info for given pair. + // decrease the running operation number. Its assumed that the operation is over once an attempt to release the lock is made. + infoMap.running-- + // decrease the total reference count of locks jeld on pair. + infoMap.ref-- + delete(infoMap.lockInfo, operationID) + } else { + // Unlock request with invalid opertion ID not accepted. + return LockInfoOpsIDNotFound{param.volume, param.path, operationID} + } + } else { + return LockInfoVolPathMssing{param.volume, param.path} + } + return nil +} + +// return randomly generated string ID if lock debug is enabled, +// else returns empty string +func getOpsID() (opsID string) { + // check if lock debug is enabled. + if globalDebugLock { + // generated random ID. + opsID = string(generateRequestID()) + } + return opsID +} diff --git a/cmd/lock-instrument_test.go b/cmd/lock-instrument_test.go new file mode 100644 index 000000000..d762ddfb2 --- /dev/null +++ b/cmd/lock-instrument_test.go @@ -0,0 +1,744 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "testing" + "time" +) + +type lockStateCase struct { + volume string + path string + lockOrigin string + opsID string + readLock bool // lock type. + setBlocked bool // initialize the initial state to blocked. + expectedErr error + // expected global lock stats. + expectedLockStatus string // Status of the lock Blocked/Running. + + expectedGlobalLockCount int // Total number of locks held across the system, includes blocked + held locks. + expectedBlockedLockCount int // Total blocked lock across the system. + expectedRunningLockCount int // Total succesfully held locks (non-blocking). + // expected lock statu for given pair. + expectedVolPathLockCount int // Total locks held for given pair, includes blocked locks. + expectedVolPathRunningCount int // Total succcesfully held locks for given pair. + expectedVolPathBlockCount int // Total locks blocked on the given pair. +} + +// Used for validating the Lock info obtaining from contol RPC end point for obtaining lock related info. +func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoResponse SystemLockState, t TestErrHandler, testNum int) { + // Assert the total number of locks (locked + acquired) in the system. + if rpcLockInfoResponse.TotalLocks != int64(l.expectedGlobalLockCount) { + t.Fatalf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount), + rpcLockInfoResponse.TotalLocks) + } + + // verify the count for total blocked locks. + if rpcLockInfoResponse.TotalBlockedLocks != int64(l.expectedBlockedLockCount) { + t.Fatalf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount), + rpcLockInfoResponse.TotalBlockedLocks) + } + + // verify the count for total running locks. + if rpcLockInfoResponse.TotalAcquiredLocks != int64(l.expectedRunningLockCount) { + t.Fatalf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount), + rpcLockInfoResponse.TotalAcquiredLocks) + } + + for _, locksInfoPerObject := range rpcLockInfoResponse.LocksInfoPerObject { + // See whether the entry for the exists in the RPC response. + if locksInfoPerObject.Bucket == l.volume && locksInfoPerObject.Object == l.path { + // Assert the total number of locks (blocked + acquired) for the given pair. + if locksInfoPerObject.LocksOnObject != int64(l.expectedVolPathLockCount) { + t.Errorf("Test %d: Expected the total lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, + l.volume, l.path, int64(l.expectedVolPathLockCount), locksInfoPerObject.LocksOnObject) + } + // Assert the total number of acquired locks for the given pair. + if locksInfoPerObject.LocksAcquiredOnObject != int64(l.expectedVolPathRunningCount) { + t.Errorf("Test %d: Expected the acquired lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, + l.volume, l.path, int64(l.expectedVolPathRunningCount), locksInfoPerObject.LocksAcquiredOnObject) + } + // Assert the total number of blocked locks for the given pair. + if locksInfoPerObject.TotalBlockedLocks != int64(l.expectedVolPathBlockCount) { + t.Errorf("Test %d: Expected the blocked lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, + l.volume, l.path, int64(l.expectedVolPathBlockCount), locksInfoPerObject.TotalBlockedLocks) + } + // Flag to mark whether there's an entry in the RPC lock info response for given opsID. + var opsIDfound bool + for _, opsLockState := range locksInfoPerObject.LockDetailsOnObject { + // first check whether the entry for the given operation ID exists. + if opsLockState.OperationID == l.opsID { + opsIDfound = true + // asserting the type of lock (RLock/WLock) from the RPC lock info response. + if l.readLock { + if opsLockState.LockType != debugRLockStr { + t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugRLockStr) + } + } else { + if opsLockState.LockType != debugWLockStr { + t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugWLockStr) + } + } + + if opsLockState.Status != l.expectedLockStatus { + t.Errorf("Test case %d: Expected the status of the operation to be \"%s\", got \"%s\"", testNum, l.expectedLockStatus, opsLockState.Status) + } + + // if opsLockState.LockOrigin != l.lockOrigin { + // t.Fatalf("Test case %d: Expected the origin of the lock to be \"%s\", got \"%s\"", testNum, opsLockState.LockOrigin, l.lockOrigin) + // } + // all check satisfied, return here. + // Any mismatch in the earlier checks would have ended the tests due to `Fatalf`, + // control reaching here implies that all checks are satisfied. + return + } + } + // opsID not found. + // No entry for an operation with given operation ID exists. + if !opsIDfound { + t.Fatalf("Test case %d: Entry for OpsId: \"%s\" not found in : \"%s\", : \"%s\" doesn't exist in the RPC response", testNum, l.opsID, l.volume, l.path) + } + } + } + // No entry exists for given pair in the RPC response. + t.Errorf("Test case %d: Entry for : \"%s\", : \"%s\" doesn't exist in the RPC response", testNum, l.volume, l.path) +} + +// Asserts the lock counter from the global nsMutex inmemory lock with the expected one. +func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) { + nsMutex.lockMapMutex.Lock() + + // Verifying the lock stats. + if nsMutex.globalLockCounter != int64(l.expectedGlobalLockCount) { + t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount), + nsMutex.globalLockCounter) + } + // verify the count for total blocked locks. + if nsMutex.blockedCounter != int64(l.expectedBlockedLockCount) { + t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount), + nsMutex.blockedCounter) + } + // verify the count for total running locks. + if nsMutex.runningLockCounter != int64(l.expectedRunningLockCount) { + t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount), + nsMutex.runningLockCounter) + } + nsMutex.lockMapMutex.Unlock() + // Verifying again with the JSON response of the lock info. + // Verifying the lock stats. + sysLockState, err := generateSystemLockResponse() + if err != nil { + t.Fatalf("Obtaining lock info failed with %s", err) + + } + if sysLockState.TotalLocks != int64(l.expectedGlobalLockCount) { + t.Errorf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount), + sysLockState.TotalLocks) + } + // verify the count for total blocked locks. + if sysLockState.TotalBlockedLocks != int64(l.expectedBlockedLockCount) { + t.Errorf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount), + sysLockState.TotalBlockedLocks) + } + // verify the count for total running locks. + if sysLockState.TotalAcquiredLocks != int64(l.expectedRunningLockCount) { + t.Errorf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount), + sysLockState.TotalAcquiredLocks) + } +} + +// Verify the lock counter for entries of given pair. +func verifyLockStats(l lockStateCase, t *testing.T, testNum int) { + nsMutex.lockMapMutex.Lock() + defer nsMutex.lockMapMutex.Unlock() + param := nsParam{l.volume, l.path} + + // Verify the total locks (blocked+running) for given pair. + if nsMutex.debugLockMap[param].ref != int64(l.expectedVolPathLockCount) { + t.Errorf("Test %d: Expected the total lock count for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, + param.volume, param.path, int64(l.expectedVolPathLockCount), nsMutex.debugLockMap[param].ref) + } + // Verify the total running locks for given pair. + if nsMutex.debugLockMap[param].running != int64(l.expectedVolPathRunningCount) { + t.Errorf("Test %d: Expected the total running locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path, + int64(l.expectedVolPathRunningCount), nsMutex.debugLockMap[param].running) + } + // Verify the total blocked locks for givne pair. + if nsMutex.debugLockMap[param].blocked != int64(l.expectedVolPathBlockCount) { + t.Errorf("Test %d: Expected the total blocked locks for volume: \"%s\", path: \"%s\" to be %v, but got %v", testNum, param.volume, param.path, + int64(l.expectedVolPathBlockCount), nsMutex.debugLockMap[param].blocked) + } +} + +// verifyLockState - function which asserts the expected lock info in the system with the actual values in the nsMutex. +func verifyLockState(l lockStateCase, t *testing.T, testNum int) { + param := nsParam{l.volume, l.path} + + verifyGlobalLockStats(l, t, testNum) + nsMutex.lockMapMutex.Lock() + // Verifying the lock statuS fields. + if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok { + // Validating the lock type filed in the debug lock information. + if l.readLock { + if lockInfo.lockType != debugRLockStr { + t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugRLockStr) + } + } else { + if lockInfo.lockType != debugWLockStr { + t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugWLockStr) + } + } + + // // validating the lock origin. + // if l.lockOrigin != lockInfo.lockOrigin { + // t.Fatalf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", testNum, l.lockOrigin, lockInfo.lockOrigin) + // } + // validating the status of the lock. + if lockInfo.status != l.expectedLockStatus { + t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", testNum, l.expectedLockStatus, lockInfo.status) + } + } else { + // Stop the tests if lock debug entry for given pair is not found. + t.Errorf("Test case %d: Expected an debug lock entry for opsID \"%s\"", testNum, l.opsID) + } + } else { + // To change the status the entry for given should exist in the lock info struct. + t.Errorf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", testNum, param.volume, param.path) + } + // verifyLockStats holds its own lock. + nsMutex.lockMapMutex.Unlock() + + // verify the lock count. + verifyLockStats(l, t, testNum) +} + +// TestNewDebugLockInfoPerVolumePath - Validates the values initialized by newDebugLockInfoPerVolumePath(). +func TestNewDebugLockInfoPerVolumePath(t *testing.T) { + lockInfo := newDebugLockInfoPerVolumePath() + + if lockInfo.ref != 0 { + t.Errorf("Expected initial reference value of total locks to be 0, got %d", lockInfo.ref) + } + if lockInfo.blocked != 0 { + t.Errorf("Expected initial reference of blocked locks to be 0, got %d", lockInfo.blocked) + } + if lockInfo.running != 0 { + t.Errorf("Expected initial reference value of held locks to be 0, got %d", lockInfo.running) + } +} + +// TestNsLockMapStatusBlockedToRunning - Validates the function for changing the lock state from blocked to running. +func TestNsLockMapStatusBlockedToRunning(t *testing.T) { + + testCases := []struct { + volume string + path string + lockOrigin string + opsID string + readLock bool // lock type. + setBlocked bool // initialize the initial state to blocked. + expectedErr error + }{ + // Test case - 1. + { + + volume: "my-bucket", + path: "my-object", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "abcd1234", + readLock: true, + setBlocked: true, + // expected metrics. + expectedErr: nil, + }, + // Test case - 2. + // No entry for pair. + // So an attempt to change the state of the lock from `Blocked`->`Running` should fail. + { + + volume: "my-bucket", + path: "my-object-2", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "abcd1234", + readLock: false, + setBlocked: false, + // expected metrics. + expectedErr: LockInfoVolPathMssing{"my-bucket", "my-object-2"}, + }, + // Test case - 3. + // Entry for the given operationID doesn't exist in the lock state info. + { + volume: "my-bucket", + path: "my-object", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "ops-Id-not-registered", + readLock: true, + setBlocked: false, + // expected metrics. + expectedErr: LockInfoOpsIDNotFound{"my-bucket", "my-object", "ops-Id-not-registered"}, + }, + // Test case - 4. + // Test case with non-existent lock origin. + { + volume: "my-bucket", + path: "my-object", + lockOrigin: "Bad Origin", + opsID: "abcd1234", + readLock: true, + setBlocked: false, + // expected metrics. + expectedErr: LockInfoOriginNotFound{"my-bucket", "my-object", "abcd1234", "Bad Origin"}, + }, + // Test case - 5. + // Test case with write lock. + { + + volume: "my-bucket", + path: "my-object", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "abcd1234", + readLock: false, + setBlocked: true, + // expected metrics. + expectedErr: nil, + }, + } + + param := nsParam{testCases[0].volume, testCases[0].path} + // Testing before the initialization done. + // Since the data structures for + actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, + testCases[0].opsID, testCases[0].readLock) + + expectedNilErr := LockInfoNil{} + if actualErr != expectedNilErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr) + } + + nsMutex = &nsLockMap{ + // entries of -> stateInfo of locks, for instrumentation purpose. + debugLockMap: make(map[nsParam]*debugLockInfoPerVolumePath), + lockMap: make(map[nsParam]*nsLock), + } + // Entry for pair is set to nil. + // Should fail with `LockInfoNil{}`. + nsMutex.debugLockMap[param] = nil + actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, + testCases[0].opsID, testCases[0].readLock) + + expectedNilErr = LockInfoNil{} + if actualErr != expectedNilErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr) + } + + // Setting the lock info the be `nil`. + nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ + lockInfo: nil, // setting the lockinfo to nil. + ref: 0, + blocked: 0, + running: 0, + } + + actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, + testCases[0].opsID, testCases[0].readLock) + + expectedOpsErr := LockInfoOpsIDNotFound{testCases[0].volume, testCases[0].path, testCases[0].opsID} + if actualErr != expectedOpsErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedOpsErr, actualErr) + } + + // Next case: ase whether an attempt to change the state of the lock to "Running" done, + // but the initial state if already "Running". Such an attempt should fail + nsMutex.debugLockMap[param] = &debugLockInfoPerVolumePath{ + lockInfo: make(map[string]debugLockInfo), + ref: 0, + blocked: 0, + running: 0, + } + + // Setting the status of the lock to be "Running". + // The initial state of the lock should set to "Blocked", otherwise its not possible to change the state from "Blocked" -> "Running". + nsMutex.debugLockMap[param].lockInfo[testCases[0].opsID] = debugLockInfo{ + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + status: "Running", // State set to "Running". Should fail with `LockInfoStateNotBlocked`. + since: time.Now().UTC(), + } + + actualErr = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, + testCases[0].opsID, testCases[0].readLock) + + expectedBlockErr := LockInfoStateNotBlocked{testCases[0].volume, testCases[0].path, testCases[0].opsID} + if actualErr != expectedBlockErr { + t.Fatalf("Errors mismatch: Expected: \"%s\", got: \"%s\"", expectedBlockErr, actualErr) + } + + // enabling lock instrumentation. + globalDebugLock = true + // initializing the locks. + initNSLock(false) + // set debug lock info to `nil` so that the next tests have to initialize them again. + defer func() { + globalDebugLock = false + nsMutex.debugLockMap = nil + }() + // Iterate over the cases and assert the result. + for i, testCase := range testCases { + param := nsParam{testCase.volume, testCase.path} + // status of the lock to be set to "Blocked", before setting Blocked->Running. + if testCase.setBlocked { + nsMutex.lockMapMutex.Lock() + err := nsMutex.statusNoneToBlocked(param, testCase.lockOrigin, testCase.opsID, testCase.readLock) + if err != nil { + t.Fatalf("Test %d: Initializing the initial state to Blocked failed %s", i+1, err) + } + nsMutex.lockMapMutex.Unlock() + } + // invoking the method under test. + actualErr = nsMutex.statusBlockedToRunning(param, testCase.lockOrigin, testCase.opsID, testCase.readLock) + if actualErr != testCase.expectedErr { + t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr) + } + // In case of no error proceed with validating the lock state information. + if actualErr == nil { + // debug entry for given pair should exist. + if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok { + // Validating the lock type filed in the debug lock information. + if testCase.readLock { + if lockInfo.lockType != debugRLockStr { + t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugRLockStr) + } + } else { + if lockInfo.lockType != debugWLockStr { + t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugWLockStr) + } + } + + // validating the lock origin. + if testCase.lockOrigin != lockInfo.lockOrigin { + t.Errorf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", i+1, testCase.lockOrigin, lockInfo.lockOrigin) + } + // validating the status of the lock. + if lockInfo.status != "Running" { + t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", i+1, "Running", lockInfo.status) + } + } else { + // Stop the tests if lock debug entry for given pair is not found. + t.Fatalf("Test case %d: Expected an debug lock entry for opsID \"%s\"", i+1, testCase.opsID) + } + } else { + // To change the status the entry for given should exist in the lock info struct. + t.Fatalf("Test case %d: Debug lock entry for volume: %s, path: %s doesn't exist", i+1, param.volume, param.path) + } + } + } + +} + +// TestNsLockMapStatusNoneToBlocked - Validates the function for changing the lock state to blocked +func TestNsLockMapStatusNoneToBlocked(t *testing.T) { + + testCases := []lockStateCase{ + // Test case - 1. + { + + volume: "my-bucket", + path: "my-object", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "abcd1234", + readLock: true, + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + expectedGlobalLockCount: 1, + expectedRunningLockCount: 0, + expectedBlockedLockCount: 1, + + expectedVolPathLockCount: 1, + expectedVolPathRunningCount: 0, + expectedVolPathBlockCount: 1, + }, + // Test case - 2. + // No entry for pair. + // So an attempt to change the state of the lock from `Blocked`->`Running` should fail. + { + + volume: "my-bucket", + path: "my-object-2", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "abcd1234", + readLock: false, + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + expectedGlobalLockCount: 2, + expectedRunningLockCount: 0, + expectedBlockedLockCount: 2, + + expectedVolPathLockCount: 1, + expectedVolPathRunningCount: 0, + expectedVolPathBlockCount: 1, + }, + // Test case - 3. + // Entry for the given operationID doesn't exist in the lock state info. + // The entry should be created and relevant counters should be set. + { + volume: "my-bucket", + path: "my-object", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "ops-Id-not-registered", + readLock: true, + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + expectedGlobalLockCount: 3, + expectedRunningLockCount: 0, + expectedBlockedLockCount: 3, + + expectedVolPathLockCount: 2, + expectedVolPathRunningCount: 0, + expectedVolPathBlockCount: 2, + }, + } + + param := nsParam{testCases[0].volume, testCases[0].path} + // Testing before the initialization done. + // Since the data structures for + actualErr := nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, + testCases[0].opsID, testCases[0].readLock) + + expectedNilErr := LockInfoNil{} + if actualErr != expectedNilErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr) + } + // enabling lock instrumentation. + globalDebugLock = true + // initializing the locks. + initNSLock(false) + // set debug lock info to `nil` so that the next tests have to initialize them again. + defer func() { + globalDebugLock = false + nsMutex.debugLockMap = nil + }() + // Iterate over the cases and assert the result. + for i, testCase := range testCases { + nsMutex.lockMapMutex.Lock() + param := nsParam{testCase.volume, testCase.path} + actualErr := nsMutex.statusNoneToBlocked(param, testCase.lockOrigin, testCase.opsID, testCase.readLock) + if actualErr != testCase.expectedErr { + t.Fatalf("Test %d: Errors mismatch: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, actualErr) + } + nsMutex.lockMapMutex.Unlock() + if actualErr == nil { + verifyLockState(testCase, t, i+1) + } + } +} + +// TestNsLockMapDeleteLockInfoEntryForOps - Validates the removal of entry for given Operational ID from the lock info. +func TestNsLockMapDeleteLockInfoEntryForOps(t *testing.T) { + testCases := []lockStateCase{ + // Test case - 1. + { + volume: "my-bucket", + path: "my-object", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "abcd1234", + readLock: true, + // expected metrics. + }, + } + // case - 1. + // Testing the case where delete lock info is attempted even before the lock is initialized. + param := nsParam{testCases[0].volume, testCases[0].path} + // Testing before the initialization done. + + actualErr := nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + + expectedNilErr := LockInfoNil{} + if actualErr != expectedNilErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr) + } + + // enabling lock instrumentation. + globalDebugLock = true + // initializing the locks. + initNSLock(false) + // set debug lock info to `nil` so that the next tests have to initialize them again. + defer func() { + globalDebugLock = false + nsMutex.debugLockMap = nil + }() + // case - 2. + // Case where an attempt to delete the entry for non-existent pair is done. + // Set the status of the lock to blocked and then to running. + nonExistParam := nsParam{volume: "non-exist-volume", path: "non-exist-path"} + actualErr = nsMutex.deleteLockInfoEntryForOps(nonExistParam, testCases[0].opsID) + + expectedVolPathErr := LockInfoVolPathMssing{nonExistParam.volume, nonExistParam.path} + if actualErr != expectedVolPathErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedVolPathErr, actualErr) + } + + // Case - 3. + // Lock state is set to Running and then an attempt to delete the info for non-existant opsID done. + nsMutex.lockMapMutex.Lock() + err := nsMutex.statusNoneToBlocked(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock) + if err != nil { + t.Fatalf("Setting lock status to Blocked failed: %s", err) + } + nsMutex.lockMapMutex.Unlock() + err = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock) + if err != nil { + t.Fatalf("Setting lock status to Running failed: %s", err) + } + actualErr = nsMutex.deleteLockInfoEntryForOps(param, "non-existant-OpsID") + + expectedOpsIDErr := LockInfoOpsIDNotFound{param.volume, param.path, "non-existant-OpsID"} + if actualErr != expectedOpsIDErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedOpsIDErr, actualErr) + } + // case - 4. + // Attempt to delete an registered entry is done. + // All metrics should be 0 after deleting the entry. + + // Verify that the entry the opsID exists. + if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; !ok { + t.Fatalf("Entry for OpsID \"%s\" in %s, %s should have existed. ", testCases[0].opsID, param.volume, param.path) + } + } else { + t.Fatalf("Entry for %s, %s should have existed. ", param.volume, param.path) + } + + actualErr = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + if actualErr != nil { + t.Fatalf("Expected the error to be , but got %s", actualErr) + } + + // Verify that the entry for the opsId doesn't exists. + if debugLockMap, ok := nsMutex.debugLockMap[param]; ok { + if _, ok := debugLockMap.lockInfo[testCases[0].opsID]; ok { + t.Fatalf("The entry for opsID \"%s\" should have been deleted", testCases[0].opsID) + } + } else { + t.Fatalf("Entry for %s, %s should have existed. ", param.volume, param.path) + } + if nsMutex.runningLockCounter != int64(0) { + t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter) + } + if nsMutex.blockedCounter != int64(0) { + t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter) + } + if nsMutex.globalLockCounter != int64(0) { + t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter) + } +} + +// TestNsLockMapDeleteLockInfoEntryForVolumePath - Tests validate the logic for removal +// of entry for given pair from lock info. +func TestNsLockMapDeleteLockInfoEntryForVolumePath(t *testing.T) { + testCases := []lockStateCase{ + // Test case - 1. + { + volume: "my-bucket", + path: "my-object", + lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a", + opsID: "abcd1234", + readLock: true, + // expected metrics. + }, + } + // case - 1. + // Testing the case where delete lock info is attempted even before the lock is initialized. + param := nsParam{testCases[0].volume, testCases[0].path} + // Testing before the initialization done. + + actualErr := nsMutex.deleteLockInfoEntryForVolumePath(param) + + expectedNilErr := LockInfoNil{} + if actualErr != expectedNilErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedNilErr, actualErr) + } + + // enabling lock instrumentation. + globalDebugLock = true + // initializing the locks. + initNSLock(false) + // set debug lock info to `nil` so that the next tests have to initialize them again. + defer func() { + globalDebugLock = false + nsMutex.debugLockMap = nil + }() + // case - 2. + // Case where an attempt to delete the entry for non-existent pair is done. + // Set the status of the lock to blocked and then to running. + nonExistParam := nsParam{volume: "non-exist-volume", path: "non-exist-path"} + actualErr = nsMutex.deleteLockInfoEntryForVolumePath(nonExistParam) + + expectedVolPathErr := LockInfoVolPathMssing{nonExistParam.volume, nonExistParam.path} + if actualErr != expectedVolPathErr { + t.Fatalf("Errors mismatch: Expected \"%s\", got \"%s\"", expectedVolPathErr, actualErr) + } + + // case - 3. + // Attempt to delete an registered entry is done. + // All metrics should be 0 after deleting the entry. + + // Registering the entry first. + nsMutex.lockMapMutex.Lock() + err := nsMutex.statusNoneToBlocked(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock) + if err != nil { + t.Fatalf("Setting lock status to Blocked failed: %s", err) + } + nsMutex.lockMapMutex.Unlock() + err = nsMutex.statusBlockedToRunning(param, testCases[0].lockOrigin, testCases[0].opsID, testCases[0].readLock) + if err != nil { + t.Fatalf("Setting lock status to Running failed: %s", err) + } + // Verify that the entry the for given exists. + if _, ok := nsMutex.debugLockMap[param]; !ok { + t.Fatalf("Entry for %s, %s should have existed.", param.volume, param.path) + } + // first delete the entry for the operation ID. + err = nsMutex.deleteLockInfoEntryForOps(param, testCases[0].opsID) + actualErr = nsMutex.deleteLockInfoEntryForVolumePath(param) + if actualErr != nil { + t.Fatalf("Expected the error to be , but got %s", actualErr) + } + + // Verify that the entry for the opsId doesn't exists. + if _, ok := nsMutex.debugLockMap[param]; ok { + t.Fatalf("Entry for %s, %s should have been deleted. ", param.volume, param.path) + } + // The lock count values should be 0. + if nsMutex.runningLockCounter != int64(0) { + t.Errorf("Expected the count of total running locks to be %v, but got %v", int64(0), nsMutex.runningLockCounter) + } + if nsMutex.blockedCounter != int64(0) { + t.Errorf("Expected the count of total blocked locks to be %v, but got %v", int64(0), nsMutex.blockedCounter) + } + if nsMutex.globalLockCounter != int64(0) { + t.Errorf("Expected the count of all locks to be %v, but got %v", int64(0), nsMutex.globalLockCounter) + } +} diff --git a/cmd/main.go b/cmd/main.go index 578aefd70..b75440244 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -62,6 +62,9 @@ func init() { // Set global trace flag. globalTrace = os.Getenv("MINIO_TRACE") == "1" + + // Set all the debug flags from ENV if any. + setGlobalsDebugFromEnv() } func migrate() { diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 3d77c1024..3eb4ce059 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -18,7 +18,9 @@ package cmd import ( "errors" + "fmt" pathutil "path" + "runtime" "strconv" "strings" "sync" @@ -58,6 +60,15 @@ func initNSLock(isDist bool) { isDist: isDist, lockMap: make(map[nsParam]*nsLock), } + if globalDebugLock { + // lock Debugging enabed, initialize nsLockMap with entry for debugging information. + // entries of -> stateInfo of locks, for instrumentation purpose. + nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) + } +} + +func (n *nsLockMap) initLockInfoForVolumePath(param nsParam) { + n.debugLockMap[param] = newDebugLockInfoPerVolumePath() } // RWLocker - interface that any read-write locking library should implement. @@ -83,13 +94,19 @@ type nsLock struct { // nsLockMap - namespace lock map, provides primitives to Lock, // Unlock, RLock and RUnlock. type nsLockMap struct { + // lock counter used for lock debugging. + globalLockCounter int64 //total locks held. + blockedCounter int64 // total operations blocked waiting for locks. + runningLockCounter int64 // total locks held but not released yet. + debugLockMap map[nsParam]*debugLockInfoPerVolumePath // info for instrumentation on locks. + isDist bool // indicates whether the locking service is part of a distributed setup or not. lockMap map[nsParam]*nsLock lockMapMutex sync.Mutex } // Lock the namespace resource. -func (n *nsLockMap) lock(volume, path string, readLock bool) { +func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock bool) { var nsLk *nsLock n.lockMapMutex.Lock() @@ -112,6 +129,15 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { if readLock && n.isDist { rwlock = dsync.NewDRWMutex(pathutil.Join(volume, path)) } + + if globalDebugLock { + // change the state of the lock to be blocked for the given pair of and till the lock unblocks. + // The lock for accessing `nsMutex` is held inside the function itself. + err := n.statusNoneToBlocked(param, lockOrigin, opsID, readLock) + if err != nil { + errorIf(err, "Failed to set lock state to blocked.") + } + } // Unlock map before Locking NS which might block. n.lockMapMutex.Unlock() @@ -133,10 +159,20 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { } else { rwlock.Lock() } + + // check if lock debugging enabled. + if globalDebugLock { + // Changing the status of the operation from blocked to running. + // change the state of the lock to be running (from blocked) for the given pair of and . + err := n.statusBlockedToRunning(param, lockOrigin, opsID, readLock) + if err != nil { + errorIf(err, "Failed to set the lock state to running.") + } + } } // Unlock the namespace resource. -func (n *nsLockMap) unlock(volume, path string, readLock bool) { +func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) { // nsLk.Unlock() will not block, hence locking the map for the entire function is fine. n.lockMapMutex.Lock() defer n.lockMapMutex.Unlock() @@ -163,6 +199,13 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) { } if nsLk.ref != 0 { nsLk.ref-- + // locking debug enabled, delete the lock state entry for given operation ID. + if globalDebugLock { + err := n.deleteLockInfoEntryForOps(param, opsID) + if err != nil { + errorIf(err, "Failed to delete lock info entry.") + } + } } if nsLk.ref == 0 { if len(nsLk.readerArray) != 0 && n.isDist { @@ -171,31 +214,61 @@ func (n *nsLockMap) unlock(volume, path string, readLock bool) { // Remove from the map if there are no more references. delete(n.lockMap, param) + + // locking debug enabled, delete the lock state entry for given pair. + if globalDebugLock { + err := n.deleteLockInfoEntryForVolumePath(param) + if err != nil { + errorIf(err, "Failed to delete lock info entry.") + } + } } } } // Lock - locks the given resource for writes, using a previously // allocated name space lock or initializing a new one. -func (n *nsLockMap) Lock(volume, path string) { +func (n *nsLockMap) Lock(volume, path, opsID string) { + var lockOrigin string + // lock debugging enabled. The caller information of the lock held has be obtained here before calling any other function. + if globalDebugLock { + // fetching the package, function name and the line number of the caller from the runtime. + // here is an example https://play.golang.org/p/perrmNRI9_ . + pc, fn, line, success := runtime.Caller(1) + if !success { + errorIf(errors.New("Couldn't get caller info."), "Fetching caller info form runtime failed.") + } + lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]", runtime.FuncForPC(pc).Name(), fn, line) + } readLock := false - n.lock(volume, path, readLock) + n.lock(volume, path, lockOrigin, opsID, readLock) } // Unlock - unlocks any previously acquired write locks. -func (n *nsLockMap) Unlock(volume, path string) { +func (n *nsLockMap) Unlock(volume, path, opsID string) { readLock := false - n.unlock(volume, path, readLock) + n.unlock(volume, path, opsID, readLock) } // RLock - locks any previously acquired read locks. -func (n *nsLockMap) RLock(volume, path string) { +func (n *nsLockMap) RLock(volume, path, opsID string) { + var lockOrigin string readLock := true - n.lock(volume, path, readLock) + // lock debugging enabled. The caller information of the lock held has be obtained here before calling any other function. + if globalDebugLock { + // fetching the package, function name and the line number of the caller from the runtime. + // here is an example https://play.golang.org/p/perrmNRI9_ . + pc, fn, line, success := runtime.Caller(1) + if !success { + errorIf(errors.New("Couldn't get caller info."), "Fetching caller info form runtime failed.") + } + lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]", runtime.FuncForPC(pc).Name(), fn, line) + } + n.lock(volume, path, lockOrigin, opsID, readLock) } // RUnlock - unlocks any previously acquired read locks. -func (n *nsLockMap) RUnlock(volume, path string) { +func (n *nsLockMap) RUnlock(volume, path, opsID string) { readLock := true - n.unlock(volume, path, readLock) + n.unlock(volume, path, opsID, readLock) } diff --git a/cmd/namespace-lock_test.go b/cmd/namespace-lock_test.go index be2945df8..8e91a3140 100644 --- a/cmd/namespace-lock_test.go +++ b/cmd/namespace-lock_test.go @@ -16,16 +16,21 @@ package cmd -import "testing" +import ( + "strconv" + "sync" + "testing" + "time" +) // Tests functionality provided by namespace lock. func TestNamespaceLockTest(t *testing.T) { // List of test cases. testCases := []struct { - lk func(s1, s2 string) - unlk func(s1, s2 string) - rlk func(s1, s2 string) - runlk func(s1, s2 string) + lk func(s1, s2, s3 string) + unlk func(s1, s2, s3 string) + rlk func(s1, s2, s3 string) + runlk func(s1, s2, s3 string) lkCount int lockedRefCount uint unlockedRefCount uint @@ -58,7 +63,7 @@ func TestNamespaceLockTest(t *testing.T) { // Write lock tests. testCase := testCases[0] - testCase.lk("a", "b") // lock once. + testCase.lk("a", "b", "c") // lock once. nsLk, ok := nsMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") @@ -67,7 +72,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref) } - testCase.unlk("a", "b") // unlock once. + testCase.unlk("a", "b", "c") // unlock once. if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref) } @@ -78,10 +83,10 @@ func TestNamespaceLockTest(t *testing.T) { // Read lock tests. testCase = testCases[1] - testCase.rlk("a", "b") // lock once. - testCase.rlk("a", "b") // lock second time. - testCase.rlk("a", "b") // lock third time. - testCase.rlk("a", "b") // lock fourth time. + testCase.rlk("a", "b", "c") // lock once. + testCase.rlk("a", "b", "c") // lock second time. + testCase.rlk("a", "b", "c") // lock third time. + testCase.rlk("a", "b", "c") // lock fourth time. nsLk, ok = nsMutex.lockMap[nsParam{"a", "b"}] if !ok && testCase.shouldPass { t.Errorf("Lock in map missing.") @@ -90,8 +95,9 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref) } - testCase.runlk("a", "b") // unlock once. - testCase.runlk("a", "b") // unlock second time. + + testCase.runlk("a", "b", "c") // unlock once. + testCase.runlk("a", "b", "c") // unlock second time. if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref) } @@ -102,7 +108,7 @@ func TestNamespaceLockTest(t *testing.T) { // Read lock 0 ref count. testCase = testCases[2] - testCase.rlk("a", "c") // lock once. + testCase.rlk("a", "c", "d") // lock once. nsLk, ok = nsMutex.lockMap[nsParam{"a", "c"}] if !ok && testCase.shouldPass { @@ -112,7 +118,7 @@ func TestNamespaceLockTest(t *testing.T) { if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.lockedRefCount, nsLk.ref) } - testCase.runlk("a", "c") // unlock once. + testCase.runlk("a", "c", "d") // unlock once. if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass { t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref) } @@ -121,3 +127,266 @@ func TestNamespaceLockTest(t *testing.T) { t.Errorf("Lock map not found.") } } + +func TestLockStats(t *testing.T) { + + expectedResult := []lockStateCase{ + // Test case - 1. + // Case where 10 read locks are held. + // Entry for any of the 10 reads locks has to be found. + // Since they held in a loop, Lock origin for first 10 read locks (opsID 0-9) should be the same. + { + + volume: "my-bucket", + path: "my-object", + opsID: "0", + readLock: true, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + expectedGlobalLockCount: 10, + expectedRunningLockCount: 10, + expectedBlockedLockCount: 0, + + expectedVolPathLockCount: 10, + expectedVolPathRunningCount: 10, + expectedVolPathBlockCount: 0, + }, + // Test case - 2. + // Case where the first 5 read locks are released. + // Entry for any of the 6-10th "Running" reads lock has to be found. + { + volume: "my-bucket", + path: "my-object", + opsID: "6", + readLock: true, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + expectedGlobalLockCount: 5, + expectedRunningLockCount: 5, + expectedBlockedLockCount: 0, + + expectedVolPathLockCount: 5, + expectedVolPathRunningCount: 5, + expectedVolPathBlockCount: 0, + }, + // Test case - 3. + { + + volume: "my-bucket", + path: "my-object", + opsID: "10", + readLock: false, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + expectedGlobalLockCount: 2, + expectedRunningLockCount: 1, + expectedBlockedLockCount: 1, + + expectedVolPathLockCount: 2, + expectedVolPathRunningCount: 1, + expectedVolPathBlockCount: 1, + }, + // Test case - 4. + { + + volume: "my-bucket", + path: "my-object", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + expectedGlobalLockCount: 1, + expectedRunningLockCount: 0, + expectedBlockedLockCount: 1, + + expectedVolPathLockCount: 1, + expectedVolPathRunningCount: 0, + expectedVolPathBlockCount: 1, + }, + // Test case - 5. + { + + volume: "my-bucket", + path: "my-object", + opsID: "11", + readLock: false, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + expectedGlobalLockCount: 1, + expectedRunningLockCount: 1, + expectedBlockedLockCount: 0, + + expectedVolPathLockCount: 1, + expectedVolPathRunningCount: 1, + expectedVolPathBlockCount: 0, + }, + // Test case - 6. + // Case where in the first 5 read locks are released, but 2 write locks are + // blocked waiting for the remaining 5 read locks locks to be released (10 read locks were held intially). + // We check the entry for the first blocked write call here. + { + + volume: "my-bucket", + path: "my-object", + opsID: "10", + readLock: false, + // write lock is held at line 318. + // this confirms that we are looking the right write lock. + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + // count of held(running) + blocked locks. + expectedGlobalLockCount: 7, + // count of acquired locks. + expectedRunningLockCount: 5, + // 2 write calls are blocked, waiting for the remaining 5 read locks. + expectedBlockedLockCount: 2, + + expectedVolPathLockCount: 7, + expectedVolPathRunningCount: 5, + expectedVolPathBlockCount: 2, + }, + // Test case - 7. + // Case where in 9 out of 10 read locks are released. + // Since there's one more pending read lock, the 2 write locks are still blocked. + // Testing the entry for the last read lock. + {volume: "my-bucket", + path: "my-object", + opsID: "9", + readLock: true, + lockOrigin: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Running", + + // Total running + blocked locks. + // 2 blocked write lock. + expectedGlobalLockCount: 3, + expectedRunningLockCount: 1, + expectedBlockedLockCount: 2, + + expectedVolPathLockCount: 3, + expectedVolPathRunningCount: 1, + expectedVolPathBlockCount: 2, + }, + // Test case - 8. + { + + volume: "my-bucket", + path: "my-object", + // expected metrics. + expectedErr: nil, + expectedLockStatus: "Blocked", + + expectedGlobalLockCount: 0, + expectedRunningLockCount: 0, + expectedBlockedLockCount: 0, + }, + } + var wg sync.WaitGroup + // enabling lock instrumentation. + globalDebugLock = true + // initializing the locks. + initNSLock(false) + + // set debug lock info to `nil` so that the next tests have to initialize them again. + defer func() { + globalDebugLock = false + nsMutex.debugLockMap = nil + }() + + // hold 10 read locks. + for i := 0; i < 10; i++ { + nsMutex.RLock("my-bucket", "my-object", strconv.Itoa(i)) + } + // expected lock info. + expectedLockStats := expectedResult[0] + // verify the actual lock info with the expected one. + verifyLockState(expectedLockStats, t, 1) + // unlock 5 readlock. + for i := 0; i < 5; i++ { + nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i)) + } + + expectedLockStats = expectedResult[1] + // verify the actual lock info with the expected one. + verifyLockState(expectedLockStats, t, 2) + + syncChan := make(chan struct{}, 1) + wg.Add(1) + go func() { + defer wg.Done() + // blocks till all read locks are released. + nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(10)) + // Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock. + expectedWLockStats := expectedResult[2] + // Since the write lock acquired here, the number of blocked locks should reduce by 1 and + // count of running locks should increase by 1. + verifyLockState(expectedWLockStats, t, 3) + // release the write lock. + nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10)) + // The number of running locks should decrease by 1. + // expectedWLockStats = expectedResult[3] + // verifyLockState(expectedWLockStats, t, 4) + // Take the lock stats after the first write lock is unlocked. + // Only then unlock then second write lock. + syncChan <- struct{}{} + }() + // waiting so that the write locks in the above go routines are held. + // sleeping so that we can predict the order of the write locks held. + time.Sleep(100 * time.Millisecond) + + // since there are 5 more readlocks still held on <"my-bucket","my-object">, + // an attempt to hold write locks blocks. So its run in a new go routine. + wg.Add(1) + go func() { + defer wg.Done() + // blocks till all read locks are released. + nsMutex.Lock("my-bucket", "my-object", strconv.Itoa(11)) + // Once the above attempt to lock is unblocked/acquired, we release the lock. + // Unlock the second write lock only after lock stats for first write lock release is taken. + <-syncChan + // The number of running locks should decrease by 1. + expectedWLockStats := expectedResult[4] + verifyLockState(expectedWLockStats, t, 5) + nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11)) + }() + + expectedLockStats = expectedResult[5] + + time.Sleep(1 * time.Second) + // verify the actual lock info with the expected one. + verifyLockState(expectedLockStats, t, 6) + + // unlock 4 out of remaining 5 read locks. + for i := 0; i < 4; i++ { + nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5)) + } + + // verify the entry for one remaining read lock and count of blocked write locks. + expectedLockStats = expectedResult[6] + // verify the actual lock info with the expected one. + verifyLockState(expectedLockStats, t, 7) + + // Releasing the last read lock. + nsMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9)) + wg.Wait() + expectedLockStats = expectedResult[7] + // verify the actual lock info with the expected one. + verifyGlobalLockStats(expectedLockStats, t, 8) + +} diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 7e5e87d1f..b0df96b16 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -26,10 +26,8 @@ import ( "io" "io/ioutil" "math/rand" - "net" "net/http" "net/http/httptest" - "net/rpc" "net/url" "os" "regexp" @@ -180,6 +178,58 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer { return testServer } +// Initializes control RPC end points. +// The object Layer will be a temp back used for testing purpose. +func initTestControlRPCEndPoint(objectLayer ObjectLayer) http.Handler { + // Initialize Web. + + controllerHandlers := &controllerAPIHandlers{ + ObjectAPI: func() ObjectLayer { return objectLayer }, + } + + // Initialize router. + muxRouter := router.NewRouter() + registerControllerRPCRouter(muxRouter, controllerHandlers) + return muxRouter +} + +// StartTestRPCServer - Creates a temp XL/FS backend and initializes control RPC end points, +// then starts a test server with those control RPC end points registered. +func StartTestRPCServer(t TestErrHandler, instanceType string) TestServer { + // create temporary backend for the test server. + nDisks := 16 + disks, err := getRandomDisks(nDisks) + if err != nil { + t.Fatal("Failed to create disks for the backend") + } + // create an instance of TestServer. + testRPCServer := TestServer{} + // create temporary backend for the test server. + objLayer, err := makeTestBackend(disks, instanceType) + + if err != nil { + t.Fatalf("Failed obtaining Temp Backend: %s", err) + } + + root, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("%s", err) + } + + // Get credential. + credentials := serverConfig.GetCredential() + + testRPCServer.Root = root + testRPCServer.Disks = disks + testRPCServer.AccessKey = credentials.AccessKeyID + testRPCServer.SecretKey = credentials.SecretAccessKey + testRPCServer.Obj = objLayer + // Run TestServer. + testRPCServer.Server = httptest.NewServer(initTestControlRPCEndPoint(objLayer)) + + return testRPCServer +} + // Configure the server for the test run. func newTestConfig(bucketLocation string) (rootPath string, err error) { // Get test root. @@ -990,37 +1040,3 @@ func initTestWebRPCEndPoint(objLayer ObjectLayer) http.Handler { registerWebRouter(muxRouter, webHandlers) return muxRouter } - -// Initialize Controller RPC Handlers for testing -func initTestControllerRPCEndPoint(objLayer ObjectLayer) (string, string, error) { - controllerHandlers := &controllerAPIHandlers{ - ObjectAPI: func() ObjectLayer { return objLayer }, - } - // Start configuring net/rpc server - server := rpc.NewServer() - server.RegisterName("Controller", controllerHandlers) - - listenTCP := func() (net.Listener, string, error) { - l, e := net.Listen("tcp", ":0") // any available address - if e != nil { - return nil, "", errors.New("net.Listen tcp :0, " + e.Error()) - } - return l, l.Addr().String(), nil - } - - l, serverAddr, err := listenTCP() - if err != nil { - return "", "", nil - } - go server.Accept(l) - - // net/rpc only accepts one registered path and doesn't help to unregister it, - // so we are registering a new rpc path each time this function is called - random := strconv.Itoa(rand.Int()) - server.HandleHTTP("/controller"+random, "/controller-debug"+random) - - testserver := httptest.NewServer(nil) - serverAddr = testserver.Listener.Addr().String() - - return serverAddr, random, nil -} diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 9e5501561..cc2c20602 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -35,8 +35,12 @@ func (xl xlObjects) MakeBucket(bucket string) error { return toObjectErr(errVolumeExists, bucket) } - nsMutex.Lock(bucket, "") - defer nsMutex.Unlock(bucket, "") + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.Lock(bucket, "", opsID) + defer nsMutex.Unlock(bucket, "", opsID) // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -157,8 +161,12 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err // Checks whether bucket exists. func (xl xlObjects) isBucketExist(bucket string) bool { - nsMutex.RLock(bucket, "") - defer nsMutex.RUnlock(bucket, "") + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.RLock(bucket, "", opsID) + defer nsMutex.RUnlock(bucket, "", opsID) // Check whether bucket exists. _, err := xl.getBucketInfo(bucket) @@ -178,8 +186,12 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { if !IsValidBucketName(bucket) { return BucketInfo{}, BucketNameInvalid{Bucket: bucket} } - nsMutex.RLock(bucket, "") - defer nsMutex.RUnlock(bucket, "") + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.RLock(bucket, "", opsID) + defer nsMutex.RUnlock(bucket, "", opsID) bucketInfo, err := xl.getBucketInfo(bucket) if err != nil { return BucketInfo{}, toObjectErr(err, bucket) @@ -254,8 +266,12 @@ func (xl xlObjects) DeleteBucket(bucket string) error { return BucketNotFound{Bucket: bucket} } - nsMutex.Lock(bucket, "") - defer nsMutex.Unlock(bucket, "") + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.Lock(bucket, "", opsID) + defer nsMutex.Unlock(bucket, "", opsID) // Collect if all disks report volume not found. var wg = &sync.WaitGroup{} diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index 148ce666a..b9fe83955 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -137,8 +137,13 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma result.Prefixes = append(result.Prefixes, objInfo.Name) continue } + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Check if the current object needs healing - nsMutex.RLock(bucket, objInfo.Name) + nsMutex.RLock(bucket, objInfo.Name, opsID) partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name) if xlShouldHeal(partsMetadata, errs) { result.Objects = append(result.Objects, ObjectInfo{ @@ -148,7 +153,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma IsDir: false, }) } - nsMutex.RUnlock(bucket, objInfo.Name) + nsMutex.RUnlock(bucket, objInfo.Name, opsID) } return result, nil } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 7b1cdd929..2dc3a6770 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -62,7 +62,11 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // List all upload ids for the keyMarker starting from // uploadIDMarker first. if uploadIDMarker != "" { - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { continue @@ -76,7 +80,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } break } - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID) if err != nil { return ListMultipartsInfo{}, err } @@ -127,8 +131,13 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark var newUploads []uploadMetadata var end bool uploadIDMarker = "" + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // For the new object entry we get all its pending uploadIDs. - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) var disk StorageAPI for _, disk = range xl.getLoadBalancedDisks() { if disk == nil { @@ -143,7 +152,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } break } - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) if err != nil { if isErrIgnored(err, walkResultIgnoredErrs) { continue @@ -269,9 +278,13 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st xlMeta.Stat.ModTime = time.Now().UTC() xlMeta.Meta = meta + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/" - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) uploadID = getUUID() initiated := time.Now().UTC() @@ -339,20 +352,25 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s var partsMetadata []xlMetaV1 var errs []error uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) - nsMutex.RLock(minioMetaBucket, uploadIDPath) + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID) // Validates if upload ID exists. if !xl.isUploadIDExists(bucket, object, uploadID) { - nsMutex.RUnlock(minioMetaBucket, uploadIDPath) + nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) return "", InvalidUploadID{UploadID: uploadID} } // Read metadata associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) if !isDiskQuorum(errs, xl.writeQuorum) { - nsMutex.RUnlock(minioMetaBucket, uploadIDPath) + nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) return "", toObjectErr(errXLWriteQuorum, bucket, object) } - nsMutex.RUnlock(minioMetaBucket, uploadIDPath) + nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) // List all online disks. onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) @@ -421,8 +439,12 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } } - nsMutex.Lock(minioMetaBucket, uploadIDPath) - defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID = getOpsID() + + nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID) + defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID) // Validate again if upload ID still exists. if !xl.isUploadIDExists(bucket, object, uploadID) { @@ -565,9 +587,14 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM if !IsValidObjectName(object) { return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} } + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !xl.isUploadIDExists(bucket, object, uploadID) { return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} @@ -597,11 +624,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload Object: object, } } + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Hold lock so that // 1) no one aborts this multipart upload // 2) no one does a parallel complete-multipart-upload on this multipart upload - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !xl.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} @@ -712,15 +744,20 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID = getOpsID() + // Hold write lock on the destination before rename. - nsMutex.Lock(bucket, object) + nsMutex.Lock(bucket, object, opsID) defer func() { // A new complete multipart upload invalidates any // previously cached object in memory. xl.objCache.Delete(path.Join(bucket, object)) // This lock also protects the cache namespace. - nsMutex.Unlock(bucket, object) + nsMutex.Unlock(bucket, object, opsID) // Prefetch the object from disk by triggering a fake GetObject call // Unlike a regular single PutObject, multipart PutObject is comes in @@ -761,10 +798,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Delete the previously successfully renamed object. xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID)) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID = getOpsID() + // Hold the lock so that two parallel complete-multipart-uploads do not // leave a stale uploads.json behind. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. @@ -804,8 +845,12 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e return toObjectErr(err, bucket, object) } - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID) // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. uploadsJSON, err := xl.readUploadsJSON(bucket, object) @@ -857,9 +902,13 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error return ObjectNameInvalid{Bucket: bucket, Object: object} } + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Hold lock so that there is no competing complete-multipart-upload or put-object-part. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !xl.isUploadIDExists(bucket, object, uploadID) { return InvalidUploadID{UploadID: uploadID} diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 560407d51..d1e9bceb3 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -56,9 +56,14 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i if writer == nil { return toObjectErr(errUnexpected, bucket, object) } + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Lock the object before reading. - nsMutex.RLock(bucket, object) - defer nsMutex.RUnlock(bucket, object) + nsMutex.RLock(bucket, object, opsID) + defer nsMutex.RUnlock(bucket, object, opsID) // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) @@ -226,9 +231,13 @@ func (xl xlObjects) HealObject(bucket, object string) error { return ObjectNameInvalid{Bucket: bucket, Object: object} } + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Lock the object before healing. - nsMutex.RLock(bucket, object) - defer nsMutex.RUnlock(bucket, object) + nsMutex.RLock(bucket, object, opsID) + defer nsMutex.RUnlock(bucket, object, opsID) partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) if err := reduceErrs(errs, nil); err != nil { @@ -350,8 +359,13 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { if !IsValidObjectName(object) { return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} } - nsMutex.RLock(bucket, object) - defer nsMutex.RUnlock(bucket, object) + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.RLock(bucket, object, opsID) + defer nsMutex.RUnlock(bucket, object, opsID) info, err := xl.getObjectInfo(bucket, object) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) @@ -609,9 +623,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } } + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + // Lock the object. - nsMutex.Lock(bucket, object) - defer nsMutex.Unlock(bucket, object) + nsMutex.Lock(bucket, object, opsID) + defer nsMutex.Unlock(bucket, object, opsID) // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). @@ -724,8 +742,13 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { if !IsValidObjectName(object) { return ObjectNameInvalid{Bucket: bucket, Object: object} } - nsMutex.Lock(bucket, object) - defer nsMutex.Unlock(bucket, object) + + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.Lock(bucket, object, opsID) + defer nsMutex.Unlock(bucket, object, opsID) // Validate object exists. if !xl.isObject(bucket, object) { diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 2e23edf2d..5a98d503f 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -178,8 +178,12 @@ func (xl xlObjects) Shutdown() error { // HealDiskMetadata function for object storage interface. func (xl xlObjects) HealDiskMetadata() error { - nsMutex.Lock(minioMetaBucket, formatConfigFile) - defer nsMutex.Unlock(minioMetaBucket, formatConfigFile) + // generates random string on setting MINIO_DEBUG=lock, else returns empty string. + // used for instrumentation on locks. + opsID := getOpsID() + + nsMutex.Lock(minioMetaBucket, formatConfigFile, opsID) + defer nsMutex.Unlock(minioMetaBucket, formatConfigFile, opsID) return repairDiskMetadata(xl.storageDisks) }