@ -20,6 +20,8 @@ import (
"bytes"
"bytes"
"context"
"context"
"encoding/json"
"encoding/json"
"os"
"path/filepath"
"time"
"time"
jsoniter "github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"
@ -48,40 +50,36 @@ func runDataUsageInfoUpdateRoutine() {
break
break
}
}
ctx := context . Background ( )
runDataUsageInfo ( context . Background ( ) , objAPI , GlobalServiceDoneCh )
switch v := objAPI . ( type ) {
case * xlZones :
runDataUsageInfoForXLZones ( ctx , v , GlobalServiceDoneCh )
case * FSObjects :
runDataUsageInfoForFS ( ctx , v , GlobalServiceDoneCh )
default :
return
}
}
}
func runDataUsageInfoForFS ( ctx context . Context , fsObj * FSObjects , endCh <- chan struct { } ) {
// timeToNextCrawl returns the duration until next crawl should occur
t := time . NewTicker ( dataUsageCrawlInterval )
// this is validated by verifying the LastUpdate time.
defer t . Stop ( )
func timeToCrawl ( ctx context . Context , objAPI ObjectLayer ) time . Duration {
for {
dataUsageInfo , err := loadDataUsageFromBackend ( ctx , objAPI )
// Get data usage info of the FS Object
if err != nil {
usageInfo := fsObj . crawlAndGetDataUsageInfo ( ctx , endCh )
// Upon an error wait for like 10
// Save the data usage in the disk
// seconds to start the crawler.
err := storeDataUsageInBackend ( ctx , fsObj , usageInfo )
return 10 * time . Second
if err != nil {
}
logger . LogIf ( ctx , err )
// File indeed doesn't exist when LastUpdate is zero
}
// so we have never crawled, start crawl right away.
select {
if dataUsageInfo . LastUpdate . IsZero ( ) {
case <- endCh :
return 1 * time . Second
return
// Wait until the next crawl interval
case <- t . C :
}
}
}
waitDuration := dataUsageInfo . LastUpdate . Sub ( UTCNow ( ) )
if waitDuration > dataUsageCrawlInterval {
// Waited long enough start crawl in a 1 second
return 1 * time . Second
}
// No crawling needed, ask the routine to wait until
// the daily interval 12hrs - delta between last update
// with current time.
return dataUsageCrawlInterval - waitDuration
}
}
func runDataUsageInfoForXLZones ( ctx context . Context , z * xlZones , endCh <- chan struct { } ) {
func runDataUsageInfo ( ctx context . Context , objAPI ObjectLayer , endCh <- chan struct { } ) {
locker := z . NewNSLock ( ctx , minioMetaBucket , "leader-data-usage-info" )
locker := objAPI . NewNSLock ( ctx , minioMetaBucket , "leader-data-usage-info" )
for {
for {
err := locker . GetLock ( newDynamicTimeout ( time . Millisecond , time . Millisecond ) )
err := locker . GetLock ( newDynamicTimeout ( time . Millisecond , time . Millisecond ) )
if err != nil {
if err != nil {
@ -93,19 +91,17 @@ func runDataUsageInfoForXLZones(ctx context.Context, z *xlZones, endCh <-chan st
break
break
}
}
t := time . NewTicker ( dataUsageCrawlInterval )
defer t . Stop ( )
for {
for {
usageInfo := z . crawlAndGetDataUsage ( ctx , endCh )
wait := timeToCrawl ( ctx , objAPI )
err := storeDataUsageInBackend ( ctx , z , usageInfo )
if err != nil {
logger . LogIf ( ctx , err )
}
select {
select {
case <- endCh :
case <- endCh :
locker . Unlock ( )
locker . Unlock ( )
return
return
case <- t . C :
case <- time . NewTimer ( wait ) . C :
// Crawl only when no previous crawl has occurred,
// or its been too long since last crawl.
err := storeDataUsageInBackend ( ctx , objAPI , objAPI . CrawlAndGetDataUsage ( ctx , endCh ) )
logger . LogIf ( ctx , err )
}
}
}
}
}
}
@ -131,7 +127,10 @@ func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsag
err := objAPI . GetObject ( ctx , minioMetaBackgroundOpsBucket , dataUsageObjName , 0 , - 1 , & dataUsageInfoJSON , "" , ObjectOptions { } )
err := objAPI . GetObject ( ctx , minioMetaBackgroundOpsBucket , dataUsageObjName , 0 , - 1 , & dataUsageInfoJSON , "" , ObjectOptions { } )
if err != nil {
if err != nil {
return DataUsageInfo { } , nil
if isErrObjectNotFound ( err ) {
return DataUsageInfo { } , nil
}
return DataUsageInfo { } , toObjectErr ( err , minioMetaBackgroundOpsBucket , dataUsageObjName )
}
}
var dataUsageInfo DataUsageInfo
var dataUsageInfo DataUsageInfo
@ -143,3 +142,85 @@ func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsag
return dataUsageInfo , nil
return dataUsageInfo , nil
}
}
// Item represents each file while walking.
type Item struct {
Path string
Typ os . FileMode
}
type getSizeFn func ( item Item ) ( int64 , error )
type activeIOFn func ( ) error
func updateUsage ( basePath string , endCh <- chan struct { } , waitForLowActiveIO activeIOFn , getSize getSizeFn ) DataUsageInfo {
var dataUsageInfo = DataUsageInfo {
BucketsSizes : make ( map [ string ] uint64 ) ,
ObjectsSizesHistogram : make ( map [ string ] uint64 ) ,
}
itemCh := make ( chan Item )
skipCh := make ( chan error )
defer close ( skipCh )
go func ( ) {
defer close ( itemCh )
fastWalk ( basePath , func ( path string , typ os . FileMode ) error {
if err := waitForLowActiveIO ( ) ; err != nil {
return filepath . SkipDir
}
select {
case <- endCh :
return filepath . SkipDir
case itemCh <- Item { path , typ } :
}
return <- skipCh
} )
} ( )
for {
select {
case <- endCh :
return dataUsageInfo
case item , ok := <- itemCh :
if ! ok {
return dataUsageInfo
}
bucket , entry := path2BucketObjectWithBasePath ( basePath , item . Path )
if bucket == "" {
skipCh <- nil
continue
}
if isReservedOrInvalidBucket ( bucket , false ) {
skipCh <- filepath . SkipDir
continue
}
if entry == "" && item . Typ & os . ModeDir != 0 {
dataUsageInfo . BucketsCount ++
dataUsageInfo . BucketsSizes [ bucket ] = 0
skipCh <- nil
continue
}
if item . Typ & os . ModeDir != 0 {
skipCh <- nil
continue
}
size , err := getSize ( item )
if err != nil {
skipCh <- errSkipFile
continue
}
dataUsageInfo . ObjectsCount ++
dataUsageInfo . ObjectsTotalSize += uint64 ( size )
dataUsageInfo . BucketsSizes [ bucket ] += uint64 ( size )
dataUsageInfo . ObjectsSizesHistogram [ objSizeToHistoInterval ( uint64 ( size ) ) ] ++
skipCh <- nil
}
}
}