Reroute requests based token heal/listing (#9939)

When manual healing is triggered, one node in a cluster will 
become the authority to heal. mc regularly sends new requests 
to fetch the status of the ongoing healing process, but a load 
balancer could land the healing request to a node that is not 
doing the healing request.

This PR will redirect a request to the node based on the node 
index found described as part of the client token. A similar
technique is also used to proxy ListObjectsV2 requests
by encoding this information in continuation-token
master
Anis Elleuch 4 years ago committed by GitHub
parent e59ee14f40
commit 2be20588bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      cmd/admin-handlers.go
  2. 7
      cmd/admin-heal-ops.go
  3. 95
      cmd/bucket-listobjects-handlers.go
  4. 53
      cmd/endpoint.go
  5. 2
      cmd/globals.go
  6. 28
      cmd/handler-utils.go
  7. 2
      cmd/server-main.go
  8. 6
      pkg/handlers/forwarder.go

@ -652,6 +652,20 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
if globalIsDistErasure {
// Analyze the heal token and route the request accordingly
_, nodeIndex, parsed := parseRequestToken(hip.clientToken)
if parsed {
if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) {
return
}
} else {
apiErr := errorCodes.ToAPIErr(ErrHealInvalidClientToken)
writeErrorResponseJSON(ctx, w, apiErr, r.URL)
return
}
}
type healResp struct { type healResp struct {
respBytes []byte respBytes []byte
apiErr APIError apiErr APIError

@ -370,13 +370,18 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
reqInfo.AppendTags("prefix", objPrefix) reqInfo.AppendTags("prefix", objPrefix)
ctx, cancel := context.WithCancel(logger.SetReqInfo(ctx, reqInfo)) ctx, cancel := context.WithCancel(logger.SetReqInfo(ctx, reqInfo))
clientToken := mustGetUUID()
if globalIsDistErasure {
clientToken = fmt.Sprintf("%s@%d", clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
}
return &healSequence{ return &healSequence{
respCh: make(chan healResult), respCh: make(chan healResult),
bucket: bucket, bucket: bucket,
object: objPrefix, object: objPrefix,
reportProgress: true, reportProgress: true,
startTime: UTCNow(), startTime: UTCNow(),
clientToken: mustGetUUID(), clientToken: clientToken,
clientAddress: clientAddr, clientAddress: clientAddr,
forceStarted: forceStart, forceStarted: forceStart,
settings: hs, settings: hs,

@ -18,8 +18,9 @@ package cmd
import ( import (
"context" "context"
"io" "fmt"
"net/http" "net/http"
"strconv"
"strings" "strings"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -159,8 +160,13 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
return return
} }
if proxyListRequest(ctx, w, r, bucket) { // Analyze continuation token and route the request accordingly
return subToken, nodeIndex, parsed := parseRequestToken(token)
if parsed {
if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) {
return
}
token = subToken
} }
listObjectsV2 := objectAPI.ListObjectsV2 listObjectsV2 := objectAPI.ListObjectsV2
@ -185,8 +191,10 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
} }
} }
response := generateListObjectsV2Response(bucket, prefix, token, // The next continuation token has id@node_index format to optimize paginated listing
listObjectsV2Info.NextContinuationToken, startAfter, nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex())
response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter,
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated, delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated,
maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, true) maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, true)
@ -237,8 +245,13 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
return return
} }
if proxyListRequest(ctx, w, r, bucket) { // Analyze continuation token and route the request accordingly
return subToken, nodeIndex, parsed := parseRequestToken(token)
if parsed {
if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) {
return
}
token = subToken
} }
listObjectsV2 := objectAPI.ListObjectsV2 listObjectsV2 := objectAPI.ListObjectsV2
@ -263,8 +276,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
} }
} }
response := generateListObjectsV2Response(bucket, prefix, token, // The next continuation token has id@node_index format to optimize paginated listing
listObjectsV2Info.NextContinuationToken, startAfter, nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex())
response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter,
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated, delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated,
maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, false) maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes, false)
@ -272,41 +287,47 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))
} }
func getListEndpoint(bucket string) ListEndpoint { func getLocalNodeIndex() int {
return globalListEndpoints[crcHashMod(bucket, len(globalListEndpoints))] if len(globalProxyEndpoints) == 0 {
return -1
}
for i, ep := range globalProxyEndpoints {
if ep.IsLocal {
return i
}
}
return -1
} }
// Proxy the list request to the right server. func parseRequestToken(token string) (subToken string, nodeIndex int, success bool) {
func proxyListRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, bucket string) (success bool) { i := strings.Index(token, "@")
if len(globalListEndpoints) == 0 { if i < 0 {
return false return "", -1, false
}
nodeIndex, err := strconv.Atoi(token[i+1:])
if err != nil {
return "", -1, false
} }
ep := getListEndpoint(bucket) subToken = token[:i]
if ep.isLocal { return subToken, nodeIndex, true
}
func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http.Request, index int) (success bool) {
if len(globalProxyEndpoints) == 0 {
return false return false
} }
ctx = r.Context() if index < 0 || index >= len(globalProxyEndpoints) {
outreq := r.Clone(ctx)
outreq.URL.Scheme = "http"
outreq.URL.Host = ep.host
outreq.URL.Path = r.URL.Path
outreq.Header.Add("Host", r.Host)
if globalIsSSL {
outreq.URL.Scheme = "https"
}
outreq.Host = r.Host
res, err := ep.t.RoundTrip(outreq)
if err != nil {
return false return false
} }
for k, vv := range res.Header { ep := globalProxyEndpoints[index]
for _, v := range vv { if ep.IsLocal {
w.Header().Set(k, v) return false
}
} }
w.WriteHeader(res.StatusCode) return proxyRequest(ctx, w, r, ep)
io.Copy(w, res.Body) }
return true
func proxyRequestByBucket(ctx context.Context, w http.ResponseWriter, r *http.Request, bucket string) (success bool) {
return proxyRequestByNodeIndex(ctx, w, r, crcHashMod(bucket, len(globalProxyEndpoints)))
} }
// ListObjectsV1Handler - GET Bucket (List Objects) Version 1. // ListObjectsV1Handler - GET Bucket (List Objects) Version 1.
@ -347,7 +368,7 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
return return
} }
if proxyListRequest(ctx, w, r, bucket) { if proxyRequestByBucket(ctx, w, r, bucket) {
return return
} }

@ -49,12 +49,11 @@ const (
URLEndpointType URLEndpointType
) )
// ListEndpoint - endpoint used for list redirects // ProxyEndpoint - endpoint used for proxy redirects
// See proxyListRequest() for details. // See proxyRequest() for details.
type ListEndpoint struct { type ProxyEndpoint struct {
host string Endpoint
t *http.Transport Transport *http.Transport
isLocal bool
} }
// Endpoint - any type of endpoint. // Endpoint - any type of endpoint.
@ -719,18 +718,21 @@ func GetRemotePeers(endpointZones EndpointZones) []string {
return peerSet.ToSlice() return peerSet.ToSlice()
} }
// GetListEndpoints - get all endpoints that can be used to proxy list request. // GetProxyEndpointLocalIndex returns index of the local proxy endpoint
func GetListEndpoints(endpointZones EndpointZones) ([]ListEndpoint, error) { func GetProxyEndpointLocalIndex(proxyEps []ProxyEndpoint) int {
var listeps []ListEndpoint for i, pep := range proxyEps {
if pep.IsLocal {
listepExists := func(host string) bool { return i
for _, listep := range listeps {
if listep.host == host {
return true
}
} }
return false
} }
return -1
}
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) {
var proxyEps []ProxyEndpoint
proxyEpSet := set.NewStringSet()
for _, ep := range endpointZones { for _, ep := range endpointZones {
for _, endpoint := range ep.Endpoints { for _, endpoint := range ep.Endpoints {
@ -739,28 +741,25 @@ func GetListEndpoints(endpointZones EndpointZones) ([]ListEndpoint, error) {
} }
host := endpoint.Host host := endpoint.Host
if listepExists(host) { if proxyEpSet.Contains(host) {
continue continue
} }
hostName, _, err := net.SplitHostPort(host) proxyEpSet.Add(host)
if err != nil {
return nil, err
}
var tlsConfig *tls.Config var tlsConfig *tls.Config
if globalIsSSL { if globalIsSSL {
tlsConfig = &tls.Config{ tlsConfig = &tls.Config{
ServerName: hostName, ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs, RootCAs: globalRootCAs,
} }
} }
listeps = append(listeps, ListEndpoint{ proxyEps = append(proxyEps, ProxyEndpoint{
host, Endpoint: endpoint,
newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(), Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(),
endpoint.IsLocal,
}) })
} }
} }
return listeps, nil return proxyEps, nil
} }
func updateDomainIPs(endPoints set.StringSet) { func updateDomainIPs(endPoints set.StringSet) {

@ -279,7 +279,7 @@ var (
// If writes to FS backend should be O_SYNC. // If writes to FS backend should be O_SYNC.
globalFSOSync bool globalFSOSync bool
globalListEndpoints []ListEndpoint globalProxyEndpoints []ProxyEndpoint
// Add new variable global values here. // Add new variable global values here.
) )

@ -1,5 +1,5 @@
/* /*
* MinIO Cloud Storage, (C) 2015, 2016, 2017 MinIO, Inc. * MinIO Cloud Storage, (C) 2015-2020 MinIO, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -452,3 +452,29 @@ func getHostName(r *http.Request) (hostName string) {
} }
return return
} }
// Proxy any request to an endpoint.
func proxyRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, ep ProxyEndpoint) (success bool) {
success = true
f := handlers.NewForwarder(&handlers.Forwarder{
PassHost: true,
RoundTripper: ep.Transport,
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
success = false
w.WriteHeader(http.StatusBadGateway)
},
Logger: func(err error) {
logger.LogIf(GlobalContext, err)
},
})
r.URL.Scheme = "http"
if globalIsSSL {
r.URL.Scheme = "https"
}
r.URL.Host = ep.Host
f.ServeHTTP(w, r)
return
}

@ -396,7 +396,7 @@ func serverMain(ctx *cli.Context) {
globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get()) globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get())
logger.FatalIf(err, "Failed to read root CAs (%v)", err) logger.FatalIf(err, "Failed to read root CAs (%v)", err)
globalListEndpoints, err = GetListEndpoints(globalEndpoints) globalProxyEndpoints, err = GetProxyEndpoints(globalEndpoints)
logger.FatalIf(err, "Invalid command line arguments") logger.FatalIf(err, "Invalid command line arguments")
globalMinioEndpoint = func() string { globalMinioEndpoint = func() string {

@ -33,6 +33,7 @@ type Forwarder struct {
RoundTripper http.RoundTripper RoundTripper http.RoundTripper
PassHost bool PassHost bool
Logger func(error) Logger func(error)
ErrorHandler func(http.ResponseWriter, *http.Request, error)
// internal variables // internal variables
rewriter *headerRewriter rewriter *headerRewriter
@ -61,6 +62,11 @@ func (f *Forwarder) ServeHTTP(w http.ResponseWriter, inReq *http.Request) {
FlushInterval: defaultFlushInterval, FlushInterval: defaultFlushInterval,
ErrorHandler: f.customErrHandler, ErrorHandler: f.customErrHandler,
} }
if f.ErrorHandler != nil {
revproxy.ErrorHandler = f.ErrorHandler
}
revproxy.ServeHTTP(w, outReq) revproxy.ServeHTTP(w, outReq)
} }

Loading…
Cancel
Save