@ -45,7 +45,7 @@ import (
const (
const (
dataCrawlSleepPerFolder = time . Millisecond // Time to wait between folders.
dataCrawlSleepPerFolder = time . Millisecond // Time to wait between folders.
dataCrawlStartDelay = 5 * time . Minute // Time to wait on startup and between cycles.
dataCrawlStartDelay = 1 * time . Minute // Time to wait on startup and between cycles.
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
healDeleteDangling = true
healDeleteDangling = true
@ -426,8 +426,11 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
cache . addChildString ( entName )
cache . addChildString ( entName )
this := cachedFolder { name : entName , parent : & thisHash , objectHealProbDiv : folder . objectHealProbDiv }
this := cachedFolder { name : entName , parent : & thisHash , objectHealProbDiv : folder . objectHealProbDiv }
delete ( existing , h . Key ( ) )
delete ( existing , h . Key ( ) ) // h.Key() already accounted for.
cache . addChild ( h )
cache . addChild ( h )
if final {
if final {
if exists {
if exists {
f . existingFolders = append ( f . existingFolders , this )
f . existingFolders = append ( f . existingFolders , this )
@ -454,17 +457,30 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
lifeCycle : activeLifeCycle ,
lifeCycle : activeLifeCycle ,
heal : thisHash . mod ( f . oldCache . Info . NextCycle , f . healObjectSelect / folder . objectHealProbDiv ) && globalIsErasure ,
heal : thisHash . mod ( f . oldCache . Info . NextCycle , f . healObjectSelect / folder . objectHealProbDiv ) && globalIsErasure ,
}
}
sizeSummary , err := f . getSize ( item )
wait ( )
sizeSummary , err := f . getSize ( item )
if err == errSkipFile {
if err == errSkipFile {
wait ( ) // wait to proceed to next entry.
return nil
return nil
}
}
logger . LogIf ( ctx , err )
// successfully read means we have a valid object.
// Remove filename i.e is the meta file to construct object name
item . transformMetaDir ( )
// Object already accounted for, remove from heal map,
// simply because getSize() function already heals the
// object.
delete ( existing , path . Join ( item . bucket , item . objectPath ( ) ) )
cache . addSizes ( sizeSummary )
cache . addSizes ( sizeSummary )
cache . Objects ++
cache . Objects ++
cache . ObjSizes . add ( sizeSummary . totalSize )
cache . ObjSizes . add ( sizeSummary . totalSize )
wait ( ) // wait to proceed to next entry.
return nil
return nil
} )
} )
if err != nil {
if err != nil {
@ -516,7 +532,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
resolver . bucket = bucket
resolver . bucket = bucket
foundObjs := false
foundObjs := false
dangling := tru e
dangling := fals e
ctx , cancel := context . WithCancel ( ctx )
ctx , cancel := context . WithCancel ( ctx )
err := listPathRaw ( ctx , listPathRawOptions {
err := listPathRaw ( ctx , listPathRawOptions {
disks : f . disks ,
disks : f . disks ,
@ -530,15 +546,16 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
if f . dataUsageCrawlDebug {
if f . dataUsageCrawlDebug {
logger . Info ( color . Green ( "healObjects:" ) + " got agreement: %v" , entry . name )
logger . Info ( color . Green ( "healObjects:" ) + " got agreement: %v" , entry . name )
}
}
if entry . isObject ( ) {
dangling = false
}
} ,
} ,
// Some disks have data for this.
// Some disks have data for this.
partial : func ( entries metaCacheEntries , nAgreed int , errs [ ] error ) {
partial : func ( entries metaCacheEntries , nAgreed int , errs [ ] error ) {
if f . dataUsageCrawlDebug {
if f . dataUsageCrawlDebug {
logger . Info ( color . Green ( "healObjects:" ) + " got partial, %d agreed, errs: %v" , nAgreed , errs )
logger . Info ( color . Green ( "healObjects:" ) + " got partial, %d agreed, errs: %v" , nAgreed , errs )
}
}
// agreed value less than expected quorum
dangling = nAgreed < resolver . objQuorum || nAgreed < resolver . dirQuorum
// Sleep and reset.
// Sleep and reset.
wait ( )
wait ( )
wait = crawlerSleeper . Timer ( ctx )
wait = crawlerSleeper . Timer ( ctx )
@ -546,8 +563,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
if ! ok {
if ! ok {
for _ , err := range errs {
for _ , err := range errs {
if err != nil {
if err != nil {
// Not all disks are ready, do nothing for now.
dangling = false
return
return
}
}
}
}
@ -559,10 +574,10 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
if f . dataUsageCrawlDebug {
if f . dataUsageCrawlDebug {
logger . Info ( color . Green ( "healObjects:" ) + " resolved to: %v, dir: %v" , entry . name , entry . isDir ( ) )
logger . Info ( color . Green ( "healObjects:" ) + " resolved to: %v, dir: %v" , entry . name , entry . isDir ( ) )
}
}
if entry . isDir ( ) {
if entry . isDir ( ) {
return
return
}
}
dangling = false
// We got an entry which we should be able to heal.
// We got an entry which we should be able to heal.
fiv , err := entry . fileInfoVersions ( bucket )
fiv , err := entry . fileInfoVersions ( bucket )
if err != nil {
if err != nil {
@ -597,7 +612,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
if f . dataUsageCrawlDebug {
if f . dataUsageCrawlDebug {
logger . Info ( color . Green ( "healObjects:" ) + " too many errors: %v" , errs )
logger . Info ( color . Green ( "healObjects:" ) + " too many errors: %v" , errs )
}
}
dangling = false
cancel ( )
cancel ( )
} ,
} ,
} )
} )
@ -612,8 +626,20 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
logger . Info ( color . Green ( "healObjects:" ) + " deleting dangling directory %s" , prefix )
logger . Info ( color . Green ( "healObjects:" ) + " deleting dangling directory %s" , prefix )
}
}
// If we have quorum, found directories, but no objects, issue heal to delete the dangling.
objAPI . HealObjects ( ctx , bucket , prefix , madmin . HealOpts {
objAPI . HealObject ( ctx , bucket , prefix , "" , madmin . HealOpts { Recursive : true , Remove : true } )
Recursive : true ,
Remove : true ,
} ,
func ( bucket , object , versionID string ) error {
// Wait for each heal as per crawler frequency.
wait ( )
wait = crawlerSleeper . Timer ( ctx )
return bgSeq . queueHealTask ( healSource {
bucket : bucket ,
object : object ,
versionID : versionID ,
} , madmin . HealItemObject )
} )
}
}
wait ( )
wait ( )
@ -746,6 +772,7 @@ type actionMeta struct {
oi ObjectInfo
oi ObjectInfo
successorModTime time . Time // The modtime of the successor version
successorModTime time . Time // The modtime of the successor version
numVersions int // The number of versions of this object
numVersions int // The number of versions of this object
bitRotScan bool // indicates if bitrot check was requested.
}
}
// applyActions will apply lifecycle checks on to a scanned item.
// applyActions will apply lifecycle checks on to a scanned item.
@ -765,7 +792,11 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action
logger . Info ( color . Green ( "applyActions:" ) + " heal checking: %v/%v" , i . bucket , i . objectPath ( ) )
logger . Info ( color . Green ( "applyActions:" ) + " heal checking: %v/%v" , i . bucket , i . objectPath ( ) )
}
}
}
}
res , err := o . HealObject ( ctx , i . bucket , i . objectPath ( ) , meta . oi . VersionID , madmin . HealOpts { Remove : healDeleteDangling } )
healOpts := madmin . HealOpts { Remove : healDeleteDangling }
if meta . bitRotScan {
healOpts . ScanMode = madmin . HealDeepScan
}
res , err := o . HealObject ( ctx , i . bucket , i . objectPath ( ) , meta . oi . VersionID , healOpts )
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
return 0
return 0
}
}
@ -940,52 +971,52 @@ func (i *crawlItem) objectPath() string {
}
}
// healReplication will heal a scanned item that has failed replication.
// healReplication will heal a scanned item that has failed replication.
func ( i * crawlItem ) healReplication ( ctx context . Context , o ObjectLayer , meta actionMeta , sizeS * sizeSummary ) {
func ( i * crawlItem ) healReplication ( ctx context . Context , o ObjectLayer , oi ObjectInfo , sizeS * sizeSummary ) {
if meta . oi . DeleteMarker || ! meta . oi . VersionPurgeStatus . Empty ( ) {
if oi . DeleteMarker || ! oi . VersionPurgeStatus . Empty ( ) {
// heal delete marker replication failure or versioned delete replication failure
// heal delete marker replication failure or versioned delete replication failure
if meta . oi . ReplicationStatus == replication . Pending ||
if oi . ReplicationStatus == replication . Pending ||
meta . oi . ReplicationStatus == replication . Failed ||
oi . ReplicationStatus == replication . Failed ||
meta . oi . VersionPurgeStatus == Failed || meta . oi . VersionPurgeStatus == Pending {
oi . VersionPurgeStatus == Failed || oi . VersionPurgeStatus == Pending {
i . healReplicationDeletes ( ctx , o , meta )
i . healReplicationDeletes ( ctx , o , oi )
return
return
}
}
}
}
switch meta . oi . ReplicationStatus {
switch oi . ReplicationStatus {
case replication . Pending :
case replication . Pending :
sizeS . pendingSize += meta . oi . Size
sizeS . pendingSize += oi . Size
globalReplicationState . queueReplicaTask ( meta . oi )
globalReplicationState . queueReplicaTask ( oi )
case replication . Failed :
case replication . Failed :
sizeS . failedSize += meta . oi . Size
sizeS . failedSize += oi . Size
globalReplicationState . queueReplicaTask ( meta . oi )
globalReplicationState . queueReplicaTask ( oi )
case replication . Complete :
case replication . Complete :
sizeS . replicatedSize += meta . oi . Size
sizeS . replicatedSize += oi . Size
case replication . Replica :
case replication . Replica :
sizeS . replicaSize += meta . oi . Size
sizeS . replicaSize += oi . Size
}
}
}
}
// healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes.
// healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes.
func ( i * crawlItem ) healReplicationDeletes ( ctx context . Context , o ObjectLayer , meta actionMeta ) {
func ( i * crawlItem ) healReplicationDeletes ( ctx context . Context , o ObjectLayer , oi ObjectInfo ) {
// handle soft delete and permanent delete failures here.
// handle soft delete and permanent delete failures here.
if meta . oi . DeleteMarker || ! meta . oi . VersionPurgeStatus . Empty ( ) {
if oi . DeleteMarker || ! oi . VersionPurgeStatus . Empty ( ) {
versionID := ""
versionID := ""
dmVersionID := ""
dmVersionID := ""
if meta . oi . VersionPurgeStatus . Empty ( ) {
if oi . VersionPurgeStatus . Empty ( ) {
dmVersionID = meta . oi . VersionID
dmVersionID = oi . VersionID
} else {
} else {
versionID = meta . oi . VersionID
versionID = oi . VersionID
}
}
globalReplicationState . queueReplicaDeleteTask ( DeletedObjectVersionInfo {
globalReplicationState . queueReplicaDeleteTask ( DeletedObjectVersionInfo {
DeletedObject : DeletedObject {
DeletedObject : DeletedObject {
ObjectName : meta . oi . Name ,
ObjectName : oi . Name ,
DeleteMarkerVersionID : dmVersionID ,
DeleteMarkerVersionID : dmVersionID ,
VersionID : versionID ,
VersionID : versionID ,
DeleteMarkerReplicationStatus : string ( meta . oi . ReplicationStatus ) ,
DeleteMarkerReplicationStatus : string ( oi . ReplicationStatus ) ,
DeleteMarkerMTime : DeleteMarkerMTime { meta . oi . ModTime } ,
DeleteMarkerMTime : DeleteMarkerMTime { oi . ModTime } ,
DeleteMarker : meta . oi . DeleteMarker ,
DeleteMarker : oi . DeleteMarker ,
VersionPurgeStatus : meta . oi . VersionPurgeStatus ,
VersionPurgeStatus : oi . VersionPurgeStatus ,
} ,
} ,
Bucket : meta . oi . Bucket ,
Bucket : oi . Bucket ,
} )
} )
}
}
}
}