From c8f57133a4384ea98206560b691f50e7520007b9 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 4 Jan 2017 13:09:22 +0530 Subject: [PATCH] Implement list, clear locks REST API w/ pkg/madmin support (#3491) * Filter lock info based on bucket, prefix and time since lock was held * Implement list and clear locks REST API * madmin: Add list and clear locks API * locks: Clear locks matching bucket, prefix, relTime. * Gather lock information across nodes for both list and clear locks admin REST API. * docs: Add lock API to management APIs --- cmd/admin-handlers.go | 145 ++++++++++++++- cmd/admin-handlers_test.go | 241 ++++++++++++++++++++++++- cmd/admin-router.go | 10 +- cmd/admin-rpc-client.go | 98 ++++++++-- cmd/admin-rpc-server.go | 38 +++- cmd/admin-rpc-server_test.go | 2 +- cmd/api-errors.go | 6 + cmd/lockinfo-handlers.go | 68 ++++++- cmd/lockinfo-handlers_test.go | 81 +++++++++ cmd/object-api-utils.go | 2 +- docs/admin-api/management-api.md | 82 +++++++++ docs/admin-api/service.md | 33 ---- pkg/madmin/examples/lock-clear.go | 47 +++++ pkg/madmin/examples/lock-list.go | 46 +++++ pkg/madmin/examples/service-restart.go | 4 +- pkg/madmin/examples/service-status.go | 4 +- pkg/madmin/examples/service-stop.go | 4 +- pkg/madmin/lock-commands.go | 151 ++++++++++++++++ pkg/madmin/lock-commands_test.go | 61 +++++++ pkg/madmin/service.go | 6 +- 20 files changed, 1039 insertions(+), 90 deletions(-) create mode 100644 cmd/lockinfo-handlers_test.go create mode 100644 docs/admin-api/management-api.md delete mode 100644 docs/admin-api/service.md create mode 100644 pkg/madmin/examples/lock-clear.go create mode 100644 pkg/madmin/examples/lock-list.go create mode 100644 pkg/madmin/lock-commands.go create mode 100644 pkg/madmin/lock-commands_test.go diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index a08b1a772..f80ec826f 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -19,6 +19,8 @@ package cmd import ( "encoding/json" "net/http" + "net/url" + "time" ) const ( @@ -28,9 +30,8 @@ const ( // ServiceStatusHandler - GET /?service // HTTP header x-minio-operation: status // ---------- -// This implementation of the GET operation fetches server status information. -// provides total disk space available to use, online disks, offline disks and -// quorum threshold. +// Fetches server status information like total disk space available +// to use, online disks, offline disks and quorum threshold. func (adminAPI adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Request) { adminAPIErr := checkRequestAuthType(r, "", "", "") if adminAPIErr != ErrNone { @@ -44,15 +45,16 @@ func (adminAPI adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r * errorIf(err, "Failed to marshal storage info into json.") return } + // Reply with storage information (across nodes in a + // distributed setup) as json. writeSuccessResponse(w, jsonBytes) } // ServiceStopHandler - POST /?service // HTTP header x-minio-operation: stop // ---------- -// This implementation of the POST operation stops minio server gracefully, -// in a distributed setup stops all the servers in the cluster. Body sent -// if any on client request is ignored. +// Stops minio server gracefully. In a distributed setup, stops all the +// servers in the cluster. func (adminAPI adminAPIHandlers) ServiceStopHandler(w http.ResponseWriter, r *http.Request) { adminAPIErr := checkRequestAuthType(r, "", "", "") if adminAPIErr != ErrNone { @@ -67,9 +69,8 @@ func (adminAPI adminAPIHandlers) ServiceStopHandler(w http.ResponseWriter, r *ht // ServiceRestartHandler - POST /?service // HTTP header x-minio-operation: restart // ---------- -// This implementation of the POST operation restarts minio server gracefully, -// in a distributed setup restarts all the servers in the cluster. Body sent -// if any on client request is ignored. +// Restarts minio server gracefully. In a distributed setup, restarts +// all the servers in the cluster. func (adminAPI adminAPIHandlers) ServiceRestartHandler(w http.ResponseWriter, r *http.Request) { adminAPIErr := checkRequestAuthType(r, "", "", "") if adminAPIErr != ErrNone { @@ -80,3 +81,129 @@ func (adminAPI adminAPIHandlers) ServiceRestartHandler(w http.ResponseWriter, r w.WriteHeader(http.StatusOK) sendServiceCmd(globalAdminPeers, serviceRestart) } + +// Type-safe lock query params. +type lockQueryKey string + +// Only valid query params for list/clear locks management APIs. +const ( + lockBucket lockQueryKey = "bucket" + lockPrefix lockQueryKey = "prefix" + lockOlderThan lockQueryKey = "older-than" +) + +// validateLockQueryParams - Validates query params for list/clear locks management APIs. +func validateLockQueryParams(vars url.Values) (string, string, time.Duration, APIErrorCode) { + bucket := vars.Get(string(lockBucket)) + prefix := vars.Get(string(lockPrefix)) + relTimeStr := vars.Get(string(lockOlderThan)) + + // N B empty bucket name is invalid + if !IsValidBucketName(bucket) { + return "", "", time.Duration(0), ErrInvalidBucketName + } + + // empty prefix is valid. + if !IsValidObjectPrefix(prefix) { + return "", "", time.Duration(0), ErrInvalidObjectName + } + + // If older-than parameter was empty then set it to 0s to list + // all locks older than now. + if relTimeStr == "" { + relTimeStr = "0s" + } + relTime, err := time.ParseDuration(relTimeStr) + if err != nil { + errorIf(err, "Failed to parse duration passed as query value.") + return "", "", time.Duration(0), ErrInvalidDuration + } + + return bucket, prefix, relTime, ErrNone +} + +// ListLocksHandler - GET /?lock&bucket=mybucket&prefix=myprefix&older-than=rel_time +// - bucket is a mandatory query parameter +// - prefix and older-than are optional query parameters +// HTTP header x-minio-operation: list +// --------- +// Lists locks held on a given bucket, prefix and relative time. +func (adminAPI adminAPIHandlers) ListLocksHandler(w http.ResponseWriter, r *http.Request) { + adminAPIErr := checkRequestAuthType(r, "", "", "") + if adminAPIErr != ErrNone { + writeErrorResponse(w, r, adminAPIErr, r.URL.Path) + return + } + + vars := r.URL.Query() + bucket, prefix, relTime, adminAPIErr := validateLockQueryParams(vars) + if adminAPIErr != ErrNone { + writeErrorResponse(w, r, adminAPIErr, r.URL.Path) + return + } + + // Fetch lock information of locks matching bucket/prefix that + // are available since relTime. + volLocks, err := listPeerLocksInfo(globalAdminPeers, bucket, prefix, relTime) + if err != nil { + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + errorIf(err, "Failed to fetch lock information from remote nodes.") + return + } + + // Marshal list of locks as json. + jsonBytes, err := json.Marshal(volLocks) + if err != nil { + writeErrorResponseNoHeader(w, r, ErrInternalError, r.URL.Path) + errorIf(err, "Failed to marshal lock information into json.") + return + } + + // Reply with list of locks held on bucket, matching prefix + // older than relTime supplied, as json. + writeSuccessResponse(w, jsonBytes) +} + +// ClearLocksHandler - POST /?lock&bucket=mybucket&prefix=myprefix&older-than=relTime +// - bucket is a mandatory query parameter +// - prefix and older-than are optional query parameters +// HTTP header x-minio-operation: clear +// --------- +// Clear locks held on a given bucket, prefix and relative time. +func (adminAPI adminAPIHandlers) ClearLocksHandler(w http.ResponseWriter, r *http.Request) { + adminAPIErr := checkRequestAuthType(r, "", "", "") + if adminAPIErr != ErrNone { + writeErrorResponse(w, r, adminAPIErr, r.URL.Path) + return + } + + vars := r.URL.Query() + bucket, prefix, relTime, adminAPIErr := validateLockQueryParams(vars) + if adminAPIErr != ErrNone { + writeErrorResponse(w, r, adminAPIErr, r.URL.Path) + return + } + + // Fetch lock information of locks matching bucket/prefix that + // are available since relTime. + volLocks, err := listPeerLocksInfo(globalAdminPeers, bucket, prefix, relTime) + if err != nil { + writeErrorResponseNoHeader(w, r, ErrInternalError, r.URL.Path) + errorIf(err, "Failed to fetch lock information from remote nodes.") + return + } + + // Marshal list of locks as json. + jsonBytes, err := json.Marshal(volLocks) + if err != nil { + writeErrorResponseNoHeader(w, r, ErrInternalError, r.URL.Path) + errorIf(err, "Failed to marshal lock information into json.") + return + } + // Remove lock matching bucket/prefix older than relTime. + for _, volLock := range volLocks { + globalNSMutex.ForceUnlock(volLock.Bucket, volLock.Object) + } + // Reply with list of locks cleared, as json. + writeSuccessResponse(w, jsonBytes) +} diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 95f5b853e..91c7252da 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -18,13 +18,17 @@ package cmd import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" + "net/url" "testing" router "github.com/gorilla/mux" ) +// cmdType - Represents different service subcomands like status, stop +// and restart. type cmdType int const ( @@ -33,6 +37,7 @@ const ( restartCmd ) +// String - String representation for cmdType func (c cmdType) String() string { switch c { case statusCmd: @@ -45,6 +50,8 @@ func (c cmdType) String() string { return "" } +// apiMethod - Returns the HTTP method corresponding to the admin REST +// API for a given cmdType value. func (c cmdType) apiMethod() string { switch c { case statusCmd: @@ -57,6 +64,8 @@ func (c cmdType) apiMethod() string { return "GET" } +// toServiceSignal - Helper function that translates a given cmdType +// value to its corresponding serviceSignal value. func (c cmdType) toServiceSignal() serviceSignal { switch c { case statusCmd: @@ -69,6 +78,8 @@ func (c cmdType) toServiceSignal() serviceSignal { return serviceStatus } +// testServiceSignalReceiver - Helper function that simulates a +// go-routine waiting on service signal. func testServiceSignalReceiver(cmd cmdType, t *testing.T) { expectedCmd := cmd.toServiceSignal() serviceCmd := <-globalServiceSignalCh @@ -77,12 +88,19 @@ func testServiceSignalReceiver(cmd cmdType, t *testing.T) { } } -func getAdminCmdRequest(cmd cmdType, cred credential) (*http.Request, error) { +// getServiceCmdRequest - Constructs a management REST API request for service +// subcommands for a given cmdType value. +func getServiceCmdRequest(cmd cmdType, cred credential) (*http.Request, error) { req, err := newTestRequest(cmd.apiMethod(), "/?service", 0, nil) if err != nil { return nil, err } + + // minioAdminOpHeader is to identify the request as a + // management REST API request. req.Header.Set(minioAdminOpHeader, cmd.String()) + + // management REST API uses signature V4 for authentication. err = signRequestV4(req, cred.AccessKey, cred.SecretKey) if err != nil { return nil, err @@ -90,18 +108,26 @@ func getAdminCmdRequest(cmd cmdType, cred credential) (*http.Request, error) { return req, nil } +// testServicesCmdHandler - parametrizes service subcommand tests on +// cmdType value. func testServicesCmdHandler(cmd cmdType, t *testing.T) { + // Initialize configuration for access/secret credentials. rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("Unable to initialize server config. %s", err) } defer removeAll(rootPath) - // Initialize admin peers to make admin RPC calls. + // Initialize admin peers to make admin RPC calls. Note: In a + // single node setup, this degenerates to a simple function + // call under the hood. eps, err := parseStorageEndpoints([]string{"http://localhost"}) if err != nil { t.Fatalf("Failed to parse storage end point - %v", err) } + + // Set globalMinioAddr to be able to distinguish local endpoints from remote. + globalMinioAddr = eps[0].Host initGlobalAdminPeers(eps) if cmd == statusCmd { @@ -128,7 +154,7 @@ func testServicesCmdHandler(cmd cmdType, t *testing.T) { registerAdminRouter(adminRouter) rec := httptest.NewRecorder() - req, err := getAdminCmdRequest(cmd, credentials) + req, err := getServiceCmdRequest(cmd, credentials) if err != nil { t.Fatalf("Failed to build service status request %v", err) } @@ -151,14 +177,223 @@ func testServicesCmdHandler(cmd cmdType, t *testing.T) { } } +// Test for service status management REST API. func TestServiceStatusHandler(t *testing.T) { testServicesCmdHandler(statusCmd, t) } +// Test for service stop management REST API. func TestServiceStopHandler(t *testing.T) { testServicesCmdHandler(stopCmd, t) } +// Test for service restart management REST API. func TestServiceRestartHandler(t *testing.T) { testServicesCmdHandler(restartCmd, t) } + +// Test for locks list management REST API. +func TestListLocksHandler(t *testing.T) { + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Unable to initialize server config. %s", err) + } + defer removeAll(rootPath) + + // Initialize admin peers to make admin RPC calls. + eps, err := parseStorageEndpoints([]string{"http://localhost"}) + if err != nil { + t.Fatalf("Failed to parse storage end point - %v", err) + } + + // Set globalMinioAddr to be able to distinguish local endpoints from remote. + globalMinioAddr = eps[0].Host + initGlobalAdminPeers(eps) + + testCases := []struct { + bucket string + prefix string + relTime string + expectedStatus int + }{ + // Test 1 - valid testcase + { + bucket: "mybucket", + prefix: "myobject", + relTime: "1s", + expectedStatus: 200, + }, + // Test 2 - invalid duration + { + bucket: "mybucket", + prefix: "myprefix", + relTime: "invalidDuration", + expectedStatus: 400, + }, + // Test 3 - invalid bucket name + { + bucket: `invalid\\Bucket`, + prefix: "myprefix", + relTime: "1h", + expectedStatus: 400, + }, + // Test 4 - invalid prefix + { + bucket: "mybucket", + prefix: `invalid\\Prefix`, + relTime: "1h", + expectedStatus: 400, + }, + } + + adminRouter := router.NewRouter() + registerAdminRouter(adminRouter) + + for i, test := range testCases { + queryStr := fmt.Sprintf("&bucket=%s&prefix=%s&older-than=%s", test.bucket, test.prefix, test.relTime) + req, err := newTestRequest("GET", "/?lock"+queryStr, 0, nil) + if err != nil { + t.Fatalf("Test %d - Failed to construct list locks request - %v", i+1, err) + } + req.Header.Set(minioAdminOpHeader, "list") + + cred := serverConfig.GetCredential() + err = signRequestV4(req, cred.AccessKey, cred.SecretKey) + if err != nil { + t.Fatalf("Test %d - Failed to sign list locks request - %v", i+1, err) + } + rec := httptest.NewRecorder() + adminRouter.ServeHTTP(rec, req) + if test.expectedStatus != rec.Code { + t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.expectedStatus, rec.Code) + } + } +} + +// Test for locks clear management REST API. +func TestClearLocksHandler(t *testing.T) { + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Unable to initialize server config. %s", err) + } + defer removeAll(rootPath) + + // Initialize admin peers to make admin RPC calls. + eps, err := parseStorageEndpoints([]string{"http://localhost"}) + if err != nil { + t.Fatalf("Failed to parse storage end point - %v", err) + } + initGlobalAdminPeers(eps) + + testCases := []struct { + bucket string + prefix string + relTime string + expectedStatus int + }{ + // Test 1 - valid testcase + { + bucket: "mybucket", + prefix: "myobject", + relTime: "1s", + expectedStatus: 200, + }, + // Test 2 - invalid duration + { + bucket: "mybucket", + prefix: "myprefix", + relTime: "invalidDuration", + expectedStatus: 400, + }, + // Test 3 - invalid bucket name + { + bucket: `invalid\\Bucket`, + prefix: "myprefix", + relTime: "1h", + expectedStatus: 400, + }, + // Test 4 - invalid prefix + { + bucket: "mybucket", + prefix: `invalid\\Prefix`, + relTime: "1h", + expectedStatus: 400, + }, + } + + adminRouter := router.NewRouter() + registerAdminRouter(adminRouter) + + for i, test := range testCases { + queryStr := fmt.Sprintf("&bucket=%s&prefix=%s&older-than=%s", test.bucket, test.prefix, test.relTime) + req, err := newTestRequest("POST", "/?lock"+queryStr, 0, nil) + if err != nil { + t.Fatalf("Test %d - Failed to construct clear locks request - %v", i+1, err) + } + req.Header.Set(minioAdminOpHeader, "clear") + + cred := serverConfig.GetCredential() + err = signRequestV4(req, cred.AccessKey, cred.SecretKey) + if err != nil { + t.Fatalf("Test %d - Failed to sign clear locks request - %v", i+1, err) + } + rec := httptest.NewRecorder() + adminRouter.ServeHTTP(rec, req) + if test.expectedStatus != rec.Code { + t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.expectedStatus, rec.Code) + } + } +} + +// Test for lock query param validation helper function. +func TestValidateLockQueryParams(t *testing.T) { + // Sample query values for test cases. + allValidVal := url.Values{} + allValidVal.Set(string(lockBucket), "bucket") + allValidVal.Set(string(lockPrefix), "prefix") + allValidVal.Set(string(lockOlderThan), "1s") + + invalidBucketVal := url.Values{} + invalidBucketVal.Set(string(lockBucket), `invalid\\Bucket`) + invalidBucketVal.Set(string(lockPrefix), "prefix") + invalidBucketVal.Set(string(lockOlderThan), "invalidDuration") + + invalidPrefixVal := url.Values{} + invalidPrefixVal.Set(string(lockBucket), "bucket") + invalidPrefixVal.Set(string(lockPrefix), `invalid\\PRefix`) + invalidPrefixVal.Set(string(lockOlderThan), "invalidDuration") + + invalidOlderThanVal := url.Values{} + invalidOlderThanVal.Set(string(lockBucket), "bucket") + invalidOlderThanVal.Set(string(lockPrefix), "prefix") + invalidOlderThanVal.Set(string(lockOlderThan), "invalidDuration") + + testCases := []struct { + qVals url.Values + apiErr APIErrorCode + }{ + { + qVals: invalidBucketVal, + apiErr: ErrInvalidBucketName, + }, + { + qVals: invalidPrefixVal, + apiErr: ErrInvalidObjectName, + }, + { + qVals: invalidOlderThanVal, + apiErr: ErrInvalidDuration, + }, + { + qVals: allValidVal, + apiErr: ErrNone, + }, + } + + for i, test := range testCases { + _, _, _, apiErr := validateLockQueryParams(test.qVals) + if apiErr != test.apiErr { + t.Errorf("Test %d - Expected error %v but received %v", i+1, test.apiErr, apiErr) + } + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 60b5c3aeb..e74d2edc0 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -29,7 +29,7 @@ func registerAdminRouter(mux *router.Router) { // Admin router adminRouter := mux.NewRoute().PathPrefix("/").Subrouter() - /// Admin operations + /// Service operations // Service status adminRouter.Methods("GET").Queries("service", "").Headers(minioAdminOpHeader, "status").HandlerFunc(adminAPI.ServiceStatusHandler) @@ -37,4 +37,12 @@ func registerAdminRouter(mux *router.Router) { adminRouter.Methods("POST").Queries("service", "").Headers(minioAdminOpHeader, "stop").HandlerFunc(adminAPI.ServiceStopHandler) // Service restart adminRouter.Methods("POST").Queries("service", "").Headers(minioAdminOpHeader, "restart").HandlerFunc(adminAPI.ServiceRestartHandler) + + /// Lock operations + + // List Locks + adminRouter.Methods("GET").Queries("lock", "").Headers(minioAdminOpHeader, "list").HandlerFunc(adminAPI.ListLocksHandler) + + // Clear locks + adminRouter.Methods("POST").Queries("lock", "").Headers(minioAdminOpHeader, "clear").HandlerFunc(adminAPI.ClearLocksHandler) } diff --git a/cmd/admin-rpc-client.go b/cmd/admin-rpc-client.go index 14f65626d..3598b1151 100644 --- a/cmd/admin-rpc-client.go +++ b/cmd/admin-rpc-client.go @@ -20,6 +20,7 @@ import ( "net/url" "path" "sync" + "time" ) // localAdminClient - represents admin operation to be executed locally. @@ -32,11 +33,12 @@ type remoteAdminClient struct { *AuthRPCClient } -// stopRestarter - abstracts stop and restart operations for both -// local and remote execution. -type stopRestarter interface { +// adminCmdRunner - abstracts local and remote execution of admin +// commands like service stop and service restart. +type adminCmdRunner interface { Stop() error Restart() error + ListLocks(bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error) } // Stop - Sends a message over channel to the go-routine responsible @@ -53,24 +55,43 @@ func (lc localAdminClient) Restart() error { return nil } +// ListLocks - Fetches lock information from local lock instrumentation. +func (lc localAdminClient) ListLocks(bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error) { + return listLocksInfo(bucket, prefix, relTime), nil +} + // Stop - Sends stop command to remote server via RPC. func (rc remoteAdminClient) Stop() error { args := AuthRPCArgs{} reply := AuthRPCReply{} - return rc.Call("Service.Shutdown", &args, &reply) + return rc.Call("Admin.Shutdown", &args, &reply) } // Restart - Sends restart command to remote server via RPC. func (rc remoteAdminClient) Restart() error { args := AuthRPCArgs{} reply := AuthRPCReply{} - return rc.Call("Service.Restart", &args, &reply) + return rc.Call("Admin.Restart", &args, &reply) +} + +// ListLocks - Sends list locks command to remote server via RPC. +func (rc remoteAdminClient) ListLocks(bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error) { + listArgs := ListLocksQuery{ + bucket: bucket, + prefix: prefix, + relTime: relTime, + } + var reply ListLocksReply + if err := rc.Call("Admin.ListLocks", &listArgs, &reply); err != nil { + return nil, err + } + return reply.volLocks, nil } // adminPeer - represents an entity that implements Stop and Restart methods. type adminPeer struct { - addr string - svcClnt stopRestarter + addr string + cmdRunner adminCmdRunner } // type alias for a collection of adminPeer. @@ -105,13 +126,13 @@ func makeAdminPeers(eps []*url.URL) adminPeers { secretKey: serverCred.SecretKey, serverAddr: ep.Host, secureConn: isSSL(), - serviceEndpoint: path.Join(reservedBucket, servicePath), - serviceName: "Service", + serviceEndpoint: path.Join(reservedBucket, adminPath), + serviceName: "Admin", } servicePeers = append(servicePeers, adminPeer{ - addr: ep.Host, - svcClnt: &remoteAdminClient{newAuthRPCClient(cfg)}, + addr: ep.Host, + cmdRunner: &remoteAdminClient{newAuthRPCClient(cfg)}, }) seenAddr[ep.Host] = true } @@ -129,9 +150,9 @@ func initGlobalAdminPeers(eps []*url.URL) { func invokeServiceCmd(cp adminPeer, cmd serviceSignal) (err error) { switch cmd { case serviceStop: - err = cp.svcClnt.Stop() + err = cp.cmdRunner.Stop() case serviceRestart: - err = cp.svcClnt.Restart() + err = cp.cmdRunner.Restart() } return err } @@ -147,9 +168,58 @@ func sendServiceCmd(cps adminPeers, cmd serviceSignal) { wg.Add(1) go func(idx int) { defer wg.Done() - errs[idx] = invokeServiceCmd(remotePeers[idx], cmd) + // we use idx+1 because remotePeers slice is 1 position shifted w.r.t cps + errs[idx+1] = invokeServiceCmd(remotePeers[idx], cmd) }(i) } wg.Wait() errs[0] = invokeServiceCmd(cps[0], cmd) } + +func listPeerLocksInfo(peers adminPeers, bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error) { + // Used to aggregate volume lock information from all nodes. + allLocks := make([][]VolumeLockInfo, len(peers)) + errs := make([]error, len(peers)) + var wg sync.WaitGroup + localPeer := peers[0] + remotePeers := peers[1:] + for i, remotePeer := range remotePeers { + wg.Add(1) + go func(idx int, remotePeer adminPeer) { + defer wg.Done() + // `remotePeers` is right-shifted by one position relative to `peers` + allLocks[idx], errs[idx] = remotePeer.cmdRunner.ListLocks(bucket, prefix, relTime) + }(i+1, remotePeer) + } + wg.Wait() + allLocks[0], errs[0] = localPeer.cmdRunner.ListLocks(bucket, prefix, relTime) + + // Summarizing errors received for ListLocks RPC across all + // nodes. N B the possible unavailability of quorum in errors + // applies only to distributed setup. + errCount, err := reduceErrs(errs, []error{}) + if err != nil { + if errCount >= (len(peers)/2 + 1) { + return nil, err + } + return nil, InsufficientReadQuorum{} + } + + // Group lock information across nodes by (bucket, object) + // pair. For readability only. + paramLockMap := make(map[nsParam][]VolumeLockInfo) + for _, nodeLocks := range allLocks { + for _, lockInfo := range nodeLocks { + param := nsParam{ + volume: lockInfo.Bucket, + path: lockInfo.Object, + } + paramLockMap[param] = append(paramLockMap[param], lockInfo) + } + } + groupedLockInfos := []VolumeLockInfo{} + for _, volLocks := range paramLockMap { + groupedLockInfos = append(groupedLockInfos, volLocks...) + } + return groupedLockInfos, nil +} diff --git a/cmd/admin-rpc-server.go b/cmd/admin-rpc-server.go index 62751d114..0980cef11 100644 --- a/cmd/admin-rpc-server.go +++ b/cmd/admin-rpc-server.go @@ -18,20 +18,35 @@ package cmd import ( "net/rpc" + "time" router "github.com/gorilla/mux" ) -const servicePath = "/admin/service" +const adminPath = "/admin" -// serviceCmd - exports RPC methods for service status, stop and +// adminCmd - exports RPC methods for service status, stop and // restart commands. -type serviceCmd struct { +type adminCmd struct { AuthRPCServer } +// ListLocksQuery - wraps ListLocks API's query values to send over RPC. +type ListLocksQuery struct { + AuthRPCArgs + bucket string + prefix string + relTime time.Duration +} + +// ListLocksReply - wraps ListLocks response over RPC. +type ListLocksReply struct { + AuthRPCReply + volLocks []VolumeLockInfo +} + // Shutdown - Shutdown this instance of minio server. -func (s *serviceCmd) Shutdown(args *AuthRPCArgs, reply *AuthRPCReply) error { +func (s *adminCmd) Shutdown(args *AuthRPCArgs, reply *AuthRPCReply) error { if err := args.IsAuthenticated(); err != nil { return err } @@ -41,7 +56,7 @@ func (s *serviceCmd) Shutdown(args *AuthRPCArgs, reply *AuthRPCReply) error { } // Restart - Restart this instance of minio server. -func (s *serviceCmd) Restart(args *AuthRPCArgs, reply *AuthRPCReply) error { +func (s *adminCmd) Restart(args *AuthRPCArgs, reply *AuthRPCReply) error { if err := args.IsAuthenticated(); err != nil { return err } @@ -50,16 +65,23 @@ func (s *serviceCmd) Restart(args *AuthRPCArgs, reply *AuthRPCReply) error { return nil } +// ListLocks - lists locks held by requests handled by this server instance. +func (s *adminCmd) ListLocks(query *ListLocksQuery, reply *ListLocksReply) error { + volLocks := listLocksInfo(query.bucket, query.prefix, query.relTime) + *reply = ListLocksReply{volLocks: volLocks} + return nil +} + // registerAdminRPCRouter - registers RPC methods for service status, // stop and restart commands. func registerAdminRPCRouter(mux *router.Router) error { - adminRPCHandler := &serviceCmd{} + adminRPCHandler := &adminCmd{} adminRPCServer := rpc.NewServer() - err := adminRPCServer.RegisterName("Service", adminRPCHandler) + err := adminRPCServer.RegisterName("Admin", adminRPCHandler) if err != nil { return traceError(err) } adminRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() - adminRouter.Path(servicePath).Handler(adminRPCServer) + adminRouter.Path(adminPath).Handler(adminRPCServer) return nil } diff --git a/cmd/admin-rpc-server_test.go b/cmd/admin-rpc-server_test.go index 1ebc24568..6a4a36603 100644 --- a/cmd/admin-rpc-server_test.go +++ b/cmd/admin-rpc-server_test.go @@ -28,7 +28,7 @@ func testAdminCmd(cmd cmdType, t *testing.T) { } defer removeAll(rootPath) - adminServer := serviceCmd{} + adminServer := adminCmd{} creds := serverConfig.GetCredential() args := LoginRPCArgs{ Username: creds.AccessKey, diff --git a/cmd/api-errors.go b/cmd/api-errors.go index a3958d250..a23a7e7d6 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -110,6 +110,7 @@ const ( ErrInvalidQuerySignatureAlgo ErrInvalidQueryParams ErrBucketAlreadyOwnedByYou + ErrInvalidDuration // Add new error codes here. // Bucket notification related errors. @@ -477,6 +478,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{ Description: "Your previous request to create the named bucket succeeded and you already own it.", HTTPStatusCode: http.StatusConflict, }, + ErrInvalidDuration: { + Code: "InvalidDuration", + Description: "Relative duration provided in the request is invalid.", + HTTPStatusCode: http.StatusBadRequest, + }, /// Bucket notification related errors. ErrEventNotification: { diff --git a/cmd/lockinfo-handlers.go b/cmd/lockinfo-handlers.go index 50f7429b6..70dd4e76f 100644 --- a/cmd/lockinfo-handlers.go +++ b/cmd/lockinfo-handlers.go @@ -16,7 +16,10 @@ package cmd -import "time" +import ( + "strings" + "time" +) // SystemLockState - Structure to fill the lock state of entire object storage. // That is the total locks held, total calls blocked on locks and state of all the locks for the entire system. @@ -26,7 +29,7 @@ type SystemLockState struct { // be released. TotalBlockedLocks int64 `json:"totalBlockedLocks"` // Count of operations which has successfully acquired the lock but - // hasn't unlocked yet( operation in progress). + // hasn't unlocked yet (operation in progress). TotalAcquiredLocks int64 `json:"totalAcquiredLocks"` LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"` } @@ -64,11 +67,13 @@ func getSystemLockState() (SystemLockState, error) { globalNSMutex.lockMapMutex.Lock() defer globalNSMutex.lockMapMutex.Unlock() - lockState := SystemLockState{} - - lockState.TotalBlockedLocks = globalNSMutex.counters.blocked - lockState.TotalLocks = globalNSMutex.counters.total - lockState.TotalAcquiredLocks = globalNSMutex.counters.granted + // Fetch current time once instead of fetching system time for every lock. + timeNow := time.Now().UTC() + lockState := SystemLockState{ + TotalAcquiredLocks: globalNSMutex.counters.granted, + TotalLocks: globalNSMutex.counters.total, + TotalBlockedLocks: globalNSMutex.counters.blocked, + } for param, debugLock := range globalNSMutex.debugLockMap { volLockInfo := VolumeLockInfo{} @@ -84,10 +89,57 @@ func getSystemLockState() (SystemLockState, error) { LockType: lockInfo.lType, Status: lockInfo.status, Since: lockInfo.since, - Duration: time.Now().UTC().Sub(lockInfo.since), + Duration: timeNow.Sub(lockInfo.since), }) } lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo) } return lockState, nil } + +// listLocksInfo - Fetches locks held on bucket, matching prefix older than relTime. +func listLocksInfo(bucket, prefix string, relTime time.Duration) []VolumeLockInfo { + globalNSMutex.lockMapMutex.Lock() + defer globalNSMutex.lockMapMutex.Unlock() + + // Fetch current time once instead of fetching system time for every lock. + timeNow := time.Now().UTC() + volumeLocks := []VolumeLockInfo{} + + for param, debugLock := range globalNSMutex.debugLockMap { + if param.volume != bucket { + continue + } + // N B empty prefix matches all param.path. + if !strings.HasPrefix(param.path, prefix) { + continue + } + + volLockInfo := VolumeLockInfo{ + Bucket: param.volume, + Object: param.path, + LocksOnObject: debugLock.counters.total, + TotalBlockedLocks: debugLock.counters.blocked, + LocksAcquiredOnObject: debugLock.counters.granted, + } + // Filter locks that are held on bucket, prefix. + for opsID, lockInfo := range debugLock.lockInfo { + elapsed := timeNow.Sub(lockInfo.since) + if elapsed < relTime { + continue + } + // Add locks that are older than relTime. + volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, + OpsLockState{ + OperationID: opsID, + LockSource: lockInfo.lockSource, + LockType: lockInfo.lType, + Status: lockInfo.status, + Since: lockInfo.since, + Duration: elapsed, + }) + volumeLocks = append(volumeLocks, volLockInfo) + } + } + return volumeLocks +} diff --git a/cmd/lockinfo-handlers_test.go b/cmd/lockinfo-handlers_test.go new file mode 100644 index 000000000..78c969151 --- /dev/null +++ b/cmd/lockinfo-handlers_test.go @@ -0,0 +1,81 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "testing" + "time" +) + +// TestListLocksInfo - Test for listLocksInfo. +func TestListLocksInfo(t *testing.T) { + // Initialize globalNSMutex to validate listing of lock + // instrumentation information. + isDistXL := false + initNSLock(isDistXL) + + // Acquire a few locks to populate lock instrumentation. + // Take 10 read locks on bucket1/prefix1/obj1 + for i := 0; i < 10; i++ { + readLk := globalNSMutex.NewNSLock("bucket1", "prefix1/obj1") + readLk.RLock() + } + + // Take write locks on bucket1/prefix/obj{11..19} + for i := 0; i < 10; i++ { + wrLk := globalNSMutex.NewNSLock("bucket1", fmt.Sprintf("prefix1/obj%d", 10+i)) + wrLk.Lock() + } + + testCases := []struct { + bucket string + prefix string + relTime time.Duration + numLocks int + }{ + // Test 1 - Matches all the locks acquired above. + { + bucket: "bucket1", + prefix: "prefix1", + relTime: time.Duration(0 * time.Second), + numLocks: 20, + }, + // Test 2 - Bucket doesn't match. + { + bucket: "bucket", + prefix: "prefix1", + relTime: time.Duration(0 * time.Second), + numLocks: 0, + }, + // Test 3 - Prefix doesn't match. + { + bucket: "bucket1", + prefix: "prefix11", + relTime: time.Duration(0 * time.Second), + numLocks: 0, + }, + } + + for i, test := range testCases { + actual := listLocksInfo(test.bucket, test.prefix, test.relTime) + if len(actual) != test.numLocks { + t.Errorf("Test %d - Expected %d locks but observed %d locks", + i+1, test.numLocks, len(actual)) + } + } +} diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index f953a73b9..d07a37c6c 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -49,7 +49,7 @@ var isIPAddress = regexp.MustCompile(`^(\d+\.){3}\d+$`) // See: http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html func IsValidBucketName(bucket string) bool { // Special case when bucket is equal to 'metaBucket'. - if bucket == minioMetaBucket { + if bucket == minioMetaBucket || bucket == minioMetaMultipartBucket { return true } if len(bucket) < 3 || len(bucket) > 63 { diff --git a/docs/admin-api/management-api.md b/docs/admin-api/management-api.md new file mode 100644 index 000000000..2655c1698 --- /dev/null +++ b/docs/admin-api/management-api.md @@ -0,0 +1,82 @@ +# Management REST API + +## Authentication +- AWS signatureV4 +- We use "minio" as region. Here region is set only for signature calculation. + +## List of management APIs +- Service + - Stop + - Restart + - Status + +- Locks + - List + - Clear + +- Healing + +### Service Management APIs +* Stop + - POST /?service + - x-minio-operation: stop + - Response: On success 200 + +* Restart + - POST /?service + - x-minio-operation: restart + - Response: On success 200 + +* Status + - GET /?service + - x-minio-operation: status + - Response: On success 200, return json formatted StorageInfo object. + +### Lock Management APIs +* ListLocks + - GET /?lock&bucket=mybucket&prefix=myprefix&older-than=rel_time + - x-minio-operation: list + - Response: On success 200, json encoded response containing all locks held, older than rel_time. e.g, older than 3 hours. + - Possible error responses + - ErrInvalidBucketName + + InvalidBucketName + The specified bucket is not valid. + + + / + 3L137 + 3L137 + + + - ErrInvalidObjectName + + XMinioInvalidObjectName + Object name contains unsupported characters. Unsupported characters are `^*|\" + + + / + 3L137 + 3L137 + + + - ErrInvalidDuration + + InvalidDuration + Relative duration provided in the request is invalid. + + + / + 3L137 + 3L137 + + + +* ClearLocks + - POST /?lock&bucket=mybucket&prefix=myprefix&older-than=rel_time + - x-minio-operation: clear + - Response: On success 200, json encoded response containing all locks cleared, older than rel_time. e.g, older than 3 hours. + - Possible error responses, similar to errors listed in ListLocks. + - ErrInvalidBucketName + - ErrInvalidObjectName + - ErrInvalidDuration diff --git a/docs/admin-api/service.md b/docs/admin-api/service.md deleted file mode 100644 index bc1e580ab..000000000 --- a/docs/admin-api/service.md +++ /dev/null @@ -1,33 +0,0 @@ -# Service REST API - -## Authentication -- AWS signatureV4 -- We use "minio" as region. Here region is set only for signature calculation. - -## List of management APIs -- Service - - Stop - - Restart - - Status - -- Locks - - List - - Clear - -- Healing - -### Service Management APIs -* Stop - - POST /?service - - x-minio-operation: stop - - Response: On success 200 - -* Restart - - POST /?service - - x-minio-operation: restart - - Response: On success 200 - -* Status - - GET /?service - - x-minio-operation: status - - Response: On success 200, return json formatted StorageInfo object. diff --git a/pkg/madmin/examples/lock-clear.go b/pkg/madmin/examples/lock-clear.go new file mode 100644 index 000000000..2f7a30de4 --- /dev/null +++ b/pkg/madmin/examples/lock-clear.go @@ -0,0 +1,47 @@ +// +build ignore + +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "log" + "time" + + "github.com/minio/minio/pkg/madmin" +) + +func main() { + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are + // dummy values, please replace them with original values. + + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. + // New returns an Minio Admin client object. + madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) + if err != nil { + log.Fatalln(err) + } + + // Clear locks held on mybucket/myprefix older than olderThan seconds. + olderThan := time.Duration(30 * time.Second) + locksCleared, err := madmClnt.ClearLocks("mybucket", "myprefix", olderThan) + if err != nil { + log.Fatalln(err) + } + log.Println(locksCleared) +} diff --git a/pkg/madmin/examples/lock-list.go b/pkg/madmin/examples/lock-list.go new file mode 100644 index 000000000..bc1e9ff08 --- /dev/null +++ b/pkg/madmin/examples/lock-list.go @@ -0,0 +1,46 @@ +// +build ignore + +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "log" + "time" + + "github.com/minio/minio/pkg/madmin" +) + +func main() { + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are + // dummy values, please replace them with original values. + + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. + // New returns an Minio Admin client object. + madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) + if err != nil { + log.Fatalln(err) + } + + // List locks held on mybucket/myprefix older than 30s. + locksHeld, err := madmClnt.ListLocks("mybucket", "myprefix", time.Duration(30*time.Second)) + if err != nil { + log.Fatalln(err) + } + log.Println(locksHeld) +} diff --git a/pkg/madmin/examples/service-restart.go b/pkg/madmin/examples/service-restart.go index 26d991488..ba76e1321 100644 --- a/pkg/madmin/examples/service-restart.go +++ b/pkg/madmin/examples/service-restart.go @@ -29,9 +29,7 @@ func main() { // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are // dummy values, please replace them with original values. - // Requests are always secure (HTTPS) by default. Set secure=false to enable insecure (HTTP) access. - // This boolean value is the last argument for New(). - + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. // New returns an Minio Admin client object. madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) if err != nil { diff --git a/pkg/madmin/examples/service-status.go b/pkg/madmin/examples/service-status.go index fbee6fb22..dd8f27faa 100644 --- a/pkg/madmin/examples/service-status.go +++ b/pkg/madmin/examples/service-status.go @@ -29,9 +29,7 @@ func main() { // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY and my-bucketname are // dummy values, please replace them with original values. - // Requests are always secure (HTTPS) by default. Set secure=false to enable insecure (HTTP) access. - // This boolean value is the last argument for New(). - + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. // New returns an Minio Admin client object. madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) if err != nil { diff --git a/pkg/madmin/examples/service-stop.go b/pkg/madmin/examples/service-stop.go index 056140774..05b446eb2 100644 --- a/pkg/madmin/examples/service-stop.go +++ b/pkg/madmin/examples/service-stop.go @@ -29,9 +29,7 @@ func main() { // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY and my-bucketname are // dummy values, please replace them with original values. - // Requests are always secure (HTTPS) by default. Set secure=false to enable insecure (HTTP) access. - // This boolean value is the last argument for New(). - + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. // New returns an Minio Admin client object. madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) if err != nil { diff --git a/pkg/madmin/lock-commands.go b/pkg/madmin/lock-commands.go new file mode 100644 index 000000000..1bae3a8df --- /dev/null +++ b/pkg/madmin/lock-commands.go @@ -0,0 +1,151 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package madmin + +import ( + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + "net/url" + "time" +) + +type statusType string + +const ( + runningStatus statusType = "Running" + blockedStatus statusType = "Blocked" +) + +type lockType string + +const ( + debugRLockStr lockType = "RLock" + debugWLockStr lockType = "WLock" +) + +// OpsLockState - represents lock specific details. +type OpsLockState struct { + OperationID string `json:"opsID"` // String containing operation ID. + LockSource string `json:"lockSource"` // Operation type (GetObject, PutObject...) + LockType lockType `json:"lockType"` // Lock type (RLock, WLock) + Status statusType `json:"status"` // Status can be Running/Ready/Blocked. + Since time.Time `json:"statusSince"` // Time when the lock was initially held. + Duration time.Duration `json:"statusDuration"` // Duration since the lock was held. +} + +// VolumeLockInfo - represents summary and individual lock details of all +// locks held on a given bucket, object. +type VolumeLockInfo struct { + Bucket string `json:"bucket"` + Object string `json:"object"` + // All locks blocked + running for given pair. + LocksOnObject int64 `json:"locksOnObject"` + // Count of operations which has successfully acquired the lock + // but hasn't unlocked yet( operation in progress). + LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"` + // Count of operations which are blocked waiting for the lock + // to be released. + TotalBlockedLocks int64 `json:"locksBlockedOnObject"` + // State information containing state of the locks for all operations + // on given pair. + LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"` +} + +// getLockInfos - unmarshal []VolumeLockInfo from a reader. +func getLockInfos(body io.Reader) ([]VolumeLockInfo, error) { + respBytes, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + var lockInfos []VolumeLockInfo + + err = json.Unmarshal(respBytes, &lockInfos) + if err != nil { + return nil, err + } + + return lockInfos, nil +} + +// ListLocks - Calls List Locks Management API to fetch locks matching +// bucket, prefix and held before the duration supplied. +func (adm *AdminClient) ListLocks(bucket, prefix string, olderThan time.Duration) ([]VolumeLockInfo, error) { + queryVal := make(url.Values) + queryVal.Set("lock", "") + queryVal.Set("bucket", bucket) + queryVal.Set("prefix", prefix) + queryVal.Set("older-than", olderThan.String()) + + hdrs := make(http.Header) + hdrs.Set(minioAdminOpHeader, "list") + + reqData := requestData{ + queryValues: queryVal, + customHeaders: hdrs, + } + + // Execute GET on /?lock to list locks. + resp, err := adm.executeMethod("GET", reqData) + + defer closeResponse(resp) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("Got HTTP Status: " + resp.Status) + } + + return getLockInfos(resp.Body) +} + +// ClearLocks - Calls Clear Locks Management API to clear locks held +// on bucket, matching prefix older than duration supplied. +func (adm *AdminClient) ClearLocks(bucket, prefix string, olderThan time.Duration) ([]VolumeLockInfo, error) { + queryVal := make(url.Values) + queryVal.Set("lock", "") + queryVal.Set("bucket", bucket) + queryVal.Set("prefix", prefix) + queryVal.Set("older-than", olderThan.String()) + + hdrs := make(http.Header) + hdrs.Set(minioAdminOpHeader, "clear") + + reqData := requestData{ + queryValues: queryVal, + customHeaders: hdrs, + } + + // Execute POST on /?lock to clear locks. + resp, err := adm.executeMethod("POST", reqData) + + defer closeResponse(resp) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("Got HTTP Status: " + resp.Status) + } + + return getLockInfos(resp.Body) +} diff --git a/pkg/madmin/lock-commands_test.go b/pkg/madmin/lock-commands_test.go new file mode 100644 index 000000000..447f64cea --- /dev/null +++ b/pkg/madmin/lock-commands_test.go @@ -0,0 +1,61 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package madmin + +import ( + "bytes" + "encoding/json" + "reflect" + "testing" +) + +// Test for getLockInfos helper function. +func TestGetLockInfos(t *testing.T) { + testCases := []struct { + // Used to construct a io.Reader holding xml serialized lock information + inputLocks []VolumeLockInfo + }{ + // To build a reader with _no_ lock information. + { + inputLocks: []VolumeLockInfo{}, + }, + // To build a reader with _one_ lock information. + { + inputLocks: []VolumeLockInfo{{Bucket: "bucket", Object: "object"}}, + }, + } + for i, test := range testCases { + jsonBytes, err := json.Marshal(test.inputLocks) + if err != nil { + t.Fatalf("Test %d - Failed to marshal input lockInfos - %v", i+1, err) + } + actualLocks, err := getLockInfos(bytes.NewReader(jsonBytes)) + if err != nil { + t.Fatalf("Test %d - Failed to get lock information - %v", i+1, err) + } + if !reflect.DeepEqual(actualLocks, test.inputLocks) { + t.Errorf("Test %d - Expected %v but received %v", i+1, test.inputLocks, actualLocks) + } + } + + // Invalid json representation of []VolumeLockInfo + _, err := getLockInfos(bytes.NewReader([]byte("invalidBytes"))) + if err == nil { + t.Errorf("Test expected to fail, but passed") + } +} diff --git a/pkg/madmin/service.go b/pkg/madmin/service.go index 27027bc04..6b25a018b 100644 --- a/pkg/madmin/service.go +++ b/pkg/madmin/service.go @@ -77,7 +77,7 @@ func (adm *AdminClient) ServiceStatus() (ServiceStatusMetadata, error) { } if resp.StatusCode != http.StatusOK { - return ServiceStatusMetadata{}, errors.New("Got " + resp.Status) + return ServiceStatusMetadata{}, errors.New("Got HTTP Status: " + resp.Status) } respBytes, err := ioutil.ReadAll(resp.Body) @@ -113,7 +113,7 @@ func (adm *AdminClient) ServiceStop() error { } if resp.StatusCode != http.StatusOK { - return errors.New("Got " + resp.Status) + return errors.New("Got HTTP Status: " + resp.Status) } return nil @@ -137,7 +137,7 @@ func (adm *AdminClient) ServiceRestart() error { } if resp.StatusCode != http.StatusOK { - return errors.New("Got " + resp.Status) + return errors.New("Got HTTP Status: " + resp.Status) } return nil }