You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
573 lines
15 KiB
573 lines
15 KiB
/*
|
|
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/djherbis/atime"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/disk"
|
|
"github.com/ncw/directio"
|
|
)
|
|
|
|
const (
|
|
// cache.json object metadata for cached objects.
|
|
cacheMetaJSONFile = "cache.json"
|
|
cacheDataFile = "part.1"
|
|
cacheMetaVersion = "1.0.0"
|
|
|
|
cacheEnvDelimiter = ";"
|
|
)
|
|
|
|
// CacheChecksumInfoV1 - carries checksums of individual blocks on disk.
|
|
type CacheChecksumInfoV1 struct {
|
|
Algorithm string `json:"algorithm"`
|
|
Blocksize int64 `json:"blocksize"`
|
|
}
|
|
|
|
// Represents the cache metadata struct
|
|
type cacheMeta struct {
|
|
Version string `json:"version"`
|
|
Stat statInfo `json:"stat"` // Stat of the current object `cache.json`.
|
|
|
|
// checksums of blocks on disk.
|
|
Checksum CacheChecksumInfoV1 `json:"checksum,omitempty"`
|
|
// Metadata map for current object.
|
|
Meta map[string]string `json:"meta,omitempty"`
|
|
}
|
|
|
|
func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
|
|
if len(m.Meta) == 0 {
|
|
m.Meta = make(map[string]string)
|
|
m.Stat.ModTime = timeSentinel
|
|
}
|
|
|
|
o = ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: object,
|
|
}
|
|
|
|
// We set file info only if its valid.
|
|
o.ModTime = m.Stat.ModTime
|
|
o.Size = m.Stat.Size
|
|
o.ETag = extractETag(m.Meta)
|
|
o.ContentType = m.Meta["content-type"]
|
|
o.ContentEncoding = m.Meta["content-encoding"]
|
|
if storageClass, ok := m.Meta[amzStorageClass]; ok {
|
|
o.StorageClass = storageClass
|
|
} else {
|
|
o.StorageClass = globalMinioDefaultStorageClass
|
|
}
|
|
var (
|
|
t time.Time
|
|
e error
|
|
)
|
|
if exp, ok := m.Meta["expires"]; ok {
|
|
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
|
o.Expires = t.UTC()
|
|
}
|
|
}
|
|
// etag/md5Sum has already been extracted. We need to
|
|
// remove to avoid it from appearing as part of user-defined metadata
|
|
o.UserDefined = cleanMetadata(m.Meta)
|
|
return o
|
|
}
|
|
|
|
// represents disk cache struct
|
|
type diskCache struct {
|
|
dir string // caching directory
|
|
maxDiskUsagePct int // max usage in %
|
|
expiry int // cache expiry in days
|
|
// mark false if drive is offline
|
|
online bool
|
|
// mutex to protect updates to online variable
|
|
onlineMutex *sync.RWMutex
|
|
// purge() listens on this channel to start the cache-purge process
|
|
purgeChan chan struct{}
|
|
pool sync.Pool
|
|
}
|
|
|
|
// Inits the disk cache dir if it is not initialized already.
|
|
func newdiskCache(dir string, expiry int, maxDiskUsagePct int) (*diskCache, error) {
|
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
|
return nil, fmt.Errorf("Unable to initialize '%s' dir, %s", dir, err)
|
|
}
|
|
|
|
if expiry == 0 {
|
|
expiry = globalCacheExpiry
|
|
}
|
|
cache := diskCache{
|
|
dir: dir,
|
|
expiry: expiry,
|
|
maxDiskUsagePct: maxDiskUsagePct,
|
|
purgeChan: make(chan struct{}),
|
|
online: true,
|
|
onlineMutex: &sync.RWMutex{},
|
|
pool: sync.Pool{
|
|
New: func() interface{} {
|
|
b := directio.AlignedBlock(int(cacheBlkSize))
|
|
return &b
|
|
},
|
|
},
|
|
}
|
|
return &cache, nil
|
|
}
|
|
|
|
// Returns if the disk usage is low.
|
|
// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct
|
|
// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G
|
|
// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G)
|
|
func (c *diskCache) diskUsageLow() bool {
|
|
minUsage := c.maxDiskUsagePct * 80 / 100
|
|
di, err := disk.GetInfo(c.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
|
return int(usedPercent) < minUsage
|
|
}
|
|
|
|
// Return if the disk usage is high.
|
|
// Disk usage is high if disk used is > cacheMaxDiskUsagePct
|
|
func (c *diskCache) diskUsageHigh() bool {
|
|
di, err := disk.GetInfo(c.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return true
|
|
}
|
|
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
|
return int(usedPercent) > c.maxDiskUsagePct
|
|
}
|
|
|
|
// Returns if size space can be allocated without exceeding
|
|
// max disk usable for caching
|
|
func (c *diskCache) diskAvailable(size int64) bool {
|
|
di, err := disk.GetInfo(c.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total
|
|
return int(usedPercent) < c.maxDiskUsagePct
|
|
}
|
|
|
|
// Purge cache entries that were not accessed.
|
|
func (c *diskCache) purge() {
|
|
ctx := context.Background()
|
|
for {
|
|
olderThan := c.expiry
|
|
for !c.diskUsageLow() {
|
|
// delete unaccessed objects older than expiry duration
|
|
expiry := UTCNow().AddDate(0, 0, -1*olderThan)
|
|
olderThan /= 2
|
|
if olderThan < 1 {
|
|
break
|
|
}
|
|
deletedCount := 0
|
|
|
|
objDirs, err := ioutil.ReadDir(c.dir)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
for _, obj := range objDirs {
|
|
if obj.Name() == minioMetaBucket {
|
|
continue
|
|
}
|
|
// stat entry to get atime
|
|
var fi os.FileInfo
|
|
fi, err := os.Stat(pathJoin(c.dir, obj.Name(), cacheDataFile))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
objInfo, err := c.statCache(ctx, pathJoin(c.dir, obj.Name()))
|
|
if err != nil {
|
|
// delete any partially filled cache entry left behind.
|
|
removeAll(pathJoin(c.dir, obj.Name()))
|
|
continue
|
|
}
|
|
cc := cacheControlOpts(objInfo)
|
|
if atime.Get(fi).Before(expiry) ||
|
|
cc.isStale(objInfo.ModTime) {
|
|
if err = removeAll(pathJoin(c.dir, obj.Name())); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
deletedCount++
|
|
// break early if sufficient disk space reclaimed.
|
|
if !c.diskUsageLow() {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if deletedCount == 0 {
|
|
break
|
|
}
|
|
}
|
|
lastRunTime := time.Now()
|
|
for {
|
|
<-c.purgeChan
|
|
timeElapsed := time.Since(lastRunTime)
|
|
if timeElapsed > time.Hour {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// sets cache drive status
|
|
func (c *diskCache) setOnline(status bool) {
|
|
c.onlineMutex.Lock()
|
|
c.online = status
|
|
c.onlineMutex.Unlock()
|
|
}
|
|
|
|
// returns true if cache drive is online
|
|
func (c *diskCache) IsOnline() bool {
|
|
c.onlineMutex.RLock()
|
|
defer c.onlineMutex.RUnlock()
|
|
return c.online
|
|
}
|
|
|
|
// Stat returns ObjectInfo from disk cache
|
|
func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
|
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
|
oi, err = c.statCache(ctx, cacheObjPath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
oi.Bucket = bucket
|
|
oi.Name = object
|
|
return
|
|
}
|
|
|
|
// statCache is a convenience function for purge() to get ObjectInfo for cached object
|
|
func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (oi ObjectInfo, e error) {
|
|
// Stat the file to get file size.
|
|
metaPath := path.Join(cacheObjPath, cacheMetaJSONFile)
|
|
f, err := os.Open(metaPath)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
defer f.Close()
|
|
|
|
meta := &cacheMeta{Version: cacheMetaVersion}
|
|
if err := jsonLoad(f, meta); err != nil {
|
|
return oi, err
|
|
}
|
|
fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile))
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
meta.Stat.ModTime = atime.Get(fi)
|
|
return meta.ToObjectInfo("", ""), nil
|
|
}
|
|
|
|
// saves object metadata to disk cache
|
|
func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64) error {
|
|
fileName := getCacheSHADir(c.dir, bucket, object)
|
|
metaPath := pathJoin(fileName, cacheMetaJSONFile)
|
|
|
|
f, err := os.Create(metaPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
m := cacheMeta{Meta: meta, Version: cacheMetaVersion}
|
|
m.Stat.Size = actualSize
|
|
m.Stat.ModTime = UTCNow()
|
|
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
|
|
jsonData, err := json.Marshal(m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = f.Write(jsonData)
|
|
return err
|
|
}
|
|
|
|
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
|
|
func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error {
|
|
if !reflect.DeepEqual(bkObjectInfo.UserDefined, cacheObjInfo.UserDefined) ||
|
|
bkObjectInfo.ETag != cacheObjInfo.ETag ||
|
|
bkObjectInfo.ContentType != cacheObjInfo.ContentType ||
|
|
bkObjectInfo.Expires != cacheObjInfo.Expires {
|
|
return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getCacheSHADir(dir, bucket, object string) string {
|
|
return path.Join(dir, getSHA256Hash([]byte(path.Join(bucket, object))))
|
|
}
|
|
|
|
// Cache data to disk with bitrot checksum added for each block of 1MB
|
|
func (c *diskCache) bitrotWriteToCache(ctx context.Context, cachePath string, reader io.Reader, size int64) (int64, error) {
|
|
if err := os.MkdirAll(cachePath, 0777); err != nil {
|
|
return 0, err
|
|
}
|
|
bufSize := int64(readSizeV1)
|
|
if size > 0 && bufSize > size {
|
|
bufSize = size
|
|
}
|
|
filePath := path.Join(cachePath, cacheDataFile)
|
|
|
|
if filePath == "" || reader == nil {
|
|
return 0, errInvalidArgument
|
|
}
|
|
|
|
if err := checkPathLength(filePath); err != nil {
|
|
return 0, err
|
|
}
|
|
f, err := os.Create(filePath)
|
|
if err != nil {
|
|
return 0, osErrToFSFileErr(err)
|
|
}
|
|
defer f.Close()
|
|
|
|
var bytesWritten int64
|
|
|
|
h := HighwayHash256S.New()
|
|
|
|
bufp := c.pool.Get().(*[]byte)
|
|
defer c.pool.Put(bufp)
|
|
|
|
for {
|
|
n, err := io.ReadFull(reader, *bufp)
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe {
|
|
return 0, err
|
|
}
|
|
eof := err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe
|
|
if n == 0 && size != 0 {
|
|
// Reached EOF, nothing more to be done.
|
|
break
|
|
}
|
|
h.Reset()
|
|
if _, err := h.Write((*bufp)[:n]); err != nil {
|
|
return 0, err
|
|
}
|
|
hashBytes := h.Sum(nil)
|
|
if _, err = f.Write(hashBytes); err != nil {
|
|
return 0, err
|
|
}
|
|
if _, err = f.Write((*bufp)[:n]); err != nil {
|
|
return 0, err
|
|
}
|
|
bytesWritten += int64(n)
|
|
if eof {
|
|
break
|
|
}
|
|
}
|
|
return bytesWritten, nil
|
|
}
|
|
|
|
// Caches the object to disk
|
|
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
|
|
if c.diskUsageHigh() {
|
|
select {
|
|
case c.purgeChan <- struct{}{}:
|
|
default:
|
|
}
|
|
return errDiskFull
|
|
}
|
|
if !c.diskAvailable(size) {
|
|
return errDiskFull
|
|
}
|
|
cachePath := getCacheSHADir(c.dir, bucket, object)
|
|
if err := os.MkdirAll(cachePath, 0777); err != nil {
|
|
return err
|
|
}
|
|
bufSize := int64(readSizeV1)
|
|
if size > 0 && bufSize > size {
|
|
bufSize = size
|
|
}
|
|
|
|
n, err := c.bitrotWriteToCache(ctx, cachePath, data, size)
|
|
if IsErr(err, baseErrs...) {
|
|
c.setOnline(false)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.saveMetadata(ctx, bucket, object, opts.UserDefined, n)
|
|
}
|
|
|
|
// checks streaming bitrot checksum of cached object before returning data
|
|
func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, offset, length int64, writer io.Writer) error {
|
|
h := HighwayHash256S.New()
|
|
|
|
checksumHash := make([]byte, h.Size())
|
|
|
|
startBlock := offset / cacheBlkSize
|
|
endBlock := (offset + length) / cacheBlkSize
|
|
|
|
// get block start offset
|
|
var blockStartOffset int64
|
|
if startBlock > 0 {
|
|
blockStartOffset = (cacheBlkSize + int64(h.Size())) * startBlock
|
|
}
|
|
|
|
tillLength := (cacheBlkSize + int64(h.Size())) * (endBlock - startBlock + 1)
|
|
|
|
// Start offset cannot be negative.
|
|
if offset < 0 {
|
|
logger.LogIf(ctx, errUnexpected)
|
|
return errUnexpected
|
|
}
|
|
|
|
// Writer cannot be nil.
|
|
if writer == nil {
|
|
logger.LogIf(ctx, errUnexpected)
|
|
return errUnexpected
|
|
}
|
|
var blockOffset, blockLength int64
|
|
rc, err := readCacheFileStream(filePath, blockStartOffset, tillLength)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bufp := c.pool.Get().(*[]byte)
|
|
defer c.pool.Put(bufp)
|
|
|
|
for block := startBlock; block <= endBlock; block++ {
|
|
switch {
|
|
case startBlock == endBlock:
|
|
blockOffset = offset % cacheBlkSize
|
|
blockLength = length
|
|
case block == startBlock:
|
|
blockOffset = offset % cacheBlkSize
|
|
blockLength = cacheBlkSize - blockOffset
|
|
case block == endBlock:
|
|
blockOffset = 0
|
|
blockLength = (offset + length) % cacheBlkSize
|
|
default:
|
|
blockOffset = 0
|
|
blockLength = cacheBlkSize
|
|
}
|
|
if blockLength == 0 {
|
|
break
|
|
}
|
|
if _, err := io.ReadFull(rc, checksumHash); err != nil {
|
|
return err
|
|
}
|
|
|
|
h.Reset()
|
|
n, err := io.ReadFull(rc, *bufp)
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
eof := err == io.EOF || err == io.ErrUnexpectedEOF
|
|
if n == 0 && length != 0 {
|
|
// Reached EOF, nothing more to be done.
|
|
break
|
|
}
|
|
|
|
if _, e := h.Write((*bufp)[:n]); e != nil {
|
|
return e
|
|
}
|
|
hashBytes := h.Sum(nil)
|
|
|
|
if !bytes.Equal(hashBytes, checksumHash) {
|
|
err = HashMismatchError{hex.EncodeToString(checksumHash), hex.EncodeToString(hashBytes)}
|
|
logger.LogIf(context.Background(), err)
|
|
return err
|
|
}
|
|
|
|
if _, err := io.Copy(writer, bytes.NewReader((*bufp)[blockOffset:blockOffset+blockLength])); err != nil {
|
|
if err != io.ErrClosedPipe {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
return err
|
|
}
|
|
if eof {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get returns ObjectInfo and reader for object from disk cache
|
|
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
|
var objInfo ObjectInfo
|
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
|
|
|
if objInfo, err = c.statCache(ctx, cacheObjPath); err != nil {
|
|
return nil, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
var nsUnlocker = func() {}
|
|
// For a directory, we need to send an reader that returns no bytes.
|
|
if hasSuffix(object, SlashSeparator) {
|
|
// The lock taken above is released when
|
|
// objReader.Close() is called by the caller.
|
|
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
|
|
}
|
|
|
|
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
|
|
if nErr != nil {
|
|
return nil, nErr
|
|
}
|
|
|
|
filePath := path.Join(cacheObjPath, cacheDataFile)
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
pw.CloseWithError(c.bitrotReadFromCache(ctx, filePath, off, length, pw))
|
|
}()
|
|
// Cleanup function to cause the go routine above to exit, in
|
|
// case of incomplete read.
|
|
pipeCloser := func() { pr.Close() }
|
|
|
|
return fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser)
|
|
|
|
}
|
|
|
|
// Deletes the cached object
|
|
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
|
|
cachePath := getCacheSHADir(c.dir, bucket, object)
|
|
return removeAll(cachePath)
|
|
|
|
}
|
|
|
|
// convenience function to check if object is cached on this diskCache
|
|
func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
|
|
if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|