@ -96,6 +96,25 @@ func init() {
gob . Register ( listPathOptions { } )
}
// newMetacache constructs a new metacache from the options.
func ( o listPathOptions ) newMetacache ( ) metacache {
return metacache {
id : o . ID ,
bucket : o . Bucket ,
root : o . BaseDir ,
recursive : o . Recursive ,
status : scanStateStarted ,
error : "" ,
started : UTCNow ( ) ,
lastHandout : UTCNow ( ) ,
lastUpdate : UTCNow ( ) ,
ended : time . Time { } ,
startedCycle : o . CurrentCycle ,
endedCycle : 0 ,
dataVersion : metacacheStreamVersion ,
}
}
// gatherResults will collect all results on the input channel and filter results according to the options.
// Caller should close the channel when done.
// The returned function will return the results once there is enough or input is closed.
@ -230,23 +249,15 @@ func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) {
}
}
// newMetacache constructs a new metacache from the options.
func ( o listPathOptions ) newMetacache ( ) metacache {
return metacache {
id : o . ID ,
bucket : o . Bucket ,
root : o . BaseDir ,
recursive : o . Recursive ,
status : scanStateStarted ,
error : "" ,
started : UTCNow ( ) ,
lastHandout : UTCNow ( ) ,
lastUpdate : UTCNow ( ) ,
ended : time . Time { } ,
startedCycle : o . CurrentCycle ,
endedCycle : 0 ,
dataVersion : metacacheStreamVersion ,
// updateMetacacheListing will update the metacache listing.
func ( o * listPathOptions ) updateMetacacheListing ( m metacache , rpc * peerRESTClient ) ( metacache , error ) {
if o . Transient {
return localMetacacheMgr . getTransient ( ) . updateCacheEntry ( m )
}
if rpc == nil {
return localMetacacheMgr . getBucket ( GlobalContext , o . Bucket ) . updateCacheEntry ( m )
}
return rpc . UpdateMetacacheListing ( context . Background ( ) , m )
}
func getMetacacheBlockInfo ( fi FileInfo , block int ) ( * metacacheBlock , error ) {
@ -332,6 +343,7 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor
func ( er * erasureObjects ) streamMetadataParts ( ctx context . Context , o listPathOptions ) ( entries metaCacheEntriesSorted , err error ) {
retries := 0
const debugPrint = false
rpc := globalNotificationSys . restClientFromHash ( o . Bucket )
for {
select {
case <- ctx . Done ( ) :
@ -339,6 +351,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
default :
}
const retryDelay = 500 * time . Millisecond
// Load first part metadata...
// All operations are performed without locks, so we must be careful and allow for failures.
fi , metaArr , onlineDisks , err := er . getObjectFileInfo ( ctx , minioMetaBucket , o . objectPath ( 0 ) , ObjectOptions { } )
@ -346,18 +359,17 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
if err == errFileNotFound || errors . Is ( err , errErasureReadQuorum ) || errors . Is ( err , InsufficientReadQuorum { } ) {
// Not ready yet...
if retries == 10 {
err := o . checkMetacacheState ( ctx )
err := o . checkMetacacheState ( ctx , rpc )
if debugPrint {
logger . Info ( "waiting for first part (%s), err: %v" , o . objectPath ( 0 ) , err )
}
if err != nil {
return entries , err
}
retries = 0
continue
retries = - 1
}
retries ++
time . Sleep ( 100 * time . Millisecond )
time . Sleep ( retryDelay )
continue
}
if debugPrint {
@ -375,22 +387,22 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
case nil :
case io . ErrUnexpectedEOF , errErasureReadQuorum , InsufficientReadQuorum { } :
if retries == 10 {
err := o . checkMetacacheState ( ctx )
err := o . checkMetacacheState ( ctx , rpc )
if debugPrint {
logger . Info ( "waiting for metadata, err: %v" , err )
}
if err != nil {
return entries , err
}
retries = 0
continue
retries = - 1
}
retries ++
time . Sleep ( 100 * time . Millisecond )
time . Sleep ( retryDelay )
continue
case io . EOF :
return entries , io . EOF
}
// We got a stream to start at.
loadedPart := 0
var buf bytes . Buffer
@ -407,25 +419,24 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
switch err {
case errFileNotFound , errErasureReadQuorum , InsufficientReadQuorum { } :
if retries >= 10 {
err := o . checkMetacacheState ( ctx )
err := o . checkMetacacheState ( ctx , rpc )
if debugPrint {
logger . Info ( "waiting for part data (%v), err: %v" , o . objectPath ( partN ) , err )
}
if err != nil {
return entries , err
}
retries = 0
continue
retries = - 1
}
time . Sleep ( 100 * time . Millisecond )
time . Sleep ( retryDelay )
continue
default :
time . Sleep ( 100 * time . Millisecond )
if retries >= 20 {
// We had at least 10 retries without getting a result.
logger . LogIf ( ctx , err )
return entries , err
}
time . Sleep ( retryDelay )
retries ++
continue
case nil :
@ -452,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
return entries , err
}
retries ++
time . Sleep ( 100 * time . Millisecond )
time . Sleep ( retryDelay )
continue
default :
logger . LogIf ( ctx , err )
@ -511,36 +522,34 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
return entries , err
}
rpcClient := globalNotificationSys . restClientFromHash ( o . Bucket )
meta := o . newMetacache ( )
rpc := globalNotificationSys . restClientFromHash ( o . Bucket )
var metaMu sync . Mutex
if debugPrint {
console . Println ( "listPath: scanning bucket:" , o . Bucket , "basedir:" , o . BaseDir , "prefix:" , o . Prefix , "marker:" , o . Marker )
}
// Disconnect from call above, but cancel on exit.
ctx , cancel := context . WithCancel ( GlobalContext )
// We need to ask disks.
disks := er . getOnlineDisks ( )
defer func ( ) {
if debugPrint {
console . Println ( "listPath returning:" , entries . len ( ) , "err:" , err )
}
if err != nil {
if err != nil && err != io . EOF {
metaMu . Lock ( )
if meta . status != scanStateError {
meta . error = err . Error ( )
meta . status = scanStateError
}
lm := meta
meta , _ = o . updateMetacacheListing ( meta , rpc )
metaMu . Unlock ( )
if rpcClient == nil {
localMetacacheMgr . getBucket ( GlobalContext , o . Bucket ) . updateCacheEntry ( lm )
} else {
rpcClient . UpdateMetacacheListing ( context . Background ( ) , lm )
}
cancel ( )
}
} ( )
if debugPrint {
console . Println ( "listPath: scanning bucket:" , o . Bucket , "basedir:" , o . BaseDir , "prefix:" , o . Prefix , "marker:" , o . Marker )
}
// Disconnect from call above, but cancel on exit.
ctx , cancel := context . WithCancel ( GlobalContext )
// We need to ask disks.
disks := er . getOnlineDisks ( )
askDisks := o . AskDisks
if askDisks == - 1 {
@ -571,7 +580,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
cancel ( )
return entries , err
}
// Send request.
// Send request to each disk .
go func ( ) {
err := d . WalkDir ( ctx , WalkDirOptions { Bucket : o . Bucket , BaseDir : o . BaseDir , Recursive : o . Recursive || o . Separator != SlashSeparator } , w )
w . CloseWithError ( err )
@ -596,6 +605,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
defer cancel ( )
// Save continuous updates
go func ( ) {
var err error
ticker := time . NewTicker ( 10 * time . Second )
defer ticker . Stop ( )
var exit bool
@ -607,21 +617,13 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
}
metaMu . Lock ( )
meta . endedCycle = intDataUpdateTracker . current ( )
lm := meta
metaMu . Unlock ( )
var err error
if o . Transient {
lm , err = localMetacacheMgr . getTransient ( ) . updateCacheEntry ( lm )
} else if rpcClient == nil {
lm , err = localMetacacheMgr . getBucket ( GlobalContext , o . Bucket ) . updateCacheEntry ( lm )
} else {
lm , err = rpcClient . UpdateMetacacheListing ( context . Background ( ) , lm )
}
logger . LogIf ( ctx , err )
if lm . status == scanStateError {
meta , err = o . updateMetacacheListing ( meta , rpc )
if meta . status == scanStateError {
cancel ( )
exit = true
}
metaMu . Unlock ( )
logger . LogIf ( ctx , err )
}
} ( )
@ -640,8 +642,10 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
_ , err = er . putObject ( ctx , minioMetaBucket , o . objectPath ( b . n ) , NewPutObjReader ( r , nil , nil ) , ObjectOptions { UserDefined : custom } )
if err != nil {
metaMu . Lock ( )
meta . status = scanStateError
meta . error = err . Error ( )
if meta . error != "" {
meta . status = scanStateError
meta . error = err . Error ( )
}
metaMu . Unlock ( )
cancel ( )
return err
@ -758,18 +762,24 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
}
}
}
closeChannels ( )
// Save success
metaMu . Lock ( )
if meta . error == "" {
if err := bw . Close ( ) ; err != nil {
meta . error = err . Error ( )
meta . status = scanStateError
} else {
meta . status = scanStateSuccess
meta . endedCycle = intDataUpdateTracker . current ( )
}
meta . status = scanStateSuccess
meta . endedCycle = intDataUpdateTracker . current ( )
}
meta , _ = o . updateMetacacheListing ( meta , rpc )
metaMu . Unlock ( )
closeChannels ( )
if err := bw . Close ( ) ; err != nil {
metaMu . Lock ( )
meta . error = err . Error ( )
meta . status = scanStateError
meta , err = o . updateMetacacheListing ( meta , rpc )
metaMu . Unlock ( )
}
} ( )
return filteredResults ( )