admin/heal: Fix deep healing to heal objects under more conditions (#8321)

- Heal if the part.1 is truncated from its original size
- Heal if the part.1 fails while being verified in between
- Heal if the part.1 fails while being at a certain offset

Other cleanups include make sure to flush the HTTP responses
properly from storage-rest-server, avoid using 'defer' to
improve call latency. 'defer' incurs latency avoid them
in our hot-paths such as storage-rest handlers.

Fixes #8319
master
Harshavardhana 5 years ago committed by kannappanr
parent 61927d228c
commit ff5bf51952
  1. 4
      cmd/bitrot-streaming.go
  2. 3
      cmd/disk-cache-backend.go
  3. 3
      cmd/iam.go
  4. 45
      cmd/posix.go
  5. 11
      cmd/posix_test.go
  6. 2
      cmd/server-main.go
  7. 18
      cmd/storage-errors.go
  8. 28
      cmd/storage-rest-client.go
  9. 6
      cmd/storage-rest-common.go
  10. 22
      cmd/storage-rest-server.go
  11. 11
      cmd/xl-v1-healing-common.go
  12. 15
      cmd/xl-v1-healing.go
  13. 52
      cmd/xl-v1-healing_test.go
  14. 1
      cmd/xl-v1-utils.go

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"fmt"
"hash" "hash"
"io" "io"
@ -131,7 +132,8 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
b.h.Write(buf) b.h.Write(buf)
if !bytes.Equal(b.h.Sum(nil), b.hashBytes) { if !bytes.Equal(b.h.Sum(nil), b.hashBytes) {
err = HashMismatchError{hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil))} err = fmt.Errorf("hashes do not match expected %s, got %s",
hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
return 0, err return 0, err
} }

@ -584,7 +584,8 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
hashBytes := h.Sum(nil) hashBytes := h.Sum(nil)
if !bytes.Equal(hashBytes, checksumHash) { if !bytes.Equal(hashBytes, checksumHash) {
err = HashMismatchError{hex.EncodeToString(checksumHash), hex.EncodeToString(hashBytes)} err = fmt.Errorf("hashes do not match expected %s, got %s",
hex.EncodeToString(checksumHash), hex.EncodeToString(hashBytes))
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
return err return err
} }

@ -1306,7 +1306,8 @@ func NewIAMSys() *IAMSys {
// The default users system // The default users system
var utype UsersSysType var utype UsersSysType
switch { switch {
case globalServerConfig.LDAPServerConfig.ServerAddr != "": case globalServerConfig != nil &&
globalServerConfig.LDAPServerConfig.ServerAddr != "":
utype = LDAPUsersSysType utype = LDAPUsersSysType
default: default:
utype = MinIOUsersSysType utype = MinIOUsersSysType

@ -18,7 +18,6 @@ package cmd
import ( import (
"context" "context"
"encoding/hex"
"errors" "errors"
"io" "io"
"io/ioutil" "io/ioutil"
@ -977,7 +976,7 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
} }
if !bytes.Equal(h.Sum(nil), verifier.sum) { if !bytes.Equal(h.Sum(nil), verifier.sum) {
return 0, HashMismatchError{hex.EncodeToString(verifier.sum), hex.EncodeToString(h.Sum(nil))} return 0, errFileCorrupt
} }
return int64(len(buffer)), nil return int64(len(buffer)), nil
@ -1418,11 +1417,14 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
if err != nil { if err != nil {
return err return err
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat((volumeDir)) _, err = os.Stat((volumeDir))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
} else if os.IsPermission(err) {
return errVolumeAccessDenied
} else if isSysErrIO(err) { } else if isSysErrIO(err) {
return errFaultyDisk return errFaultyDisk
} }
@ -1564,11 +1566,16 @@ func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgor
if err != nil { if err != nil {
return err return err
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Stat(volumeDir)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
} else if isSysErrIO(err) {
return errFaultyDisk
} else if os.IsPermission(err) {
return errVolumeAccessDenied
} }
return err return err
} }
@ -1582,18 +1589,7 @@ func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgor
// Open the file for reading. // Open the file for reading.
file, err := os.Open(filePath) file, err := os.Open(filePath)
if err != nil { if err != nil {
switch { return osErrToFSFileErr(err)
case os.IsNotExist(err):
return errFileNotFound
case os.IsPermission(err):
return errFileAccessDenied
case isSysErrNotDir(err):
return errFileAccessDenied
case isSysErrIO(err):
return errFaultyDisk
default:
return err
}
} }
// Close the file descriptor. // Close the file descriptor.
@ -1605,10 +1601,11 @@ func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgor
h := algo.New() h := algo.New()
if _, err = io.CopyBuffer(h, file, *bufp); err != nil { if _, err = io.CopyBuffer(h, file, *bufp); err != nil {
return err // Premature failure in reading the object,file is corrupt.
return errFileCorrupt
} }
if !bytes.Equal(h.Sum(nil), sum) { if !bytes.Equal(h.Sum(nil), sum) {
return HashMismatchError{hex.EncodeToString(sum), hex.EncodeToString(h.Sum(nil))} return errFileCorrupt
} }
return nil return nil
} }
@ -1618,23 +1615,28 @@ func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgor
hashBuf := make([]byte, h.Size()) hashBuf := make([]byte, h.Size())
fi, err := file.Stat() fi, err := file.Stat()
if err != nil { if err != nil {
// Unable to stat on the file, return an expected error
// for healing code to fix this file.
return err return err
} }
size := fi.Size()
// Calculate the size of the bitrot file and compare // Calculate the size of the bitrot file and compare
// it with the actual file size. // it with the actual file size.
if fi.Size() != bitrotShardFileSize(fileSize, shardSize, algo) { if size != bitrotShardFileSize(fileSize, shardSize, algo) {
return errFileUnexpectedSize return errFileCorrupt
} }
size := fi.Size() var n int
for { for {
if size == 0 { if size == 0 {
return nil return nil
} }
h.Reset() h.Reset()
n, err := file.Read(hashBuf) n, err = file.Read(hashBuf)
if err != nil { if err != nil {
// Read's failed for object with right size, file is corrupt.
return err return err
} }
size -= int64(n) size -= int64(n)
@ -1643,12 +1645,13 @@ func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgor
} }
n, err = file.Read(buf) n, err = file.Read(buf)
if err != nil { if err != nil {
// Read's failed for object with right size, at different offsets.
return err return err
} }
size -= int64(n) size -= int64(n)
h.Write(buf) h.Write(buf)
if !bytes.Equal(h.Sum(nil), hashBuf) { if !bytes.Equal(h.Sum(nil), hashBuf) {
return HashMismatchError{hex.EncodeToString(hashBuf), hex.EncodeToString(h.Sum(nil))} return errFileCorrupt
} }
} }
} }

@ -19,7 +19,6 @@ package cmd
import ( import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"encoding/hex"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -1251,13 +1250,13 @@ var posixReadFileWithVerifyTests = []struct {
{file: "myobject", offset: 25, length: 74, algorithm: SHA256, expError: nil}, // 1 {file: "myobject", offset: 25, length: 74, algorithm: SHA256, expError: nil}, // 1
{file: "myobject", offset: 29, length: 70, algorithm: SHA256, expError: nil}, // 2 {file: "myobject", offset: 29, length: 70, algorithm: SHA256, expError: nil}, // 2
{file: "myobject", offset: 100, length: 0, algorithm: SHA256, expError: nil}, // 3 {file: "myobject", offset: 100, length: 0, algorithm: SHA256, expError: nil}, // 3
{file: "myobject", offset: 1, length: 120, algorithm: SHA256, expError: HashMismatchError{}}, // 4 {file: "myobject", offset: 1, length: 120, algorithm: SHA256, expError: errFileCorrupt}, // 4
{file: "myobject", offset: 3, length: 1100, algorithm: SHA256, expError: nil}, // 5 {file: "myobject", offset: 3, length: 1100, algorithm: SHA256, expError: nil}, // 5
{file: "myobject", offset: 2, length: 100, algorithm: SHA256, expError: HashMismatchError{}}, // 6 {file: "myobject", offset: 2, length: 100, algorithm: SHA256, expError: errFileCorrupt}, // 6
{file: "myobject", offset: 1000, length: 1001, algorithm: SHA256, expError: nil}, // 7 {file: "myobject", offset: 1000, length: 1001, algorithm: SHA256, expError: nil}, // 7
{file: "myobject", offset: 0, length: 100, algorithm: BLAKE2b512, expError: HashMismatchError{}}, // 8 {file: "myobject", offset: 0, length: 100, algorithm: BLAKE2b512, expError: errFileCorrupt}, // 8
{file: "myobject", offset: 25, length: 74, algorithm: BLAKE2b512, expError: nil}, // 9 {file: "myobject", offset: 25, length: 74, algorithm: BLAKE2b512, expError: nil}, // 9
{file: "myobject", offset: 29, length: 70, algorithm: BLAKE2b512, expError: HashMismatchError{}}, // 10 {file: "myobject", offset: 29, length: 70, algorithm: BLAKE2b512, expError: errFileCorrupt}, // 10
{file: "myobject", offset: 100, length: 0, algorithm: BLAKE2b512, expError: nil}, // 11 {file: "myobject", offset: 100, length: 0, algorithm: BLAKE2b512, expError: nil}, // 11
{file: "myobject", offset: 1, length: 120, algorithm: BLAKE2b512, expError: nil}, // 12 {file: "myobject", offset: 1, length: 120, algorithm: BLAKE2b512, expError: nil}, // 12
{file: "myobject", offset: 3, length: 1100, algorithm: BLAKE2b512, expError: nil}, // 13 {file: "myobject", offset: 3, length: 1100, algorithm: BLAKE2b512, expError: nil}, // 13
@ -1294,9 +1293,7 @@ func TestPosixReadFileWithVerify(t *testing.T) {
h := test.algorithm.New() h := test.algorithm.New()
h.Write(data) h.Write(data)
if test.expError != nil { if test.expError != nil {
expected := h.Sum(nil)
h.Write([]byte{0}) h.Write([]byte{0})
test.expError = HashMismatchError{hex.EncodeToString(h.Sum(nil)), hex.EncodeToString(expected)}
} }
buffer := make([]byte, test.length) buffer := make([]byte, test.length)

@ -36,7 +36,7 @@ import (
func init() { func init() {
logger.Init(GOPATH, GOROOT) logger.Init(GOPATH, GOROOT)
logger.RegisterUIError(fmtError) logger.RegisterUIError(fmtError)
gob.Register(HashMismatchError{}) gob.Register(VerifyFileError(""))
gob.Register(DeleteFileError("")) gob.Register(DeleteFileError(""))
} }

@ -18,7 +18,6 @@ package cmd
import ( import (
"errors" "errors"
"fmt"
) )
// errUnexpected - unexpected error, requires manual intervention. // errUnexpected - unexpected error, requires manual intervention.
@ -72,8 +71,8 @@ var errVolumeAccessDenied = errors.New("volume access denied")
// errFileAccessDenied - cannot access file, insufficient permissions. // errFileAccessDenied - cannot access file, insufficient permissions.
var errFileAccessDenied = errors.New("file access denied") var errFileAccessDenied = errors.New("file access denied")
// errFileUnexpectedSize - file has an unexpected size // errFileCorrupt - file has an unexpected size, or is not readable
var errFileUnexpectedSize = errors.New("file has unexpected size") var errFileCorrupt = errors.New("file is corrupted")
// errFileParentIsFile - cannot have overlapping objects, parent is already a file. // errFileParentIsFile - cannot have overlapping objects, parent is already a file.
var errFileParentIsFile = errors.New("parent is a file") var errFileParentIsFile = errors.New("parent is a file")
@ -94,17 +93,12 @@ var errLessData = errors.New("less data available than what was requested")
// errMoreData = returned when more data was sent by the caller than what it was supposed to. // errMoreData = returned when more data was sent by the caller than what it was supposed to.
var errMoreData = errors.New("more data was sent than what was advertised") var errMoreData = errors.New("more data was sent than what was advertised")
// HashMismatchError represents a bit-rot hash verification failure error. // VerifyFileError represents error generated by VerifyFile posix call.
type HashMismatchError struct { type VerifyFileError string
Expected string
Computed string
}
// Error method for the hashMismatchError // Error method for the hashMismatchError
func (h HashMismatchError) Error() string { func (h VerifyFileError) Error() string {
return fmt.Sprintf( return string(h)
"Bitrot verification mismatch - expected %v, received %v",
h.Expected, h.Computed)
} }
// Collection of basic errors. // Collection of basic errors.

@ -22,13 +22,11 @@ import (
"crypto/tls" "crypto/tls"
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"path" "path"
"strconv" "strconv"
"strings"
"github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/rest" "github.com/minio/minio/cmd/rest"
@ -61,12 +59,10 @@ func toStorageErr(err error) error {
} }
switch err.Error() { switch err.Error() {
case io.EOF.Error(): case errFaultyDisk.Error():
return io.EOF return errFaultyDisk
case io.ErrUnexpectedEOF.Error(): case errFileCorrupt.Error():
return io.ErrUnexpectedEOF return errFileCorrupt
case errFileUnexpectedSize.Error():
return errFileUnexpectedSize
case errUnexpected.Error(): case errUnexpected.Error():
return errUnexpected return errUnexpected
case errDiskFull.Error(): case errDiskFull.Error():
@ -99,15 +95,10 @@ func toStorageErr(err error) error {
return errRPCAPIVersionUnsupported return errRPCAPIVersionUnsupported
case errServerTimeMismatch.Error(): case errServerTimeMismatch.Error():
return errServerTimeMismatch return errServerTimeMismatch
} case io.EOF.Error():
if strings.Contains(err.Error(), "Bitrot verification mismatch") { return io.EOF
var expected string case io.ErrUnexpectedEOF.Error():
var received string return io.ErrUnexpectedEOF
fmt.Sscanf(err.Error(), "Bitrot verification mismatch - expected %s received %s", &expected, &received)
// Go's Sscanf %s scans "," that comes after the expected hash, hence remove it. Providing "," in the format string does not help.
expected = strings.TrimSuffix(expected, ",")
bitrotErr := HashMismatchError{expected, received}
return bitrotErr
} }
return err return err
} }
@ -461,8 +452,7 @@ func (client *storageRESTClient) VerifyFile(volume, path string, size int64, alg
} }
} }
verifyResp := &VerifyFileResp{} verifyResp := &VerifyFileResp{}
err = gob.NewDecoder(reader).Decode(verifyResp) if err = gob.NewDecoder(reader).Decode(verifyResp); err != nil {
if err != nil {
return err return err
} }
return toStorageErr(verifyResp.Err) return toStorageErr(verifyResp.Err)

@ -16,8 +16,10 @@
package cmd package cmd
const storageRESTVersion = "v9" const (
const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + SlashSeparator storageRESTVersion = "v9"
storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + SlashSeparator
)
const ( const (
storageRESTMethodDiskInfo = "diskinfo" storageRESTMethodDiskInfo = "diskinfo"

@ -141,8 +141,8 @@ func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Reque
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(&infos) gob.NewEncoder(w).Encode(&infos)
w.(http.Flusher).Flush()
} }
// StatVolHandler - stat a volume. // StatVolHandler - stat a volume.
@ -157,8 +157,8 @@ func (s *storageRESTServer) StatVolHandler(w http.ResponseWriter, r *http.Reques
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(info) gob.NewEncoder(w).Encode(info)
w.(http.Flusher).Flush()
} }
// DeleteVolumeHandler - delete a volume. // DeleteVolumeHandler - delete a volume.
@ -250,8 +250,8 @@ func (s *storageRESTServer) StatFileHandler(w http.ResponseWriter, r *http.Reque
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(info) gob.NewEncoder(w).Encode(info)
w.(http.Flusher).Flush()
} }
// ReadAllHandler - read all the contents of a file. // ReadAllHandler - read all the contents of a file.
@ -393,12 +393,13 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request)
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
defer w.(http.Flusher).Flush()
w.Header().Set(xhttp.ContentType, "text/event-stream")
encoder := gob.NewEncoder(w) encoder := gob.NewEncoder(w)
for fi := range fch { for fi := range fch {
encoder.Encode(&fi) encoder.Encode(&fi)
} }
w.(http.Flusher).Flush()
} }
// ListDirHandler - list a directory. // ListDirHandler - list a directory.
@ -420,8 +421,8 @@ func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Reques
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(&entries) gob.NewEncoder(w).Encode(&entries)
w.(http.Flusher).Flush()
} }
// DeleteFileHandler - delete a file. // DeleteFileHandler - delete a file.
@ -551,12 +552,17 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
s.writeErrorResponse(w, errInvalidArgument) s.writeErrorResponse(w, errInvalidArgument)
return return
} }
algo := BitrotAlgorithmFromString(algoStr)
w.Header().Set(xhttp.ContentType, "text/event-stream") w.Header().Set(xhttp.ContentType, "text/event-stream")
encoder := gob.NewEncoder(w)
doneCh := sendWhiteSpaceVerifyFile(w) doneCh := sendWhiteSpaceVerifyFile(w)
err = s.storage.VerifyFile(volume, filePath, size, algo, hash, int64(shardSize)) err = s.storage.VerifyFile(volume, filePath, size, BitrotAlgorithmFromString(algoStr), hash, int64(shardSize))
<-doneCh <-doneCh
gob.NewEncoder(w).Encode(VerifyFileResp{err}) vresp := &VerifyFileResp{}
if err != nil {
vresp.Err = VerifyFileError(err.Error())
}
encoder.Encode(vresp)
w.(http.Flusher).Flush()
} }
// registerStorageRPCRouter - register storage rpc router. // registerStorageRPCRouter - register storage rpc router.

@ -1,5 +1,5 @@
/* /*
* MinIO Cloud Storage, (C) 2016, 2017 MinIO, Inc. * MinIO Cloud Storage, (C) 2016-2019 MinIO, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,7 +18,6 @@ package cmd
import ( import (
"context" "context"
"strings"
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -185,8 +184,12 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
checksumInfo := erasureInfo.GetChecksumInfo(part.Name) checksumInfo := erasureInfo.GetChecksumInfo(part.Name)
err = onlineDisk.VerifyFile(bucket, pathJoin(object, part.Name), erasure.ShardFileSize(part.Size), checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) err = onlineDisk.VerifyFile(bucket, pathJoin(object, part.Name), erasure.ShardFileSize(part.Size), checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
if err != nil { if err != nil {
isCorrupt := strings.HasPrefix(err.Error(), "Bitrot verification mismatch - expected ") if !IsErr(err, []error{
if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound && err != errFileUnexpectedSize { errFileNotFound,
errVolumeNotFound,
errFileCorrupt,
}...) {
logger.GetReqInfo(ctx).AppendTags("disk", onlineDisk.String())
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
dataErrs[i] = err dataErrs[i] = err

@ -191,17 +191,14 @@ func shouldHealObjectOnDisk(xlErr, dataErr error, meta xlMetaV1, quorumModTime t
return true return true
} }
if xlErr == nil { if xlErr == nil {
// If xl.json was read fine but there is some problem with the part.N files. // If xl.json was read fine but there may be problem with the part.N files.
if dataErr == errFileNotFound { if IsErr(dataErr, []error{
errFileNotFound,
errFileCorrupt,
}...) {
return true return true
} }
if dataErr == errFileUnexpectedSize { if !quorumModTime.Equal(meta.Stat.ModTime) {
return true
}
if _, ok := dataErr.(HashMismatchError); ok {
return true
}
if quorumModTime != meta.Stat.ModTime {
return true return true
} }
} }

@ -68,7 +68,7 @@ func TestHealObjectCorrupted(t *testing.T) {
defer removeRoots(fsDirs) defer removeRoots(fsDirs)
// Everything is fine, should return nil // Everything is fine, should return nil
obj, _, err := initObjectLayer(mustGetNewEndpointList(fsDirs...)) objLayer, _, err := initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -78,21 +78,21 @@ func TestHealObjectCorrupted(t *testing.T) {
data := bytes.Repeat([]byte("a"), 5*1024*1024) data := bytes.Repeat([]byte("a"), 5*1024*1024)
var opts ObjectOptions var opts ObjectOptions
err = obj.MakeBucketWithLocation(context.Background(), bucket, "") err = objLayer.MakeBucketWithLocation(context.Background(), bucket, "")
if err != nil { if err != nil {
t.Fatalf("Failed to make a bucket - %v", err) t.Fatalf("Failed to make a bucket - %v", err)
} }
// Create an object with multiple parts uploaded in decreasing // Create an object with multiple parts uploaded in decreasing
// part number. // part number.
uploadID, err := obj.NewMultipartUpload(context.Background(), bucket, object, opts) uploadID, err := objLayer.NewMultipartUpload(context.Background(), bucket, object, opts)
if err != nil { if err != nil {
t.Fatalf("Failed to create a multipart upload - %v", err) t.Fatalf("Failed to create a multipart upload - %v", err)
} }
var uploadedParts []CompletePart var uploadedParts []CompletePart
for _, partID := range []int{2, 1} { for _, partID := range []int{2, 1} {
pInfo, err1 := obj.PutObjectPart(context.Background(), bucket, object, uploadID, partID, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts) pInfo, err1 := objLayer.PutObjectPart(context.Background(), bucket, object, uploadID, partID, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
if err1 != nil { if err1 != nil {
t.Fatalf("Failed to upload a part - %v", err1) t.Fatalf("Failed to upload a part - %v", err1)
} }
@ -102,20 +102,20 @@ func TestHealObjectCorrupted(t *testing.T) {
}) })
} }
_, err = obj.CompleteMultipartUpload(context.Background(), bucket, object, uploadID, uploadedParts, ObjectOptions{}) _, err = objLayer.CompleteMultipartUpload(context.Background(), bucket, object, uploadID, uploadedParts, ObjectOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to complete multipart upload - %v", err) t.Fatalf("Failed to complete multipart upload - %v", err)
} }
// Test 1: Remove the object backend files from the first disk. // Test 1: Remove the object backend files from the first disk.
xl := obj.(*xlObjects) xl := objLayer.(*xlObjects)
firstDisk := xl.storageDisks[0] firstDisk := xl.storageDisks[0]
err = firstDisk.DeleteFile(bucket, filepath.Join(object, xlMetaJSONFile)) err = firstDisk.DeleteFile(bucket, filepath.Join(object, xlMetaJSONFile))
if err != nil { if err != nil {
t.Fatalf("Failed to delete a file - %v", err) t.Fatalf("Failed to delete a file - %v", err)
} }
_, err = obj.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan) _, err = objLayer.HealObject(context.Background(), bucket, object, false, false, madmin.HealNormalScan)
if err != nil { if err != nil {
t.Fatalf("Failed to heal object - %v", err) t.Fatalf("Failed to heal object - %v", err)
} }
@ -132,13 +132,13 @@ func TestHealObjectCorrupted(t *testing.T) {
} }
err = firstDisk.DeleteFile(bucket, filepath.Join(object, "part.1")) err = firstDisk.DeleteFile(bucket, filepath.Join(object, "part.1"))
if err != nil { if err != nil {
t.Errorf("Failure during part.1 removal - %v", err) t.Errorf("Failure during deleting part.1 - %v", err)
} }
err = firstDisk.AppendFile(bucket, filepath.Join(object, "part.1"), []byte{}) err = firstDisk.WriteAll(bucket, filepath.Join(object, "part.1"), bytes.NewReader([]byte{}))
if err != nil { if err != nil {
t.Errorf("Failure during creating part.1 - %v", err) t.Errorf("Failure during creating part.1 - %v", err)
} }
_, err = obj.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan) _, err = objLayer.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
if err != nil { if err != nil {
t.Errorf("Expected nil but received %v", err) t.Errorf("Expected nil but received %v", err)
} }
@ -150,7 +150,33 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Errorf("part.1 file size is not the same before and after heal") t.Errorf("part.1 file size is not the same before and after heal")
} }
// Test 3: checks if HealObject returns an error when xl.json is not found // Test 3: Heal when part.1 is correct in size but corrupted
partSt1, err = firstDisk.StatFile(bucket, filepath.Join(object, "part.1"))
if err != nil {
t.Errorf("Expected part.1 file to be present but stat failed - %v", err)
}
err = firstDisk.DeleteFile(bucket, filepath.Join(object, "part.1"))
if err != nil {
t.Errorf("Failure during deleting part.1 - %v", err)
}
bdata := bytes.Repeat([]byte("b"), int(partSt1.Size))
err = firstDisk.WriteAll(bucket, filepath.Join(object, "part.1"), bytes.NewReader(bdata))
if err != nil {
t.Errorf("Failure during creating part.1 - %v", err)
}
_, err = objLayer.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
if err != nil {
t.Errorf("Expected nil but received %v", err)
}
partSt2, err = firstDisk.StatFile(bucket, filepath.Join(object, "part.1"))
if err != nil {
t.Errorf("Expected from part.1 file to be present but stat failed - %v", err)
}
if partSt1.Size != partSt2.Size {
t.Errorf("part.1 file size is not the same before and after heal")
}
// Test 4: checks if HealObject returns an error when xl.json is not found
// in more than read quorum number of disks, to create a corrupted situation. // in more than read quorum number of disks, to create a corrupted situation.
for i := 0; i <= len(xl.storageDisks)/2; i++ { for i := 0; i <= len(xl.storageDisks)/2; i++ {
@ -158,13 +184,13 @@ func TestHealObjectCorrupted(t *testing.T) {
} }
// Try healing now, expect to receive errDiskNotFound. // Try healing now, expect to receive errDiskNotFound.
_, err = obj.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan) _, err = objLayer.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
if err != nil { if err != nil {
t.Errorf("Expected nil but received %v", err) t.Errorf("Expected nil but received %v", err)
} }
// since majority of xl.jsons are not available, object should be successfully deleted. // since majority of xl.jsons are not available, object should be successfully deleted.
_, err = obj.GetObjectInfo(context.Background(), bucket, object, ObjectOptions{}) _, err = objLayer.GetObjectInfo(context.Background(), bucket, object, ObjectOptions{})
if _, ok := err.(ObjectNotFound); !ok { if _, ok := err.(ObjectNotFound); !ok {
t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err) t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err)
} }

@ -174,7 +174,6 @@ func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object stri
if len(xlMetaBuf) == 0 { if len(xlMetaBuf) == 0 {
return xlMetaV1{}, errFileNotFound return xlMetaV1{}, errFileNotFound
} }
logger.GetReqInfo(ctx).AppendTags("disk", disk.String())
return xlMetaV1UnmarshalJSON(ctx, xlMetaBuf) return xlMetaV1UnmarshalJSON(ctx, xlMetaBuf)
} }

Loading…
Cancel
Save