@ -1,5 +1,5 @@
/ *
* MinIO Cloud Storage , ( C ) 2019 MinIO , Inc .
* MinIO Cloud Storage , ( C ) 2019 , 2020 MinIO , Inc .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -28,6 +28,7 @@ const (
// mergeWalkVersions - represents the go routine that does the merge walk versions.
type mergeWalkVersions struct {
added time . Time
entryChs [ ] FileInfoVersionsCh
endWalkCh chan struct { } // To signal when mergeWalk go-routine should end.
endTimerCh chan <- struct { } // To signal when timer go-routine should end.
@ -88,17 +89,70 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
t . Lock ( )
defer t . Unlock ( )
// If we are above the limit delete at least one entry from the pool.
if len ( t . pool ) > treeWalkEntryLimit {
age := time . Now ( )
var oldest listParams
for k , v := range t . pool {
if len ( v ) == 0 {
delete ( t . pool , k )
continue
}
// The first element is the oldest, so we only check that.
e := v [ 0 ]
if e . added . Before ( age ) {
oldest = k
age = e . added
}
}
// Invalidate and delete oldest.
if walks , ok := t . pool [ oldest ] ; ok && len ( walks ) > 0 {
endCh := walks [ 0 ] . endTimerCh
endWalkCh := walks [ 0 ] . endWalkCh
if len ( walks ) > 1 {
// Move walks forward
copy ( walks , walks [ 1 : ] )
walks = walks [ : len ( walks ) - 1 ]
t . pool [ oldest ] = walks
} else {
// Only entry, just delete.
delete ( t . pool , oldest )
}
select {
case endCh <- struct { } { } :
close ( endWalkCh )
default :
}
} else {
// Shouldn't happen, but just in case.
delete ( t . pool , oldest )
}
}
// Should be a buffered channel so that Release() never blocks.
endTimerCh := make ( chan struct { } , 1 )
walkInfo := mergeWalkVersions {
added : UTCNow ( ) ,
entryChs : resultChs ,
endWalkCh : endWalkCh ,
endTimerCh : endTimerCh ,
}
// Append new walk info.
t . pool [ params ] = append ( t . pool [ params ] , walkInfo )
walks := t . pool [ params ]
if len ( walks ) < treeWalkSameEntryLimit {
t . pool [ params ] = append ( walks , walkInfo )
} else {
// We are at limit, invalidate oldest, move list down and add new as last.
select {
case walks [ 0 ] . endTimerCh <- struct { } { } :
close ( walks [ 0 ] . endWalkCh )
default :
}
copy ( walks , walks [ 1 : ] )
walks [ len ( walks ) - 1 ] = walkInfo
}
// Timer go-routine which times out after t.timeOut seconds.
go func ( endTimerCh <- chan struct { } , walkInfo mergeWalkVersions ) {
@ -108,6 +162,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
// Timeout has expired. Remove the mergeWalk from mergeWalkPool and
// end the mergeWalk go-routine.
t . Lock ( )
defer t . Unlock ( )
walks , ok := t . pool [ params ]
if ok {
// Trick of filtering without allocating
@ -131,7 +186,6 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
}
// Signal the mergeWalk go-routine to die.
close ( endWalkCh )
t . Unlock ( )
case <- endTimerCh :
return
}
@ -166,8 +220,11 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{})
t . Lock ( )
defer t . Unlock ( )
walks , ok := t . pool [ params ] // Pick the valid walks.
if ok {
if len ( walks ) > 0 {
if ! ok || len ( walks ) == 0 {
// Release return nil if params not found.
return nil , nil
}
// Pop out the first valid walk entry.
walk := walks [ 0 ]
walks [ 0 ] = mergeWalk { } // clear references.
@ -180,10 +237,6 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{})
walk . endTimerCh <- struct { } { }
return walk . entryChs , walk . endWalkCh
}
}
// Release return nil if params not found.
return nil , nil
}
// Set - adds a mergeWalk to the mergeWalkPool.
// Also starts a timer go-routine that ends when:
@ -207,27 +260,38 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
continue
}
// The first element is the oldest, so we only check that.
if v [ 0 ] . added . Before ( age ) {
e := v [ 0 ]
if e . added . Before ( age ) {
oldest = k
age = e . added
}
}
// Invalidate and delete oldest.
if walks , ok := t . pool [ oldest ] ; ok {
walk := walks [ 0 ]
walks [ 0 ] = mergeWalk { } // clear references.
walks = walks [ 1 : ]
if len ( walks ) > 0 {
t . pool [ params ] = walks
if walks , ok := t . pool [ oldest ] ; ok && len ( walks ) > 0 {
endCh := walks [ 0 ] . endTimerCh
endWalkCh := walks [ 0 ] . endWalkCh
if len ( walks ) > 1 {
// Move walks forward
copy ( walks , walks [ 1 : ] )
walks = walks [ : len ( walks ) - 1 ]
t . pool [ oldest ] = walks
} else {
delete ( t . pool , params )
// Only entry, just delete.
delete ( t . pool , oldest )
}
walk . endTimerCh <- struct { } { }
select {
case endCh <- struct { } { } :
close ( endWalkCh )
default :
}
} else {
// Shouldn't happen, but just in case.
delete ( t . pool , oldest )
}
}
// Should be a buffered channel so that Release() never blocks.
endTimerCh := make ( chan struct { } , 1 )
walkInfo := mergeWalk {
added : UTCNow ( ) ,
entryChs : resultChs ,
@ -241,7 +305,11 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
t . pool [ params ] = append ( walks , walkInfo )
} else {
// We are at limit, invalidate oldest, move list down and add new as last.
walks [ 0 ] . endTimerCh <- struct { } { }
select {
case walks [ 0 ] . endTimerCh <- struct { } { } :
close ( walks [ 0 ] . endWalkCh )
default :
}
copy ( walks , walks [ 1 : ] )
walks [ len ( walks ) - 1 ] = walkInfo
}
@ -254,6 +322,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
// Timeout has expired. Remove the mergeWalk from mergeWalkPool and
// end the mergeWalk go-routine.
t . Lock ( )
defer t . Unlock ( )
walks , ok := t . pool [ params ]
if ok {
// Trick of filtering without allocating
@ -277,7 +346,6 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
}
// Signal the mergeWalk go-routine to die.
close ( endWalkCh )
t . Unlock ( )
case <- endTimerCh :
return
}