From a3a310cde814651d5a691ee301b0e56001a51072 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Fri, 24 Jun 2016 16:41:57 -0700 Subject: [PATCH] Moved tree-walk-fs to use tree-walk-pool (#1978) --- fs-v1-multipart.go | 11 ++--- fs-v1.go | 30 ++++++------- tree-walk-fs.go | 109 +++++++++++---------------------------------- 3 files changed, 46 insertions(+), 104 deletions(-) diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index de8cd5f13..e182b5518 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -67,14 +67,15 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark maxUploads = maxUploads - len(uploads) } if maxUploads > 0 { - walker := fs.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) - if walker == nil { - walker = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool { + walkResultCh, endWalkCh := fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + if walkResultCh == nil { + endWalkCh = make(chan struct{}) + walkResultCh = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool { return fs.isMultipartUpload(bucket, object) - }) + }, endWalkCh) } for maxUploads > 0 { - walkResult, ok := <-walker.ch + walkResult, ok := <-walkResultCh if !ok { // Closed channel. eof = true diff --git a/fs-v1.go b/fs-v1.go index 24c8f7c6e..b507c166e 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -27,7 +27,6 @@ import ( "path/filepath" "sort" "strings" - "sync" "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/mimedb" @@ -35,10 +34,11 @@ import ( // fsObjects - Implements fs object layer. type fsObjects struct { - storage StorageAPI - physicalDisk string - listObjectMap map[listParams][]*treeWalkerFS - listObjectMapMutex *sync.Mutex + storage StorageAPI + physicalDisk string + + // List pool management. + listPool *treeWalkPool } // creates format.json, the FS format info in minioMetaBucket. @@ -117,10 +117,9 @@ func newFSObjects(disk string) (ObjectLayer, error) { // Return successfully initialized object layer. return fsObjects{ - storage: storage, - physicalDisk: disk, - listObjectMap: make(map[listParams][]*treeWalkerFS), - listObjectMapMutex: &sync.Mutex{}, + storage: storage, + physicalDisk: disk, + listPool: newTreeWalkPool(globalLookupTimeout), }, nil } @@ -443,17 +442,18 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey recursive = false } - walker := fs.lookupTreeWalk(listParams{bucket, recursive, marker, prefix}) - if walker == nil { - walker = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool { + walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix}) + if walkResultCh == nil { + endWalkCh = make(chan struct{}) + walkResultCh = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool { return !strings.HasSuffix(object, slashSeparator) - }) + }, endWalkCh) } var fileInfos []FileInfo var eof bool var nextMarker string for i := 0; i < maxKeys; { - walkResult, ok := <-walker.ch + walkResult, ok := <-walkResultCh if !ok { // Closed channel. eof = true @@ -481,7 +481,7 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey } params := listParams{bucket, recursive, nextMarker, prefix} if !eof { - fs.saveTreeWalk(params, walker) + fs.listPool.Set(params, walkResultCh, endWalkCh) } result := ListObjectsInfo{IsTruncated: !eof} diff --git a/tree-walk-fs.go b/tree-walk-fs.go index 25db24ee3..b5ea4c656 100644 --- a/tree-walk-fs.go +++ b/tree-walk-fs.go @@ -20,26 +20,10 @@ import ( "path" "sort" "strings" - "time" ) -// Tree walk notify carries a channel which notifies tree walk -// results, additionally it also carries information if treeWalk -// should be timedOut. -type treeWalkerFS struct { - ch <-chan treeWalkResultFS - timedOut bool -} - -// Tree walk result carries results of tree walking. -type treeWalkResultFS struct { - entry string - err error - end bool -} - // treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResultFS) bool, count *int, isLeaf func(string, string) bool) bool { +func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -56,8 +40,12 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, } entries, err := fs.storage.ListDir(bucket, prefixDir) if err != nil { - send(treeWalkResultFS{err: err}) - return false + select { + case <-endWalkCh: + return errWalkAbort + case resultCh <- treeWalkResult{err: err}: + return err + } } for i, entry := range entries { @@ -77,7 +65,7 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, entries = entries[1:] } if len(entries) == 0 { - return true + return nil } // example: // If markerDir="four/" Search() returns the index of "four/" in the sorted @@ -86,12 +74,10 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, return entries[i] >= markerDir }) entries = entries[idx:] - *count += len(entries) for i, entry := range entries { if i == 0 && markerDir == entry { if !recursive { // Skip as the marker would already be listed in the previous listing. - *count-- continue } if recursive && !strings.HasSuffix(entry, slashSeparator) { @@ -100,7 +86,6 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // should not be skipped, instead it will need to be treeWalk()'ed into. // Skip if it is a file though as it would be listed in previous listing. - *count-- continue } } @@ -113,23 +98,27 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // recursing into "four/" markerArg = markerBase } - *count-- prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if !fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) { - return false + markIsEnd := i == len(entries)-1 && isEnd + if tErr := fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil { + return tErr } continue } - *count-- - if !send(treeWalkResultFS{entry: pathJoin(prefixDir, entry)}) { - return false + // EOF is set if we are at last entry and the caller indicated we at the end. + isEOF := ((i == len(entries)-1) && isEnd) + select { + case <-endWalkCh: + return errWalkAbort + case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}: } } - return true + // Everything is listed + return nil } // Initiate a new treeWalk in a goroutine. -func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalkerFS { +func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -140,8 +129,7 @@ func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" // and entryPrefixMatch="th" - ch := make(chan treeWalkResultFS, maxObjectList) - walkNotify := treeWalkerFS{ch: ch} + resultCh := make(chan treeWalkResult, maxObjectList) entryPrefixMatch := prefix prefixDir := "" lastIndex := strings.LastIndex(prefix, slashSeparator) @@ -149,58 +137,11 @@ func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, entryPrefixMatch = prefix[lastIndex+1:] prefixDir = prefix[:lastIndex+1] } - count := 0 marker = strings.TrimPrefix(marker, prefixDir) go func() { - defer close(ch) - send := func(walkResult treeWalkResultFS) bool { - if count == 0 { - walkResult.end = true - } - timer := time.After(time.Second * 60) - select { - case ch <- walkResult: - return true - case <-timer: - walkNotify.timedOut = true - return false - } - } - fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf) + isEnd := true // Indication to start walking the tree with end as true. + fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd) + close(resultCh) }() - return &walkNotify -} - -// Save the goroutine reference in the map -func (fs fsObjects) saveTreeWalk(params listParams, walker *treeWalkerFS) { - fs.listObjectMapMutex.Lock() - defer fs.listObjectMapMutex.Unlock() - - walkers, _ := fs.listObjectMap[params] - walkers = append(walkers, walker) - - fs.listObjectMap[params] = walkers -} - -// Lookup the goroutine reference from map -func (fs fsObjects) lookupTreeWalk(params listParams) *treeWalkerFS { - fs.listObjectMapMutex.Lock() - defer fs.listObjectMapMutex.Unlock() - - if walkChs, ok := fs.listObjectMap[params]; ok { - for i, walkCh := range walkChs { - if !walkCh.timedOut { - newWalkChs := walkChs[i+1:] - if len(newWalkChs) > 0 { - fs.listObjectMap[params] = newWalkChs - } else { - delete(fs.listObjectMap, params) - } - return walkCh - } - } - // As all channels are timed out, delete the map entry - delete(fs.listObjectMap, params) - } - return nil + return resultCh }