diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index c5231ba68..d549d81b9 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -36,16 +36,19 @@ const ( // Type-safe query params. type mgmtQueryKey string -// Only valid query params for list/clear locks management APIs. +// Only valid query params for mgmt admin APIs. const ( - mgmtBucket mgmtQueryKey = "bucket" - mgmtObject mgmtQueryKey = "object" - mgmtPrefix mgmtQueryKey = "prefix" - mgmtLockDuration mgmtQueryKey = "duration" - mgmtDelimiter mgmtQueryKey = "delimiter" - mgmtMarker mgmtQueryKey = "marker" - mgmtMaxKey mgmtQueryKey = "max-key" - mgmtDryRun mgmtQueryKey = "dry-run" + mgmtBucket mgmtQueryKey = "bucket" + mgmtObject mgmtQueryKey = "object" + mgmtPrefix mgmtQueryKey = "prefix" + mgmtLockDuration mgmtQueryKey = "duration" + mgmtDelimiter mgmtQueryKey = "delimiter" + mgmtMarker mgmtQueryKey = "marker" + mgmtKeyMarker mgmtQueryKey = "key-marker" + mgmtMaxKey mgmtQueryKey = "max-key" + mgmtDryRun mgmtQueryKey = "dry-run" + mgmtUploadIDMarker mgmtQueryKey = "upload-id-marker" + mgmtMaxUploads mgmtQueryKey = "max-uploads" ) // ServerVersion - server version @@ -400,8 +403,57 @@ func (adminAPI adminAPIHandlers) ClearLocksHandler(w http.ResponseWriter, r *htt writeSuccessResponseJSON(w, jsonBytes) } -// validateHealQueryParams - Validates query params for heal list management API. -func validateHealQueryParams(vars url.Values) (string, string, string, string, int, APIErrorCode) { +// ListUploadsHealHandler - similar to listObjectsHealHandler +// GET +// /?heal&bucket=mybucket&prefix=myprefix&key-marker=mymarker&upload-id-marker=myuploadid&delimiter=mydelimiter&max-uploads=1000 +// - bucket is mandatory query parameter +// - rest are optional query parameters List upto maxKey objects that +// need healing in a given bucket matching the given prefix. +func (adminAPI adminAPIHandlers) ListUploadsHealHandler(w http.ResponseWriter, r *http.Request) { + // Get object layer instance. + objLayer := newObjectLayerFn() + if objLayer == nil { + writeErrorResponse(w, ErrServerNotInitialized, r.URL) + return + } + + // Validate request signature. + adminAPIErr := checkRequestAuthType(r, "", "", "") + if adminAPIErr != ErrNone { + writeErrorResponse(w, adminAPIErr, r.URL) + return + } + + // Validate query params. + vars := r.URL.Query() + bucket := vars.Get(string(mgmtBucket)) + prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, _ := getBucketMultipartResources(r.URL.Query()) + + if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, objLayer); err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + if maxUploads <= 0 || maxUploads > maxUploadsList { + writeErrorResponse(w, ErrInvalidMaxUploads, r.URL) + return + } + + // Get the list objects to be healed. + listMultipartInfos, err := objLayer.ListUploadsHeal(bucket, prefix, + keyMarker, uploadIDMarker, delimiter, maxUploads) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + listResponse := generateListMultipartUploadsResponse(bucket, listMultipartInfos) + // Write success response. + writeSuccessResponseXML(w, encodeResponse(listResponse)) +} + +// extractListObjectsHealQuery - Validates query params for heal objects list management API. +func extractListObjectsHealQuery(vars url.Values) (string, string, string, string, int, APIErrorCode) { bucket := vars.Get(string(mgmtBucket)) prefix := vars.Get(string(mgmtPrefix)) marker := vars.Get(string(mgmtMarker)) @@ -418,10 +470,13 @@ func validateHealQueryParams(vars url.Values) (string, string, string, string, i return "", "", "", "", 0, ErrInvalidObjectName } - // check if maxKey is a valid integer. - maxKey, err := strconv.Atoi(maxKeyStr) - if err != nil { - return "", "", "", "", 0, ErrInvalidMaxKeys + // check if maxKey is a valid integer, if present. + var maxKey int + var err error + if maxKeyStr != "" { + if maxKey, err = strconv.Atoi(maxKeyStr); err != nil { + return "", "", "", "", 0, ErrInvalidMaxKeys + } } // Validate prefix, marker, delimiter and maxKey. @@ -454,7 +509,7 @@ func (adminAPI adminAPIHandlers) ListObjectsHealHandler(w http.ResponseWriter, r // Validate query params. vars := r.URL.Query() - bucket, prefix, marker, delimiter, maxKey, adminAPIErr := validateHealQueryParams(vars) + bucket, prefix, marker, delimiter, maxKey, adminAPIErr := extractListObjectsHealQuery(vars) if adminAPIErr != ErrNone { writeErrorResponse(w, adminAPIErr, r.URL) return diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index b3128cfc8..187c7c0a6 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -740,7 +740,7 @@ func TestValidateHealQueryParams(t *testing.T) { } for i, test := range testCases { vars := mkListObjectsQueryVal(test.bucket, test.prefix, test.marker, test.delimiter, test.maxKeys) - _, _, _, _, _, actualErr := validateHealQueryParams(vars) + _, _, _, _, _, actualErr := extractListObjectsHealQuery(vars) if actualErr != test.apiErr { t.Errorf("Test %d - Expected %v but received %v", i+1, getAPIError(test.apiErr), getAPIError(actualErr)) @@ -856,9 +856,6 @@ func TestListObjectsHealHandler(t *testing.T) { } for i, test := range testCases { - if i != 0 { - continue - } queryVal := mkListObjectsQueryVal(test.bucket, test.prefix, test.marker, test.delimiter, test.maxKeys) req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil) if err != nil { @@ -1290,3 +1287,137 @@ func TestWriteSetConfigResponse(t *testing.T) { } } } + +// mkUploadsHealQuery - helper function to construct query values for +// listUploadsHeal. +func mkUploadsHealQuery(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploadsStr string) url.Values { + + queryVal := make(url.Values) + queryVal.Set("heal", "") + queryVal.Set(string(mgmtBucket), bucket) + queryVal.Set(string(mgmtPrefix), prefix) + queryVal.Set(string(mgmtKeyMarker), keyMarker) + queryVal.Set(string(mgmtUploadIDMarker), uploadIDMarker) + queryVal.Set(string(mgmtDelimiter), delimiter) + queryVal.Set(string(mgmtMaxUploads), maxUploadsStr) + return queryVal +} + +func TestListHealUploadsHandler(t *testing.T) { + adminTestBed, err := prepareAdminXLTestBed() + if err != nil { + t.Fatal("Failed to initialize a single node XL backend for admin handler tests.") + } + defer adminTestBed.TearDown() + + err = adminTestBed.objLayer.MakeBucket("mybucket") + if err != nil { + t.Fatalf("Failed to make bucket - %v", err) + } + + // Delete bucket after running all test cases. + defer adminTestBed.objLayer.DeleteBucket("mybucket") + + testCases := []struct { + bucket string + prefix string + keyMarker string + delimiter string + maxKeys string + statusCode int + }{ + // 1. Valid params. + { + bucket: "mybucket", + prefix: "prefix", + keyMarker: "prefix11", + delimiter: "/", + maxKeys: "10", + statusCode: http.StatusOK, + }, + // 2. Valid params with empty prefix. + { + bucket: "mybucket", + prefix: "", + keyMarker: "", + delimiter: "/", + maxKeys: "10", + statusCode: http.StatusOK, + }, + // 3. Invalid params with invalid bucket. + { + bucket: `invalid\\Bucket`, + prefix: "prefix", + keyMarker: "prefix11", + delimiter: "/", + maxKeys: "10", + statusCode: getAPIError(ErrInvalidBucketName).HTTPStatusCode, + }, + // 4. Invalid params with invalid prefix. + { + bucket: "mybucket", + prefix: `invalid\\Prefix`, + keyMarker: "prefix11", + delimiter: "/", + maxKeys: "10", + statusCode: getAPIError(ErrInvalidObjectName).HTTPStatusCode, + }, + // 5. Invalid params with invalid maxKeys. + { + bucket: "mybucket", + prefix: "prefix", + keyMarker: "prefix11", + delimiter: "/", + maxKeys: "-1", + statusCode: getAPIError(ErrInvalidMaxUploads).HTTPStatusCode, + }, + // 6. Invalid params with unsupported prefix marker combination. + { + bucket: "mybucket", + prefix: "prefix", + keyMarker: "notmatchingmarker", + delimiter: "/", + maxKeys: "10", + statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode, + }, + // 7. Invalid params with unsupported delimiter. + { + bucket: "mybucket", + prefix: "prefix", + keyMarker: "notmatchingmarker", + delimiter: "unsupported", + maxKeys: "10", + statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode, + }, + // 8. Invalid params with invalid max Keys + { + bucket: "mybucket", + prefix: "prefix", + keyMarker: "prefix11", + delimiter: "/", + maxKeys: "999999999999999999999999999", + statusCode: getAPIError(ErrInvalidMaxUploads).HTTPStatusCode, + }, + } + + for i, test := range testCases { + queryVal := mkUploadsHealQuery(test.bucket, test.prefix, test.keyMarker, "", test.delimiter, test.maxKeys) + + req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil) + if err != nil { + t.Fatalf("Test %d - Failed to construct list uploads needing heal request - %v", i+1, err) + } + req.Header.Set(minioAdminOpHeader, "list-uploads") + + cred := serverConfig.GetCredential() + err = signRequestV4(req, cred.AccessKey, cred.SecretKey) + if err != nil { + t.Fatalf("Test %d - Failed to sign list uploads needing heal request - %v", i+1, err) + } + rec := httptest.NewRecorder() + adminTestBed.mux.ServeHTTP(rec, req) + if test.statusCode != rec.Code { + t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code) + } + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index fa0b805b2..b65c4eda2 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -53,6 +53,8 @@ func registerAdminRouter(mux *router.Router) { // List Objects needing heal. adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-objects").HandlerFunc(adminAPI.ListObjectsHealHandler) + // List Uploads needing heal. + adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-uploads").HandlerFunc(adminAPI.ListUploadsHealHandler) // List Buckets needing heal. adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-buckets").HandlerFunc(adminAPI.ListBucketsHealHandler) diff --git a/cmd/api-response.go b/cmd/api-response.go index 7f0641bb2..bc95cab60 100644 --- a/cmd/api-response.go +++ b/cmd/api-response.go @@ -166,12 +166,13 @@ type ListBucketsResponse struct { // Upload container for in progress multipart upload type Upload struct { - Key string - UploadID string `xml:"UploadId"` - Initiator Initiator - Owner Owner - StorageClass string - Initiated string + Key string + UploadID string `xml:"UploadId"` + Initiator Initiator + Owner Owner + StorageClass string + Initiated string + HealUploadInfo *HealObjectInfo `xml:"HealObjectInfo,omitempty"` } // CommonPrefix container for prefix response in ListObjectsResponse @@ -488,6 +489,7 @@ func generateListMultipartUploadsResponse(bucket string, multipartsInfo ListMult newUpload.UploadID = upload.UploadID newUpload.Key = upload.Object newUpload.Initiated = upload.Initiated.UTC().Format(timeFormatAMZLong) + newUpload.HealUploadInfo = upload.HealUploadInfo listMultipartUploadsResponse.Uploads[index] = newUpload } return listMultipartUploadsResponse diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index fc1cd2d1b..b9d59a1ac 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -827,3 +827,8 @@ func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, ma func (fs fsObjects) ListBucketsHeal() ([]BucketInfo, error) { return []BucketInfo{}, traceError(NotImplemented{}) } + +func (fs fsObjects) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker, + delimiter string, maxUploads int) (ListMultipartsInfo, error) { + return ListMultipartsInfo{}, traceError(NotImplemented{}) +} diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 307220f4d..17d8d9366 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -248,6 +248,8 @@ type uploadMetadata struct { Initiated time.Time StorageClass string // Not supported yet. + + HealUploadInfo *HealObjectInfo `xml:"HealUploadInfo,omitempty"` } // completePart - completed part container. diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 4938af274..ca60e728c 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -52,4 +52,6 @@ type ObjectLayer interface { ListBucketsHeal() (buckets []BucketInfo, err error) HealObject(bucket, object string) error ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) + ListUploadsHeal(bucket, prefix, marker, uploadIDMarker, + delimiter string, maxUploads int) (ListMultipartsInfo, error) } diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index d48470974..94362d75c 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -17,6 +17,7 @@ package cmd import ( + "path/filepath" "sort" "strings" ) @@ -205,3 +206,219 @@ func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, ma // Return error at the end. return ListObjectsInfo{}, toObjectErr(err, bucket, prefix) } + +// ListUploadsHeal - lists ongoing multipart uploads that require +// healing in one or more disks. +func (xl xlObjects) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker, + delimiter string, maxUploads int) (ListMultipartsInfo, error) { + + // For delimiter and prefix as '/' we do not list anything at all + // since according to s3 spec we stop at the 'delimiter' along + // with the prefix. On a flat namespace with 'prefix' as '/' + // we don't have any entries, since all the keys are of form 'keyName/...' + if delimiter == slashSeparator && prefix == slashSeparator { + return ListMultipartsInfo{}, nil + } + + // Initiate a list operation. + listMultipartInfo, err := xl.listMultipartUploadsHeal(bucket, prefix, + marker, uploadIDMarker, delimiter, maxUploads) + if err != nil { + return ListMultipartsInfo{}, toObjectErr(err, bucket, prefix) + } + + // We got the entries successfully return. + return listMultipartInfo, nil +} + +// Fetches list of multipart uploadIDs given bucket, keyMarker, uploadIDMarker. +func fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker string, + maxUploads int, disks []StorageAPI) (uploads []uploadMetadata, end bool, + err error) { + + // Hold a read lock on keyMarker path. + keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + pathJoin(bucket, keyMarker)) + keyMarkerLock.RLock() + for _, disk := range disks { + if disk == nil { + continue + } + uploads, end, err = listMultipartUploadIDs(bucket, keyMarker, + uploadIDMarker, maxUploads, disk) + if err == nil || + !isErrIgnored(err, objMetadataOpIgnoredErrs...) { + break + } + } + keyMarkerLock.RUnlock() + return uploads, end, err +} + +// listMultipartUploadsHeal - Returns a list of incomplete multipart +// uploads that need to be healed. +func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker, + uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { + + result := ListMultipartsInfo{ + IsTruncated: true, + MaxUploads: maxUploads, + KeyMarker: keyMarker, + Prefix: prefix, + Delimiter: delimiter, + } + + recursive := delimiter != slashSeparator + + var uploads []uploadMetadata + var err error + // List all upload ids for the given keyMarker, starting from + // uploadIDMarker. + if uploadIDMarker != "" { + uploads, _, err = fetchMultipartUploadIDs(bucket, keyMarker, + uploadIDMarker, maxUploads, xl.getLoadBalancedDisks()) + if err != nil { + return ListMultipartsInfo{}, err + } + maxUploads = maxUploads - len(uploads) + } + + // We can't use path.Join() as it strips off the trailing '/'. + multipartPrefixPath := pathJoin(bucket, prefix) + // multipartPrefixPath should have a trailing '/' when prefix = "". + if prefix == "" { + multipartPrefixPath += slashSeparator + } + + multipartMarkerPath := "" + if keyMarker != "" { + multipartMarkerPath = pathJoin(bucket, keyMarker) + } + + // `heal bool` is used to differentiate listing of incomplete + // uploads (and parts) from a regular listing of incomplete + // parts by client SDKs or mc-like commands, within a treewalk + // pool. + heal := true + // The listing is truncated if we have maxUploads entries and + // there are more entries to be listed. + truncated := true + var walkerCh chan treeWalkResult + var walkerDoneCh chan struct{} + // Check if we have room left to send more uploads. + if maxUploads > 0 { + walkerCh, walkerDoneCh = xl.listPool.Release(listParams{ + bucket: minioMetaMultipartBucket, + recursive: recursive, + marker: multipartMarkerPath, + prefix: multipartPrefixPath, + heal: heal, + }) + if walkerCh == nil { + walkerDoneCh = make(chan struct{}) + isLeaf := xl.isMultipartUpload + listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, + xl.getLoadBalancedDisks()...) + walkerCh = startTreeWalk(minioMetaMultipartBucket, + multipartPrefixPath, multipartMarkerPath, + recursive, listDir, isLeaf, walkerDoneCh) + } + // Collect uploads until maxUploads limit is reached. + for walkResult := range walkerCh { + // Ignore errors like errDiskNotFound + // and errFileNotFound. + if isErrIgnored(walkResult.err, + xlTreeWalkIgnoredErrs...) { + continue + } + // For any error during tree walk we should + // return right away. + if walkResult.err != nil { + return ListMultipartsInfo{}, walkResult.err + } + + entry := strings.TrimPrefix(walkResult.entry, + retainSlash(bucket)) + // Skip entries that are not object directory. + if hasSuffix(walkResult.entry, slashSeparator) { + uploads = append(uploads, uploadMetadata{ + Object: entry, + }) + maxUploads-- + if maxUploads == 0 { + break + } + continue + } + + // For an object entry we get all its pending + // uploadIDs. + var newUploads []uploadMetadata + var end bool + uploadIDMarker = "" + newUploads, end, err = fetchMultipartUploadIDs(bucket, entry, uploadIDMarker, + maxUploads, xl.getLoadBalancedDisks()) + if err != nil { + return ListMultipartsInfo{}, err + } + uploads = append(uploads, newUploads...) + maxUploads -= len(newUploads) + if end && walkResult.end { + truncated = false + break + } + if maxUploads == 0 { + break + } + } + } + + // For all received uploads fill in the multiparts result. + for _, upload := range uploads { + var objectName string + var uploadID string + if hasSuffix(upload.Object, slashSeparator) { + // All directory entries are common + // prefixes. For common prefixes, upload ids + // are empty. + uploadID = "" + objectName = upload.Object + result.CommonPrefixes = append(result.CommonPrefixes, objectName) + } else { + // Check if upload needs healing. + uploadIDPath := filepath.Join(bucket, upload.Object, upload.UploadID) + partsMetadata, errs := readAllXLMetadata(xl.storageDisks, + minioMetaMultipartBucket, uploadIDPath) + if xlShouldHeal(partsMetadata, errs) { + healUploadInfo := xlHealStat(xl, partsMetadata, errs) + upload.HealUploadInfo = &healUploadInfo + result.Uploads = append(result.Uploads, upload) + } + uploadID = upload.UploadID + objectName = upload.Object + } + + result.NextKeyMarker = objectName + result.NextUploadIDMarker = uploadID + } + + if truncated { + // Put back the tree walk go-routine into the pool for + // subsequent use. + xl.listPool.Set(listParams{ + bucket: bucket, + recursive: recursive, + marker: result.NextKeyMarker, + prefix: prefix, + heal: heal, + }, walkerCh, walkerDoneCh) + } + + result.IsTruncated = truncated + // Result is not truncated, reset the markers. + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + return result, nil +} diff --git a/cmd/xl-v1-list-objects-heal_test.go b/cmd/xl-v1-list-objects-heal_test.go index fb515e325..af12060b2 100644 --- a/cmd/xl-v1-list-objects-heal_test.go +++ b/cmd/xl-v1-list-objects-heal_test.go @@ -18,6 +18,8 @@ package cmd import ( "bytes" + "path" + "path/filepath" "strconv" "testing" ) @@ -139,3 +141,78 @@ func TestListObjectsHeal(t *testing.T) { } } + +// Test for ListUploadsHeal API for XL. +func TestListUploadsHeal(t *testing.T) { + initNSLock(false) + + rootPath, err := newTestConfig(globalMinioDefaultRegion) + if err != nil { + t.Fatalf("Init Test config failed") + } + // Remove config directory after the test ends. + defer removeAll(rootPath) + + // Create an instance of XL backend. + xl, fsDirs, err := prepareXL() + if err != nil { + t.Fatal(err) + } + // Cleanup backend directories on function return. + defer removeRoots(fsDirs) + + bucketName := "bucket" + prefix := "prefix" + objName := path.Join(prefix, "obj") + + // Create test bucket. + err = xl.MakeBucket(bucketName) + if err != nil { + t.Fatal(err) + } + + // Create a new multipart upload. + uploadID, err := xl.NewMultipartUpload(bucketName, objName, nil) + if err != nil { + t.Fatal(err) + } + + // Upload a part. + data := bytes.Repeat([]byte("a"), 1024) + _, err = xl.PutObjectPart(bucketName, objName, uploadID, 1, + int64(len(data)), bytes.NewReader(data), "", "") + if err != nil { + t.Fatal(err) + } + + // Check if list uploads heal returns any uploads to be healed + // incorrectly. + listUploadsInfo, err := xl.ListUploadsHeal(bucketName, prefix, "", "", "", 1000) + if err != nil { + t.Fatal(err) + } + + // All uploads intact nothing to heal. + if len(listUploadsInfo.Uploads) != 0 { + t.Errorf("Expected no uploads but received %d", len(listUploadsInfo.Uploads)) + } + + // Delete the part from the first disk to make the upload (and + // its part) to appear in upload heal listing. + firstDisk := xl.(*xlObjects).storageDisks[0] + err = firstDisk.DeleteFile(minioMetaMultipartBucket, + filepath.Join(bucketName, objName, uploadID, xlMetaJSONFile)) + if err != nil { + t.Fatal(err) + } + + listUploadsInfo, err = xl.ListUploadsHeal(bucketName, prefix, "", "", "", 1000) + if err != nil { + t.Fatal(err) + } + + // One upload with missing xl.json on first disk. + if len(listUploadsInfo.Uploads) != 1 { + t.Errorf("Expected 1 upload but received %d", len(listUploadsInfo.Uploads)) + } +} diff --git a/pkg/madmin/API.md b/pkg/madmin/API.md index 8a6222bce..7d5ba2a5e 100644 --- a/pkg/madmin/API.md +++ b/pkg/madmin/API.md @@ -43,6 +43,7 @@ func main() { | | |[`HealBucket`](#HealBucket) ||| | | |[`HealObject`](#HealObject)||| | | |[`HealFormat`](#HealFormat)||| +| | |[`ListUploadsHeal`](#ListUploadsHeal)||| ## 1. Constructor @@ -353,3 +354,46 @@ __Example__ } log.Println("SetConfig: ", string(buf.Bytes())) ``` + + +### ListUploadsHeal(bucket, prefix string, recursive bool, doneCh <-chan struct{}) (<-chan UploadInfo, error) +List ongoing multipart uploads that need healing. + +| Param | Type | Description | +|---|---|---| +|`ui.Key` | _string_ | Name of the object being uploaded | +|`ui.UploadID` | _string_ | UploadID of the ongoing multipart upload | +|`ui.HealUploadInfo.Status` | _healStatus_| One of `Healthy`, `CanHeal`, `Corrupted`, `QuorumUnavailable`| +|`ui.Err`| _error_ | non-nil if fetching fetching healing information failed | + +__Example__ + +``` go + + // Set true if recursive listing is needed. + isRecursive := true + // List objects that need healing for a given bucket and + // prefix. + healUploadsCh, err := madmClnt.ListUploadsHeal(bucket, prefix, isRecursive, doneCh) + if err != nil { + log.Fatalln("Failed to get list of uploads to be healed: ", err) + } + + for upload := range healUploadsCh { + if upload.Err != nil { + log.Println("upload listing error: ", upload.Err) + } + + if upload.HealUploadInfo != nil { + switch healInfo := *upload.HealUploadInfo; healInfo.Status { + case madmin.CanHeal: + fmt.Println(upload.Key, " can be healed.") + case madmin.QuorumUnavailable: + fmt.Println(upload.Key, " can't be healed until quorum is available.") + case madmin.Corrupted: + fmt.Println(upload.Key, " can't be healed, not enough information.") + } + } + } + +``` diff --git a/pkg/madmin/examples/heal-uploads-list.go b/pkg/madmin/examples/heal-uploads-list.go new file mode 100644 index 000000000..08da11c02 --- /dev/null +++ b/pkg/madmin/examples/heal-uploads-list.go @@ -0,0 +1,74 @@ +// +build ignore + +package main + +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import ( + "fmt" + "log" + + "github.com/minio/minio/pkg/madmin" +) + +func main() { + + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are + // dummy values, please replace them with original values. + + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. + // New returns an Minio Admin client object. + madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) + if err != nil { + log.Fatalln(err) + } + + bucket := "mybucket" + prefix := "myprefix" + + // Create a done channel to control 'ListUploadsHeal' go routine. + doneCh := make(chan struct{}) + // Indicate to our routine to exit cleanly upon return. + defer close(doneCh) + + // Set true if recursive listing is needed. + isRecursive := true + // List objects that need healing for a given bucket and + // prefix. + healUploadsCh, err := madmClnt.ListUploadsHeal(bucket, prefix, isRecursive, doneCh) + if err != nil { + log.Fatalln("Failed to get list of uploads to be healed: ", err) + } + + for upload := range healUploadsCh { + if upload.Err != nil { + log.Println("upload listing error: ", upload.Err) + } + + if upload.HealUploadInfo != nil { + switch healInfo := *upload.HealUploadInfo; healInfo.Status { + case madmin.CanHeal: + fmt.Println(upload.Key, " can be healed.") + case madmin.QuorumUnavailable: + fmt.Println(upload.Key, " can't be healed until quorum is available.") + case madmin.Corrupted: + fmt.Println(upload.Key, " can't be healed, not enough information.") + } + } + } +} diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index 3dea72ccf..86042157c 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -3,6 +3,8 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. + + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -25,6 +27,10 @@ import ( "time" ) +const ( + maxUploadsList = 1000 +) + // listBucketHealResult container for listObjects response. type listBucketHealResult struct { // A response can contain CommonPrefixes only if you have @@ -156,16 +162,75 @@ type ObjectInfo struct { HealObjectInfo *HealObjectInfo `json:"healObjectInfo,omitempty"` } +// UploadInfo - represents an ongoing upload that needs to be healed. +type UploadInfo struct { + Key string `json:"name"` // Name of the object being uploaded. + + UploadID string `json:"uploadId"` // UploadID + // Owner name. + Owner struct { + DisplayName string `json:"name"` + ID string `json:"id"` + } `json:"owner"` + + // The class of storage used to store the object. + StorageClass string `json:"storageClass"` + + Initiated time.Time `json:"initiated"` // Time at which upload was initiated. + + // Error + Err error `json:"-"` + HealUploadInfo *HealObjectInfo `json:"healObjectInfo,omitempty"` +} + +// Initiator - has same properties as Owner. +type Initiator Owner + +// upload - represents an ongoing multipart upload. +type upload struct { + Key string + UploadID string `xml:"UploadId"` + Initiator Initiator + Owner Owner + StorageClass string + Initiated time.Time + HealUploadInfo *HealObjectInfo `xml:"HealObjectInfo,omitempty"` +} + +// listUploadsHealResponse - represents ListUploadsHeal response. +type listUploadsHealResponse struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult" json:"-"` + + Bucket string + KeyMarker string + UploadIDMarker string `xml:"UploadIdMarker"` + NextKeyMarker string + NextUploadIDMarker string `xml:"NextUploadIdMarker"` + Delimiter string + Prefix string + EncodingType string `xml:"EncodingType,omitempty"` + MaxUploads int + IsTruncated bool + + // List of pending uploads. + Uploads []upload `xml:"Upload"` + + // Delimed common prefixes. + CommonPrefixes []commonPrefix +} + type healQueryKey string const ( - healBucket healQueryKey = "bucket" - healObject healQueryKey = "object" - healPrefix healQueryKey = "prefix" - healMarker healQueryKey = "marker" - healDelimiter healQueryKey = "delimiter" - healMaxKey healQueryKey = "max-key" - healDryRun healQueryKey = "dry-run" + healBucket healQueryKey = "bucket" + healObject healQueryKey = "object" + healPrefix healQueryKey = "prefix" + healMarker healQueryKey = "marker" + healDelimiter healQueryKey = "delimiter" + healMaxKey healQueryKey = "max-key" + healDryRun healQueryKey = "dry-run" + healUploadIDMarker healQueryKey = "upload-id-marker" + healMaxUpload healQueryKey = "max-uploads" ) // mkHealQueryVal - helper function to construct heal REST API query params. @@ -432,3 +497,139 @@ func (adm *AdminClient) HealFormat(dryrun bool) error { return nil } + +// mkUploadsHealQuery - helper function to construct query params for +// ListUploadsHeal API. +func mkUploadsHealQuery(bucket, prefix, marker, uploadIDMarker, delimiter, maxUploadsStr string) url.Values { + + queryVal := make(url.Values) + queryVal.Set("heal", "") + queryVal.Set(string(healBucket), bucket) + queryVal.Set(string(healPrefix), prefix) + queryVal.Set(string(healMarker), marker) + queryVal.Set(string(healUploadIDMarker), uploadIDMarker) + queryVal.Set(string(healDelimiter), delimiter) + queryVal.Set(string(healMaxUpload), maxUploadsStr) + return queryVal +} + +func (adm *AdminClient) listUploadsHeal(bucket, prefix, marker, uploadIDMarker, delimiter string, maxUploads int) (listUploadsHealResponse, error) { + // Construct query params. + maxUploadsStr := fmt.Sprintf("%d", maxUploads) + queryVal := mkUploadsHealQuery(bucket, prefix, marker, uploadIDMarker, delimiter, maxUploadsStr) + + hdrs := make(http.Header) + hdrs.Set(minioAdminOpHeader, "list-uploads") + + reqData := requestData{ + queryValues: queryVal, + customHeaders: hdrs, + } + + // Empty 'list' of objects to be healed. + toBeHealedUploads := listUploadsHealResponse{} + + // Execute GET on /?heal to list objects needing heal. + resp, err := adm.executeMethod("GET", reqData) + + defer closeResponse(resp) + if err != nil { + return listUploadsHealResponse{}, err + } + + if resp.StatusCode != http.StatusOK { + return toBeHealedUploads, httpRespToErrorResponse(resp) + + } + + err = xml.NewDecoder(resp.Body).Decode(&toBeHealedUploads) + if err != nil { + return listUploadsHealResponse{}, err + } + + return toBeHealedUploads, nil +} + +// ListUploadsHeal - issues list heal uploads API request +func (adm *AdminClient) ListUploadsHeal(bucket, prefix string, recursive bool, + doneCh <-chan struct{}) (<-chan UploadInfo, error) { + + // Default listing is delimited at "/" + delimiter := "/" + if recursive { + // If recursive we do not delimit. + delimiter = "" + } + + uploadIDMarker := "" + + // Allocate new list objects channel. + uploadStatCh := make(chan UploadInfo, maxUploadsList) + + // Initiate list objects goroutine here. + go func(uploadStatCh chan<- UploadInfo) { + defer close(uploadStatCh) + // Save marker for next request. + var marker string + for { + // Get list of objects a maximum of 1000 per request. + result, err := adm.listUploadsHeal(bucket, prefix, marker, + uploadIDMarker, delimiter, maxUploadsList) + if err != nil { + uploadStatCh <- UploadInfo{ + Err: err, + } + return + } + + // If contents are available loop through and + // send over channel. + for _, upload := range result.Uploads { + select { + // Send upload info. + case uploadStatCh <- UploadInfo{ + Key: upload.Key, + Initiated: upload.Initiated, + HealUploadInfo: upload.HealUploadInfo, + }: + // If receives done from the caller, return here. + case <-doneCh: + return + } + } + + // Send all common prefixes if any. NOTE: + // prefixes are only present if the request is + // delimited. + for _, prefix := range result.CommonPrefixes { + upload := UploadInfo{} + upload.Key = prefix.Prefix + select { + // Send object prefixes. + case uploadStatCh <- upload: + // If receives done from the caller, return here. + case <-doneCh: + return + } + } + + // If next uploadID marker is present, set it + // for the next request. + if result.NextUploadIDMarker != "" { + uploadIDMarker = result.NextUploadIDMarker + } + + // If next marker present, save it for next request. + if result.KeyMarker != "" { + marker = result.KeyMarker + } + + // Listing ends result is not truncated, + // return right here. + if !result.IsTruncated { + return + } + } + }(uploadStatCh) + return uploadStatCh, nil +}