xl: fix DeleteFile() removing meta data files without updating it (#1433)

Fixes #1428 #1427
master
Bala FA 9 years ago committed by Harshavardhana
parent 27c50a70cc
commit 84afec9ae0
  1. 89
      xl-erasure-v1.go

@ -558,25 +558,29 @@ func (xl XL) DeleteFile(volume, path string) error {
versions, err := listFileVersions(partsMetadata, errs)
// Get highest file version.
higherVersion := highestInt(versions)
// Increment to have next higher version.
higherVersion++
// Take last meta data to use later
var mdata xlMetaV1
onlineDisksCount := 0
// find online disks and meta data of higherVersion
var mdata *xlMetaV1
onlineDiskCount := 0
for index, metadata := range partsMetadata {
if errs[index] == nil {
mdata = metadata
onlineDisksCount++
onlineDiskCount++
if metadata.Stat.Version == higherVersion && mdata == nil {
mdata = &metadata
}
}
}
if onlineDisksCount < xl.writeQuorum {
// return error if mdata is empty or onlineDiskCount doesn't meet write quorum
if mdata == nil || onlineDiskCount < xl.writeQuorum {
return errWriteQuorum
}
// Increment to have next higher version.
higherVersion++
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
deleteMetaData := (onlineDisksCount == len(xl.storageDisks))
deleteMetaData := (onlineDiskCount == len(xl.storageDisks))
// Set higher version to indicate file operation
mdata.Stat.Version = higherVersion
@ -585,25 +589,16 @@ func (xl XL) DeleteFile(volume, path string) error {
nsMutex.Lock(volume, path)
defer nsMutex.Unlock(volume, path)
// Loop through and delete each chunks.
errCount := 0
// Update meta data file and remove part file
for index, disk := range xl.storageDisks {
// no need to operate on failed disks
if errs[index] != nil {
errCount++
continue
}
// TODO: errors can be allowed up to len(xl.storageDisks) - xl.writeQuorum
// TODO: Fix: meta data will be lost if deleteMetaData is true and anyone DeleteFile failure.
if deleteMetaData {
err = disk.DeleteFile(volume, xlMetaV1FilePath)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("DeleteFile failed with %s", err)
return err
}
} else {
// update meta data about delete operation
var metadataWriter io.WriteCloser
metadataWriter, err = disk.CreateFile(volume, xlMetaV1FilePath)
if err != nil {
@ -611,6 +606,15 @@ func (xl XL) DeleteFile(volume, path string) error {
"volume": volume,
"path": path,
}).Errorf("CreateFile failed with %s", err)
errCount++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
return err
}
@ -621,8 +625,16 @@ func (xl XL) DeleteFile(volume, path string) error {
"path": path,
"diskIndex": index,
}).Errorf("Writing metadata failed with %s", err)
return err
errCount++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
return err
}
erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
@ -632,9 +644,38 @@ func (xl XL) DeleteFile(volume, path string) error {
"volume": volume,
"path": path,
}).Errorf("DeleteFile failed with %s", err)
errCount++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
return err
}
}
// Remove meta data file only if deleteMetaData is true.
if deleteMetaData {
for index, disk := range xl.storageDisks {
// no need to operate on failed disks
if errs[index] != nil {
continue
}
err = disk.DeleteFile(volume, xlMetaV1FilePath)
if err != nil {
// No need to return the error as we updated the meta data file previously
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("DeleteFile failed with %s", err)
}
}
}
return nil
}

Loading…
Cancel
Save