diff --git a/tree-walk-pool.go b/tree-walk-pool.go index 461a17658..4fd7ffbcb 100644 --- a/tree-walk-pool.go +++ b/tree-walk-pool.go @@ -27,43 +27,53 @@ const ( globalLookupTimeout = time.Minute * 30 // 30minutes. ) -// errWalkAbort - returned by the treeWalker routine, it signals the end of treeWalk. +// listParams - list object params used for list object map +type listParams struct { + bucket string + recursive bool + marker string + prefix string +} + +// errWalkAbort - returned by doTreeWalk() if it returns prematurely. +// doTreeWalk() can return prematurely if +// 1) treeWalk is timed out by the timer go-routine. +// 2) there is an error during tree walk. var errWalkAbort = errors.New("treeWalk abort") -// treeWalkerPoolInfo - tree walker pool info carries temporary walker -// channel stored until timeout is called. -type treeWalkerPoolInfo struct { - treeWalkerCh chan treeWalker - treeWalkerDoneCh chan struct{} - doneCh chan<- struct{} +// treeWalk - represents the go routine that does the file tree walk. +type treeWalk struct { + resultCh chan treeWalkResult + endWalkCh chan struct{} // To signal when treeWalk go-routine should end. + endTimerCh chan<- struct{} // To signal when timer go-routine should end. } -// treeWalkerPool - tree walker pool is a set of temporary tree walker -// objects. Any item stored in the pool will be removed automatically at -// a given timeOut value. This pool is safe for use by multiple -// goroutines simultaneously. pool's purpose is to cache tree walker -// channels for later reuse. -type treeWalkerPool struct { - pool map[listParams][]treeWalkerPoolInfo +// treeWalkPool - pool of treeWalk go routines. +// A treeWalk is added to the pool by Set() and removed either by +// doing a Release() or if the concerned timer goes off. +// treeWalkPool's purpose is to maintain active treeWalk go-routines in a map so that +// it can be looked up across related list calls. +type treeWalkPool struct { + pool map[listParams][]treeWalk timeOut time.Duration lock *sync.Mutex } -// newTreeWalkerPool - initialize new tree walker pool. -func newTreeWalkerPool(timeout time.Duration) *treeWalkerPool { - tPool := &treeWalkerPool{ - pool: make(map[listParams][]treeWalkerPoolInfo), +// newTreeWalkPool - initialize new tree walk pool. +func newTreeWalkPool(timeout time.Duration) *treeWalkPool { + tPool := &treeWalkPool{ + pool: make(map[listParams][]treeWalk), timeOut: timeout, lock: &sync.Mutex{}, } return tPool } -// Release - selects an item from the pool based on the input -// listParams, removes it from the pool, and returns treeWalker -// channels. Release will return nil, if listParams is not -// recognized. -func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { +// Release - selects a treeWalk from the pool based on the input +// listParams, removes it from the pool, and returns the treeWalkResult +// channel. +// Returns nil if listParams does not have an asccociated treeWalk. +func (t treeWalkPool) Release(params listParams) (resultCh chan treeWalkResult, endWalkCh chan struct{}) { t.lock.Lock() defer t.lock.Unlock() walks, ok := t.pool[params] // Pick the valid walks. @@ -77,40 +87,45 @@ func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker } else { delete(t.pool, params) } - walk.doneCh <- struct{}{} - return walk.treeWalkerCh, walk.treeWalkerDoneCh + walk.endTimerCh <- struct{}{} + return walk.resultCh, walk.endWalkCh } } // Release return nil if params not found. return nil, nil } -// Set - adds new list params along with treeWalker channel to the -// pool for future. Additionally this also starts a go routine which -// waits at the configured timeout. Additionally this go-routine is -// also closed pro-actively by 'Release' call when the treeWalker -// item is obtained from the pool. -func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { +// Set - adds a treeWalk to the treeWalkPool. +// Also starts a timer go-routine that ends when: +// 1) time.After() expires after t.timeOut seconds. +// The expiration is needed so that the treeWalk go-routine resources are freed after a timeout +// if the S3 client does only partial listing of objects. +// 2) Relase() signals the timer go-routine to end on endTimerCh. +// During listing the timer should not timeout and end the treeWalk go-routine, hence the +// timer go-routine should be ended. +func (t treeWalkPool) Set(params listParams, resultCh chan treeWalkResult, endWalkCh chan struct{}) { t.lock.Lock() defer t.lock.Unlock() // Should be a buffered channel so that Release() never blocks. - var doneCh = make(chan struct{}, 1) - walkInfo := treeWalkerPoolInfo{ - treeWalkerCh: treeWalkerCh, - treeWalkerDoneCh: treeWalkerDoneCh, - doneCh: doneCh, + endTimerCh := make(chan struct{}, 1) + walkInfo := treeWalk{ + resultCh: resultCh, + endWalkCh: endWalkCh, + endTimerCh: endTimerCh, } // Append new walk info. t.pool[params] = append(t.pool[params], walkInfo) - // Safe expiry of treeWalkerCh after timeout. - go func(doneCh <-chan struct{}) { + // Timer go-routine which times out after t.timeOut seconds. + go func(endTimerCh <-chan struct{}) { select { // Wait until timeOut case <-time.After(t.timeOut): + // Timeout has expired. Remove the treeWalk from treeWalkPool and + // end the treeWalk go-routine. t.lock.Lock() - walks, ok := t.pool[params] // Look for valid walks. + walks, ok := t.pool[params] if ok { // Look for walkInfo, remove it from the walks list. for i, walk := range walks { @@ -118,19 +133,21 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre walks = append(walks[:i], walks[i+1:]...) } } - // Walks is empty we have no more pending requests. - // Remove map entry. if len(walks) == 0 { + // No more treeWalk go-routines associated with listParams + // hence remove map entry. delete(t.pool, params) - } else { // Save the updated walks. + } else { + // There are more treeWalk go-routines associated with listParams + // hence save the list in the map. t.pool[params] = walks } } - // Close tree walker for the backing go-routine to die. - close(treeWalkerDoneCh) + // Signal the treeWalk go-routine to die. + close(endWalkCh) t.lock.Unlock() - case <-doneCh: + case <-endTimerCh: return } - }(doneCh) + }(endTimerCh) } diff --git a/tree-walk-xl.go b/tree-walk-xl.go index df4da9ade..14e9061bd 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -21,16 +21,8 @@ import ( "strings" ) -// listParams - list object params used for list object map -type listParams struct { - bucket string - recursive bool - marker string - prefix string -} - // Tree walk result carries results of tree walking. -type treeWalker struct { +type treeWalkResult struct { entry string err error end bool @@ -80,7 +72,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } // treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, isEnd bool) error { +func (xl xlObjects) doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -100,9 +92,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, }, isLeaf) if err != nil { select { - case <-doneCh: + case <-endWalkCh: return errWalkAbort - case treeWalkCh <- treeWalker{err: err}: + case resultCh <- treeWalkResult{err: err}: return err } } @@ -149,7 +141,7 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked // true at the end of the treeWalk stream. markIsEnd := i == len(entries)-1 && isEnd - if tErr := xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, markIsEnd); tErr != nil { + if tErr := xl.doTreeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil { return tErr } continue @@ -157,9 +149,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, // EOF is set if we are at last entry and the caller indicated we at the end. isEOF := ((i == len(entries)-1) && isEnd) select { - case <-doneCh: + case <-endWalkCh: return errWalkAbort - case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}: + case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}: } } @@ -168,7 +160,7 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, } // Initiate a new treeWalk in a goroutine. -func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, doneCh chan struct{}) chan treeWalker { +func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool, endWalkCh chan struct{}) chan treeWalkResult { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -179,7 +171,7 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, // treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt" // and entryPrefixMatch="th" - treeWalkCh := make(chan treeWalker, maxObjectList) + resultCh := make(chan treeWalkResult, maxObjectList) entryPrefixMatch := prefix prefixDir := "" lastIndex := strings.LastIndex(prefix, slashSeparator) @@ -190,8 +182,8 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, marker = strings.TrimPrefix(marker, prefixDir) go func() { isEnd := true // Indication to start walking the tree with end as true. - xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, isEnd) - close(treeWalkCh) + xl.doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, resultCh, endWalkCh, isEnd) + close(resultCh) }() - return treeWalkCh + return resultCh } diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index 5708490f2..48661df87 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -26,17 +26,17 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey recursive = false } - walkerCh, walkerDoneCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) - if walkerCh == nil { - walkerDoneCh = make(chan struct{}) - walkerCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, walkerDoneCh) + walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) + if walkResultCh == nil { + endWalkCh = make(chan struct{}) + walkResultCh = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject, endWalkCh) } var objInfos []ObjectInfo var eof bool var nextMarker string for i := 0; i < maxKeys; { - walkResult, ok := <-walkerCh + walkResult, ok := <-walkResultCh if !ok { // Closed channel. eof = true @@ -76,7 +76,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey params := listParams{bucket, recursive, nextMarker, prefix} if !eof { - xl.listPool.Set(params, walkerCh, walkerDoneCh) + xl.listPool.Set(params, walkResultCh, endWalkCh) } result := ListObjectsInfo{IsTruncated: !eof} diff --git a/xl-v1.go b/xl-v1.go index 740ee0b70..e53196ea6 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -44,7 +44,7 @@ type xlObjects struct { writeQuorum int // writeQuorum minimum required disks to write data. // List pool management. - listPool *treeWalkerPool + listPool *treeWalkPool } // errXLMaxDisks - returned for reached maximum of disks. @@ -161,7 +161,7 @@ func newXLObjects(disks []string) (ObjectLayer, error) { storageDisks: newPosixDisks, dataBlocks: dataBlocks, parityBlocks: parityBlocks, - listPool: newTreeWalkerPool(globalLookupTimeout), + listPool: newTreeWalkPool(globalLookupTimeout), } // Figure out read and write quorum based on number of storage disks.