@ -125,12 +125,14 @@ func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
// represents disk cache struct
type diskCache struct {
gcCounter uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// is set to 0 if drive is offline
online uint32
online uint32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
purgeRunning int32
dir string // caching directory
quotaPct int // max usage in %
triggerGC chan struct { }
dir string // caching directory
stats CacheDiskStats // disk cache stats for prometheus
quotaPct int // max usage in %
pool sync . Pool
after int // minimum accesses before an object is cached.
lowWatermark int
@ -142,12 +144,14 @@ type diskCache struct {
}
// Inits the disk cache dir if it is not initialized already.
func newDiskCache ( dir string , quotaPct , after , lowWatermark , highWatermark int ) ( * diskCache , error ) {
func newDiskCache ( ctx context . Context , dir string , quotaPct , after , lowWatermark , highWatermark int ) ( * diskCache , error ) {
if err := os . MkdirAll ( dir , 0777 ) ; err != nil {
return nil , fmt . Errorf ( "Unable to initialize '%s' dir, %w" , dir , err )
}
cache := diskCache {
dir : dir ,
triggerGC : make ( chan struct { } ) ,
stats : CacheDiskStats { Dir : dir } ,
quotaPct : quotaPct ,
after : after ,
lowWatermark : lowWatermark ,
@ -161,6 +165,8 @@ func newDiskCache(dir string, quotaPct, after, lowWatermark, highWatermark int)
} ,
nsMutex : newNSLock ( false ) ,
}
go cache . purgeWait ( ctx )
cache . diskUsageHigh ( ) // update if cache usage is already high.
cache . NewNSLockFn = func ( ctx context . Context , cachePath string ) RWLocker {
return cache . nsMutex . NewNSLock ( ctx , nil , cachePath , "" )
}
@ -181,7 +187,12 @@ func (c *diskCache) diskUsageLow() bool {
return false
}
usedPercent := ( di . Total - di . Free ) * 100 / di . Total
return int ( usedPercent ) < gcStopPct
low := int ( usedPercent ) < gcStopPct
atomic . StoreUint64 ( & c . stats . UsagePercent , usedPercent )
if low {
atomic . StoreInt32 ( & c . stats . UsageState , 0 )
}
return low
}
// Returns if the disk usage reaches high water mark w.r.t the configured cache quota.
@ -196,7 +207,12 @@ func (c *diskCache) diskUsageHigh() bool {
return false
}
usedPercent := ( di . Total - di . Free ) * 100 / di . Total
return int ( usedPercent ) >= gcTriggerPct
high := int ( usedPercent ) >= gcTriggerPct
atomic . StoreUint64 ( & c . stats . UsagePercent , usedPercent )
if high {
atomic . StoreInt32 ( & c . stats . UsageState , 1 )
}
return high
}
// Returns if size space can be allocated without exceeding
@ -230,24 +246,36 @@ var (
errDoneForNow = errors . New ( "done for now" )
)
func ( c * diskCache ) purgeWait ( ctx context . Context ) {
for {
select {
case <- ctx . Done ( ) :
case <- c . triggerGC : // wait here until someone triggers.
c . purge ( ctx )
}
}
}
// Purge cache entries that were not accessed.
func ( c * diskCache ) purge ( ctx context . Context ) {
if c . diskUsageLow ( ) {
if atomic . LoadInt32 ( & c . purgeRunning ) == 1 || c . diskUsageLow ( ) {
return
}
toFree := c . toClear ( )
if toFree == 0 {
return
}
atomic . StoreInt32 ( & c . purgeRunning , 1 ) // do not run concurrent purge()
defer atomic . StoreInt32 ( & c . purgeRunning , 0 )
// expiry for cleaning up old cache.json files that
// need to be cleaned up.
expiry := UTCNow ( ) . Add ( - cacheExpiryDays )
// defaulting max hits count to 100
scorer , err := newFileScorer ( toFree , time . Now ( ) . Unix ( ) , 100 )
if err != nil {
logger . LogIf ( ctx , err )
return
}
// ignore error we know what value we are passing.
scorer , _ := newFileScorer ( toFree , time . Now ( ) . Unix ( ) , 100 )
// this function returns FileInfo for cached range files and cache data file.
fiStatFn := func ( ranges map [ string ] string , dataFile , pathPrefix string ) map [ string ] os . FileInfo {
@ -326,27 +354,20 @@ func (c *diskCache) purge(ctx context.Context) {
return
}
for _ , path := range scorer . fileNames ( ) {
removeAll ( path )
slashIdx := strings . LastIndex ( path , SlashSeparator )
pathPrefix := path [ 0 : slashIdx ]
fname := path [ slashIdx + 1 : ]
if fname == cacheDataFile {
removeAll ( pathPrefix )
scorer . purgeFunc ( func ( qfile queuedFile ) {
fileName := qfile . name
removeAll ( fileName )
slashIdx := strings . LastIndex ( fileName , SlashSeparator )
if slashIdx >= 0 {
fileNamePrefix := fileName [ 0 : slashIdx ]
fname := fileName [ slashIdx + 1 : ]
if fname == cacheDataFile {
removeAll ( fileNamePrefix )
}
}
}
}
func ( c * diskCache ) incGCCounter ( ) {
atomic . AddUint64 ( & c . gcCounter , 1 )
}
func ( c * diskCache ) resetGCCounter ( ) {
atomic . StoreUint64 ( & c . gcCounter , 0 )
}
} )
func ( c * diskCache ) gcCount ( ) uint64 {
return atomic . LoadUint64 ( & c . gcCounter )
scorer . reset ( )
}
// sets cache drive status
@ -630,7 +651,7 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
// Caches the object to disk
func ( c * diskCache ) Put ( ctx context . Context , bucket , object string , data io . Reader , size int64 , rs * HTTPRangeSpec , opts ObjectOptions , incHitsOnly bool ) error {
if c . diskUsageHigh ( ) {
c . incGCCounter ( )
c . triggerGC <- struct { } { }
io . Copy ( ioutil . Discard , data )
return errDiskFull
}