From a978975eea56bad12bbcea1fecff665f038b0b83 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Sat, 30 Apr 2016 07:13:18 +0530 Subject: [PATCH] xl: add quorum support for DeleteFile() (#1426) Fixes #1396 --- xl-erasure-v1-metadata.go | 1 + xl-erasure-v1.go | 87 ++++++++++++++++++++++++++++++++++----- 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/xl-erasure-v1-metadata.go b/xl-erasure-v1-metadata.go index ef4078cf1..290677f3d 100644 --- a/xl-erasure-v1-metadata.go +++ b/xl-erasure-v1-metadata.go @@ -29,6 +29,7 @@ type xlMetaV1 struct { Size int64 `json:"size"` ModTime time.Time `json:"modTime"` Version int64 `json:"version"` + Deleted bool `json:"deleted"` } `json:"stat"` Erasure struct { DataBlocks int `json:"data"` diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index 82e82f3ee..6b8dc3457 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "io" "os" slashpath "path" "sort" @@ -547,19 +548,85 @@ func (xl XL) DeleteFile(volume, path string) error { if !isValidPath(path) { return errInvalidArgument } + + // Lock right before reading from disk. + nsMutex.RLock(volume, path) + partsMetadata, errs := xl.getPartsMetadata(volume, path) + nsMutex.RUnlock(volume, path) + + // List all the file versions on existing files. + versions, err := listFileVersions(partsMetadata, errs) + // Get highest file version. + higherVersion := highestInt(versions) + // Increment to have next higher version. + higherVersion++ + + // Take last meta data to use later + var mdata xlMetaV1 + onlineDisksCount := 0 + for index, metadata := range partsMetadata { + if errs[index] == nil { + mdata = metadata + onlineDisksCount++ + } + } + + if onlineDisksCount < xl.writeQuorum { + return errWriteQuorum + } + + xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File) + deleteMetaData := (onlineDisksCount == len(xl.storageDisks)) + + // Set higher version to indicate file operation + mdata.Stat.Version = higherVersion + mdata.Stat.Deleted = true + + nsMutex.Lock(volume, path) + defer nsMutex.Unlock(volume, path) + // Loop through and delete each chunks. for index, disk := range xl.storageDisks { - erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index)) - err := disk.DeleteFile(volume, erasureFilePart) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Errorf("DeleteFile failed with %s", err) - return err + if errs[index] != nil { + continue } - xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File) - err = disk.DeleteFile(volume, xlMetaV1FilePath) + + // TODO: errors can be allowed up to len(xl.storageDisks) - xl.writeQuorum + // TODO: Fix: meta data will be lost if deleteMetaData is true and anyone DeleteFile failure. + + if deleteMetaData { + err = disk.DeleteFile(volume, xlMetaV1FilePath) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("DeleteFile failed with %s", err) + return err + } + } else { + var metadataWriter io.WriteCloser + metadataWriter, err = disk.CreateFile(volume, xlMetaV1FilePath) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("CreateFile failed with %s", err) + return err + } + + err = mdata.Write(metadataWriter) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + "diskIndex": index, + }).Errorf("Writing metadata failed with %s", err) + return err + } + } + + erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index)) + err = disk.DeleteFile(volume, erasureFilePart) if err != nil { log.WithFields(logrus.Fields{ "volume": volume,