@ -25,7 +25,9 @@ import (
// Global lookup timeout.
// Global lookup timeout.
const (
const (
globalLookupTimeout = time . Minute * 30 // 30minutes.
globalLookupTimeout = time . Minute * 30 // 30minutes.
treeWalkEntryLimit = 50
treeWalkSameEntryLimit = 4
)
)
// listParams - list object params used for list object map
// listParams - list object params used for list object map
@ -44,6 +46,7 @@ var errWalkAbort = errors.New("treeWalk abort")
// treeWalk - represents the go routine that does the file tree walk.
// treeWalk - represents the go routine that does the file tree walk.
type treeWalk struct {
type treeWalk struct {
added time . Time
resultCh chan TreeWalkResult
resultCh chan TreeWalkResult
endWalkCh chan struct { } // To signal when treeWalk go-routine should end.
endWalkCh chan struct { } // To signal when treeWalk go-routine should end.
endTimerCh chan <- struct { } // To signal when timer go-routine should end.
endTimerCh chan <- struct { } // To signal when timer go-routine should end.
@ -72,7 +75,7 @@ func NewTreeWalkPool(timeout time.Duration) *TreeWalkPool {
// Release - selects a treeWalk from the pool based on the input
// Release - selects a treeWalk from the pool based on the input
// listParams, removes it from the pool, and returns the TreeWalkResult
// listParams, removes it from the pool, and returns the TreeWalkResult
// channel.
// channel.
// Returns nil if listParams does not have an ascc ociated treeWalk.
// Returns nil if listParams does not have an ass ociated treeWalk.
func ( t * TreeWalkPool ) Release ( params listParams ) ( resultCh chan TreeWalkResult , endWalkCh chan struct { } ) {
func ( t * TreeWalkPool ) Release ( params listParams ) ( resultCh chan TreeWalkResult , endWalkCh chan struct { } ) {
t . Lock ( )
t . Lock ( )
defer t . Unlock ( )
defer t . Unlock ( )
@ -81,6 +84,7 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult,
if len ( walks ) > 0 {
if len ( walks ) > 0 {
// Pop out the first valid walk entry.
// Pop out the first valid walk entry.
walk := walks [ 0 ]
walk := walks [ 0 ]
walks [ 0 ] = treeWalk { } // clear references.
walks = walks [ 1 : ]
walks = walks [ 1 : ]
if len ( walks ) > 0 {
if len ( walks ) > 0 {
t . pool [ params ] = walks
t . pool [ params ] = walks
@ -100,22 +104,59 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult,
// 1) time.After() expires after t.timeOut seconds.
// 1) time.After() expires after t.timeOut seconds.
// The expiration is needed so that the treeWalk go-routine resources are freed after a timeout
// The expiration is needed so that the treeWalk go-routine resources are freed after a timeout
// if the S3 client does only partial listing of objects.
// if the S3 client does only partial listing of objects.
// 2) Relase() signals the timer go-routine to end on endTimerCh.
// 2) Rele ase() signals the timer go-routine to end on endTimerCh.
// During listing the timer should not timeout and end the treeWalk go-routine, hence the
// During listing the timer should not timeout and end the treeWalk go-routine, hence the
// timer go-routine should be ended.
// timer go-routine should be ended.
func ( t * TreeWalkPool ) Set ( params listParams , resultCh chan TreeWalkResult , endWalkCh chan struct { } ) {
func ( t * TreeWalkPool ) Set ( params listParams , resultCh chan TreeWalkResult , endWalkCh chan struct { } ) {
t . Lock ( )
t . Lock ( )
defer t . Unlock ( )
defer t . Unlock ( )
// If we are above the limit delete at least one entry from the pool.
if len ( t . pool ) > treeWalkEntryLimit {
age := time . Now ( )
var oldest listParams
for k , v := range t . pool {
if len ( v ) == 0 {
delete ( t . pool , k )
continue
}
// The first element is the oldest, so we only check that.
if v [ 0 ] . added . Before ( age ) {
oldest = k
}
}
// Invalidate and delete oldest.
if walks , ok := t . pool [ oldest ] ; ok {
walk := walks [ 0 ]
walks [ 0 ] = treeWalk { } // clear references.
walks = walks [ 1 : ]
if len ( walks ) > 0 {
t . pool [ params ] = walks
} else {
delete ( t . pool , params )
}
walk . endTimerCh <- struct { } { }
}
}
// Should be a buffered channel so that Release() never blocks.
// Should be a buffered channel so that Release() never blocks.
endTimerCh := make ( chan struct { } , 1 )
endTimerCh := make ( chan struct { } , 1 )
walkInfo := treeWalk {
walkInfo := treeWalk {
added : UTCNow ( ) ,
resultCh : resultCh ,
resultCh : resultCh ,
endWalkCh : endWalkCh ,
endWalkCh : endWalkCh ,
endTimerCh : endTimerCh ,
endTimerCh : endTimerCh ,
}
}
// Append new walk info.
// Append new walk info.
t . pool [ params ] = append ( t . pool [ params ] , walkInfo )
walks := t . pool [ params ]
if len ( walks ) < treeWalkSameEntryLimit {
t . pool [ params ] = append ( walks , walkInfo )
} else {
// We are at limit, invalidate oldest, move list down and add new as last.
walks [ 0 ] . endTimerCh <- struct { } { }
copy ( walks , walks [ 1 : ] )
walks [ len ( walks ) - 1 ] = walkInfo
}
// Timer go-routine which times out after t.timeOut seconds.
// Timer go-routine which times out after t.timeOut seconds.
go func ( endTimerCh <- chan struct { } ) {
go func ( endTimerCh <- chan struct { } ) {