From 002c5bf7dd5fd74d7a782d67e1631208dd5d308d Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Sat, 4 Jun 2016 00:03:50 +0530 Subject: [PATCH] XL: Treewalk handle all the race conditions and blocking channels. --- tree-walk-pool.go | 52 +++++++++++++++++++++++++------------- tree-walk-xl.go | 63 ++++++++++++++++++++++------------------------- 2 files changed, 65 insertions(+), 50 deletions(-) diff --git a/tree-walk-pool.go b/tree-walk-pool.go index 11e957126..461a17658 100644 --- a/tree-walk-pool.go +++ b/tree-walk-pool.go @@ -17,6 +17,7 @@ package main import ( + "errors" "sync" "time" ) @@ -26,6 +27,9 @@ const ( globalLookupTimeout = time.Minute * 30 // 30minutes. ) +// errWalkAbort - returned by the treeWalker routine, it signals the end of treeWalk. +var errWalkAbort = errors.New("treeWalk abort") + // treeWalkerPoolInfo - tree walker pool info carries temporary walker // channel stored until timeout is called. type treeWalkerPoolInfo struct { @@ -62,17 +66,19 @@ func newTreeWalkerPool(timeout time.Duration) *treeWalkerPool { func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { t.lock.Lock() defer t.lock.Unlock() - treeWalk, ok := t.pool[params] + walks, ok := t.pool[params] // Pick the valid walks. if ok { - if len(treeWalk) > 0 { - treeWalker := treeWalk[0] - if len(treeWalk[1:]) > 0 { - t.pool[params] = treeWalk[1:] + if len(walks) > 0 { + // Pop out the first valid walk entry. + walk := walks[0] + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks } else { delete(t.pool, params) } - treeWalker.doneCh <- struct{}{} - return treeWalker.treeWalkerCh, treeWalker.treeWalkerDoneCh + walk.doneCh <- struct{}{} + return walk.treeWalkerCh, walk.treeWalkerDoneCh } } // Release return nil if params not found. @@ -88,13 +94,15 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre t.lock.Lock() defer t.lock.Unlock() - var treeWalkerIdx = len(t.pool[params]) - var doneCh = make(chan struct{}) - t.pool[params] = append(t.pool[params], treeWalkerPoolInfo{ + // Should be a buffered channel so that Release() never blocks. + var doneCh = make(chan struct{}, 1) + walkInfo := treeWalkerPoolInfo{ treeWalkerCh: treeWalkerCh, treeWalkerDoneCh: treeWalkerDoneCh, doneCh: doneCh, - }) + } + // Append new walk info. + t.pool[params] = append(t.pool[params], walkInfo) // Safe expiry of treeWalkerCh after timeout. go func(doneCh <-chan struct{}) { @@ -102,13 +110,23 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre // Wait until timeOut case <-time.After(t.timeOut): t.lock.Lock() - treeWalk := t.pool[params] - treeWalk = append(treeWalk[:treeWalkerIdx], treeWalk[treeWalkerIdx+1:]...) - if len(treeWalk) == 0 { - delete(t.pool, params) - } else { - t.pool[params] = treeWalk + walks, ok := t.pool[params] // Look for valid walks. + if ok { + // Look for walkInfo, remove it from the walks list. + for i, walk := range walks { + if walk == walkInfo { + walks = append(walks[:i], walks[i+1:]...) + } + } + // Walks is empty we have no more pending requests. + // Remove map entry. + if len(walks) == 0 { + delete(t.pool, params) + } else { // Save the updated walks. + t.pool[params] = walks + } } + // Close tree walker for the backing go-routine to die. close(treeWalkerDoneCh) t.lock.Unlock() case <-doneCh: diff --git a/tree-walk-xl.go b/tree-walk-xl.go index d983353bb..df4da9ade 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -36,7 +36,13 @@ type treeWalker struct { end bool } -// listDir - listDir. +// listDir - lists all the entries at a given prefix, takes additional params as filter and leaf detection. +// filter is required to filter out the listed entries usually this function is supposed to return +// true or false. +// isLeaf is required to differentiate between directories and objects, this is a special requirement for XL +// backend since objects are kept as directories, the only way to know if a directory is truly an object +// we validate if 'xl.json' exists at the leaf. isLeaf replies true/false based on the outcome of a Stat +// operation. func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { if disk == nil { @@ -74,7 +80,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } // treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, stackDepth int, isEnd bool) { +func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, isEnd bool) error { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -95,18 +101,14 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, if err != nil { select { case <-doneCh: - if stackDepth == 0 { - close(treeWalkCh) - } + return errWalkAbort case treeWalkCh <- treeWalker{err: err}: + return err } - return } + // For an empty list return right here. if len(entries) == 0 { - if stackDepth == 0 { - close(treeWalkCh) - } - return + return nil } // example: @@ -116,11 +118,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, return entries[i] >= markerDir }) entries = entries[idx:] + // For an empty list after search through the entries, return right here. if len(entries) == 0 { - if stackDepth == 0 { - close(treeWalkCh) - } - return + return nil } for i, entry := range entries { if i == 0 && markerDir == entry { @@ -146,32 +146,25 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, markerArg = markerBase } prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if i == len(entries)-1 && stackDepth == 0 { - isEnd = true + // markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked + // true at the end of the treeWalk stream. + markIsEnd := i == len(entries)-1 && isEnd + if tErr := xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, markIsEnd); tErr != nil { + return tErr } - stackDepth++ - xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, stackDepth, isEnd) - stackDepth-- continue } - var isEOF bool - if stackDepth == 0 && i == len(entries)-1 { - isEOF = true - } else if i == len(entries)-1 && isEnd { - isEOF = true - } + // EOF is set if we are at last entry and the caller indicated we at the end. + isEOF := ((i == len(entries)-1) && isEnd) select { case <-doneCh: - if stackDepth == 0 { - close(treeWalkCh) - return - } + return errWalkAbort case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}: } } - if stackDepth == 0 { - close(treeWalkCh) - } + + // Everything is listed. + return nil } // Initiate a new treeWalk in a goroutine. @@ -195,6 +188,10 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, prefixDir = prefix[:lastIndex+1] } marker = strings.TrimPrefix(marker, prefixDir) - go xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, 0, false) + go func() { + isEnd := true // Indication to start walking the tree with end as true. + xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, isEnd) + close(treeWalkCh) + }() return treeWalkCh }