fix: ignore faulty drives and continue (#10511)

drives might return different types of errors
handle them individually, and for some errors
just log an error and continue
master
Harshavardhana 4 years ago committed by GitHub
parent 1cf322b7d4
commit 7f9498f43f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      cmd/bitrot-streaming.go
  2. 4
      cmd/bitrot-whole.go
  3. 21
      cmd/storage-rest-server.go
  4. 22
      cmd/xl-storage.go
  5. 12
      cmd/xl-storage_test.go

@ -140,8 +140,8 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
b.h.Write(buf) b.h.Write(buf)
if !bytes.Equal(b.h.Sum(nil), b.hashBytes) { if !bytes.Equal(b.h.Sum(nil), b.hashBytes) {
err := &errHashMismatch{fmt.Sprintf("Disk: %s - content hash does not match - expected %s, got %s", err := &errHashMismatch{fmt.Sprintf("Disk: %s -> %s/%s - content hash does not match - expected %s, got %s",
b.disk, hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))} b.disk, b.volume, b.filePath, hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))}
logger.LogIf(GlobalContext, err) logger.LogIf(GlobalContext, err)
return 0, err return 0, err
} }

@ -71,12 +71,12 @@ func (b *wholeBitrotReader) ReadAt(buf []byte, offset int64) (n int, err error)
if b.buf == nil { if b.buf == nil {
b.buf = make([]byte, b.tillOffset-offset) b.buf = make([]byte, b.tillOffset-offset)
if _, err := b.disk.ReadFile(context.TODO(), b.volume, b.filePath, offset, b.buf, b.verifier); err != nil { if _, err := b.disk.ReadFile(context.TODO(), b.volume, b.filePath, offset, b.buf, b.verifier); err != nil {
logger.LogIf(GlobalContext, fmt.Errorf("Disk: %s returned %w", b.disk, err)) logger.LogIf(GlobalContext, fmt.Errorf("Disk: %s -> %s/%s returned %w", b.disk, b.volume, b.filePath, err))
return 0, err return 0, err
} }
} }
if len(b.buf) < len(buf) { if len(b.buf) < len(buf) {
logger.LogIf(GlobalContext, errLessData) logger.LogIf(GlobalContext, fmt.Errorf("Disk: %s -> %s/%s returned %w", b.disk, b.volume, b.filePath, errLessData))
return 0, errLessData return 0, errLessData
} }
n = copy(buf, b.buf) n = copy(buf, b.buf)

@ -830,12 +830,16 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
} }
storage, err := newXLStorage(endpoint) storage, err := newXLStorage(endpoint)
if err != nil { if err != nil {
if err == errMinDiskSize { switch err {
case errMinDiskSize:
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(err.Error()), "Unable to initialize backend") logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(err.Error()), "Unable to initialize backend")
} else if err == errUnsupportedDisk { case errUnsupportedDisk:
hint := fmt.Sprintf("'%s' does not support O_DIRECT flags, refusing to use", endpoint.Path) hint := fmt.Sprintf("'%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend") logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend")
} case errDiskNotDir:
hint := fmt.Sprintf("'%s' MinIO erasure coding needs a directory", endpoint.Path)
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
case errFileAccessDenied:
// Show a descriptive error with a hint about how to fix it. // Show a descriptive error with a hint about how to fix it.
var username string var username string
if u, err := user.Current(); err == nil { if u, err := user.Current(); err == nil {
@ -843,8 +847,15 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
} else { } else {
username = "<your-username>" username = "<your-username>"
} }
hint := fmt.Sprintf("Run the following command to add the convenient permissions: `sudo chown -R %s %s && sudo chmod u+rxw %s`", username, endpoint.Path, endpoint.Path) hint := fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s %s && sudo chmod u+rxw %s`", username, endpoint.Path, endpoint.Path)
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize posix backend") logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize posix backend")
case errFaultyDisk:
logger.LogIf(GlobalContext, fmt.Errorf("disk is faulty at %s, please replace the drive", endpoint))
case errDiskFull:
logger.LogIf(GlobalContext, fmt.Errorf("disk is already full at %s, incoming I/O will fail", endpoint))
default:
logger.LogIf(GlobalContext, fmt.Errorf("disk returned an unexpected error at %s, please investigate", endpoint))
}
} }
server := &storageRESTServer{storage: storage} server := &storageRESTServer{storage: storage}

@ -176,15 +176,6 @@ func getValidPath(path string, requireDirectIO bool) (string, error) {
return path, errDiskNotDir return path, errDiskNotDir
} }
di, err := getDiskInfo(path)
if err != nil {
return path, err
}
if err = checkDiskMinTotal(di); err != nil {
return path, err
}
// check if backend is writable. // check if backend is writable.
var rnd [8]byte var rnd [8]byte
_, _ = rand.Read(rnd[:]) _, _ = rand.Read(rnd[:])
@ -195,6 +186,7 @@ func getValidPath(path string, requireDirectIO bool) (string, error) {
var file *os.File var file *os.File
if requireDirectIO { if requireDirectIO {
// only erasure coding needs direct-io support
file, err = disk.OpenFileDirectIO(fn, os.O_CREATE|os.O_EXCL, 0666) file, err = disk.OpenFileDirectIO(fn, os.O_CREATE|os.O_EXCL, 0666)
} else { } else {
file, err = os.OpenFile(fn, os.O_CREATE|os.O_EXCL, 0666) file, err = os.OpenFile(fn, os.O_CREATE|os.O_EXCL, 0666)
@ -204,12 +196,22 @@ func getValidPath(path string, requireDirectIO bool) (string, error) {
// if direct i/o failed. // if direct i/o failed.
if err != nil { if err != nil {
if isSysErrInvalidArg(err) { if isSysErrInvalidArg(err) {
// O_DIRECT not supported
return path, errUnsupportedDisk return path, errUnsupportedDisk
} }
return path, err return path, osErrToFileErr(err)
} }
file.Close() file.Close()
di, err := getDiskInfo(path)
if err != nil {
return path, err
}
if err = checkDiskMinTotal(di); err != nil {
return path, err
}
return path, nil return path, nil
} }

@ -453,7 +453,7 @@ func TestXLStorageMakeVol(t *testing.T) {
// Initialize xlStorage storage layer for permission denied error. // Initialize xlStorage storage layer for permission denied error.
_, err = newLocalXLStorage(permDeniedDir) _, err = newLocalXLStorage(permDeniedDir)
if err != nil && !os.IsPermission(err) { if err != nil && err != errFileAccessDenied {
t.Fatalf("Unable to initialize xlStorage, %s", err) t.Fatalf("Unable to initialize xlStorage, %s", err)
} }
@ -552,7 +552,7 @@ func TestXLStorageDeleteVol(t *testing.T) {
// Initialize xlStorage storage layer for permission denied error. // Initialize xlStorage storage layer for permission denied error.
_, err = newLocalXLStorage(permDeniedDir) _, err = newLocalXLStorage(permDeniedDir)
if err != nil && !os.IsPermission(err) { if err != nil && err != errFileAccessDenied {
t.Fatalf("Unable to initialize xlStorage, %s", err) t.Fatalf("Unable to initialize xlStorage, %s", err)
} }
@ -804,7 +804,7 @@ func TestXLStorageXlStorageListDir(t *testing.T) {
// Initialize xlStorage storage layer for permission denied error. // Initialize xlStorage storage layer for permission denied error.
_, err = newLocalXLStorage(permDeniedDir) _, err = newLocalXLStorage(permDeniedDir)
if err != nil && !os.IsPermission(err) { if err != nil && err != errFileAccessDenied {
t.Fatalf("Unable to initialize xlStorage, %s", err) t.Fatalf("Unable to initialize xlStorage, %s", err)
} }
@ -928,7 +928,7 @@ func TestXLStorageDeleteFile(t *testing.T) {
// Initialize xlStorage storage layer for permission denied error. // Initialize xlStorage storage layer for permission denied error.
_, err = newLocalXLStorage(permDeniedDir) _, err = newLocalXLStorage(permDeniedDir)
if err != nil && !os.IsPermission(err) { if err != nil && err != errFileAccessDenied {
t.Fatalf("Unable to initialize xlStorage, %s", err) t.Fatalf("Unable to initialize xlStorage, %s", err)
} }
@ -1126,7 +1126,7 @@ func TestXLStorageReadFile(t *testing.T) {
// Initialize xlStorage storage layer for permission denied error. // Initialize xlStorage storage layer for permission denied error.
_, err = newLocalXLStorage(permDeniedDir) _, err = newLocalXLStorage(permDeniedDir)
if err != nil && !os.IsPermission(err) { if err != nil && err != errFileAccessDenied {
t.Fatalf("Unable to initialize xlStorage, %s", err) t.Fatalf("Unable to initialize xlStorage, %s", err)
} }
@ -1296,7 +1296,7 @@ func TestXLStorageAppendFile(t *testing.T) {
var xlStoragePermStorage StorageAPI var xlStoragePermStorage StorageAPI
// Initialize xlStorage storage layer for permission denied error. // Initialize xlStorage storage layer for permission denied error.
_, err = newLocalXLStorage(permDeniedDir) _, err = newLocalXLStorage(permDeniedDir)
if err != nil && !os.IsPermission(err) { if err != nil && err != errFileAccessDenied {
t.Fatalf("Unable to initialize xlStorage, %s", err) t.Fatalf("Unable to initialize xlStorage, %s", err)
} }

Loading…
Cancel
Save