From c3028757746b52003bfc6fc8803545b22e37051a Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 20 Apr 2016 02:53:20 +0530 Subject: [PATCH] selfheal: implement self-heal. Heals the missing parts. (#1335) --- xl-v1.go | 233 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) diff --git a/xl-v1.go b/xl-v1.go index fdd248ccf..2c6ac91eb 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" slashpath "path" "sort" @@ -39,6 +40,13 @@ const ( maxErasureBlocks = 16 ) +// Self Heal data +type selfHeal struct { + volume string + fsPath string + errCh chan error +} + // XL layer structure. type XL struct { ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder. @@ -49,6 +57,7 @@ type XL struct { nameSpaceLockMapMutex *sync.Mutex readQuorum int writeQuorum int + selfHealCh chan selfHeal } // lockNS - locks the given resource, using a previously allocated @@ -154,6 +163,9 @@ func newXL(disks ...string) (StorageAPI, error) { xl.writeQuorum = len(xl.storageDisks) } + // Start self heal go routine. + xl.selfHealRoutine() + // Return successfully initialized. return xl, nil } @@ -465,3 +477,224 @@ func (xl XL) DeleteFile(volume, path string) error { } return nil } + +// selfHeal - called by the healing go-routine, heals using erasure coding. +func (xl XL) selfHeal(volume string, fsPath string) error { + totalShards := xl.DataBlocks + xl.ParityBlocks + needsSelfHeal := make([]bool, totalShards) + var metadata = make(map[string]string) + var readers = make([]io.Reader, totalShards) + var writers = make([]io.WriteCloser, totalShards) + for index, disk := range xl.storageDisks { + metadataFile := slashpath.Join(fsPath, metadataFile) + + // Start from the beginning, we are not reading partial metadata files. + offset := int64(0) + + metadataReader, err := disk.ReadFile(volume, metadataFile, offset) + if err != nil { + if err != errFileNotFound { + continue + } + // Needs healing if part.json is not found + needsSelfHeal[index] = true + continue + } + defer metadataReader.Close() + + decoder := json.NewDecoder(metadataReader) + if err = decoder.Decode(&metadata); err != nil { + // needs healing if parts.json is not parsable + needsSelfHeal[index] = true + } + + erasurePart := slashpath.Join(fsPath, fmt.Sprintf("part.%d", index)) + erasuredPartReader, err := disk.ReadFile(volume, erasurePart, offset) + if err != nil { + if err != errFileNotFound { + continue + } + // needs healing if part file not found + needsSelfHeal[index] = true + } else { + readers[index] = erasuredPartReader + defer erasuredPartReader.Close() + } + } + // Check if there is atleat one part that needs to be healed. + atleastOneSelfHeal := false + for _, shNeeded := range needsSelfHeal { + if shNeeded { + atleastOneSelfHeal = true + break + } + } + if !atleastOneSelfHeal { + // return if healing not needed anywhere. + return nil + } + + // create writers for parts where healing is needed. + for i, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + var err error + erasurePart := slashpath.Join(fsPath, fmt.Sprintf("part.%d", i)) + writers[i], err = xl.storageDisks[i].CreateFile(volume, erasurePart) + if err != nil { + // Unexpected error + closeAndRemoveWriters(writers...) + return err + } + } + size, err := strconv.ParseInt(metadata["file.size"], 10, 64) + if err != nil { + closeAndRemoveWriters(writers...) + return err + } + var totalLeft = size + for totalLeft > 0 { + // Figure out the right blockSize. + var curBlockSize int + if erasureBlockSize < totalLeft { + curBlockSize = erasureBlockSize + } else { + curBlockSize = int(totalLeft) + } + // Calculate the current shard size. + curShardSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks) + enShards := make([][]byte, totalShards) + // Loop through all readers and read. + for i, reader := range readers { + // Initialize shard slice and fill the data from each parts. + // ReedSolomon.Verify() expects that slice is not nil even if the particular + // part needs healing. + enShards[i] = make([]byte, curShardSize) + if needsSelfHeal[i] { + // Skip reading if the part needs healing. + continue + } + _, e := io.ReadFull(reader, enShards[i]) + if e != nil && e != io.ErrUnexpectedEOF { + enShards[i] = nil + } + } + + // Check blocks if they are all zero in length. + if checkBlockSize(enShards) == 0 { + err = errors.New("Data likely corrupted, all blocks are zero in length.") + return err + } + + // Verify the shards. + ok, e := xl.ReedSolomon.Verify(enShards) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + // Verification failed, shards require reconstruction. + if !ok { + for i, shNeeded := range needsSelfHeal { + if shNeeded { + // Reconstructs() reconstructs the parts if the array is nil. + enShards[i] = nil + } + } + e = xl.ReedSolomon.Reconstruct(enShards) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + // Verify reconstructed shards again. + ok, e = xl.ReedSolomon.Verify(enShards) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + if !ok { + // Shards cannot be reconstructed, corrupted data. + e = errors.New("Verification failed after reconstruction, data likely corrupted.") + closeAndRemoveWriters(writers...) + return e + } + } + for i, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + _, e := writers[i].Write(enShards[i]) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + } + totalLeft = totalLeft - erasureBlockSize + } + // After successful healing Close() the writer so that the temp files are renamed. + for i, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + writers[i].Close() + } + + // Write part.json where ever healing was done. + var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks)) + for i, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + metadataFile := slashpath.Join(fsPath, metadataFile) + metadataWriters[i], err = xl.storageDisks[i].CreateFile(volume, metadataFile) + if err != nil { + closeAndRemoveWriters(writers...) + return err + } + } + metadataBytes, err := json.Marshal(metadata) + if err != nil { + closeAndRemoveWriters(metadataWriters...) + return err + } + for i, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + _, err := metadataWriters[i].Write(metadataBytes) + if err != nil { + closeAndRemoveWriters(metadataWriters...) + return err + } + } + + // part.json written for all the healed parts hence Close() so that temp files can be renamed. + for index := range xl.storageDisks { + if !needsSelfHeal[index] { + continue + } + metadataWriters[index].Close() + } + return nil +} + +// selfHealRoutine - starts a go routine and listens on a channel for healing requests. +func (xl *XL) selfHealRoutine() { + xl.selfHealCh = make(chan selfHeal) + + // Healing request can be made like this: + // errCh := make(chan error) + // xl.selfHealCh <- selfHeal{"testbucket", "testobject", errCh} + // fmt.Println(<-errCh) + + go func() { + for sh := range xl.selfHealCh { + if sh.volume == "" || sh.fsPath == "" { + sh.errCh <- errors.New("volume or path can not be empty") + continue + } + xl.selfHeal(sh.volume, sh.fsPath) + sh.errCh <- nil + } + }() +}