Add forceStop flag to provide facility to stop healing (#6718)

This PR also makes sure that we deal with HTTP request
count by ignoring the on-going heal operation, i.e
do not wait on itself.
master
Harshavardhana 6 years ago committed by kannappanr
parent bef0318c36
commit a9cda850ca
  1. 112
      cmd/admin-handlers.go
  2. 48
      cmd/admin-heal-ops.go
  3. 2
      cmd/api-response.go
  4. 5
      pkg/madmin/API.md
  5. 34
      pkg/madmin/heal-commands.go

@ -53,9 +53,10 @@ type mgmtQueryKey string
// Only valid query params for mgmt admin APIs.
const (
mgmtBucket mgmtQueryKey = "bucket"
mgmtPrefix mgmtQueryKey = "prefix"
mgmtClientToken mgmtQueryKey = "clientToken"
mgmtForceStart mgmtQueryKey = "forceStart"
mgmtPrefix = "prefix"
mgmtClientToken = "clientToken"
mgmtForceStart = "forceStart"
mgmtForceStop = "forceStop"
)
var (
@ -402,7 +403,7 @@ func (a adminAPIHandlers) DownloadProfilingHandler(w http.ResponseWriter, r *htt
// extractHealInitParams - Validates params for heal init API.
func extractHealInitParams(r *http.Request) (bucket, objPrefix string,
hs madmin.HealOpts, clientToken string, forceStart bool,
hs madmin.HealOpts, clientToken string, forceStart bool, forceStop bool,
err APIErrorCode) {
vars := mux.Vars(r)
@ -433,7 +434,9 @@ func extractHealInitParams(r *http.Request) (bucket, objPrefix string,
if _, ok := qParms[string(mgmtForceStart)]; ok {
forceStart = true
}
if _, ok := qParms[string(mgmtForceStop)]; ok {
forceStop = true
}
// ignore body if clientToken is provided
if clientToken == "" {
jerr := json.NewDecoder(r.Body).Decode(&hs)
@ -484,7 +487,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
return
}
bucket, objPrefix, hs, clientToken, forceStart, apiErr := extractHealInitParams(r)
bucket, objPrefix, hs, clientToken, forceStart, forceStop, apiErr := extractHealInitParams(r)
if apiErr != ErrNone {
writeErrorResponseJSON(w, apiErr, r.URL)
return
@ -518,13 +521,35 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("\n\r"))
w.(http.Flusher).Flush()
case hr := <-respCh:
switch {
case hr.errCode == ErrNone:
writeSuccessResponseJSON(w, hr.respBytes)
case hr.errBody == "":
writeErrorResponseJSON(w, hr.errCode, r.URL)
switch hr.errCode {
case ErrNone:
if started {
w.Write(hr.respBytes)
w.(http.Flusher).Flush()
} else {
writeSuccessResponseJSON(w, hr.respBytes)
}
default:
writeCustomErrorResponseJSON(w, hr.errCode, hr.errBody, r.URL)
apiError := getAPIError(hr.errCode)
var errorRespJSON []byte
if hr.errBody == "" {
errorRespJSON = encodeResponseJSON(getAPIErrorResponse(apiError, r.URL.Path, w.Header().Get(responseRequestIDKey)))
} else {
errorRespJSON = encodeResponseJSON(APIErrorResponse{
Code: apiError.Code,
Message: hr.errBody,
Resource: r.URL.Path,
RequestID: w.Header().Get(responseRequestIDKey),
HostID: "3L137",
})
}
if !started {
setCommonHeaders(w)
w.Header().Set("Content-Type", string(mimeJSON))
w.WriteHeader(apiError.HTTPStatusCode)
}
w.Write(errorRespJSON)
w.(http.Flusher).Flush()
}
break forLoop
}
@ -535,34 +560,61 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
info := objLayer.StorageInfo(ctx)
numDisks := info.Backend.OfflineDisks + info.Backend.OnlineDisks
if clientToken == "" {
// Not a status request
nh := newHealSequence(bucket, objPrefix, handlers.GetSourceIP(r),
numDisks, hs, forceStart)
respCh := make(chan healResp)
go func() {
respBytes, errCode, errMsg := globalAllHealState.LaunchNewHealSequence(nh)
hr := healResp{respBytes, errCode, errMsg}
respCh <- hr
}()
healPath := pathJoin(bucket, objPrefix)
if clientToken == "" && !forceStart && !forceStop {
nh, exists := globalAllHealState.getHealSequence(healPath)
if exists && !nh.hasEnded() && len(nh.currentStatus.Items) > 0 {
b, err := json.Marshal(madmin.HealStartSuccess{
ClientToken: nh.clientToken,
ClientAddress: nh.clientAddress,
StartTime: nh.startTime,
})
if err != nil {
logger.LogIf(context.Background(), err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Client token not specified but a heal sequence exists on a path,
// Send the token back to client.
writeSuccessResponseJSON(w, b)
return
}
}
// Due to the force-starting functionality, the Launch
// call above can take a long time - to keep the
// connection alive, we start sending whitespace
keepConnLive(w, respCh)
} else {
if clientToken != "" && !forceStart && !forceStop {
// Since clientToken is given, fetch heal status from running
// heal sequence.
path := bucket + "/" + objPrefix
respBytes, errCode := globalAllHealState.PopHealStatusJSON(
path, clientToken)
healPath, clientToken)
if errCode != ErrNone {
writeErrorResponseJSON(w, errCode, r.URL)
} else {
writeSuccessResponseJSON(w, respBytes)
}
return
}
respCh := make(chan healResp)
switch {
case forceStop:
go func() {
respBytes, errCode := globalAllHealState.stopHealSequence(healPath)
hr := healResp{respBytes: respBytes, errCode: errCode}
respCh <- hr
}()
case clientToken == "":
nh := newHealSequence(bucket, objPrefix, handlers.GetSourceIP(r), numDisks, hs, forceStart)
go func() {
respBytes, errCode, errMsg := globalAllHealState.LaunchNewHealSequence(nh)
hr := healResp{respBytes, errCode, errMsg}
respCh <- hr
}()
}
// Due to the force-starting functionality, the Launch
// call above can take a long time - to keep the
// connection alive, we start sending whitespace
keepConnLive(w, respCh)
}
// GetConfigHandler - GET /minio/admin/v1/config

@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"runtime"
"strings"
"sync"
@ -123,6 +124,35 @@ func (ahs *allHealState) getHealSequence(path string) (h *healSequence, exists b
return h, exists
}
func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIErrorCode) {
var hsp madmin.HealStopSuccess
he, exists := ahs.getHealSequence(path)
if !exists {
hsp = madmin.HealStopSuccess{
ClientToken: "invalid",
StartTime: UTCNow(),
}
} else {
hsp = madmin.HealStopSuccess{
ClientToken: he.clientToken,
ClientAddress: he.clientAddress,
StartTime: he.startTime,
}
he.stop()
for !he.hasEnded() {
time.Sleep(1 * time.Second)
}
ahs.Lock()
defer ahs.Unlock()
// Heal sequence explicitly stopped, remove it.
delete(ahs.healSeqMap, path)
}
b, err := json.Marshal(&hsp)
return b, toAdminAPIErrCode(err)
}
// LaunchNewHealSequence - launches a background routine that performs
// healing according to the healSequence argument. For each heal
// sequence, state is stored in the `globalAllHealState`, which is a
@ -143,20 +173,20 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) (
existsAndLive = true
}
}
if existsAndLive {
// A heal sequence exists on the given path.
if h.forceStarted {
// stop the running heal sequence - wait for
// it to finish.
// stop the running heal sequence - wait for it to finish.
he.stop()
for !he.hasEnded() {
time.Sleep(10 * time.Second)
time.Sleep(1 * time.Second)
}
} else {
errMsg = "Heal is already running on the given path " +
"(use force-start option to stop and start afresh). " +
fmt.Sprintf("The heal was started by IP %s at %s",
h.clientAddress, h.startTime)
fmt.Sprintf("The heal was started by IP %s at %s, token is %s",
h.clientAddress, h.startTime.Format(http.TimeFormat), h.clientToken)
return nil, ErrHealAlreadyRunning, errMsg
}
@ -285,7 +315,7 @@ type healSequence struct {
// bucket, and prefix on which heal seq. was initiated
bucket, objPrefix string
// path is just bucket + "/" + objPrefix
// path is just pathJoin(bucket, objPrefix)
path string
// time at which heal sequence was started
@ -330,7 +360,7 @@ func newHealSequence(bucket, objPrefix, clientAddr string,
return &healSequence{
bucket: bucket,
objPrefix: objPrefix,
path: bucket + "/" + objPrefix,
path: pathJoin(bucket, objPrefix),
startTime: UTCNow(),
clientToken: mustGetUUID(),
clientAddress: clientAddr,
@ -552,7 +582,7 @@ func (h *healSequence) healConfig() error {
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
@ -698,7 +728,7 @@ func (h *healSequence) healBucket(bucket string) error {
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}

@ -608,7 +608,7 @@ func writeCustomErrorResponseJSON(w http.ResponseWriter, errorCode APIErrorCode,
Code: apiError.Code,
Message: errBody,
Resource: reqURL.Path,
RequestID: "3L137",
RequestID: w.Header().Get(responseRequestIDKey),
HostID: "3L137",
}
encodedErrorResponse := encodeResponseJSON(errorResponse)

@ -208,7 +208,7 @@ Fetches information for all cluster nodes, such as server properties, storage in
## 6. Heal operations
<a name="Heal"></a>
### Heal(bucket, prefix string, healOpts HealOpts, clientToken string, forceStart bool) (start HealStartSuccess, status HealTaskStatus, err error)
### Heal(bucket, prefix string, healOpts HealOpts, clientToken string, forceStart bool, forceStop bool) (start HealStartSuccess, status HealTaskStatus, err error)
Start a heal sequence that scans data under given (possible empty)
`bucket` and `prefix`. The `recursive` bool turns on recursive
@ -232,7 +232,8 @@ __Example__
DryRun: false,
}
forceStart := false
healPath, err := madmClnt.Heal("", "", opts, "", forceStart)
forceStop := false
healPath, err := madmClnt.Heal("", "", opts, "", forceStart, forceStop)
if err != nil {
log.Fatalln(err)
}

@ -40,6 +40,10 @@ type HealStartSuccess struct {
StartTime time.Time `json:"startTime"`
}
// HealStopSuccess - holds information about a successfully stopped
// heal operation.
type HealStopSuccess HealStartSuccess
// HealTaskStatus - status struct for a heal task
type HealTaskStatus struct {
Summary string `json:"summary"`
@ -176,10 +180,17 @@ func (hri *HealResultItem) GetOnlineCounts() (b, a int) {
}
// Heal - API endpoint to start heal and to fetch status
// forceStart and forceStop are mutually exclusive, you can either
// set one of them to 'true'. If both are set 'forceStart' will be
// honored.
func (adm *AdminClient) Heal(bucket, prefix string, healOpts HealOpts,
clientToken string, forceStart bool) (
clientToken string, forceStart, forceStop bool) (
healStart HealStartSuccess, healTaskStatus HealTaskStatus, err error) {
if forceStart && forceStop {
return healStart, healTaskStatus, ErrInvalidArgument("forceStart and forceStop set to true is not allowed")
}
body, err := json.Marshal(healOpts)
if err != nil {
return healStart, healTaskStatus, err
@ -196,8 +207,12 @@ func (adm *AdminClient) Heal(bucket, prefix string, healOpts HealOpts,
queryVals.Set("clientToken", clientToken)
body = []byte{}
}
// Anyone can be set, either force start or forceStop.
if forceStart {
queryVals.Set("forceStart", "true")
} else if forceStop {
queryVals.Set("forceStop", "true")
}
resp, err := adm.executeMethod("POST", requestData{
@ -221,9 +236,24 @@ func (adm *AdminClient) Heal(bucket, prefix string, healOpts HealOpts,
// Was it a status request?
if clientToken == "" {
// As a special operation forceStop would return a
// similar struct as healStart will have the
// heal sequence information about the heal which
// was stopped.
err = json.Unmarshal(respBytes, &healStart)
} else {
err = json.Unmarshal(respBytes, &healTaskStatus)
}
return healStart, healTaskStatus, err
if err != nil {
// May be the server responded with error after success
// message, handle it separately here.
var errResp ErrorResponse
err = json.Unmarshal(respBytes, &errResp)
if err != nil {
// Unknown structure return error anyways.
return healStart, healTaskStatus, err
}
return healStart, healTaskStatus, errResp
}
return healStart, healTaskStatus, nil
}

Loading…
Cancel
Save