Add API retry functionality in mc admin (#7602)

master
Harshavardhana 6 years ago committed by GitHub
parent 72929ec05b
commit ac3b59645e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      pkg/madmin/api-error-response.go
  2. 85
      pkg/madmin/api.go
  3. 175
      pkg/madmin/retry.go

@ -77,6 +77,28 @@ func httpRespToErrorResponse(resp *http.Response) error {
return errResp return errResp
} }
// ToErrorResponse - Returns parsed ErrorResponse struct from body and
// http headers.
//
// For example:
//
// import admin "github.com/minio/minio/pkg/madmin"
// ...
// ...
// ss, err := adm.ServiceStatus(...)
// if err != nil {
// resp := admin.ToErrorResponse(err)
// }
// ...
func ToErrorResponse(err error) ErrorResponse {
switch err := err.(type) {
case ErrorResponse:
return err
default:
return ErrorResponse{}
}
}
// ErrInvalidArgument - Invalid argument response. // ErrInvalidArgument - Invalid argument response.
func ErrInvalidArgument(message string) error { func ErrInvalidArgument(message string) error {
return ErrorResponse{ return ErrorResponse{

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
@ -29,6 +30,7 @@ import (
"regexp" "regexp"
"runtime" "runtime"
"strings" "strings"
"time"
"github.com/minio/minio-go/pkg/s3signer" "github.com/minio/minio-go/pkg/s3signer"
"github.com/minio/minio-go/pkg/s3utils" "github.com/minio/minio-go/pkg/s3utils"
@ -57,6 +59,8 @@ type AdminClient struct {
// Needs allocation. // Needs allocation.
httpClient *http.Client httpClient *http.Client
random *rand.Rand
// Advanced functionality. // Advanced functionality.
isTraceEnabled bool isTraceEnabled bool
traceOutput io.Writer traceOutput io.Writer
@ -107,6 +111,8 @@ func privateNew(endpoint, accessKeyID, secretAccessKey string, secure bool) (*Ad
httpClient: &http.Client{ httpClient: &http.Client{
Transport: http.DefaultTransport, Transport: http.DefaultTransport,
}, },
// Introduce a new locked random seed.
random: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
} }
// Return. // Return.
@ -315,6 +321,7 @@ var successStatus = []int{
// request upon any error up to maxRetries attempts in a binomially // request upon any error up to maxRetries attempts in a binomially
// delayed manner using a standard back off algorithm. // delayed manner using a standard back off algorithm.
func (adm AdminClient) executeMethod(method string, reqData requestData) (res *http.Response, err error) { func (adm AdminClient) executeMethod(method string, reqData requestData) (res *http.Response, err error) {
var reqRetry = MaxRetry // Indicates how many times we can retry the request
// Create a done channel to control 'ListObjects' go routine. // Create a done channel to control 'ListObjects' go routine.
doneCh := make(chan struct{}, 1) doneCh := make(chan struct{}, 1)
@ -322,39 +329,63 @@ func (adm AdminClient) executeMethod(method string, reqData requestData) (res *h
// Indicate to our routine to exit cleanly upon return. // Indicate to our routine to exit cleanly upon return.
defer close(doneCh) defer close(doneCh)
// Instantiate a new request. for range adm.newRetryTimer(reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter, doneCh) {
var req *http.Request // Instantiate a new request.
req, err = adm.newRequest(method, reqData) var req *http.Request
if err != nil { req, err = adm.newRequest(method, reqData)
return nil, err if err != nil {
} return nil, err
}
// Initiate the request. // Initiate the request.
res, err = adm.do(req) res, err = adm.do(req)
if err != nil { if err != nil {
return nil, err // For supported http requests errors verify.
} if isHTTPReqErrorRetryable(err) {
continue // Retry.
}
// For other errors, return here no need to retry.
return nil, err
}
// For any known successful http status, return quickly. // For any known successful http status, return quickly.
for _, httpStatus := range successStatus { for _, httpStatus := range successStatus {
if httpStatus == res.StatusCode { if httpStatus == res.StatusCode {
return res, nil return res, nil
}
} }
}
// Read the body to be saved later. // Read the body to be saved later.
errBodyBytes, err := ioutil.ReadAll(res.Body) errBodyBytes, err := ioutil.ReadAll(res.Body)
if err != nil { // res.Body should be closed
return nil, err closeResponse(res)
} if err != nil {
// Save the body. return nil, err
errBodySeeker := bytes.NewReader(errBodyBytes) }
res.Body = ioutil.NopCloser(errBodySeeker)
// Save the body back again. // Save the body.
errBodySeeker.Seek(0, 0) // Seek back to starting point. errBodySeeker := bytes.NewReader(errBodyBytes)
res.Body = ioutil.NopCloser(errBodySeeker) res.Body = ioutil.NopCloser(errBodySeeker)
// For errors verify if its retryable otherwise fail quickly.
errResponse := ToErrorResponse(httpRespToErrorResponse(res))
// Save the body back again.
errBodySeeker.Seek(0, 0) // Seek back to starting point.
res.Body = ioutil.NopCloser(errBodySeeker)
// Verify if error response code is retryable.
if isS3CodeRetryable(errResponse.Code) {
continue // Retry.
}
// Verify if http status code is retryable.
if isHTTPStatusRetryable(res.StatusCode) {
continue // Retry.
}
break
}
return res, err return res, err
} }

@ -0,0 +1,175 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package madmin
import (
"math/rand"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
// MaxRetry is the maximum number of retries before stopping.
var MaxRetry = 10
// MaxJitter will randomize over the full exponential backoff time
const MaxJitter = 1.0
// NoJitter disables the use of jitter for randomizing the exponential backoff time
const NoJitter = 0.0
// DefaultRetryUnit - default unit multiplicative per retry.
// defaults to 1 second.
const DefaultRetryUnit = time.Second
// DefaultRetryCap - Each retry attempt never waits no longer than
// this maximum time duration.
const DefaultRetryCap = time.Second * 30
// lockedRandSource provides protected rand source, implements rand.Source interface.
type lockedRandSource struct {
lk sync.Mutex
src rand.Source
}
// Int63 returns a non-negative pseudo-random 63-bit integer as an int64.
func (r *lockedRandSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return
}
// Seed uses the provided seed value to initialize the generator to a
// deterministic state.
func (r *lockedRandSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
// newRetryTimer creates a timer with exponentially increasing
// delays until the maximum retry attempts are reached.
func (adm AdminClient) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int {
attemptCh := make(chan int)
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
exponentialBackoffWait := func(attempt int) time.Duration {
// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<uint(attempt))
if sleep > cap {
sleep = cap
}
if jitter != NoJitter {
sleep -= time.Duration(adm.random.Float64() * float64(sleep) * jitter)
}
return sleep
}
go func() {
defer close(attemptCh)
for i := 0; i < maxRetry; i++ {
select {
// Attempts start from 1.
case attemptCh <- i + 1:
case <-doneCh:
// Stop the routine.
return
}
time.Sleep(exponentialBackoffWait(i))
}
}()
return attemptCh
}
// isHTTPReqErrorRetryable - is http requests error retryable, such
// as i/o timeout, connection broken etc..
func isHTTPReqErrorRetryable(err error) bool {
if err == nil {
return false
}
switch e := err.(type) {
case *url.Error:
switch e.Err.(type) {
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
return true
}
if strings.Contains(err.Error(), "Connection closed by foreign host") {
return true
} else if strings.Contains(err.Error(), "net/http: TLS handshake timeout") {
// If error is - tlsHandshakeTimeoutError, retry.
return true
} else if strings.Contains(err.Error(), "i/o timeout") {
// If error is - tcp timeoutError, retry.
return true
} else if strings.Contains(err.Error(), "connection timed out") {
// If err is a net.Dial timeout, retry.
return true
} else if strings.Contains(err.Error(), "net/http: HTTP/1.x transport connection broken") {
// If error is transport connection broken, retry.
return true
}
}
return false
}
// List of AWS S3 error codes which are retryable.
var retryableS3Codes = map[string]struct{}{
"RequestError": {},
"RequestTimeout": {},
"Throttling": {},
"ThrottlingException": {},
"RequestLimitExceeded": {},
"RequestThrottled": {},
"InternalError": {},
"SlowDown": {},
// Add more AWS S3 codes here.
}
// isS3CodeRetryable - is s3 error code retryable.
func isS3CodeRetryable(s3Code string) (ok bool) {
_, ok = retryableS3Codes[s3Code]
return ok
}
// List of HTTP status codes which are retryable.
var retryableHTTPStatusCodes = map[int]struct{}{
http.StatusTooManyRequests: {},
http.StatusInternalServerError: {},
http.StatusBadGateway: {},
http.StatusServiceUnavailable: {},
// Add more HTTP status codes here.
}
// isHTTPStatusRetryable - is HTTP error code retryable.
func isHTTPStatusRetryable(httpStatusCode int) (ok bool) {
_, ok = retryableHTTPStatusCodes[httpStatusCode]
return ok
}
Loading…
Cancel
Save