From bea6f33b083dd657ed3083781c72925128250d3f Mon Sep 17 00:00:00 2001 From: Bala FA Date: Tue, 12 Apr 2016 07:30:08 +0530 Subject: [PATCH] backend/fs: remove timer channel from scanMultipartDir() (#1310) Previously scanMultipartDir() returns object info channel and timer channel where timer channel is used to check whether object info channel is alive or not. This causes a race condition that timeout may occur while object info channel in use. This patch fixes the issue by removing timer channel and uses object info channel directly where each object info has End bool field indicates whether received object info is end or not. --- fs-multipart-dir.go | 158 +++++++++++++++++--------------------------- fs-multipart.go | 108 +++++++++++++++++++----------- fs.go | 4 +- 3 files changed, 135 insertions(+), 135 deletions(-) diff --git a/fs-multipart-dir.go b/fs-multipart-dir.go index b023cbb6d..84268a46a 100644 --- a/fs-multipart-dir.go +++ b/fs-multipart-dir.go @@ -24,9 +24,8 @@ import ( "time" ) -func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) multipartObjectInfoChannel { +func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) <-chan multipartObjectInfo { objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit) - timeoutCh := make(chan struct{}, 1) // TODO: check if bucketDir is absolute path scanDir := bucketDir @@ -96,7 +95,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, // goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel. go func() { defer close(objectInfoCh) - defer close(timeoutCh) // send function - returns true if ObjectInfo is sent // within (time.Second * 15) else false on timeout. @@ -106,27 +104,45 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, case objectInfoCh <- oi: return true case <-timer: - timeoutCh <- struct{}{} return false } } - for { - // Filters scandir entries. This filter function is - // specific for multipart listing. - multipartFilterFn := func(dirent fsDirent) bool { - // Verify if dirent is a directory a regular file - // with match uploadID suffix. - if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) { - // Return if dirent matches prefix and - // lexically higher than marker. - return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath + // filter function - filters directory entries matching multipart uploadids, prefix and marker + direntFilterFn := func(dirent fsDirent) bool { + // check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string + if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) { + // return if dirent's name starts with prefixPath and lexically higher than markerPath + return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath + } + return false + } + + // filter function - filters directory entries matching multipart uploadids + subDirentFilterFn := func(dirent fsDirent) bool { + // check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string + return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) + } + + // lastObjInfo is used to save last object info which is sent at last with End=true + var lastObjInfo *multipartObjectInfo + + sendError := func(err error) { + if lastObjInfo != nil { + if !send(*lastObjInfo) { + // as we got error sending lastObjInfo, we can't send the error + return } - return false } - dirents, err := scandir(scanDir, multipartFilterFn, false) + + send(multipartObjectInfo{Err: err, End: true}) + return + } + + for { + dirents, err := scandir(scanDir, direntFilterFn, false) if err != nil { - send(multipartObjectInfo{Err: err}) + sendError(err) return } @@ -138,19 +154,19 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, name := strings.Replace(filepath.Dir(dirent.name), bucketDir, "", 1) if name == "" { // This should not happen ie uploadid file should not be in bucket directory - send(multipartObjectInfo{Err: errors.New("Corrupted metadata")}) + sendError(errors.New("Corrupted metadata")) return } uploadID := strings.Split(filepath.Base(dirent.name), multipartUploadIDSuffix)[0] // Solaris and older unixes have modTime to be - // empty, fall back to os.Stat() to fill missing values. + // empty, fallback to os.Stat() to fill missing values. if dirent.modTime.IsZero() { if fi, e := os.Stat(dirent.name); e == nil { dirent.modTime = fi.ModTime() } else { - send(multipartObjectInfo{Err: e}) + sendError(e) return } } @@ -161,20 +177,21 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, ModifiedTime: dirent.modTime, } - if !send(objInfo) { - return + // as we got new object info, send last object info and keep new object info as last object info + if lastObjInfo != nil { + if !send(*lastObjInfo) { + return + } } + lastObjInfo = &objInfo continue } - multipartSubDirentFilterFn := func(dirent fsDirent) bool { - return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) - } // Fetch sub dirents. - subDirents, err := scandir(dirent.name, multipartSubDirentFilterFn, false) + subDirents, err := scandir(dirent.name, subDirentFilterFn, false) if err != nil { - send(multipartObjectInfo{Err: err}) + sendError(err) return } @@ -198,12 +215,12 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, // Send directory only for non-recursive listing if !recursive && (subDirFound || len(subDirents) == 0) { // Solaris and older unixes have modTime to be - // empty, fall back to os.Stat() to fill missing values. + // empty, fallback to os.Stat() to fill missing values. if dirent.modTime.IsZero() { if fi, e := os.Stat(dirent.name); e == nil { dirent.modTime = fi.ModTime() } else { - send(multipartObjectInfo{Err: e}) + sendError(e) return } } @@ -214,9 +231,13 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, IsDir: true, } - if !send(objInfo) { - return + // as we got new object info, send last object info and keep new object info as last object info + if lastObjInfo != nil { + if !send(*lastObjInfo) { + return + } } + lastObjInfo = &objInfo } if recursive { @@ -235,10 +256,17 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, break } } + + if lastObjInfo != nil { + // we got last object + lastObjInfo.End = true + if !send(*lastObjInfo) { + return + } + } }() - // Return multipart info. - return multipartObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh} + return objectInfoCh } // multipartObjectInfo - Multipart object info @@ -248,67 +276,5 @@ type multipartObjectInfo struct { ModifiedTime time.Time IsDir bool Err error -} - -// multipartObjectInfoChannel - multipart object info channel -type multipartObjectInfoChannel struct { - ch <-chan multipartObjectInfo - objInfo *multipartObjectInfo - closed bool - timeoutCh <-chan struct{} - timedOut bool -} - -func (oic *multipartObjectInfoChannel) Read() (multipartObjectInfo, bool) { - if oic.closed { - return multipartObjectInfo{}, false - } - if oic.objInfo == nil { - // First read. - if oi, ok := <-oic.ch; ok { - oic.objInfo = &oi - } else { - oic.closed = true - return multipartObjectInfo{}, false - } - } - - retObjInfo := *oic.objInfo - status := true - oic.objInfo = nil - - // Read once more to know whether it was last read. - if oi, ok := <-oic.ch; ok { - oic.objInfo = &oi - } else { - oic.closed = true - } - - return retObjInfo, status -} - -// IsClosed - return whether channel is closed or not. -func (oic multipartObjectInfoChannel) IsClosed() bool { - if oic.objInfo != nil { - return false - } - return oic.closed -} - -// IsTimedOut - return whether channel is closed due to timeout. -func (oic multipartObjectInfoChannel) IsTimedOut() bool { - if oic.timedOut { - return true - } - - select { - case _, ok := <-oic.timeoutCh: - if ok { - oic.timedOut = true - return true - } - return false - default: - return false - } + End bool } diff --git a/fs-multipart.go b/fs-multipart.go index c6c8ea726..4ceec5af7 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2015 Minio, Inc. + * Minio Cloud Storage, (C) 2015,2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -394,11 +394,11 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa return newObject, nil } -func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { +func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch <-chan multipartObjectInfo) { fs.listMultipartObjectMapMutex.Lock() defer fs.listMultipartObjectMapMutex.Unlock() - channels := []multipartObjectInfoChannel{ch} + channels := []<-chan multipartObjectInfo{ch} if _, ok := fs.listMultipartObjectMap[params]; ok { channels = append(fs.listMultipartObjectMap[params], ch) } @@ -406,27 +406,23 @@ func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams fs.listMultipartObjectMap[params] = channels } -func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { +func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) <-chan multipartObjectInfo { fs.listMultipartObjectMapMutex.Lock() defer fs.listMultipartObjectMapMutex.Unlock() if channels, ok := fs.listMultipartObjectMap[params]; ok { - for i, channel := range channels { - if !channel.IsTimedOut() { - chs := channels[i+1:] - if len(chs) > 0 { - fs.listMultipartObjectMap[params] = chs - } else { - delete(fs.listMultipartObjectMap, params) - } - - return &channel - } + var channel <-chan multipartObjectInfo + channel, channels = channels[0], channels[1:] + if len(channels) > 0 { + fs.listMultipartObjectMap[params] = channels + } else { + // do not store empty channel list + delete(fs.listMultipartObjectMap, params) } - // As all channels are timed out, delete the map entry - delete(fs.listMultipartObjectMap, params) + return channel } + return nil } @@ -484,8 +480,10 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa } metaBucketDir := filepath.Join(fs.diskPath, minioMetaDir, bucket) + // Lookup of if listMultipartObjectChannel is available for given // parameters, else create a new one. + savedChannel := true multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{ bucket: bucket, delimiter: delimiter, @@ -493,58 +491,94 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa prefix: prefixPath, uploadIDMarker: uploadIDMarker, }) + if multipartObjectInfoCh == nil { - ch := scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) - multipartObjectInfoCh = &ch + multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) + savedChannel = false } + var objInfo *multipartObjectInfo nextKeyMarker := "" nextUploadIDMarker := "" for i := 0; i < maxUploads; { - multipartObjInfo, ok := multipartObjectInfoCh.Read() - if !ok { - // Closed channel. - return result, nil + // read the channel + if oi, ok := <-multipartObjectInfoCh; ok { + objInfo = &oi + } else { + // closed channel + if i == 0 { + // first read + if !savedChannel { + // its valid to have a closed new channel for first read + multipartObjectInfoCh = nil + break + } + + // invalid saved channel amd create new channel + multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, + uploadIDMarker, recursive) + } else { + // TODO: FIX: there is a chance of infinite loop if we get closed channel always + // the channel got closed due to timeout + // create a new channel + multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, nextKeyMarker, + nextUploadIDMarker, recursive) + } + + // make it as new channel + savedChannel = false + continue } - if multipartObjInfo.Err != nil { - if os.IsNotExist(multipartObjInfo.Err) { + if objInfo.Err != nil { + if os.IsNotExist(objInfo.Err) { return ListMultipartsInfo{}, nil } - return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) + + return ListMultipartsInfo{}, probe.NewError(objInfo.Err) } - if strings.Contains(multipartObjInfo.Name, "$multiparts") || - strings.Contains(multipartObjInfo.Name, "$tmpobject") { + // backward compatibility check + if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { continue } // Directories are listed only if recursive is false - if multipartObjInfo.IsDir { - result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) + if objInfo.IsDir { + result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name) } else { result.Uploads = append(result.Uploads, uploadMetadata{ - Object: multipartObjInfo.Name, - UploadID: multipartObjInfo.UploadID, - Initiated: multipartObjInfo.ModifiedTime, + Object: objInfo.Name, + UploadID: objInfo.UploadID, + Initiated: objInfo.ModifiedTime, }) } - nextKeyMarker = multipartObjInfo.Name - nextUploadIDMarker = multipartObjInfo.UploadID + + nextKeyMarker = objInfo.Name + nextUploadIDMarker = objInfo.UploadID i++ + + if objInfo.End { + // as we received last object, do not save this channel for later use + multipartObjectInfoCh = nil + break + } } - if !multipartObjectInfoCh.IsClosed() { + if multipartObjectInfoCh != nil { + // we haven't received last object result.IsTruncated = true result.NextKeyMarker = nextKeyMarker result.NextUploadIDMarker = nextUploadIDMarker + + // save this channel for later use fs.saveListMultipartObjectCh(listMultipartObjectParams{ bucket: bucket, delimiter: delimiter, keyMarker: nextKeyMarker, prefix: objectPrefix, uploadIDMarker: nextUploadIDMarker, - }, *multipartObjectInfoCh) + }, multipartObjectInfoCh) } return result, nil diff --git a/fs.go b/fs.go index dbf927f65..66d832e3c 100644 --- a/fs.go +++ b/fs.go @@ -49,7 +49,7 @@ type Filesystem struct { rwLock *sync.RWMutex listObjectMap map[listObjectParams][]*treeWalker listObjectMapMutex *sync.Mutex - listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel + listMultipartObjectMap map[listMultipartObjectParams][]<-chan multipartObjectInfo listMultipartObjectMapMutex *sync.Mutex } @@ -69,7 +69,7 @@ func newFS(diskPath string) (ObjectAPI, *probe.Error) { fs.listObjectMapMutex = &sync.Mutex{} // Initialize list multipart map. - fs.listMultipartObjectMap = make(map[listMultipartObjectParams][]multipartObjectInfoChannel) + fs.listMultipartObjectMap = make(map[listMultipartObjectParams][]<-chan multipartObjectInfo) fs.listMultipartObjectMapMutex = &sync.Mutex{} // Return here.