XL: Treewalk handle all the race conditions and blocking channels.

master
Krishna Srinivas 9 years ago committed by Harshavardhana
parent 1cf1532ca3
commit 002c5bf7dd
  1. 50
      tree-walk-pool.go
  2. 63
      tree-walk-xl.go

@ -17,6 +17,7 @@
package main package main
import ( import (
"errors"
"sync" "sync"
"time" "time"
) )
@ -26,6 +27,9 @@ const (
globalLookupTimeout = time.Minute * 30 // 30minutes. globalLookupTimeout = time.Minute * 30 // 30minutes.
) )
// errWalkAbort - returned by the treeWalker routine, it signals the end of treeWalk.
var errWalkAbort = errors.New("treeWalk abort")
// treeWalkerPoolInfo - tree walker pool info carries temporary walker // treeWalkerPoolInfo - tree walker pool info carries temporary walker
// channel stored until timeout is called. // channel stored until timeout is called.
type treeWalkerPoolInfo struct { type treeWalkerPoolInfo struct {
@ -62,17 +66,19 @@ func newTreeWalkerPool(timeout time.Duration) *treeWalkerPool {
func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) { func (t treeWalkerPool) Release(params listParams) (treeWalkerCh chan treeWalker, treeWalkerDoneCh chan struct{}) {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
treeWalk, ok := t.pool[params] walks, ok := t.pool[params] // Pick the valid walks.
if ok { if ok {
if len(treeWalk) > 0 { if len(walks) > 0 {
treeWalker := treeWalk[0] // Pop out the first valid walk entry.
if len(treeWalk[1:]) > 0 { walk := walks[0]
t.pool[params] = treeWalk[1:] walks = walks[1:]
if len(walks) > 0 {
t.pool[params] = walks
} else { } else {
delete(t.pool, params) delete(t.pool, params)
} }
treeWalker.doneCh <- struct{}{} walk.doneCh <- struct{}{}
return treeWalker.treeWalkerCh, treeWalker.treeWalkerDoneCh return walk.treeWalkerCh, walk.treeWalkerDoneCh
} }
} }
// Release return nil if params not found. // Release return nil if params not found.
@ -88,13 +94,15 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
var treeWalkerIdx = len(t.pool[params]) // Should be a buffered channel so that Release() never blocks.
var doneCh = make(chan struct{}) var doneCh = make(chan struct{}, 1)
t.pool[params] = append(t.pool[params], treeWalkerPoolInfo{ walkInfo := treeWalkerPoolInfo{
treeWalkerCh: treeWalkerCh, treeWalkerCh: treeWalkerCh,
treeWalkerDoneCh: treeWalkerDoneCh, treeWalkerDoneCh: treeWalkerDoneCh,
doneCh: doneCh, doneCh: doneCh,
}) }
// Append new walk info.
t.pool[params] = append(t.pool[params], walkInfo)
// Safe expiry of treeWalkerCh after timeout. // Safe expiry of treeWalkerCh after timeout.
go func(doneCh <-chan struct{}) { go func(doneCh <-chan struct{}) {
@ -102,13 +110,23 @@ func (t treeWalkerPool) Set(params listParams, treeWalkerCh chan treeWalker, tre
// Wait until timeOut // Wait until timeOut
case <-time.After(t.timeOut): case <-time.After(t.timeOut):
t.lock.Lock() t.lock.Lock()
treeWalk := t.pool[params] walks, ok := t.pool[params] // Look for valid walks.
treeWalk = append(treeWalk[:treeWalkerIdx], treeWalk[treeWalkerIdx+1:]...) if ok {
if len(treeWalk) == 0 { // Look for walkInfo, remove it from the walks list.
for i, walk := range walks {
if walk == walkInfo {
walks = append(walks[:i], walks[i+1:]...)
}
}
// Walks is empty we have no more pending requests.
// Remove map entry.
if len(walks) == 0 {
delete(t.pool, params) delete(t.pool, params)
} else { } else { // Save the updated walks.
t.pool[params] = treeWalk t.pool[params] = walks
}
} }
// Close tree walker for the backing go-routine to die.
close(treeWalkerDoneCh) close(treeWalkerDoneCh)
t.lock.Unlock() t.lock.Unlock()
case <-doneCh: case <-doneCh:

@ -36,7 +36,13 @@ type treeWalker struct {
end bool end bool
} }
// listDir - listDir. // listDir - lists all the entries at a given prefix, takes additional params as filter and leaf detection.
// filter is required to filter out the listed entries usually this function is supposed to return
// true or false.
// isLeaf is required to differentiate between directories and objects, this is a special requirement for XL
// backend since objects are kept as directories, the only way to know if a directory is truly an object
// we validate if 'xl.json' exists at the leaf. isLeaf replies true/false based on the outcome of a Stat
// operation.
func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) {
for _, disk := range xl.getLoadBalancedQuorumDisks() { for _, disk := range xl.getLoadBalancedQuorumDisks() {
if disk == nil { if disk == nil {
@ -74,7 +80,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
} }
// treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. // treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, stackDepth int, isEnd bool) { func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, isLeaf func(string, string) bool, treeWalkCh chan treeWalker, doneCh chan struct{}, isEnd bool) error {
// Example: // Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt" // called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -95,18 +101,14 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
if err != nil { if err != nil {
select { select {
case <-doneCh: case <-doneCh:
if stackDepth == 0 { return errWalkAbort
close(treeWalkCh)
}
case treeWalkCh <- treeWalker{err: err}: case treeWalkCh <- treeWalker{err: err}:
return err
} }
return
} }
// For an empty list return right here.
if len(entries) == 0 { if len(entries) == 0 {
if stackDepth == 0 { return nil
close(treeWalkCh)
}
return
} }
// example: // example:
@ -116,11 +118,9 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
return entries[i] >= markerDir return entries[i] >= markerDir
}) })
entries = entries[idx:] entries = entries[idx:]
// For an empty list after search through the entries, return right here.
if len(entries) == 0 { if len(entries) == 0 {
if stackDepth == 0 { return nil
close(treeWalkCh)
}
return
} }
for i, entry := range entries { for i, entry := range entries {
if i == 0 && markerDir == entry { if i == 0 && markerDir == entry {
@ -146,32 +146,25 @@ func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
markerArg = markerBase markerArg = markerBase
} }
prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories.
if i == len(entries)-1 && stackDepth == 0 { // markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
isEnd = true // true at the end of the treeWalk stream.
markIsEnd := i == len(entries)-1 && isEnd
if tErr := xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, markIsEnd); tErr != nil {
return tErr
} }
stackDepth++
xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, isLeaf, treeWalkCh, doneCh, stackDepth, isEnd)
stackDepth--
continue continue
} }
var isEOF bool // EOF is set if we are at last entry and the caller indicated we at the end.
if stackDepth == 0 && i == len(entries)-1 { isEOF := ((i == len(entries)-1) && isEnd)
isEOF = true
} else if i == len(entries)-1 && isEnd {
isEOF = true
}
select { select {
case <-doneCh: case <-doneCh:
if stackDepth == 0 { return errWalkAbort
close(treeWalkCh)
return
}
case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}: case treeWalkCh <- treeWalker{entry: pathJoin(prefixDir, entry), end: isEOF}:
} }
} }
if stackDepth == 0 {
close(treeWalkCh) // Everything is listed.
} return nil
} }
// Initiate a new treeWalk in a goroutine. // Initiate a new treeWalk in a goroutine.
@ -195,6 +188,10 @@ func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool,
prefixDir = prefix[:lastIndex+1] prefixDir = prefix[:lastIndex+1]
} }
marker = strings.TrimPrefix(marker, prefixDir) marker = strings.TrimPrefix(marker, prefixDir)
go xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, 0, false) go func() {
isEnd := true // Indication to start walking the tree with end as true.
xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, isLeaf, treeWalkCh, doneCh, isEnd)
close(treeWalkCh)
}()
return treeWalkCh return treeWalkCh
} }

Loading…
Cancel
Save