From 86e5d715193f02ba10cb7c2cd447f83c2a11eb3c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 11 May 2016 12:54:21 -0700 Subject: [PATCH] erasure: MakeVol, DeleteVol and StatVol should hold locks. (#1597) Since there is a good amount of overlap, each code has to lock properly for the operation they are going to perform. - MakeVol create vols in a routine on all disks, hold locks. - DeleteVol delete vols in a routine on all disks, hold locks. - StatVol stat vols in a routine on all disks, hold locks. Fixes #1588 --- posix-list-dir-nix.go | 21 ++++++---- posix.go | 11 +++-- xl-erasure-v1.go | 97 ++++++++++++++++++++++++++++++++++++------- 3 files changed, 104 insertions(+), 25 deletions(-) diff --git a/posix-list-dir-nix.go b/posix-list-dir-nix.go index 37321ceb0..baec7db34 100644 --- a/posix-list-dir-nix.go +++ b/posix-list-dir-nix.go @@ -83,15 +83,20 @@ func parseDirents(dirPath string, buf []byte) (entries []string, err error) { // On Linux XFS does not implement d_type for on disk // format << v5. Fall back to Stat(). var fi os.FileInfo - if fi, err = os.Stat(path.Join(dirPath, name)); err == nil { - if fi.IsDir() { - entries = append(entries, fi.Name()+slashSeparator) - } else if fi.Mode().IsRegular() { - entries = append(entries, fi.Name()) + fi, err = os.Stat(path.Join(dirPath, name)) + if err != nil { + // If file does not exist, we continue and skip it. + // Could happen if it was deleted in the middle while + // this list was being performed. + if os.IsNotExist(err) { + continue } - } else { - // This is unexpected. - return + return nil, err + } + if fi.IsDir() { + entries = append(entries, fi.Name()+slashSeparator) + } else if fi.Mode().IsRegular() { + entries = append(entries, fi.Name()) } default: // Skip entries which are not file or directory. diff --git a/posix.go b/posix.go index 1bb45e4a1..5d4dc1484 100644 --- a/posix.go +++ b/posix.go @@ -143,7 +143,7 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) { log.WithFields(logrus.Fields{ "dirPath": dirPath, }).Debugf("readDir failed with error %s", err) - return nil, err + return nil, errDiskNotFound } var volsInfo []VolInfo for _, entry := range entries { @@ -151,11 +151,16 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) { // Skip if entry is neither a directory not a valid volume name. continue } - fi, err := os.Stat(pathJoin(dirPath, entry)) + var fi os.FileInfo + fi, err = os.Stat(pathJoin(dirPath, entry)) if err != nil { log.WithFields(logrus.Fields{ "path": pathJoin(dirPath, entry), }).Debugf("Stat failed with error %s", err) + // If the file does not exist, skip the entry. + if os.IsNotExist(err) { + continue + } return nil, err } volsInfo = append(volsInfo, VolInfo{ @@ -186,7 +191,7 @@ func (s fsStorage) getVolumeDir(volume string) (string, error) { volsInfo, err = getAllUniqueVols(s.diskPath) if err != nil { // For any errors, treat it as disk not found. - return volumeDir, errDiskNotFound + return volumeDir, err } for _, vol := range volsInfo { // Verify if lowercase version of the volume is equal to diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index 1ac9ccf14..92e02fb88 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -23,6 +23,7 @@ import ( "strings" "path" + "sync" "github.com/Sirupsen/logrus" "github.com/klauspost/reedsolomon" @@ -115,8 +116,11 @@ func (xl XL) MakeVol(volume string) error { return errInvalidArgument } + // Hold read lock. + nsMutex.RLock(volume, "") // Verify if the volume already exists. _, errs := xl.getAllVolumeInfo(volume) + nsMutex.RUnlock(volume, "") // Count errors other than errVolumeNotFound, bigger than the allowed // readQuorum, if yes throw an error. @@ -133,11 +137,36 @@ func (xl XL) MakeVol(volume string) error { } } - createVolErr := 0 - volumeExistsErrCnt := 0 + // Hold a write lock before creating a volume. + nsMutex.Lock(volume, "") + defer nsMutex.Unlock(volume, "") + + // Err counters. + createVolErr := 0 // Count generic create vol errs. + volumeExistsErrCnt := 0 // Count all errVolumeExists errs. + + // Initialize sync waitgroup. + var wg = &sync.WaitGroup{} + + // Initialize list of errors. + var dErrs = make([]error, len(xl.storageDisks)) + // Make a volume entry on all underlying storage disks. - for _, disk := range xl.storageDisks { - if err := disk.MakeVol(volume); err != nil { + for index, disk := range xl.storageDisks { + wg.Add(1) + // Make a volume inside a go-routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + dErrs[index] = disk.MakeVol(volume) + }(index, disk) + } + + // Wait for all make vol to finish. + wg.Wait() + + // Loop through all the concocted errors. + for _, err := range dErrs { + if err != nil { log.WithFields(logrus.Fields{ "volume": volume, }).Errorf("MakeVol failed with %s", err) @@ -150,11 +179,15 @@ func (xl XL) MakeVol(volume string) error { } continue } + // Update error counter separately. createVolErr++ if createVolErr <= len(xl.storageDisks)-xl.writeQuorum { continue } + + // Return errWriteQuorum if errors were more than + // allowed write quorum. return errWriteQuorum } } @@ -167,12 +200,32 @@ func (xl XL) DeleteVol(volume string) error { return errInvalidArgument } + // Hold a write lock for Delete volume. + nsMutex.Lock(volume, "") + defer nsMutex.Unlock(volume, "") + // Collect if all disks report volume not found. var volumeNotFoundErrCnt int + var wg = &sync.WaitGroup{} + var dErrs = make([]error, len(xl.storageDisks)) + // Remove a volume entry on all underlying storage disks. - for _, disk := range xl.storageDisks { - if err := disk.DeleteVol(volume); err != nil { + for index, disk := range xl.storageDisks { + wg.Add(1) + // Delete volume inside a go-routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + dErrs[index] = disk.DeleteVol(volume) + }(index, disk) + } + + // Wait for all the delete vols to finish. + wg.Wait() + + // Loop through concocted errors and return anything unusual. + for _, err := range dErrs { + if err != nil { log.WithFields(logrus.Fields{ "volume": volume, }).Errorf("DeleteVol failed with %s", err) @@ -245,16 +298,30 @@ func (xl XL) ListVols() (volsInfo []VolInfo, err error) { // getAllVolumeInfo - get bucket volume info from all disks. // Returns error slice indicating the failed volume stat operations. func (xl XL) getAllVolumeInfo(volume string) (volsInfo []VolInfo, errs []error) { + // Create errs and volInfo slices of storageDisks size. errs = make([]error, len(xl.storageDisks)) volsInfo = make([]VolInfo, len(xl.storageDisks)) + + // Allocate a new waitgroup. + var wg = &sync.WaitGroup{} for index, disk := range xl.storageDisks { - volInfo, err := disk.StatVol(volume) - if err != nil { - errs[index] = err - continue - } - volsInfo[index] = volInfo + wg.Add(1) + // Stat volume on all the disks in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + volInfo, err := disk.StatVol(volume) + if err != nil { + errs[index] = err + return + } + volsInfo[index] = volInfo + }(index, disk) } + + // Wait for all the Stat operations to finish. + wg.Wait() + + // Return the concocted values. return volsInfo, errs } @@ -349,6 +416,8 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { if !isValidVolname(volume) { return VolInfo{}, errInvalidArgument } + + // Acquire a read lock before reading. nsMutex.RLock(volume, "") volsInfo, heal, err := xl.listAllVolumeInfo(volume) nsMutex.RUnlock(volume, "") @@ -361,10 +430,10 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { if heal { go func() { - if herr := xl.healVolume(volume); herr != nil { + if hErr := xl.healVolume(volume); hErr != nil { log.WithFields(logrus.Fields{ "volume": volume, - }).Errorf("healVolume failed with %s", herr) + }).Errorf("healVolume failed with %s", hErr) return } }()