rest/storage: Remove racy LastError usage (#8817)

instead perform a liveness check call to
verify if server is online and print relevant
errors.

Also introduce a StorageErr string error type
instead of errors.New() deprecate usage of
VerifyFileError, DeleteFileError for gob,
change in datastructure also requires bump in
storage REST version to v13.

Fixes #8811
master
Harshavardhana 5 years ago committed by kannappanr
parent 9be7066715
commit 0879a4f743
  1. 5
      cmd/healthcheck-handler.go
  2. 4
      cmd/naughty-disk_test.go
  3. 4
      cmd/posix-diskid-check.go
  4. 7
      cmd/posix.go
  5. 66
      cmd/prepare-storage.go
  6. 3
      cmd/server-main.go
  7. 59
      cmd/storage-errors.go
  8. 1
      cmd/storage-interface.go
  9. 23
      cmd/storage-rest-client.go
  10. 2
      cmd/storage-rest-common.go
  11. 11
      cmd/storage-rest-server.go

@ -30,7 +30,7 @@ import (
func ReadinessCheckHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ReadinessCheckHandler")
objLayer := newObjectLayerFn()
objLayer := newObjectLayerWithoutSafeModeFn()
// Service not initialized yet
if objLayer == nil || !objLayer.IsReady(ctx) {
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
@ -47,7 +47,7 @@ func ReadinessCheckHandler(w http.ResponseWriter, r *http.Request) {
func LivenessCheckHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "LivenessCheckHandler")
objLayer := newObjectLayerFn()
objLayer := newObjectLayerWithoutSafeModeFn()
// Service not initialized yet
if objLayer == nil {
// Respond with 200 OK while server initializes to ensure a distributed cluster
@ -95,5 +95,6 @@ func LivenessCheckHandler(w http.ResponseWriter, r *http.Request) {
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
return
}
writeResponse(w, http.StatusOK, nil, mimeNone)
}

@ -57,10 +57,6 @@ func (*naughtyDisk) Hostname() string {
return ""
}
func (d *naughtyDisk) LastError() (err error) {
return nil
}
func (d *naughtyDisk) Close() (err error) {
if err = d.calcError(); err != nil {
return err

@ -46,10 +46,6 @@ func (p *posixDiskIDCheck) Hostname() string {
return p.storage.Hostname()
}
func (p *posixDiskIDCheck) LastError() error {
return p.storage.LastError()
}
func (p *posixDiskIDCheck) Close() error {
return p.storage.Close()
}

@ -313,13 +313,6 @@ func (*posix) Hostname() string {
return ""
}
func (s *posix) LastError() error {
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
return nil
}
func (s *posix) Close() error {
close(s.stopUsageCh)
return nil

@ -18,12 +18,18 @@ package cmd
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"net/url"
"os"
"path"
"sync"
"time"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -172,6 +178,45 @@ func validateXLFormats(format *formatXLV3, formats []*formatXLV3, endpoints Endp
// https://github.com/minio/minio/issues/5667
var errXLV3ThisEmpty = fmt.Errorf("XL format version 3 has This field empty")
// IsServerResolvable - checks if the endpoint is resolvable
// by sending a naked HTTP request with liveness checks.
func IsServerResolvable(endpoint Endpoint) error {
serverURL := &url.URL{
Scheme: endpoint.Scheme,
Host: endpoint.Host,
Path: path.Join(healthCheckPathPrefix, healthCheckLivenessPath),
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs,
NextProtos: []string{"http/1.1"}, // Force http1.1
}
}
req, err := http.NewRequest(http.MethodGet, serverURL.String(), nil)
if err != nil {
return err
}
httpClient := &http.Client{
Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout)(),
}
resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer xhttp.DrainBody(resp.Body)
if resp.StatusCode != http.StatusOK {
return StorageErr(resp.Status)
}
return nil
}
// connect to list of endpoints and load all XL disk formats, validate the formats are correct
// and are in quorum, if no formats are found attempt to initialize all of them for the first
// time. additionally make sure to close all the disks used in this attempt.
@ -179,9 +224,15 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
// Initialize all storage disks
storageDisks, errs := initStorageDisksWithErrors(endpoints)
defer closeStorageDisks(storageDisks)
for i, err := range errs {
if err != nil && err != errDiskNotFound {
return nil, fmt.Errorf("Disk %s: %w", endpoints[i], err)
if err != nil {
if err != errDiskNotFound {
return nil, fmt.Errorf("Disk %s: %w", endpoints[i], err)
}
if retryCount >= 5 {
logger.Info("Unable to connect to %s: %v\n", endpoints[i], IsServerResolvable(endpoints[i]))
}
}
}
@ -194,17 +245,6 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
}
}
// Connect to all storage disks, a connection failure will be
// only logged after some retries.
for _, disk := range storageDisks {
if disk != nil {
connectErr := disk.LastError()
if connectErr != nil && retryCount >= 5 {
logger.Info("Unable to connect to %s: %v\n", disk.String(), connectErr.Error())
}
}
}
// Pre-emptively check if one of the formatted disks
// is invalid. This function returns success for the
// most part unless one of the formats is not consistent

@ -45,8 +45,7 @@ func init() {
globalConsoleSys = NewConsoleLogger(context.Background())
logger.AddTarget(globalConsoleSys)
gob.Register(VerifyFileError(""))
gob.Register(DeleteFileError(""))
gob.Register(StorageErr(""))
}
// ServerFlags - server command specific flags

@ -16,88 +16,83 @@
package cmd
import (
"errors"
)
// errUnexpected - unexpected error, requires manual intervention.
var errUnexpected = errors.New("Unexpected error, please report this issue at https://github.com/minio/minio/issues")
var errUnexpected = StorageErr("Unexpected error, please report this issue at https://github.com/minio/minio/issues")
// errCorruptedFormat - corrupted backend format.
var errCorruptedFormat = errors.New("corrupted backend format, please join https://slack.min.io for assistance")
var errCorruptedFormat = StorageErr("corrupted backend format, please join https://slack.min.io for assistance")
// errUnformattedDisk - unformatted disk found.
var errUnformattedDisk = errors.New("unformatted disk found")
var errUnformattedDisk = StorageErr("unformatted disk found")
// errDiskFull - cannot create volume or files when disk is full.
var errDiskFull = errors.New("disk path full")
var errDiskFull = StorageErr("disk path full")
// errDiskNotFound - cannot find the underlying configured disk anymore.
var errDiskNotFound = errors.New("disk not found")
var errDiskNotFound = StorageErr("disk not found")
// errFaultyRemoteDisk - remote disk is faulty.
var errFaultyRemoteDisk = errors.New("remote disk is faulty")
var errFaultyRemoteDisk = StorageErr("remote disk is faulty")
// errFaultyDisk - disk is faulty.
var errFaultyDisk = errors.New("disk is faulty")
var errFaultyDisk = StorageErr("disk is faulty")
// errDiskAccessDenied - we don't have write permissions on disk.
var errDiskAccessDenied = errors.New("disk access denied")
var errDiskAccessDenied = StorageErr("disk access denied")
// errFileNotFound - cannot find the file.
var errFileNotFound = errors.New("file not found")
var errFileNotFound = StorageErr("file not found")
// errTooManyOpenFiles - too many open files.
var errTooManyOpenFiles = errors.New("too many open files")
var errTooManyOpenFiles = StorageErr("too many open files")
// errFileNameTooLong - given file name is too long than supported length.
var errFileNameTooLong = errors.New("file name too long")
var errFileNameTooLong = StorageErr("file name too long")
// errVolumeExists - cannot create same volume again.
var errVolumeExists = errors.New("volume already exists")
var errVolumeExists = StorageErr("volume already exists")
// errIsNotRegular - not of regular file type.
var errIsNotRegular = errors.New("not of regular file type")
var errIsNotRegular = StorageErr("not of regular file type")
// errVolumeNotFound - cannot find the volume.
var errVolumeNotFound = errors.New("volume not found")
var errVolumeNotFound = StorageErr("volume not found")
// errVolumeNotEmpty - volume not empty.
var errVolumeNotEmpty = errors.New("volume is not empty")
var errVolumeNotEmpty = StorageErr("volume is not empty")
// errVolumeAccessDenied - cannot access volume, insufficient permissions.
var errVolumeAccessDenied = errors.New("volume access denied")
var errVolumeAccessDenied = StorageErr("volume access denied")
// errFileAccessDenied - cannot access file, insufficient permissions.
var errFileAccessDenied = errors.New("file access denied")
var errFileAccessDenied = StorageErr("file access denied")
// errFileCorrupt - file has an unexpected size, or is not readable
var errFileCorrupt = errors.New("file is corrupted")
var errFileCorrupt = StorageErr("file is corrupted")
// errFileParentIsFile - cannot have overlapping objects, parent is already a file.
var errFileParentIsFile = errors.New("parent is a file")
var errFileParentIsFile = StorageErr("parent is a file")
// errBitrotHashAlgoInvalid - the algo for bit-rot hash
// verification is empty or invalid.
var errBitrotHashAlgoInvalid = errors.New("bit-rot hash algorithm is invalid")
var errBitrotHashAlgoInvalid = StorageErr("bit-rot hash algorithm is invalid")
// errCrossDeviceLink - rename across devices not allowed.
var errCrossDeviceLink = errors.New("Rename across devices not allowed, please fix your backend configuration")
var errCrossDeviceLink = StorageErr("Rename across devices not allowed, please fix your backend configuration")
// errMinDiskSize - cannot create volume or files when disk size is less than threshold.
var errMinDiskSize = errors.New("The disk size is less than the minimum threshold")
var errMinDiskSize = StorageErr("The disk size is less than the minimum threshold")
// errLessData - returned when less data available than what was requested.
var errLessData = errors.New("less data available than what was requested")
var errLessData = StorageErr("less data available than what was requested")
// errMoreData = returned when more data was sent by the caller than what it was supposed to.
var errMoreData = errors.New("more data was sent than what was advertised")
var errMoreData = StorageErr("more data was sent than what was advertised")
// VerifyFileError represents error generated by VerifyFile posix call.
type VerifyFileError string
// StorageErr represents error generated by posix call.
type StorageErr string
// Error method for the hashMismatchError
func (h VerifyFileError) Error() string {
func (h StorageErr) Error() string {
return string(h)
}

@ -28,7 +28,6 @@ type StorageAPI interface {
// Storage operations.
IsOnline() bool // Returns true if disk is online.
Hostname() string // Returns host name if remote host.
LastError() error
Close() error
SetDiskID(id string)

@ -47,9 +47,9 @@ func isNetworkError(err error) bool {
return false
}
// Converts rpc.ServerError to underlying error. This function is
// written so that the storageAPI errors are consistent across network
// disks as well.
// Converts network error to storageErr. This function is
// written so that the storageAPI errors are consistent
// across network disks.
func toStorageErr(err error) error {
if err == nil {
return nil
@ -111,14 +111,13 @@ type storageRESTClient struct {
endpoint Endpoint
restClient *rest.Client
connected int32
lastError error
diskID string
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected
// permanently. The only way to restore the storage connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *storageRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
func (client *storageRESTClient) call(method string, values url.Values, body io.Reader, length int64) (io.ReadCloser, error) {
if !client.IsOnline() {
return nil, errDiskNotFound
}
@ -126,16 +125,17 @@ func (client *storageRESTClient) call(method string, values url.Values, body io.
values = make(url.Values)
}
values.Set(storageRESTDiskID, client.diskID)
respBody, err = client.restClient.Call(method, values, body, length)
respBody, err := client.restClient.Call(method, values, body, length)
if err == nil {
return respBody, nil
}
client.lastError = err
if isNetworkError(err) || err.Error() == errDiskStale.Error() {
err = toStorageErr(err)
if err == errDiskNotFound {
atomic.StoreInt32(&client.connected, 0)
}
return nil, toStorageErr(err)
return nil, err
}
// Stringer provides a canonicalized representation of network device.
@ -174,11 +174,6 @@ func (client *storageRESTClient) CrawlAndGetDataUsage(endCh <-chan struct{}) (Da
return usageInfo, err
}
// LastError - returns the network error if any.
func (client *storageRESTClient) LastError() error {
return client.lastError
}
func (client *storageRESTClient) SetDiskID(id string) {
client.diskID = id
}

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v12"
storageRESTVersion = "v13" // Introduced StorageErr error type.
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)

@ -480,13 +480,6 @@ type DeleteFileBulkErrsResp struct {
Errs []error
}
// DeleteFileError - error captured per delete operation
type DeleteFileError string
func (d DeleteFileError) Error() string {
return string(d)
}
// DeleteFileBulkHandler - delete a file.
func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -505,7 +498,7 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http
derrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))}
for idx, err := range errs {
if err != nil {
derrsResp.Errs[idx] = DeleteFileError(err.Error())
derrsResp.Errs[idx] = StorageErr(err.Error())
}
}
@ -594,7 +587,7 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
<-doneCh
vresp := &VerifyFileResp{}
if err != nil {
vresp.Err = VerifyFileError(err.Error())
vresp.Err = StorageErr(err.Error())
}
encoder.Encode(vresp)
w.(http.Flusher).Flush()

Loading…
Cancel
Save