From 1cf1532ca33dc1aab7d854f706868a70e8dc6413 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 29 May 2016 21:05:00 -0700 Subject: [PATCH 1/2] XL: Implement ListObjects channel and pool management. --- object-api-multipart_test.go | 1 + tree-walk-pool.go | 118 +++++++++++++++++++++++++++++++ tree-walk-xl.go | 130 +++++++++++++---------------------- xl-v1-list-objects.go | 18 ++--- xl-v1-multipart.go | 17 +++-- xl-v1.go | 15 ++-- 6 files changed, 190 insertions(+), 109 deletions(-) create mode 100644 tree-walk-pool.go diff --git a/object-api-multipart_test.go b/object-api-multipart_test.go index e6abb0095..a11ed2757 100644 --- a/object-api-multipart_test.go +++ b/object-api-multipart_test.go @@ -1056,6 +1056,7 @@ func testListMultipartUploads(obj ObjectLayer, instanceType string, t *testing.T } for i, testCase := range testCases { + // fmt.Println(testCase) // uncomment to peek into the test cases. actualResult, actualErr := obj.ListMultipartUploads(testCase.bucket, testCase.prefix, testCase.keyMarker, testCase.uploadIDMarker, testCase.delimiter, testCase.maxUploads) if actualErr != nil && testCase.shouldPass { t.Errorf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, actualErr.Error()) diff --git a/tree-walk-pool.go b/tree-walk-pool.go new file mode 100644 index 000000000..11e957126 --- /dev/null +++ b/tree-walk-pool.go @@ -0,0 +1,118 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "sync" + "time" +) + +// Global lookup timeout. +const ( + globalLookupTimeout = time.Minute * 30 // 30minutes. +) + +// treeWalkerPoolInfo - tree walker pool info carries temporary walker +// channel stored until timeout is called. +type treeWalkerPoolInfo struct { + treeWalkerCh chan treeWalker + treeWalkerDoneCh chan struct{} + doneCh chan<- struct{} +} + +// treeWalkerPool - tree walker pool is a set of temporary tree walker +// objects. Any item stored in the pool will be removed automatically at +// a given timeOut value. This pool is safe for use by multiple +// goroutines simultaneously. pool's purpose is to cache tree walker +// channels for later reuse. +type treeWalkerPool struct { + pool map[listParams][]treeWalkerPoolInfo + timeOut time.Duration + lock *sync.Mutex +} + +// newTreeWalkerPool - initialize new tree walker pool. +func newTreeWalkerPool(timeout time.Duration) *treeWalkerPool { + tPool := &treeWalkerPool{ + pool: make(map[listParams][]treeWalkerPoolInfo), + timeOut: timeout, + lock: &sync.Mutex{}, + } + return tPool +} + +// Release - selects an item from the pool based on the input +// listParams, removes it from the pool, and returns treeWalker +// channels. Release will return nil, if listParams is not +// recognized. +func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { + t.lock.Lock() + defer t.lock.Unlock() + treeWalk, ok := t.pool[params] + if ok { + if len(treeWalk) > 0 { + treeWalker := treeWalk[0] + if len(treeWalk[1:]) > 0 { + t.pool[params] = treeWalk[1:] + } else { + delete(t.pool, params) + } + treeWalker.doneCh <- struct{}{} + return treeWalker.treeWalkerCh, treeWalker.treeWalkerDoneCh + } + } + // Release return nil if params not found. + return nil, nil +} + +// Set - adds new list params along with treeWalker channel to the +// pool for future. Additionally this also starts a go routine which +// waits at the configured timeout. Additionally this go-routine is +// also closed pro-actively by 'Release' call when the treeWalker +// item is obtained from the pool. +func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { + t.lock.Lock() + defer t.lock.Unlock() + + var treeWalkerIdx = len(t.pool[params]) + var doneCh = make(chan struct{}) + t.pool[params] = append(t.pool[params], treeWalkerPoolInfo{ + treeWalkerCh: treeWalkerCh, + treeWalkerDoneCh: treeWalkerDoneCh, + doneCh: doneCh, + }) + + // Safe expiry of treeWalkerCh after timeout. + go func(doneCh <-chan struct{}) { + select { + // Wait until timeOut + case <-time.After(t.timeOut): + t.lock.Lock() + treeWalk := t.pool[params] + treeWalk = append(treeWalk[:treeWalkerIdx], treeWalk[treeWalkerIdx+1:]...) + if len(treeWalk) == 0 { + delete(t.pool, params) + } else { + t.pool[params] = treeWalk + } + close(treeWalkerDoneCh) + t.lock.Unlock() + case <-doneCh: + return + } + }(doneCh) +} diff --git a/tree-walk-xl.go b/tree-walk-xl.go index 3a0ddf3c8..d983353bb 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -19,7 +19,6 @@ package main import ( "sort" "strings" - "time" ) // listParams - list object params used for list object map @@ -31,20 +30,12 @@ type listParams struct { } // Tree walk result carries results of tree walking. -type treeWalkResult struct { +type treeWalker struct { entry string err error end bool } -// Tree walk notify carries a channel which notifies tree walk -// results, additionally it also carries information if treeWalk -// should be timedOut. -type treeWalker struct { - ch <-chan treeWalkResult - timedOut bool -} - // listDir - listDir. func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { @@ -62,7 +53,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } // Skip the entries which do not match the filter. for i, entry := range entries { - if filter(entry) { + if !filter(entry) { entries[i] = "" continue } @@ -83,7 +74,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } // treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int, isLeaf func(string, string) bool) bool { +func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, stackDepth int, isEnd bool) { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -99,14 +90,23 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, } } entries, err := xl.listDir(bucket, prefixDir, func(entry string) bool { - return !strings.HasPrefix(entry, entryPrefixMatch) + return strings.HasPrefix(entry, entryPrefixMatch) }, isLeaf) if err != nil { - send(treeWalkResult{err: err}) - return false + select { + case <-doneCh: + if stackDepth == 0 { + close(treeWalkCh) + } + case treeWalkCh <- treeWalker{err: err}: + } + return } if len(entries) == 0 { - return true + if stackDepth == 0 { + close(treeWalkCh) + } + return } // example: @@ -116,12 +116,16 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, return entries[i] >= markerDir }) entries = entries[idx:] - *count += len(entries) + if len(entries) == 0 { + if stackDepth == 0 { + close(treeWalkCh) + } + return + } for i, entry := range entries { if i == 0 && markerDir == entry { if !recursive { // Skip as the marker would already be listed in the previous listing. - *count-- continue } if recursive && !strings.HasSuffix(entry, slashSeparator) { @@ -130,11 +134,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // should not be skipped, instead it will need to be treeWalk()'ed into. // Skip if it is a file though as it would be listed in previous listing. - *count-- continue } } - if recursive && strings.HasSuffix(entry, slashSeparator) { // If the entry is a directory, we will need recurse into it. markerArg := "" @@ -143,23 +145,37 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // recursing into "four/" markerArg = markerBase } - *count-- prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if !xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) { - return false + if i == len(entries)-1 && stackDepth == 0 { + isEnd = true } + stackDepth++ + xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, stackDepth, isEnd) + stackDepth-- continue } - *count-- - if !send(treeWalkResult{entry: pathJoin(prefixDir, entry)}) { - return false + var isEOF bool + if stackDepth == 0 && i == len(entries)-1 { + isEOF = true + } else if i == len(entries)-1 && isEnd { + isEOF = true } + select { + case <-doneCh: + if stackDepth == 0 { + close(treeWalkCh) + return + } + case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}: + } + } + if stackDepth == 0 { + close(treeWalkCh) } - return true } // Initiate a new treeWalk in a goroutine. -func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalker { +func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, doneCh chan struct{}) chan treeWalker { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -170,8 +186,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" // and entryPrefixMatch="th" - ch := make(chan treeWalkResult, maxObjectList) - walkNotify := treeWalker{ch: ch} + treeWalkCh := make(chan treeWalker, maxObjectList) entryPrefixMatch := prefix prefixDir := "" lastIndex := strings.LastIndex(prefix, slashSeparator) @@ -179,58 +194,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, entryPrefixMatch = prefix[lastIndex+1:] prefixDir = prefix[:lastIndex+1] } - count := 0 marker = strings.TrimPrefix(marker, prefixDir) - go func() { - defer close(ch) - send := func(walkResult treeWalkResult) bool { - if count == 0 { - walkResult.end = true - } - timer := time.After(time.Second * 60) - select { - case ch <- walkResult: - return true - case <-timer: - walkNotify.timedOut = true - return false - } - } - xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf) - }() - return &walkNotify -} - -// Save the goroutine reference in the map -func (xl xlObjects) saveTreeWalk(params listParams, walker *treeWalker) { - xl.listObjectMapMutex.Lock() - defer xl.listObjectMapMutex.Unlock() - - walkers, _ := xl.listObjectMap[params] - walkers = append(walkers, walker) - - xl.listObjectMap[params] = walkers -} - -// Lookup the goroutine reference from map -func (xl xlObjects) lookupTreeWalk(params listParams) *treeWalker { - xl.listObjectMapMutex.Lock() - defer xl.listObjectMapMutex.Unlock() - - if walkChs, ok := xl.listObjectMap[params]; ok { - for i, walkCh := range walkChs { - if !walkCh.timedOut { - newWalkChs := walkChs[i+1:] - if len(newWalkChs) > 0 { - xl.listObjectMap[params] = newWalkChs - } else { - delete(xl.listObjectMap, params) - } - return walkCh - } - } - // As all channels are timed out, delete the map entry - delete(xl.listObjectMap, params) - } - return nil + go xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, 0, false) + return treeWalkCh } diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index 2d5e8a71e..5708490f2 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -26,15 +26,17 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey recursive = false } - walker := xl.lookupTreeWalk(listParams{bucket, recursive, marker, prefix}) - if walker == nil { - walker = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject) + walkerCh, walkerDoneCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) + if walkerCh == nil { + walkerDoneCh = make(chan struct{}) + walkerCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, walkerDoneCh) } + var objInfos []ObjectInfo var eof bool var nextMarker string for i := 0; i < maxKeys; { - walkResult, ok := <-walker.ch + walkResult, ok := <-walkerCh if !ok { // Closed channel. eof = true @@ -63,18 +65,18 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey return ListObjectsInfo{}, toObjectErr(err, bucket, prefix) } } - nextMarker = objInfo.Name objInfos = append(objInfos, objInfo) - if walkResult.end { + i++ + if walkResult.end == true { eof = true break } - i++ } + params := listParams{bucket, recursive, nextMarker, prefix} if !eof { - xl.saveTreeWalk(params, walker) + xl.listPool.Set(params, walkerCh, walkerDoneCh) } result := ListObjectsInfo{IsTruncated: !eof} diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 844e3f66c..edec2d616 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -81,13 +81,14 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } // Validate if we need to list further depending on maxUploads. if maxUploads > 0 { - walker := xl.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) - if walker == nil { - walker = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload) + walkerCh, walkerDoneCh := xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + if walkerCh == nil { + walkerDoneCh = make(chan struct{}) + walkerCh = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload, walkerDoneCh) } // Collect uploads until we have reached maxUploads count to 0. for maxUploads > 0 { - walkResult, ok := <-walker.ch + walkResult, ok := <-walkerCh if !ok { // Closed channel. eof = true @@ -110,10 +111,8 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark }) maxUploads-- if maxUploads == 0 { - if walkResult.end { - eof = true - break - } + eof = true + break } continue } @@ -142,7 +141,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } uploads = append(uploads, newUploads...) maxUploads -= len(newUploads) - if walkResult.end && end { + if end && walkResult.end { eof = true break } diff --git a/xl-v1.go b/xl-v1.go index 2211bbfd5..740ee0b70 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "sort" - "sync" "github.com/minio/minio/pkg/disk" ) @@ -45,8 +44,7 @@ type xlObjects struct { writeQuorum int // writeQuorum minimum required disks to write data. // List pool management. - listObjectMap map[listParams][]*treeWalker - listObjectMapMutex *sync.Mutex + listPool *treeWalkerPool } // errXLMaxDisks - returned for reached maximum of disks. @@ -159,12 +157,11 @@ func newXLObjects(disks []string) (ObjectLayer, error) { // Initialize xl objects. xl := xlObjects{ - physicalDisks: disks, - storageDisks: newPosixDisks, - dataBlocks: dataBlocks, - parityBlocks: parityBlocks, - listObjectMap: make(map[listParams][]*treeWalker), - listObjectMapMutex: &sync.Mutex{}, + physicalDisks: disks, + storageDisks: newPosixDisks, + dataBlocks: dataBlocks, + parityBlocks: parityBlocks, + listPool: newTreeWalkerPool(globalLookupTimeout), } // Figure out read and write quorum based on number of storage disks. From 002c5bf7dd5fd74d7a782d67e1631208dd5d308d Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Sat, 4 Jun 2016 00:03:50 +0530 Subject: [PATCH 2/2] XL: Treewalk handle all the race conditions and blocking channels. --- tree-walk-pool.go | 52 +++++++++++++++++++++++++------------- tree-walk-xl.go | 63 ++++++++++++++++++++++------------------------- 2 files changed, 65 insertions(+), 50 deletions(-) diff --git a/tree-walk-pool.go b/tree-walk-pool.go index 11e957126..461a17658 100644 --- a/tree-walk-pool.go +++ b/tree-walk-pool.go @@ -17,6 +17,7 @@ package main import ( + "errors" "sync" "time" ) @@ -26,6 +27,9 @@ const ( globalLookupTimeout = time.Minute * 30 // 30minutes. ) +// errWalkAbort - returned by the treeWalker routine, it signals the end of treeWalk. +var errWalkAbort = errors.New("treeWalk abort") + // treeWalkerPoolInfo - tree walker pool info carries temporary walker // channel stored until timeout is called. type treeWalkerPoolInfo struct { @@ -62,17 +66,19 @@ func newTreeWalkerPool(timeout time.Duration) *treeWalkerPool { func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { t.lock.Lock() defer t.lock.Unlock() - treeWalk, ok := t.pool[params] + walks, ok := t.pool[params] // Pick the valid walks. if ok { - if len(treeWalk) > 0 { - treeWalker := treeWalk[0] - if len(treeWalk[1:]) > 0 { - t.pool[params] = treeWalk[1:] + if len(walks) > 0 { + // Pop out the first valid walk entry. + walk := walks[0] + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks } else { delete(t.pool, params) } - treeWalker.doneCh <- struct{}{} - return treeWalker.treeWalkerCh, treeWalker.treeWalkerDoneCh + walk.doneCh <- struct{}{} + return walk.treeWalkerCh, walk.treeWalkerDoneCh } } // Release return nil if params not found. @@ -88,13 +94,15 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre t.lock.Lock() defer t.lock.Unlock() - var treeWalkerIdx = len(t.pool[params]) - var doneCh = make(chan struct{}) - t.pool[params] = append(t.pool[params], treeWalkerPoolInfo{ + // Should be a buffered channel so that Release() never blocks. + var doneCh = make(chan struct{}, 1) + walkInfo := treeWalkerPoolInfo{ treeWalkerCh: treeWalkerCh, treeWalkerDoneCh: treeWalkerDoneCh, doneCh: doneCh, - }) + } + // Append new walk info. + t.pool[params] = append(t.pool[params], walkInfo) // Safe expiry of treeWalkerCh after timeout. go func(doneCh <-chan struct{}) { @@ -102,13 +110,23 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre // Wait until timeOut case <-time.After(t.timeOut): t.lock.Lock() - treeWalk := t.pool[params] - treeWalk = append(treeWalk[:treeWalkerIdx], treeWalk[treeWalkerIdx+1:]...) - if len(treeWalk) == 0 { - delete(t.pool, params) - } else { - t.pool[params] = treeWalk + walks, ok := t.pool[params] // Look for valid walks. + if ok { + // Look for walkInfo, remove it from the walks list. + for i, walk := range walks { + if walk == walkInfo { + walks = append(walks[:i], walks[i+1:]...) + } + } + // Walks is empty we have no more pending requests. + // Remove map entry. + if len(walks) == 0 { + delete(t.pool, params) + } else { // Save the updated walks. + t.pool[params] = walks + } } + // Close tree walker for the backing go-routine to die. close(treeWalkerDoneCh) t.lock.Unlock() case <-doneCh: diff --git a/tree-walk-xl.go b/tree-walk-xl.go index d983353bb..df4da9ade 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -36,7 +36,13 @@ type treeWalker struct { end bool } -// listDir - listDir. +// listDir - lists all the entries at a given prefix, takes additional params as filter and leaf detection. +// filter is required to filter out the listed entries usually this function is supposed to return +// true or false. +// isLeaf is required to differentiate between directories and objects, this is a special requirement for XL +// backend since objects are kept as directories, the only way to know if a directory is truly an object +// we validate if 'xl.json' exists at the leaf. isLeaf replies true/false based on the outcome of a Stat +// operation. func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { if disk == nil { @@ -74,7 +80,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } // treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, stackDepth int, isEnd bool) { +func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, isEnd bool) error { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -95,18 +101,14 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, if err != nil { select { case <-doneCh: - if stackDepth == 0 { - close(treeWalkCh) - } + return errWalkAbort case treeWalkCh <- treeWalker{err: err}: + return err } - return } + // For an empty list return right here. if len(entries) == 0 { - if stackDepth == 0 { - close(treeWalkCh) - } - return + return nil } // example: @@ -116,11 +118,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, return entries[i] >= markerDir }) entries = entries[idx:] + // For an empty list after search through the entries, return right here. if len(entries) == 0 { - if stackDepth == 0 { - close(treeWalkCh) - } - return + return nil } for i, entry := range entries { if i == 0 && markerDir == entry { @@ -146,32 +146,25 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, markerArg = markerBase } prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if i == len(entries)-1 && stackDepth == 0 { - isEnd = true + // markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked + // true at the end of the treeWalk stream. + markIsEnd := i == len(entries)-1 && isEnd + if tErr := xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, markIsEnd); tErr != nil { + return tErr } - stackDepth++ - xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, stackDepth, isEnd) - stackDepth-- continue } - var isEOF bool - if stackDepth == 0 && i == len(entries)-1 { - isEOF = true - } else if i == len(entries)-1 && isEnd { - isEOF = true - } + // EOF is set if we are at last entry and the caller indicated we at the end. + isEOF := ((i == len(entries)-1) && isEnd) select { case <-doneCh: - if stackDepth == 0 { - close(treeWalkCh) - return - } + return errWalkAbort case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}: } } - if stackDepth == 0 { - close(treeWalkCh) - } + + // Everything is listed. + return nil } // Initiate a new treeWalk in a goroutine. @@ -195,6 +188,10 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, prefixDir = prefix[:lastIndex+1] } marker = strings.TrimPrefix(marker, prefixDir) - go xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, 0, false) + go func() { + isEnd := true // Indication to start walking the tree with end as true. + xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, isEnd) + close(treeWalkCh) + }() return treeWalkCh }