diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index e77a787d1..6988a4850 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -53,9 +53,10 @@ type mgmtQueryKey string // Only valid query params for mgmt admin APIs. const ( mgmtBucket mgmtQueryKey = "bucket" - mgmtPrefix mgmtQueryKey = "prefix" - mgmtClientToken mgmtQueryKey = "clientToken" - mgmtForceStart mgmtQueryKey = "forceStart" + mgmtPrefix = "prefix" + mgmtClientToken = "clientToken" + mgmtForceStart = "forceStart" + mgmtForceStop = "forceStop" ) var ( @@ -402,7 +403,7 @@ func (a adminAPIHandlers) DownloadProfilingHandler(w http.ResponseWriter, r *htt // extractHealInitParams - Validates params for heal init API. func extractHealInitParams(r *http.Request) (bucket, objPrefix string, - hs madmin.HealOpts, clientToken string, forceStart bool, + hs madmin.HealOpts, clientToken string, forceStart bool, forceStop bool, err APIErrorCode) { vars := mux.Vars(r) @@ -433,7 +434,9 @@ func extractHealInitParams(r *http.Request) (bucket, objPrefix string, if _, ok := qParms[string(mgmtForceStart)]; ok { forceStart = true } - + if _, ok := qParms[string(mgmtForceStop)]; ok { + forceStop = true + } // ignore body if clientToken is provided if clientToken == "" { jerr := json.NewDecoder(r.Body).Decode(&hs) @@ -484,7 +487,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { return } - bucket, objPrefix, hs, clientToken, forceStart, apiErr := extractHealInitParams(r) + bucket, objPrefix, hs, clientToken, forceStart, forceStop, apiErr := extractHealInitParams(r) if apiErr != ErrNone { writeErrorResponseJSON(w, apiErr, r.URL) return @@ -518,13 +521,35 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte("\n\r")) w.(http.Flusher).Flush() case hr := <-respCh: - switch { - case hr.errCode == ErrNone: - writeSuccessResponseJSON(w, hr.respBytes) - case hr.errBody == "": - writeErrorResponseJSON(w, hr.errCode, r.URL) + switch hr.errCode { + case ErrNone: + if started { + w.Write(hr.respBytes) + w.(http.Flusher).Flush() + } else { + writeSuccessResponseJSON(w, hr.respBytes) + } default: - writeCustomErrorResponseJSON(w, hr.errCode, hr.errBody, r.URL) + apiError := getAPIError(hr.errCode) + var errorRespJSON []byte + if hr.errBody == "" { + errorRespJSON = encodeResponseJSON(getAPIErrorResponse(apiError, r.URL.Path, w.Header().Get(responseRequestIDKey))) + } else { + errorRespJSON = encodeResponseJSON(APIErrorResponse{ + Code: apiError.Code, + Message: hr.errBody, + Resource: r.URL.Path, + RequestID: w.Header().Get(responseRequestIDKey), + HostID: "3L137", + }) + } + if !started { + setCommonHeaders(w) + w.Header().Set("Content-Type", string(mimeJSON)) + w.WriteHeader(apiError.HTTPStatusCode) + } + w.Write(errorRespJSON) + w.(http.Flusher).Flush() } break forLoop } @@ -535,34 +560,61 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { info := objLayer.StorageInfo(ctx) numDisks := info.Backend.OfflineDisks + info.Backend.OnlineDisks - if clientToken == "" { - // Not a status request - nh := newHealSequence(bucket, objPrefix, handlers.GetSourceIP(r), - numDisks, hs, forceStart) - - respCh := make(chan healResp) - go func() { - respBytes, errCode, errMsg := globalAllHealState.LaunchNewHealSequence(nh) - hr := healResp{respBytes, errCode, errMsg} - respCh <- hr - }() + healPath := pathJoin(bucket, objPrefix) + if clientToken == "" && !forceStart && !forceStop { + nh, exists := globalAllHealState.getHealSequence(healPath) + if exists && !nh.hasEnded() && len(nh.currentStatus.Items) > 0 { + b, err := json.Marshal(madmin.HealStartSuccess{ + ClientToken: nh.clientToken, + ClientAddress: nh.clientAddress, + StartTime: nh.startTime, + }) + if err != nil { + logger.LogIf(context.Background(), err) + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + // Client token not specified but a heal sequence exists on a path, + // Send the token back to client. + writeSuccessResponseJSON(w, b) + return + } + } - // Due to the force-starting functionality, the Launch - // call above can take a long time - to keep the - // connection alive, we start sending whitespace - keepConnLive(w, respCh) - } else { + if clientToken != "" && !forceStart && !forceStop { // Since clientToken is given, fetch heal status from running // heal sequence. - path := bucket + "/" + objPrefix respBytes, errCode := globalAllHealState.PopHealStatusJSON( - path, clientToken) + healPath, clientToken) if errCode != ErrNone { writeErrorResponseJSON(w, errCode, r.URL) } else { writeSuccessResponseJSON(w, respBytes) } + return } + + respCh := make(chan healResp) + switch { + case forceStop: + go func() { + respBytes, errCode := globalAllHealState.stopHealSequence(healPath) + hr := healResp{respBytes: respBytes, errCode: errCode} + respCh <- hr + }() + case clientToken == "": + nh := newHealSequence(bucket, objPrefix, handlers.GetSourceIP(r), numDisks, hs, forceStart) + go func() { + respBytes, errCode, errMsg := globalAllHealState.LaunchNewHealSequence(nh) + hr := healResp{respBytes, errCode, errMsg} + respCh <- hr + }() + } + + // Due to the force-starting functionality, the Launch + // call above can take a long time - to keep the + // connection alive, we start sending whitespace + keepConnLive(w, respCh) } // GetConfigHandler - GET /minio/admin/v1/config diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index b513a4eeb..e1f3c68ff 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "runtime" "strings" "sync" @@ -123,6 +124,35 @@ func (ahs *allHealState) getHealSequence(path string) (h *healSequence, exists b return h, exists } +func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIErrorCode) { + var hsp madmin.HealStopSuccess + he, exists := ahs.getHealSequence(path) + if !exists { + hsp = madmin.HealStopSuccess{ + ClientToken: "invalid", + StartTime: UTCNow(), + } + } else { + hsp = madmin.HealStopSuccess{ + ClientToken: he.clientToken, + ClientAddress: he.clientAddress, + StartTime: he.startTime, + } + + he.stop() + for !he.hasEnded() { + time.Sleep(1 * time.Second) + } + ahs.Lock() + defer ahs.Unlock() + // Heal sequence explicitly stopped, remove it. + delete(ahs.healSeqMap, path) + } + + b, err := json.Marshal(&hsp) + return b, toAdminAPIErrCode(err) +} + // LaunchNewHealSequence - launches a background routine that performs // healing according to the healSequence argument. For each heal // sequence, state is stored in the `globalAllHealState`, which is a @@ -143,20 +173,20 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) ( existsAndLive = true } } + if existsAndLive { // A heal sequence exists on the given path. if h.forceStarted { - // stop the running heal sequence - wait for - // it to finish. + // stop the running heal sequence - wait for it to finish. he.stop() for !he.hasEnded() { - time.Sleep(10 * time.Second) + time.Sleep(1 * time.Second) } } else { errMsg = "Heal is already running on the given path " + "(use force-start option to stop and start afresh). " + - fmt.Sprintf("The heal was started by IP %s at %s", - h.clientAddress, h.startTime) + fmt.Sprintf("The heal was started by IP %s at %s, token is %s", + h.clientAddress, h.startTime.Format(http.TimeFormat), h.clientToken) return nil, ErrHealAlreadyRunning, errMsg } @@ -285,7 +315,7 @@ type healSequence struct { // bucket, and prefix on which heal seq. was initiated bucket, objPrefix string - // path is just bucket + "/" + objPrefix + // path is just pathJoin(bucket, objPrefix) path string // time at which heal sequence was started @@ -330,7 +360,7 @@ func newHealSequence(bucket, objPrefix, clientAddr string, return &healSequence{ bucket: bucket, objPrefix: objPrefix, - path: bucket + "/" + objPrefix, + path: pathJoin(bucket, objPrefix), startTime: UTCNow(), clientToken: mustGetUUID(), clientAddress: clientAddr, @@ -552,7 +582,7 @@ func (h *healSequence) healConfig() error { // before proceeding to heal waitCount := 60 // Any requests in progress, delay the heal. - for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 { + for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 { waitCount-- time.Sleep(1 * time.Second) } @@ -698,7 +728,7 @@ func (h *healSequence) healBucket(bucket string) error { // before proceeding to heal waitCount := 60 // Any requests in progress, delay the heal. - for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 { + for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 { waitCount-- time.Sleep(1 * time.Second) } diff --git a/cmd/api-response.go b/cmd/api-response.go index 9d7348f13..7efc7ec86 100644 --- a/cmd/api-response.go +++ b/cmd/api-response.go @@ -608,7 +608,7 @@ func writeCustomErrorResponseJSON(w http.ResponseWriter, errorCode APIErrorCode, Code: apiError.Code, Message: errBody, Resource: reqURL.Path, - RequestID: "3L137", + RequestID: w.Header().Get(responseRequestIDKey), HostID: "3L137", } encodedErrorResponse := encodeResponseJSON(errorResponse) diff --git a/pkg/madmin/API.md b/pkg/madmin/API.md index 75469df5c..04e8642fc 100644 --- a/pkg/madmin/API.md +++ b/pkg/madmin/API.md @@ -208,7 +208,7 @@ Fetches information for all cluster nodes, such as server properties, storage in ## 6. Heal operations -### Heal(bucket, prefix string, healOpts HealOpts, clientToken string, forceStart bool) (start HealStartSuccess, status HealTaskStatus, err error) +### Heal(bucket, prefix string, healOpts HealOpts, clientToken string, forceStart bool, forceStop bool) (start HealStartSuccess, status HealTaskStatus, err error) Start a heal sequence that scans data under given (possible empty) `bucket` and `prefix`. The `recursive` bool turns on recursive @@ -232,7 +232,8 @@ __Example__ DryRun: false, } forceStart := false - healPath, err := madmClnt.Heal("", "", opts, "", forceStart) + forceStop := false + healPath, err := madmClnt.Heal("", "", opts, "", forceStart, forceStop) if err != nil { log.Fatalln(err) } diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index 3ec5f27a5..da18c17f6 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -40,6 +40,10 @@ type HealStartSuccess struct { StartTime time.Time `json:"startTime"` } +// HealStopSuccess - holds information about a successfully stopped +// heal operation. +type HealStopSuccess HealStartSuccess + // HealTaskStatus - status struct for a heal task type HealTaskStatus struct { Summary string `json:"summary"` @@ -176,10 +180,17 @@ func (hri *HealResultItem) GetOnlineCounts() (b, a int) { } // Heal - API endpoint to start heal and to fetch status +// forceStart and forceStop are mutually exclusive, you can either +// set one of them to 'true'. If both are set 'forceStart' will be +// honored. func (adm *AdminClient) Heal(bucket, prefix string, healOpts HealOpts, - clientToken string, forceStart bool) ( + clientToken string, forceStart, forceStop bool) ( healStart HealStartSuccess, healTaskStatus HealTaskStatus, err error) { + if forceStart && forceStop { + return healStart, healTaskStatus, ErrInvalidArgument("forceStart and forceStop set to true is not allowed") + } + body, err := json.Marshal(healOpts) if err != nil { return healStart, healTaskStatus, err @@ -196,8 +207,12 @@ func (adm *AdminClient) Heal(bucket, prefix string, healOpts HealOpts, queryVals.Set("clientToken", clientToken) body = []byte{} } + + // Anyone can be set, either force start or forceStop. if forceStart { queryVals.Set("forceStart", "true") + } else if forceStop { + queryVals.Set("forceStop", "true") } resp, err := adm.executeMethod("POST", requestData{ @@ -221,9 +236,24 @@ func (adm *AdminClient) Heal(bucket, prefix string, healOpts HealOpts, // Was it a status request? if clientToken == "" { + // As a special operation forceStop would return a + // similar struct as healStart will have the + // heal sequence information about the heal which + // was stopped. err = json.Unmarshal(respBytes, &healStart) } else { err = json.Unmarshal(respBytes, &healTaskStatus) } - return healStart, healTaskStatus, err + if err != nil { + // May be the server responded with error after success + // message, handle it separately here. + var errResp ErrorResponse + err = json.Unmarshal(respBytes, &errResp) + if err != nil { + // Unknown structure return error anyways. + return healStart, healTaskStatus, err + } + return healStart, healTaskStatus, errResp + } + return healStart, healTaskStatus, nil }