Remove healing of incomplete multipart uploads (#5390)

Since the server performs automatic clean-up of multipart uploads that
have not been resumed for more than a couple of weeks, it was decided
to remove functionality to heal multipart uploads.
master
Aditya Manthramurthy 7 years ago committed by kannappanr
parent 20584dc08f
commit cd22feecf8
  1. 141
      cmd/admin-handlers.go
  2. 302
      cmd/admin-handlers_test.go
  3. 4
      cmd/admin-router.go
  4. 70
      pkg/madmin/API.md
  5. 254
      pkg/madmin/heal-commands.go

@ -25,7 +25,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"
"sync"
"time"
@ -51,12 +50,8 @@ const (
mgmtLockDuration mgmtQueryKey = "duration"
mgmtDelimiter mgmtQueryKey = "delimiter"
mgmtMarker mgmtQueryKey = "marker"
mgmtKeyMarker mgmtQueryKey = "key-marker"
mgmtMaxKey mgmtQueryKey = "max-key"
mgmtDryRun mgmtQueryKey = "dry-run"
mgmtUploadIDMarker mgmtQueryKey = "upload-id-marker"
mgmtMaxUploads mgmtQueryKey = "max-uploads"
mgmtUploadID mgmtQueryKey = "upload-id"
)
// ServerVersion - server version
@ -427,61 +422,6 @@ func (adminAPI adminAPIHandlers) ClearLocksHandler(w http.ResponseWriter, r *htt
writeSuccessResponseJSON(w, jsonBytes)
}
// ListUploadsHealHandler - similar to listObjectsHealHandler
// GET
// /?heal&bucket=mybucket&prefix=myprefix&key-marker=mymarker&upload-id-marker=myuploadid&delimiter=mydelimiter&max-uploads=1000
// - bucket is mandatory query parameter
// - rest are optional query parameters List upto maxKey objects that
// need healing in a given bucket matching the given prefix.
func (adminAPI adminAPIHandlers) ListUploadsHealHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// Check if this setup has an erasure coded backend.
if !globalIsXL {
writeErrorResponse(w, ErrHealNotImplemented, r.URL)
return
}
// Validate query params.
vars := r.URL.Query()
bucket := vars.Get(string(mgmtBucket))
prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, _ := getBucketMultipartResources(r.URL.Query())
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, objLayer); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
if maxUploads <= 0 || maxUploads > maxUploadsList {
writeErrorResponse(w, ErrInvalidMaxUploads, r.URL)
return
}
// Get the list objects to be healed.
listMultipartInfos, err := objLayer.ListUploadsHeal(bucket, prefix,
keyMarker, uploadIDMarker, delimiter, maxUploads)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
listResponse := generateListMultipartUploadsResponse(bucket, listMultipartInfos)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(listResponse))
}
// extractListObjectsHealQuery - Validates query params for heal objects list management API.
func extractListObjectsHealQuery(vars url.Values) (string, string, string, string, int, APIErrorCode) {
bucket := vars.Get(string(mgmtBucket))
@ -756,87 +696,6 @@ func (adminAPI adminAPIHandlers) HealObjectHandler(w http.ResponseWriter, r *htt
writeSuccessResponseJSON(w, jsonBytes)
}
// HealUploadHandler - POST /?heal&bucket=mybucket&object=myobject&upload-id=myuploadID&dry-run
// - x-minio-operation = upload
// - bucket, object and upload-id are mandatory query parameters
// Heal a given upload, if present.
func (adminAPI adminAPIHandlers) HealUploadHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// Check if this setup has an erasure coded backend.
if !globalIsXL {
writeErrorResponse(w, ErrHealNotImplemented, r.URL)
return
}
vars := r.URL.Query()
bucket := vars.Get(string(mgmtBucket))
object := vars.Get(string(mgmtObject))
uploadID := vars.Get(string(mgmtUploadID))
uploadObj := path.Join(bucket, object, uploadID)
// Validate bucket and object names as supplied via query
// parameters.
if err := checkBucketAndObjectNames(bucket, object); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Validate the bucket and object w.r.t backend representation
// of an upload.
if err := checkBucketAndObjectNames(minioMetaMultipartBucket,
uploadObj); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Check if upload exists.
if _, err := objLayer.GetObjectInfo(minioMetaMultipartBucket,
uploadObj); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// if dry-run is set in query params then perform validations
// and return success.
if isDryRun(vars) {
writeSuccessResponseHeadersOnly(w)
return
}
//We are able to use HealObject for healing an upload since an
//ongoing upload has the same backend representation as an
//object. The 'object' corresponding to a given bucket,
//object and uploadID is
//.minio.sys/multipart/bucket/object/uploadID.
numOfflineDisks, numHealedDisks, err := objLayer.HealObject(minioMetaMultipartBucket, uploadObj)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
jsonBytes, err := json.Marshal(newHealResult(numHealedDisks, numOfflineDisks))
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Return 200 on success.
writeSuccessResponseJSON(w, jsonBytes)
}
// HealFormatHandler - POST /?heal&dry-run
// - x-minio-operation = format
// - bucket and object are both mandatory query parameters

@ -27,10 +27,8 @@ import (
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
"testing"
"time"
router "github.com/gorilla/mux"
"github.com/minio/minio/pkg/auth"
@ -1051,140 +1049,6 @@ func buildAdminRequest(queryVal url.Values, opHdr, method string,
return req, nil
}
// mkHealUploadQuery - helper to build HealUploadHandler query string.
func mkHealUploadQuery(bucket, object, uploadID, dryRun string) url.Values {
queryVal := url.Values{}
queryVal.Set(string(mgmtBucket), bucket)
queryVal.Set(string(mgmtObject), object)
queryVal.Set(string(mgmtUploadID), uploadID)
queryVal.Set("heal", "")
queryVal.Set(string(mgmtDryRun), dryRun)
return queryVal
}
// TestHealUploadHandler - test for HealUploadHandler.
func TestHealUploadHandler(t *testing.T) {
adminTestBed, err := prepareAdminXLTestBed()
if err != nil {
t.Fatal("Failed to initialize a single node XL backend for admin handler tests.")
}
defer adminTestBed.TearDown()
// Create an object myobject under bucket mybucket.
bucketName := "mybucket"
objName := "myobject"
err = adminTestBed.objLayer.MakeBucketWithLocation(bucketName, "")
if err != nil {
t.Fatalf("Failed to make bucket %s - %v", bucketName, err)
}
// Create a new multipart upload.
uploadID, err := adminTestBed.objLayer.NewMultipartUpload(bucketName, objName, nil)
if err != nil {
t.Fatalf("Failed to create a new multipart upload %s/%s - %v",
bucketName, objName, err)
}
// Upload a part.
partID := 1
_, err = adminTestBed.objLayer.PutObjectPart(bucketName, objName, uploadID,
partID, mustGetHashReader(t, bytes.NewReader([]byte("hello")), int64(len("hello")), "", ""))
if err != nil {
t.Fatalf("Failed to upload part %d of %s/%s - %v", partID,
bucketName, objName, err)
}
testCases := []struct {
bucket string
object string
dryrun string
statusCode int
}{
// 1. Valid test case.
{
bucket: bucketName,
object: objName,
statusCode: http.StatusOK,
},
// 2. Invalid bucket name.
{
bucket: `invalid\\Bucket`,
object: "myobject",
statusCode: http.StatusBadRequest,
},
// 3. Bucket not found.
{
bucket: "bucketnotfound",
object: "myobject",
statusCode: http.StatusNotFound,
},
// 4. Invalid object name.
{
bucket: bucketName,
object: `invalid\\Object`,
statusCode: http.StatusBadRequest,
},
// 5. Object not found.
{
bucket: bucketName,
object: "objectnotfound",
statusCode: http.StatusNotFound,
},
// 6. Valid test case with dry-run.
{
bucket: bucketName,
object: objName,
dryrun: "yes",
statusCode: http.StatusOK,
},
}
for i, test := range testCases {
// Prepare query params.
queryVal := mkHealUploadQuery(test.bucket, test.object, uploadID, test.dryrun)
req, err1 := buildAdminRequest(queryVal, "upload", http.MethodPost, 0, nil)
if err1 != nil {
t.Fatalf("Test %d - Failed to construct heal object request - %v", i+1, err1)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
}
sample := testCases[0]
// Modify authorization header after signing to test signature
// mismatch handling.
queryVal := mkHealUploadQuery(sample.bucket, sample.object, uploadID, sample.dryrun)
req, err := buildAdminRequest(queryVal, "upload", "POST", 0, nil)
if err != nil {
t.Fatalf("Failed to construct heal object request - %v", err)
}
// Set x-amz-date to a date different than time of signing.
req.Header.Set("x-amz-date", time.Time{}.Format(iso8601Format))
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusForbidden {
t.Errorf("Expected %d but received %d", http.StatusBadRequest, rec.Code)
}
// Set objectAPI to nil to test Server not initialized case.
resetGlobalObjectAPI()
queryVal = mkHealUploadQuery(sample.bucket, sample.object, uploadID, sample.dryrun)
req, err = buildAdminRequest(queryVal, "upload", "POST", 0, nil)
if err != nil {
t.Fatalf("Failed to construct heal object request - %v", err)
}
rec = httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusServiceUnavailable {
t.Errorf("Expected %d but received %d", http.StatusServiceUnavailable, rec.Code)
}
}
// TestHealFormatHandler - test for HealFormatHandler.
func TestHealFormatHandler(t *testing.T) {
adminTestBed, err := prepareAdminXLTestBed()
@ -1477,172 +1341,6 @@ func TestWriteSetConfigResponse(t *testing.T) {
}
}
// mkListUploadsHealQuery - helper function to construct query values for
// listUploadsHeal.
func mkListUploadsHealQuery(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploadsStr string) url.Values {
queryVal := make(url.Values)
queryVal.Set("heal", "")
queryVal.Set(string(mgmtBucket), bucket)
queryVal.Set(string(mgmtPrefix), prefix)
queryVal.Set(string(mgmtKeyMarker), keyMarker)
queryVal.Set(string(mgmtUploadIDMarker), uploadIDMarker)
queryVal.Set(string(mgmtDelimiter), delimiter)
queryVal.Set(string(mgmtMaxUploads), maxUploadsStr)
return queryVal
}
func TestListHealUploadsHandler(t *testing.T) {
adminTestBed, err := prepareAdminXLTestBed()
if err != nil {
t.Fatal("Failed to initialize a single node XL backend for admin handler tests.")
}
defer adminTestBed.TearDown()
err = adminTestBed.objLayer.MakeBucketWithLocation("mybucket", "")
if err != nil {
t.Fatalf("Failed to make bucket - %v", err)
}
// Delete bucket after running all test cases.
defer adminTestBed.objLayer.DeleteBucket("mybucket")
testCases := []struct {
bucket string
prefix string
keyMarker string
delimiter string
maxKeys string
statusCode int
expectedResp ListMultipartUploadsResponse
}{
// 1. Valid params.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
expectedResp: ListMultipartUploadsResponse{
XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "ListMultipartUploadsResult"},
Bucket: "mybucket",
KeyMarker: "prefix11",
Delimiter: "/",
Prefix: "prefix",
MaxUploads: 10,
},
},
// 2. Valid params with empty prefix.
{
bucket: "mybucket",
prefix: "",
keyMarker: "",
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
expectedResp: ListMultipartUploadsResponse{
XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "ListMultipartUploadsResult"},
Bucket: "mybucket",
KeyMarker: "",
Delimiter: "/",
Prefix: "",
MaxUploads: 10,
},
},
// 3. Invalid params with invalid bucket.
{
bucket: `invalid\\Bucket`,
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrInvalidBucketName).HTTPStatusCode,
},
// 4. Invalid params with invalid prefix.
{
bucket: "mybucket",
prefix: `invalid\\Prefix`,
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrInvalidObjectName).HTTPStatusCode,
},
// 5. Invalid params with invalid maxKeys.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "-1",
statusCode: getAPIError(ErrInvalidMaxUploads).HTTPStatusCode,
},
// 6. Invalid params with unsupported prefix marker combination.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "notmatchingmarker",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode,
},
// 7. Invalid params with unsupported delimiter.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "notmatchingmarker",
delimiter: "unsupported",
maxKeys: "10",
statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode,
},
// 8. Invalid params with invalid max Keys
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "999999999999999999999999999",
statusCode: getAPIError(ErrInvalidMaxUploads).HTTPStatusCode,
},
}
for i, test := range testCases {
queryVal := mkListUploadsHealQuery(test.bucket, test.prefix, test.keyMarker, "", test.delimiter, test.maxKeys)
req, err := buildAdminRequest(queryVal, "list-uploads", http.MethodGet, 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct list uploads needing heal request - %v", i+1, err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
// Compare result with the expected one only when we receive 200 OK
if rec.Code == http.StatusOK {
resp := rec.Result()
xmlBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("Test %d: Failed to read response %v", i+1, err)
}
var actualResult ListMultipartUploadsResponse
err = xml.Unmarshal(xmlBytes, &actualResult)
if err != nil {
t.Errorf("Test %d: Failed to unmarshal xml %v", i+1, err)
}
if !reflect.DeepEqual(test.expectedResp, actualResult) {
t.Fatalf("Test %d: Unexpected response `%+v`, expected: `%+v`", i+1, test.expectedResp, actualResult)
}
}
}
}
// Test for newHealResult helper function.
func TestNewHealResult(t *testing.T) {
testCases := []struct {

@ -53,8 +53,6 @@ func registerAdminRouter(mux *router.Router) {
// List Objects needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-objects").HandlerFunc(adminAPI.ListObjectsHealHandler)
// List Uploads needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-uploads").HandlerFunc(adminAPI.ListUploadsHealHandler)
// List Buckets needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-buckets").HandlerFunc(adminAPI.ListBucketsHealHandler)
@ -64,8 +62,6 @@ func registerAdminRouter(mux *router.Router) {
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "object").HandlerFunc(adminAPI.HealObjectHandler)
// Heal Format.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "format").HandlerFunc(adminAPI.HealFormatHandler)
// Heal Uploads.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "upload").HandlerFunc(adminAPI.HealUploadHandler)
/// Config operations

@ -43,8 +43,6 @@ func main() {
| | |[`HealBucket`](#HealBucket) |||
| | |[`HealObject`](#HealObject)|||
| | |[`HealFormat`](#HealFormat)|||
| | |[`ListUploadsHeal`](#ListUploadsHeal)|||
| | |[`HealUpload`](#HealUpload)|||
## 1. Constructor
<a name="Minio"></a>
@ -315,73 +313,6 @@ __Example__
log.Println("successfully healed storage format on available disks.")
```
<a name="ListUploadsHeal"> </a>
### ListUploadsHeal(bucket, prefix string, recursive bool, doneCh <-chan struct{}) (<-chan UploadInfo, error)
List ongoing multipart uploads that need healing.
| Param | Type | Description |
|---|---|---|
|`ui.Key` | _string_ | Name of the object being uploaded |
|`ui.UploadID` | _string_ | UploadID of the ongoing multipart upload |
|`ui.HealUploadInfo.Status` | _HealStatus_| One of `Healthy`, `CanHeal`, `Corrupted`, `QuorumUnavailable`|
|`ui.Err`| _error_ | non-nil if fetching fetching healing information failed |
__Example__
``` go
// Set true if recursive listing is needed.
isRecursive := true
// List objects that need healing for a given bucket and
// prefix.
healUploadsCh, err := madmClnt.ListUploadsHeal(bucket, prefix, isRecursive, doneCh)
if err != nil {
log.Fatalln("Failed to get list of uploads to be healed: ", err)
}
for upload := range healUploadsCh {
if upload.Err != nil {
log.Println("upload listing error: ", upload.Err)
}
if upload.HealUploadInfo != nil {
switch healInfo := *upload.HealUploadInfo; healInfo.Status {
case madmin.CanHeal:
fmt.Println(upload.Key, " can be healed.")
case madmin.QuorumUnavailable:
fmt.Println(upload.Key, " can't be healed until quorum is available.")
case madmin.Corrupted:
fmt.Println(upload.Key, " can't be healed, not enough information.")
}
}
}
```
<a name="HealUpload"></a>
### HealUpload(bucket, object, uploadID string, isDryRun bool) (HealResult, error)
If upload is successfully healed returns nil, otherwise returns error indicating the reason for failure. If isDryRun is true, then the upload is not healed, but heal upload request is validated by the server. e.g, if the upload exists, if upload name is valid etc.
| Param | Type | Description |
|---|---|---|
|`h.State` | _HealState_ | Represents the result of heal operation. It could be one of `HealNone`, `HealPartial` or `HealOK`. |
| Value | Description |
|---|---|
| `HealNone` | Object/Upload wasn't healed on any of the disks |
| `HealPartial` | Object/Upload was healed on some of the disks needing heal |
| `HealOK` | Object/Upload was healed on all the disks needing heal |
``` go
isDryRun = false
healResult, err := madmClnt.HealUpload("mybucket", "myobject", "myUploadID", isDryRun)
if err != nil {
log.Fatalln(err)
}
log.Println("Heal-upload result: ", healResult)
```
## 6. Config operations
@ -459,4 +390,3 @@ __Example__
log.Println("New credentials successfully set.")
```

@ -29,10 +29,6 @@ import (
"time"
)
const (
maxUploadsList = 1000
)
// listBucketHealResult container for listObjects response.
type listBucketHealResult struct {
// A response can contain CommonPrefixes only if you have
@ -168,63 +164,6 @@ type ObjectInfo struct {
HealObjectInfo *HealObjectInfo `json:"healObjectInfo,omitempty"`
}
// UploadInfo - represents an ongoing upload that needs to be healed.
type UploadInfo struct {
Key string `json:"name"` // Name of the object being uploaded.
UploadID string `json:"uploadId"` // UploadID
// Owner name.
Owner struct {
DisplayName string `json:"name"`
ID string `json:"id"`
} `json:"owner"`
// The class of storage used to store the object.
StorageClass string `json:"storageClass"`
Initiated time.Time `json:"initiated"` // Time at which upload was initiated.
// Error
Err error `json:"-"`
HealUploadInfo *HealObjectInfo `json:"healObjectInfo,omitempty"`
}
// Initiator - has same properties as Owner.
type Initiator Owner
// upload - represents an ongoing multipart upload.
type upload struct {
Key string
UploadID string `xml:"UploadId"`
Initiator Initiator
Owner Owner
StorageClass string
Initiated time.Time
HealUploadInfo *HealObjectInfo `xml:"HealObjectInfo,omitempty"`
}
// listUploadsHealResponse - represents ListUploadsHeal response.
type listUploadsHealResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult" json:"-"`
Bucket string
KeyMarker string
UploadIDMarker string `xml:"UploadIdMarker"`
NextKeyMarker string
NextUploadIDMarker string `xml:"NextUploadIdMarker"`
Delimiter string
Prefix string
EncodingType string `xml:"EncodingType,omitempty"`
MaxUploads int
IsTruncated bool
// List of pending uploads.
Uploads []upload `xml:"Upload"`
// Delimed common prefixes.
CommonPrefixes []commonPrefix
}
type healQueryKey string
const (
@ -235,9 +174,6 @@ const (
healDelimiter healQueryKey = "delimiter"
healMaxKey healQueryKey = "max-key"
healDryRun healQueryKey = "dry-run"
healUploadIDMarker healQueryKey = "upload-id-marker"
healMaxUpload healQueryKey = "max-uploads"
healUploadID healQueryKey = "upload-id"
)
// mkHealQueryVal - helper function to construct heal REST API query params.
@ -439,59 +375,6 @@ func (adm *AdminClient) HealBucket(bucket string, dryrun bool) error {
return nil
}
// HealUpload - Heal the given upload.
func (adm *AdminClient) HealUpload(bucket, object, uploadID string, dryrun bool) (HealResult, error) {
// Construct query params.
queryVal := url.Values{}
queryVal.Set("heal", "")
queryVal.Set(string(healBucket), bucket)
queryVal.Set(string(healObject), object)
queryVal.Set(string(healUploadID), uploadID)
if dryrun {
queryVal.Set(string(healDryRun), "")
}
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "upload")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute POST on
// /?heal&bucket=mybucket&object=myobject&upload-id=uploadID
// to heal an upload.
resp, err := adm.executeMethod("POST", reqData)
defer closeResponse(resp)
if err != nil {
return HealResult{}, err
}
if resp.StatusCode != http.StatusOK {
return HealResult{}, httpRespToErrorResponse(resp)
}
// Healing is not performed so heal object result is empty.
if dryrun {
return HealResult{}, nil
}
jsonBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return HealResult{}, err
}
healResult := HealResult{}
err = json.Unmarshal(jsonBytes, &healResult)
if err != nil {
return HealResult{}, err
}
return healResult, nil
}
// HealResult - represents result of heal-object admin API.
type HealResult struct {
State HealState `json:"state"`
@ -590,140 +473,3 @@ func (adm *AdminClient) HealFormat(dryrun bool) error {
return nil
}
// mkUploadsHealQuery - helper function to construct query params for
// ListUploadsHeal API.
func mkUploadsHealQuery(bucket, prefix, marker, uploadIDMarker, delimiter, maxUploadsStr string) url.Values {
queryVal := make(url.Values)
queryVal.Set("heal", "")
queryVal.Set(string(healBucket), bucket)
queryVal.Set(string(healPrefix), prefix)
queryVal.Set(string(healMarker), marker)
queryVal.Set(string(healUploadIDMarker), uploadIDMarker)
queryVal.Set(string(healDelimiter), delimiter)
queryVal.Set(string(healMaxUpload), maxUploadsStr)
return queryVal
}
func (adm *AdminClient) listUploadsHeal(bucket, prefix, marker, uploadIDMarker, delimiter string, maxUploads int) (listUploadsHealResponse, error) {
// Construct query params.
maxUploadsStr := fmt.Sprintf("%d", maxUploads)
queryVal := mkUploadsHealQuery(bucket, prefix, marker, uploadIDMarker, delimiter, maxUploadsStr)
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "list-uploads")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Empty 'list' of objects to be healed.
toBeHealedUploads := listUploadsHealResponse{}
// Execute GET on /?heal to list objects needing heal.
resp, err := adm.executeMethod("GET", reqData)
defer closeResponse(resp)
if err != nil {
return listUploadsHealResponse{}, err
}
if resp.StatusCode != http.StatusOK {
return toBeHealedUploads, httpRespToErrorResponse(resp)
}
err = xml.NewDecoder(resp.Body).Decode(&toBeHealedUploads)
if err != nil {
return listUploadsHealResponse{}, err
}
return toBeHealedUploads, nil
}
// ListUploadsHeal - issues list heal uploads API request
func (adm *AdminClient) ListUploadsHeal(bucket, prefix string, recursive bool,
doneCh <-chan struct{}) (<-chan UploadInfo, error) {
// Default listing is delimited at "/"
delimiter := "/"
if recursive {
// If recursive we do not delimit.
delimiter = ""
}
uploadIDMarker := ""
// Allocate new list objects channel.
uploadStatCh := make(chan UploadInfo, maxUploadsList)
// Initiate list objects goroutine here.
go func(uploadStatCh chan<- UploadInfo) {
defer close(uploadStatCh)
// Save marker for next request.
var marker string
for {
// Get list of objects a maximum of 1000 per request.
result, err := adm.listUploadsHeal(bucket, prefix, marker,
uploadIDMarker, delimiter, maxUploadsList)
if err != nil {
uploadStatCh <- UploadInfo{
Err: err,
}
return
}
// If contents are available loop through and
// send over channel.
for _, upload := range result.Uploads {
select {
// Send upload info.
case uploadStatCh <- UploadInfo{
Key: upload.Key,
UploadID: upload.UploadID,
Initiated: upload.Initiated,
HealUploadInfo: upload.HealUploadInfo,
}:
// If receives done from the caller, return here.
case <-doneCh:
return
}
}
// Send all common prefixes if any. NOTE:
// prefixes are only present if the request is
// delimited.
for _, prefix := range result.CommonPrefixes {
upload := UploadInfo{}
upload.Key = prefix.Prefix
select {
// Send object prefixes.
case uploadStatCh <- upload:
// If receives done from the caller, return here.
case <-doneCh:
return
}
}
// If next uploadID marker is present, set it
// for the next request.
if result.NextUploadIDMarker != "" {
uploadIDMarker = result.NextUploadIDMarker
}
// If next marker present, save it for next request.
if result.KeyMarker != "" {
marker = result.KeyMarker
}
// Listing ends result is not truncated,
// return right here.
if !result.IsTruncated {
return
}
}
}(uploadStatCh)
return uploadStatCh, nil
}

Loading…
Cancel
Save