diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 9269fdc9e..fb827c335 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -290,6 +290,31 @@ func (a adminAPIHandlers) StorageInfoHandler(w http.ResponseWriter, r *http.Requ } +// DataUsageInfoHandler - GET /minio/admin/v2/datausage +// ---------- +// Get server/cluster data usage info +func (a adminAPIHandlers) DataUsageInfoHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "DataUsageInfo") + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ListServerInfoAdminAction) + if objectAPI == nil { + return + } + + dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + return + } + + dataUsageInfoJSON, err := json.Marshal(dataUsageInfo) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + return + } + + writeSuccessResponseJSON(w, dataUsageInfoJSON) +} + // ServerCPULoadInfo holds informantion about cpu utilization // of one minio node. It also reports any errors if encountered // while trying to reach this server. diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 74d58be3d..0e1d7dc36 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -587,6 +587,11 @@ func (h *healSequence) healItemsFromSourceCh() error { logger.LogIf(h.ctx, err) } + // Start healing the background ops prefix. + if err := h.healMinioSysMeta(backgroundOpsMetaPrefix)(); err != nil { + logger.LogIf(h.ctx, err) + } + for path := range h.sourceCh { var itemType madmin.HealItemType @@ -633,6 +638,11 @@ func (h *healSequence) healItems() error { return err } + // Start healing the background ops prefix. + if err := h.healMinioSysMeta(backgroundOpsMetaPrefix)(); err != nil { + logger.LogIf(h.ctx, err) + } + // Heal buckets and objects return h.healBuckets() } diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 470d4f369..1c3313d13 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -53,6 +53,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) // StorageInfo operations adminRouter.Methods(http.MethodGet).Path(adminAPIVersionPrefix + "/storageinfo").HandlerFunc(httpTraceAll(adminAPI.StorageInfoHandler)) + // DataUsageInfo operations + adminRouter.Methods(http.MethodGet).Path(adminAPIVersionPrefix + "/datausageinfo").HandlerFunc(httpTraceAll(adminAPI.DataUsageInfoHandler)) if globalIsDistXL || globalIsXL { /// Heal operations diff --git a/cmd/data-usage.go b/cmd/data-usage.go new file mode 100644 index 000000000..1921afe48 --- /dev/null +++ b/cmd/data-usage.go @@ -0,0 +1,142 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "encoding/json" + "time" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/hash" +) + +const ( + dataUsageObjName = "data-usage" + dataUsageCrawlInterval = 12 * time.Hour +) + +func initDataUsageStats() { + go runDataUsageInfoUpdateRoutine() +} + +func runDataUsageInfoUpdateRoutine() { + // Wait until the object layer is ready + var objAPI ObjectLayer + for { + objAPI = newObjectLayerWithoutSafeModeFn() + if objAPI == nil { + time.Sleep(time.Second) + continue + } + break + } + + ctx := context.Background() + + switch v := objAPI.(type) { + case *xlZones: + runDataUsageInfoForXLZones(ctx, v, GlobalServiceDoneCh) + case *FSObjects: + runDataUsageInfoForFS(ctx, v, GlobalServiceDoneCh) + default: + return + } +} + +func runDataUsageInfoForFS(ctx context.Context, fsObj *FSObjects, endCh <-chan struct{}) { + t := time.NewTicker(dataUsageCrawlInterval) + defer t.Stop() + for { + // Get data usage info of the FS Object + usageInfo := fsObj.crawlAndGetDataUsageInfo(ctx, endCh) + // Save the data usage in the disk + err := storeDataUsageInBackend(ctx, fsObj, usageInfo) + if err != nil { + logger.LogIf(ctx, err) + } + select { + case <-endCh: + return + // Wait until the next crawl interval + case <-t.C: + } + } +} + +func runDataUsageInfoForXLZones(ctx context.Context, z *xlZones, endCh <-chan struct{}) { + locker := z.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info") + for { + err := locker.GetLock(newDynamicTimeout(time.Millisecond, time.Millisecond)) + if err != nil { + time.Sleep(5 * time.Minute) + continue + } + // Break without locking + break + } + + t := time.NewTicker(dataUsageCrawlInterval) + defer t.Stop() + for { + usageInfo := z.crawlAndGetDataUsage(ctx, endCh) + err := storeDataUsageInBackend(ctx, z, usageInfo) + if err != nil { + logger.LogIf(ctx, err) + } + select { + case <-endCh: + locker.Unlock() + return + case <-t.C: + } + } +} + +func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dataUsageInfo DataUsageInfo) error { + dataUsageJSON, err := json.Marshal(dataUsageInfo) + if err != nil { + return err + } + + size := int64(len(dataUsageJSON)) + r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size, false) + if err != nil { + return err + } + + _, err = objAPI.PutObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{}) + return err +} + +func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) { + var dataUsageInfoJSON bytes.Buffer + + err := objAPI.GetObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, 0, -1, &dataUsageInfoJSON, "", ObjectOptions{}) + if err != nil { + return DataUsageInfo{}, nil + } + + var dataUsageInfo DataUsageInfo + err = json.Unmarshal(dataUsageInfoJSON.Bytes(), &dataUsageInfo) + if err != nil { + return DataUsageInfo{}, err + } + + return dataUsageInfo, nil +} diff --git a/cmd/fastwalk.go b/cmd/fastwalk.go new file mode 100644 index 000000000..8470b4e12 --- /dev/null +++ b/cmd/fastwalk.go @@ -0,0 +1,226 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This code is imported from "golang.org/x/tools/internal/fastwalk", +// only fastwalk.go is imported since we already implement readDir() +// with some little tweaks. + +package cmd + +import ( + "errors" + "os" + "path/filepath" + "runtime" + "strings" + "sync" +) + +// ErrTraverseLink is used as a return value from WalkFuncs to indicate that the +// symlink named in the call may be traversed. +var ErrTraverseLink = errors.New("fastwalk: traverse symlink, assuming target is a directory") + +// ErrSkipFiles is a used as a return value from WalkFuncs to indicate that the +// callback should not be called for any other files in the current directory. +// Child directories will still be traversed. +var ErrSkipFiles = errors.New("fastwalk: skip remaining files in directory") + +// Walk is a faster implementation of filepath.Walk. +// +// filepath.Walk's design necessarily calls os.Lstat on each file, +// even if the caller needs less info. +// Many tools need only the type of each file. +// On some platforms, this information is provided directly by the readdir +// system call, avoiding the need to stat each file individually. +// fastwalk_unix.go contains a fork of the syscall routines. +// +// See golang.org/issue/16399 +// +// Walk walks the file tree rooted at root, calling walkFn for +// each file or directory in the tree, including root. +// +// If fastWalk returns filepath.SkipDir, the directory is skipped. +// +// Unlike filepath.Walk: +// * file stat calls must be done by the user. +// The only provided metadata is the file type, which does not include +// any permission bits. +// * multiple goroutines stat the filesystem concurrently. The provided +// walkFn must be safe for concurrent use. +// * fastWalk can follow symlinks if walkFn returns the TraverseLink +// sentinel error. It is the walkFn's responsibility to prevent +// fastWalk from going into symlink cycles. +func fastWalk(root string, walkFn func(path string, typ os.FileMode) error) error { + // TODO(bradfitz): make numWorkers configurable? We used a + // minimum of 4 to give the kernel more info about multiple + // things we want, in hopes its I/O scheduling can take + // advantage of that. Hopefully most are in cache. Maybe 4 is + // even too low of a minimum. Profile more. + numWorkers := 4 + if n := runtime.NumCPU(); n > numWorkers { + numWorkers = n + } + + // Make sure to wait for all workers to finish, otherwise + // walkFn could still be called after returning. This Wait call + // runs after close(e.donec) below. + var wg sync.WaitGroup + defer wg.Wait() + + w := &walker{ + fn: walkFn, + enqueuec: make(chan walkItem, numWorkers), // buffered for performance + workc: make(chan walkItem, numWorkers), // buffered for performance + donec: make(chan struct{}), + + // buffered for correctness & not leaking goroutines: + resc: make(chan error, numWorkers), + } + defer close(w.donec) + + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go w.doWork(&wg) + } + todo := []walkItem{{dir: root}} + out := 0 + for { + workc := w.workc + var workItem walkItem + if len(todo) == 0 { + workc = nil + } else { + workItem = todo[len(todo)-1] + } + select { + case workc <- workItem: + todo = todo[:len(todo)-1] + out++ + case it := <-w.enqueuec: + todo = append(todo, it) + case err := <-w.resc: + out-- + if err != nil { + return err + } + if out == 0 && len(todo) == 0 { + // It's safe to quit here, as long as the buffered + // enqueue channel isn't also readable, which might + // happen if the worker sends both another unit of + // work and its result before the other select was + // scheduled and both w.resc and w.enqueuec were + // readable. + select { + case it := <-w.enqueuec: + todo = append(todo, it) + default: + return nil + } + } + } + } +} + +// doWork reads directories as instructed (via workc) and runs the +// user's callback function. +func (w *walker) doWork(wg *sync.WaitGroup) { + defer wg.Done() + for { + select { + case <-w.donec: + return + case it := <-w.workc: + select { + case <-w.donec: + return + case w.resc <- w.walk(it.dir, !it.callbackDone): + } + } + } +} + +type walker struct { + fn func(path string, typ os.FileMode) error + + donec chan struct{} // closed on fastWalk's return + workc chan walkItem // to workers + enqueuec chan walkItem // from workers + resc chan error // from workers +} + +type walkItem struct { + dir string + callbackDone bool // callback already called; don't do it again +} + +func (w *walker) enqueue(it walkItem) { + select { + case w.enqueuec <- it: + case <-w.donec: + } +} + +func (w *walker) onDirEnt(dirName, baseName string, typ os.FileMode) error { + joined := dirName + string(os.PathSeparator) + baseName + if typ == os.ModeDir { + w.enqueue(walkItem{dir: joined}) + return nil + } + + err := w.fn(joined, typ) + if typ == os.ModeSymlink { + if err == ErrTraverseLink { + // Set callbackDone so we don't call it twice for both the + // symlink-as-symlink and the symlink-as-directory later: + w.enqueue(walkItem{dir: joined, callbackDone: true}) + return nil + } + if err == filepath.SkipDir { + // Permit SkipDir on symlinks too. + return nil + } + } + return err +} + +func readDirFn(dirName string, fn func(dirName, entName string, typ os.FileMode) error) error { + fis, err := readDir(dirName) + if err != nil { + return err + } + skipFiles := false + for _, fi := range fis { + var mode os.FileMode + if strings.HasSuffix(fi, SlashSeparator) { + mode |= os.ModeDir + } + + if mode == 0 && skipFiles { + continue + } + + if err := fn(dirName, fi, mode); err != nil { + if err == ErrSkipFiles { + skipFiles = true + continue + } + return err + } + } + return nil +} + +func (w *walker) walk(root string, runUserCallback bool) error { + if runUserCallback { + err := w.fn(root, os.ModeDir) + if err == filepath.SkipDir { + return nil + } + if err != nil { + return err + } + } + + return readDirFn(root, w.onDirEnt) +} diff --git a/cmd/format-xl.go b/cmd/format-xl.go index c18aaf0f3..4688092d7 100644 --- a/cmd/format-xl.go +++ b/cmd/format-xl.go @@ -771,6 +771,11 @@ func makeFormatXLMetaVolumes(disk StorageAPI) error { return err } } + if err := disk.MakeVol(minioMetaBackgroundOpsBucket); err != nil { + if !IsErrIgnored(err, initMetaVolIgnoredErrs...) { + return err + } + } if err := disk.MakeVol(minioMetaMultipartBucket); err != nil { if !IsErrIgnored(err, initMetaVolIgnoredErrs...) { return err diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 1aebf315c..f8169fbd5 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -19,11 +19,13 @@ package cmd import ( "bytes" "context" + "errors" "io" "io/ioutil" "net/http" "os" "path" + "path/filepath" "sort" "strings" "sync" @@ -50,6 +52,11 @@ type FSObjects struct { // Disk usage metrics totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG + // The count of concurrent calls on FSObjects API + activeIOCount int64 + // The active IO count ceiling for crawling to work + maxActiveIOCount int64 + // Path to be exported over S3 API. fsPath string // meta json filename, varies by fs / cache backend. @@ -100,6 +107,11 @@ func initMetaVolumeFS(fsPath, fsUUID string) error { return err } + metaStatsPath := pathJoin(fsPath, minioMetaBackgroundOpsBucket, fsUUID) + if err := os.MkdirAll(metaStatsPath, 0777); err != nil { + return err + } + metaMultipartPath := pathJoin(fsPath, minioMetaMultipartBucket) return os.MkdirAll(metaMultipartPath, 0777) @@ -147,6 +159,8 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { listPool: NewTreeWalkPool(globalLookupTimeout), appendFileMap: make(map[string]*fsAppendFile), diskMount: mountinfo.IsLikelyMountPoint(fsPath), + + maxActiveIOCount: 10, } // Once the filesystem has initialized hold the read lock for @@ -155,10 +169,6 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { // or cause changes on backend format. fs.fsFormatRlk = rlk - if !fs.diskMount { - go fs.diskUsage(GlobalServiceDoneCh) - } - go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh) // Return successfully initialized object layer. @@ -179,77 +189,14 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error { return fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)) } -// diskUsage returns du information for the posix path, in a continuous routine. -func (fs *FSObjects) diskUsage(doneCh chan struct{}) { - usageFn := func(ctx context.Context, entry string) error { - if httpServer := newHTTPServerFn(); httpServer != nil { - // Wait at max 1 minute for an inprogress request - // before proceeding to count the usage. - waitCount := 60 - // Any requests in progress, delay the usage. - for httpServer.GetRequestCount() > 0 && waitCount > 0 { - waitCount-- - time.Sleep(1 * time.Second) - } - } - - select { - case <-doneCh: - return errWalkAbort - default: - fi, err := os.Stat(entry) - if err != nil { - err = osErrToFSFileErr(err) - return err - } - atomic.AddUint64(&fs.totalUsed, uint64(fi.Size())) - } - return nil - } - - // Return this routine upon errWalkAbort, continue for any other error on purpose - // so that we can start the routine freshly in another 12 hours. - if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err == errWalkAbort { - return - } - - for { - select { - case <-doneCh: - return - case <-time.After(globalUsageCheckInterval): - var usage uint64 - usageFn = func(ctx context.Context, entry string) error { - if httpServer := newHTTPServerFn(); httpServer != nil { - // Wait at max 1 minute for an inprogress request - // before proceeding to count the usage. - waitCount := 60 - // Any requests in progress, delay the usage. - for httpServer.GetRequestCount() > 0 && waitCount > 0 { - waitCount-- - time.Sleep(1 * time.Second) - } - } - - fi, err := os.Stat(entry) - if err != nil { - err = osErrToFSFileErr(err) - return err - } - usage = usage + uint64(fi.Size()) - return nil - } - - if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil { - continue - } - atomic.StoreUint64(&fs.totalUsed, usage) - } - } -} - // StorageInfo - returns underlying storage statistics. func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo { + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + di, err := getDiskInfo(fs.fsPath) if err != nil { return StorageInfo{} @@ -258,6 +205,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo { if !fs.diskMount { used = atomic.LoadUint64(&fs.totalUsed) } + storageInfo := StorageInfo{ Used: []uint64{used}, Total: []uint64{di.Total}, @@ -268,6 +216,98 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo { return storageInfo } +func (fs *FSObjects) waitForLowActiveIO() error { + t := time.NewTicker(lowActiveIOWaitTick) + defer t.Stop() + for { + if atomic.LoadInt64(&fs.activeIOCount) >= fs.maxActiveIOCount { + select { + case <-GlobalServiceDoneCh: + return errors.New("forced exit") + case <-t.C: + continue + } + } + break + } + + return nil + +} + +// crawlAndGetDataUsageInfo returns data usage stats of the current FS deployment +func (fs *FSObjects) crawlAndGetDataUsageInfo(ctx context.Context, endCh <-chan struct{}) DataUsageInfo { + + var dataUsageInfoMu sync.Mutex + var dataUsageInfo = DataUsageInfo{ + BucketsSizes: make(map[string]uint64), + ObjectsSizesHistogram: make(map[string]uint64), + } + + walkFn := func(origPath string, typ os.FileMode) error { + + select { + case <-GlobalServiceDoneCh: + return filepath.SkipDir + default: + } + + if err := fs.waitForLowActiveIO(); err != nil { + return filepath.SkipDir + } + + path := strings.TrimPrefix(origPath, fs.fsPath) + path = strings.TrimPrefix(path, SlashSeparator) + + splits := splitN(path, SlashSeparator, 2) + bucket := splits[0] + prefix := splits[1] + + if bucket == "" { + return nil + } + + if isReservedOrInvalidBucket(bucket, false) { + return filepath.SkipDir + } + + if prefix == "" { + dataUsageInfoMu.Lock() + dataUsageInfo.BucketsCount++ + dataUsageInfo.BucketsSizes[bucket] = 0 + dataUsageInfoMu.Unlock() + return nil + } + + if typ&os.ModeDir != 0 { + return nil + } + + // Get file size + fi, err := os.Stat(origPath) + if err != nil { + return nil + } + size := fi.Size() + + dataUsageInfoMu.Lock() + dataUsageInfo.ObjectsCount++ + dataUsageInfo.ObjectsTotalSize += uint64(size) + dataUsageInfo.BucketsSizes[bucket] += uint64(size) + dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++ + dataUsageInfoMu.Unlock() + + return nil + } + + fastWalk(fs.fsPath, walkFn) + + dataUsageInfo.LastUpdate = UTCNow() + atomic.StoreUint64(&fs.totalUsed, dataUsageInfo.ObjectsTotalSize) + + return dataUsageInfo +} + /// Bucket operations // getBucketDir - will convert incoming bucket names to @@ -305,6 +345,12 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, locatio if s3utils.CheckValidBucketNameStrict(bucket) != nil { return BucketNameInvalid{Bucket: bucket} } + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + bucketDir, err := fs.getBucketDir(ctx, bucket) if err != nil { return toObjectErr(err, bucket) @@ -324,6 +370,12 @@ func (fs *FSObjects) GetBucketInfo(ctx context.Context, bucket string) (bi Bucke return bi, e } defer bucketLock.RUnlock() + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + st, err := fs.statBucketDir(ctx, bucket) if err != nil { return bi, toObjectErr(err, bucket) @@ -343,6 +395,12 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) { logger.LogIf(ctx, err) return nil, err } + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + var bucketInfos []BucketInfo entries, err := readDir((fs.fsPath)) if err != nil { @@ -388,6 +446,12 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error { return err } defer bucketLock.Unlock() + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + bucketDir, err := fs.getBucketDir(ctx, bucket) if err != nil { return toObjectErr(err, bucket) @@ -425,6 +489,11 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu defer objectDWLock.Unlock() } + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + if _, err := fs.statBucketDir(ctx, srcBucket); err != nil { return oi, toObjectErr(err, srcBucket) } @@ -482,6 +551,11 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, return nil, err } + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + if _, err = fs.statBucketDir(ctx, bucket); err != nil { return nil, toObjectErr(err, bucket) } @@ -583,6 +657,12 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse return err } defer objectLock.RUnlock() + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + return fs.getObject(ctx, bucket, object, offset, length, writer, etag, true) } @@ -768,6 +848,12 @@ func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object s // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) { + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + oi, err := fs.getObjectInfoWithLock(ctx, bucket, object) if err == errCorruptedFormat || err == io.EOF { objectLock := fs.NewNSLock(ctx, bucket, object) @@ -823,6 +909,11 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string } defer objectLock.Unlock() + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + return fs.putObject(ctx, bucket, object, r, opts) } @@ -981,6 +1072,11 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) er return err } + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + if _, err := fs.statBucketDir(ctx, bucket); err != nil { return toObjectErr(err, bucket) } @@ -1119,6 +1215,12 @@ func (fs *FSObjects) getObjectETag(ctx context.Context, bucket, entry string, lo // ListObjects - list all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool // state for future re-entrant list requests. func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { + + atomic.AddInt64(&fs.activeIOCount, 1) + defer func() { + atomic.AddInt64(&fs.activeIOCount, -1) + }() + return listObjects(ctx, fs, bucket, prefix, marker, delimiter, maxKeys, fs.listPool, fs.listDirFactory(), fs.getObjectInfo, fs.getObjectInfo) } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index cbc575b55..e1a201d70 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -80,6 +80,10 @@ func (d *naughtyDisk) calcError() (err error) { func (d *naughtyDisk) SetDiskID(id string) { } +func (d *naughtyDisk) CrawlAndGetDataUsage(endCh <-chan struct{}) (info DataUsageInfo, err error) { + return d.disk.CrawlAndGetDataUsage(endCh) +} + func (d *naughtyDisk) DiskInfo() (info DiskInfo, err error) { if err := d.calcError(); err != nil { return info, err diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index bb1ee8b37..b602169a4 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -450,3 +450,26 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d // Success. return result, nil } + +// Fetch the histogram interval corresponding +// to the passed object size. +func objSizeToHistoInterval(usize uint64) string { + size := int64(usize) + + var interval objectHistogramInterval + for _, interval = range ObjectsHistogramIntervals { + var cond1, cond2 bool + if size >= interval.start || interval.start == -1 { + cond1 = true + } + if size <= interval.end || interval.end == -1 { + cond2 = true + } + if cond1 && cond2 { + return interval.name + } + } + + // This would be the last element of histogram intervals + return interval.name +} diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index bc09b244c..b38367e2a 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -70,6 +70,39 @@ type StorageInfo struct { } } +// objectHistogramInterval is an interval that will be +// used to report the histogram of objects data sizes +type objectHistogramInterval struct { + name string + start, end int64 +} + +// ObjectsHistogramIntervals is the list of all intervals +// of object sizes to be included in objects histogram. +var ObjectsHistogramIntervals = []objectHistogramInterval{ + {"LESS_THAN_1024_B", -1, 1024 - 1}, + {"BETWEEN_1024_B_AND_1_MB", 1024, 1024*1024 - 1}, + {"BETWEEN_1_MB_AND_10_MB", 1024 * 1024, 1024*1024*10 - 1}, + {"BETWEEN_10_MB_AND_64_MB", 1024 * 1024 * 10, 1024*1024*64 - 1}, + {"BETWEEN_64_MB_AND_128_MB", 1024 * 1024 * 64, 1024*1024*128 - 1}, + {"BETWEEN_128_MB_AND_512_MB", 1024 * 1024 * 128, 1024*1024*512 - 1}, + {"GREATER_THAN_512_MB", 1024 * 1024 * 512, -1}, +} + +// DataUsageInfo represents data usage stats of the underlying Object API +type DataUsageInfo struct { + // The timestamp of when the data usage info is generated + LastUpdate time.Time `json:"lastUpdate"` + + ObjectsCount uint64 `json:"objectsCount"` + // Objects total size + ObjectsTotalSize uint64 `json:"objectsTotalSize"` + ObjectsSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"` + + BucketsCount uint64 `json:"bucketsCount"` + BucketsSizes map[string]uint64 `json:"bucketsSizes"` +} + // BucketInfo - represents bucket metadata. type BucketInfo struct { // Name of the bucket. diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 254b34049..048673ccc 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -51,6 +51,10 @@ import ( const ( // MinIO meta bucket. minioMetaBucket = ".minio.sys" + // Background ops meta prefix + backgroundOpsMetaPrefix = "background-ops" + // MinIO Stats meta prefix. + minioMetaBackgroundOpsBucket = minioMetaBucket + SlashSeparator + backgroundOpsMetaPrefix // Multipart meta prefix. mpartMetaPrefix = "multipart" // MinIO Multipart meta prefix. @@ -72,7 +76,8 @@ const ( func isMinioMetaBucketName(bucket string) bool { return bucket == minioMetaBucket || bucket == minioMetaMultipartBucket || - bucket == minioMetaTmpBucket + bucket == minioMetaTmpBucket || + bucket == minioMetaBackgroundOpsBucket } // IsValidBucketName verifies that a bucket name is in accordance with diff --git a/cmd/posix-diskid-check.go b/cmd/posix-diskid-check.go index 18393dab7..611089456 100644 --- a/cmd/posix-diskid-check.go +++ b/cmd/posix-diskid-check.go @@ -38,6 +38,10 @@ func (p *posixDiskIDCheck) IsOnline() bool { return storedDiskID == p.diskID } +func (p *posixDiskIDCheck) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, error) { + return p.storage.CrawlAndGetDataUsage(endCh) +} + func (p *posixDiskIDCheck) LastError() error { return p.storage.LastError() } diff --git a/cmd/posix.go b/cmd/posix.go index f094e7b15..52ae73524 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -59,6 +59,10 @@ const ( readAheadBuffers = 4 // Size of each buffer. readAheadBufSize = 1 << 20 + + // Wait interval to check if active IO count is low + // to proceed crawling to compute data usage + lowActiveIOWaitTick = 100 * time.Millisecond ) // isValidVolname verifies a volname name in accordance with object @@ -82,6 +86,9 @@ type posix struct { totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG ioErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG + activeIOCount int32 + maxActiveIOCount int32 + diskPath string pool sync.Pool @@ -212,12 +219,9 @@ func newPosix(path string) (*posix, error) { return &b }, }, - stopUsageCh: make(chan struct{}), - diskMount: mountinfo.IsLikelyMountPoint(path), - } - - if !p.diskMount { - go p.diskUsage(GlobalServiceDoneCh) + stopUsageCh: make(chan struct{}), + diskMount: mountinfo.IsLikelyMountPoint(path), + maxActiveIOCount: 10, } // Success. @@ -321,6 +325,105 @@ func (s *posix) IsOnline() bool { return true } +func isQuitting(endCh chan struct{}) bool { + select { + case <-endCh: + return true + default: + return false + } +} + +func (s *posix) waitForLowActiveIO() error { + t := time.NewTicker(lowActiveIOWaitTick) + defer t.Stop() + for { + if atomic.LoadInt32(&s.activeIOCount) >= s.maxActiveIOCount { + select { + case <-GlobalServiceDoneCh: + return errors.New("forced exit") + case <-t.C: + continue + } + } + break + } + return nil +} + +func (s *posix) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, error) { + + var dataUsageInfoMu sync.Mutex + var dataUsageInfo = DataUsageInfo{ + BucketsSizes: make(map[string]uint64), + ObjectsSizesHistogram: make(map[string]uint64), + } + + walkFn := func(origPath string, typ os.FileMode) error { + + select { + case <-GlobalServiceDoneCh: + return filepath.SkipDir + default: + } + + if err := s.waitForLowActiveIO(); err != nil { + return filepath.SkipDir + } + + path := strings.TrimPrefix(origPath, s.diskPath) + path = strings.TrimPrefix(path, SlashSeparator) + + splits := splitN(path, SlashSeparator, 2) + + bucket := splits[0] + prefix := splits[1] + + if bucket == "" { + return nil + } + + if isReservedOrInvalidBucket(bucket, false) { + return nil + } + + if prefix == "" { + dataUsageInfoMu.Lock() + dataUsageInfo.BucketsCount++ + dataUsageInfo.BucketsSizes[bucket] = 0 + dataUsageInfoMu.Unlock() + return nil + } + + if strings.HasSuffix(prefix, "/xl.json") { + xlMetaBuf, err := ioutil.ReadFile(origPath) + if err != nil { + return nil + } + meta, err := xlMetaV1UnmarshalJSON(context.Background(), xlMetaBuf) + if err != nil { + return nil + } + + dataUsageInfoMu.Lock() + dataUsageInfo.ObjectsCount++ + dataUsageInfo.ObjectsTotalSize += uint64(meta.Stat.Size) + dataUsageInfo.BucketsSizes[bucket] += uint64(meta.Stat.Size) + dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(meta.Stat.Size))]++ + dataUsageInfoMu.Unlock() + } + + return nil + } + + fastWalk(s.diskPath, walkFn) + + dataUsageInfo.LastUpdate = UTCNow() + + atomic.StoreUint64(&s.totalUsed, dataUsageInfo.ObjectsTotalSize) + return dataUsageInfo, nil +} + // DiskInfo is an extended type which returns current // disk usage per path. type DiskInfo struct { @@ -348,6 +451,11 @@ func (s *posix) DiskInfo() (info DiskInfo, err error) { return info, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + di, err := getDiskInfo(s.diskPath) if err != nil { return info, err @@ -437,88 +545,6 @@ func (s *posix) getDiskID() (string, error) { return s.diskID, nil } -// diskUsage returns du information for the posix path, in a continuous routine. -func (s *posix) diskUsage(doneCh chan struct{}) { - ticker := time.NewTicker(globalUsageCheckInterval) - defer ticker.Stop() - - usageFn := func(ctx context.Context, entry string) error { - if httpServer := newHTTPServerFn(); httpServer != nil { - // Wait at max 1 minute for an inprogress request - // before proceeding to count the usage. - waitCount := 60 - // Any requests in progress, delay the usage. - for httpServer.GetRequestCount() > 0 && waitCount > 0 { - waitCount-- - time.Sleep(1 * time.Second) - } - } - - select { - case <-doneCh: - return errWalkAbort - case <-s.stopUsageCh: - return errWalkAbort - default: - fi, err := os.Stat(entry) - if err != nil { - err = osErrToFSFileErr(err) - return err - } - atomic.AddUint64(&s.totalUsed, uint64(fi.Size())) - return nil - } - } - - // Return this routine upon errWalkAbort, continue for any other error on purpose - // so that we can start the routine freshly in another 12 hours. - if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err == errWalkAbort { - return - } - - for { - select { - case <-s.stopUsageCh: - return - case <-doneCh: - return - case <-time.After(globalUsageCheckInterval): - var usage uint64 - usageFn = func(ctx context.Context, entry string) error { - if httpServer := newHTTPServerFn(); httpServer != nil { - // Wait at max 1 minute for an inprogress request - // before proceeding to count the usage. - waitCount := 60 - // Any requests in progress, delay the usage. - for httpServer.GetRequestCount() > 0 && waitCount > 0 { - waitCount-- - time.Sleep(1 * time.Second) - } - } - - select { - case <-s.stopUsageCh: - return errWalkAbort - default: - fi, err := os.Stat(entry) - if err != nil { - err = osErrToFSFileErr(err) - return err - } - usage = usage + uint64(fi.Size()) - return nil - } - } - - if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil { - continue - } - - atomic.StoreUint64(&s.totalUsed, usage) - } - } -} - // Make a volume entry. func (s *posix) SetDiskID(id string) { // NO-OP for posix as it is handled either by posixDiskIDCheck{} for local disks or @@ -541,6 +567,11 @@ func (s *posix) MakeVol(volume string) (err error) { return errInvalidArgument } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -576,6 +607,11 @@ func (s *posix) ListVols() (volsInfo []VolInfo, err error) { return nil, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volsInfo, err = listVols(s.diskPath) if err != nil { if isSysErrIO(err) { @@ -641,6 +677,11 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) { return VolInfo{}, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -678,6 +719,11 @@ func (s *posix) DeleteVol(volume string) (err error) { return errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -716,6 +762,11 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st return nil, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -789,6 +840,11 @@ func (s *posix) ListDir(volume, dirPath string, count int, leafFile string) (ent return nil, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -841,6 +897,11 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) { return nil, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volumeDir, err := s.getVolDir(volume) if err != nil { return nil, err @@ -914,6 +975,11 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif return 0, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volumeDir, err := s.getVolDir(volume) if err != nil { return 0, err @@ -1082,6 +1148,11 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re return nil, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volumeDir, err := s.getVolDir(volume) if err != nil { return nil, err @@ -1165,6 +1236,11 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er return errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + // Validate if disk is indeed free. if err = checkDiskFree(s.diskPath, fileSize); err != nil { if isSysErrIO(err) { @@ -1264,6 +1340,11 @@ func (s *posix) WriteAll(volume, path string, reader io.Reader) (err error) { return errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + // Create file if not found. Note that it is created with os.O_EXCL flag as the file // always is supposed to be created in the tmp directory with a unique file name. w, err := s.openFile(volume, path, os.O_CREATE|os.O_SYNC|os.O_WRONLY|os.O_EXCL) @@ -1293,6 +1374,11 @@ func (s *posix) AppendFile(volume, path string, buf []byte) (err error) { return errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + var w *os.File // Create file if not found. Not doing O_DIRECT here to avoid the code that does buffer aligned writes. // AppendFile() is only used by healing code to heal objects written in old format. @@ -1320,6 +1406,11 @@ func (s *posix) StatFile(volume, path string) (file FileInfo, err error) { return FileInfo{}, errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volumeDir, err := s.getVolDir(volume) if err != nil { return FileInfo{}, err @@ -1416,6 +1507,11 @@ func (s *posix) DeleteFile(volume, path string) (err error) { return errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -1465,6 +1561,11 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e return errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + srcVolumeDir, err := s.getVolDir(srcVolume) if err != nil { return err @@ -1557,6 +1658,11 @@ func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgor return errFaultyDisk } + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + volumeDir, err := s.getVolDir(volume) if err != nil { return err diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 9609289c1..fc3537f24 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -223,6 +223,12 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, // Assign globalDeploymentID on first run for the // minio server managing the first disk globalDeploymentID = format.ID + } else { + // The first will always recreate some directories inside .minio.sys + // such as, tmp, multipart and background-ops + if firstDisk { + initFormatXLMetaVolume(storageDisks, formatConfigs) + } } // Return error when quorum unformatted disks - indicating we are diff --git a/cmd/server-main.go b/cmd/server-main.go index dab20acda..ea32be719 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -437,6 +437,7 @@ func serverMain(ctx *cli.Context) { globalObjLayerMutex.Unlock() } + initDataUsageStats() initDailyLifecycle() if globalIsXL { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index fdb3bf2d5..4348814a0 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -32,6 +32,7 @@ type StorageAPI interface { SetDiskID(id string) DiskInfo() (info DiskInfo, err error) + CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, error) // Volume operations. MakeVol(volume string) (err error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 8ecc7208e..571aeae0f 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -147,6 +147,28 @@ func (client *storageRESTClient) IsOnline() bool { return atomic.LoadInt32(&client.connected) == 1 } +func (client *storageRESTClient) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, error) { + respBody, err := client.call(storageRESTMethodCrawlAndGetDataUsage, nil, nil, -1) + defer http.DrainBody(respBody) + if err != nil { + return DataUsageInfo{}, err + } + reader := bufio.NewReader(respBody) + for { + b, err := reader.ReadByte() + if err != nil { + return DataUsageInfo{}, err + } + if b != ' ' { + reader.UnreadByte() + break + } + } + var usageInfo DataUsageInfo + err = gob.NewDecoder(reader).Decode(&usageInfo) + return usageInfo, err +} + // LastError - returns the network error if any. func (client *storageRESTClient) LastError() error { return client.lastError diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 9df169570..46fb58811 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,17 +17,18 @@ package cmd const ( - storageRESTVersion = "v10" - storageRESTVersionPrefix = SlashSeparator + "v10" + storageRESTVersion = "v11" + storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) const ( - storageRESTMethodDiskInfo = "/diskinfo" - storageRESTMethodMakeVol = "/makevol" - storageRESTMethodStatVol = "/statvol" - storageRESTMethodDeleteVol = "/deletevol" - storageRESTMethodListVols = "/listvols" + storageRESTMethodDiskInfo = "/diskinfo" + storageRESTMethodCrawlAndGetDataUsage = "/crawlandgetdatausage" + storageRESTMethodMakeVol = "/makevol" + storageRESTMethodStatVol = "/statvol" + storageRESTMethodDeleteVol = "/deletevol" + storageRESTMethodListVols = "/listvols" storageRESTMethodAppendFile = "/appendfile" storageRESTMethodCreateFile = "/createfile" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 80a4a5881..9a6dc258c 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -112,6 +112,31 @@ func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Reque gob.NewEncoder(w).Encode(info) } +func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + + usageInfo, err := s.storage.CrawlAndGetDataUsage(GlobalServiceDoneCh) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + w.Header().Set(xhttp.ContentType, "text/event-stream") + doneCh := sendWhiteSpaceToHTTPResponse(w) + usageInfo, err = s.storage.CrawlAndGetDataUsage(GlobalServiceDoneCh) + <-doneCh + + if err != nil { + s.writeErrorResponse(w, err) + return + } + + gob.NewEncoder(w).Encode(usageInfo) + w.(http.Flusher).Flush() +} + // MakeVolHandler - make a volume. func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -489,8 +514,9 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req } } -// Send whitespace to the client to avoid timeouts as bitrot verification can take time on spinning/slow disks. -func sendWhiteSpaceVerifyFile(w http.ResponseWriter) <-chan struct{} { +// Send whitespace to the client to avoid timeouts with long storage +// operations, such as bitrot verification or data usage crawling. +func sendWhiteSpaceToHTTPResponse(w http.ResponseWriter) <-chan struct{} { doneCh := make(chan struct{}) go func() { ticker := time.NewTicker(time.Second * 10) @@ -548,7 +574,7 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) { } w.Header().Set(xhttp.ContentType, "text/event-stream") encoder := gob.NewEncoder(w) - doneCh := sendWhiteSpaceVerifyFile(w) + doneCh := sendWhiteSpaceToHTTPResponse(w) err = s.storage.VerifyFile(volume, filePath, size, BitrotAlgorithmFromString(algoStr), hash, int64(shardSize)) <-doneCh vresp := &VerifyFileResp{} @@ -577,6 +603,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter() subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDiskInfo).HandlerFunc(httpTraceHdrs(server.DiskInfoHandler)) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCrawlAndGetDataUsage).HandlerFunc(httpTraceHdrs(server.CrawlAndGetDataUsageHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodMakeVol).HandlerFunc(httpTraceHdrs(server.MakeVolHandler)).Queries(restQueries(storageRESTVolume)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatVol).HandlerFunc(httpTraceHdrs(server.StatVolHandler)).Queries(restQueries(storageRESTVolume)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVol).HandlerFunc(httpTraceHdrs(server.DeleteVolHandler)).Queries(restQueries(storageRESTVolume)...) diff --git a/cmd/utils.go b/cmd/utils.go index 358ed6a7a..9414bf9e9 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -525,6 +525,16 @@ func getMinioMode() string { return mode } +func splitN(str, delim string, num int) []string { + stdSplit := strings.SplitN(str, delim, num) + retSplit := make([]string, num) + for i := 0; i < len(stdSplit); i++ { + retSplit[i] = stdSplit[i] + } + + return retSplit +} + func iamPolicyName() string { return globalOpenIDConfig.ClaimPrefix + iampolicy.PolicyName } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 66b80ee5e..e70c3915b 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -20,6 +20,7 @@ import ( "context" "sort" "strings" + "sync" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" @@ -206,3 +207,41 @@ func (xl xlObjects) GetMetrics(ctx context.Context) (*Metrics, error) { logger.LogIf(ctx, NotImplemented{}) return &Metrics{}, NotImplemented{} } + +func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, endCh <-chan struct{}) DataUsageInfo { + var randomDisks []StorageAPI + for _, d := range xl.getLoadBalancedDisks() { + if d == nil || !d.IsOnline() { + continue + } + if len(randomDisks) > 3 { + break + } + randomDisks = append(randomDisks, d) + } + + var dataUsageResults = make([]DataUsageInfo, len(randomDisks)) + + var wg sync.WaitGroup + for i := 0; i < len(randomDisks); i++ { + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + var err error + dataUsageResults[index], err = disk.CrawlAndGetDataUsage(endCh) + if err != nil { + logger.LogIf(ctx, err) + } + }(i, randomDisks[i]) + } + wg.Wait() + + var dataUsageInfo DataUsageInfo + for i := 0; i < len(dataUsageResults); i++ { + if dataUsageResults[i].ObjectsCount > dataUsageInfo.ObjectsCount { + dataUsageInfo = dataUsageResults[i] + } + } + + return dataUsageInfo +} diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index 99add9a63..3134ed8c0 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -23,6 +23,7 @@ import ( "math/rand" "net/http" "strings" + "sync" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -211,6 +212,46 @@ func (z *xlZones) StorageInfo(ctx context.Context) StorageInfo { return storageInfo } +func (z *xlZones) crawlAndGetDataUsage(ctx context.Context, endCh <-chan struct{}) DataUsageInfo { + var aggDataUsageInfo = struct { + sync.Mutex + DataUsageInfo + }{} + + aggDataUsageInfo.ObjectsSizesHistogram = make(map[string]uint64) + aggDataUsageInfo.BucketsSizes = make(map[string]uint64) + + var wg sync.WaitGroup + for _, z := range z.zones { + for _, xlObj := range z.sets { + wg.Add(1) + go func(xl *xlObjects) { + defer wg.Done() + info := xl.crawlAndGetDataUsage(ctx, endCh) + + aggDataUsageInfo.Lock() + aggDataUsageInfo.ObjectsCount += info.ObjectsCount + aggDataUsageInfo.ObjectsTotalSize += info.ObjectsTotalSize + if aggDataUsageInfo.BucketsCount < info.BucketsCount { + aggDataUsageInfo.BucketsCount = info.BucketsCount + } + for k, v := range info.ObjectsSizesHistogram { + aggDataUsageInfo.ObjectsSizesHistogram[k] += v + } + for k, v := range info.BucketsSizes { + aggDataUsageInfo.BucketsSizes[k] += v + } + aggDataUsageInfo.Unlock() + + }(xlObj) + } + } + wg.Wait() + + aggDataUsageInfo.LastUpdate = UTCNow() + return aggDataUsageInfo.DataUsageInfo +} + // This function is used to undo a successful MakeBucket operation. func undoMakeBucketZones(bucket string, zones []*xlSets, errs []error) { g := errgroup.WithNErrs(len(zones)) diff --git a/pkg/madmin/examples/data-usage-info.go b/pkg/madmin/examples/data-usage-info.go new file mode 100644 index 000000000..0f3b1d33a --- /dev/null +++ b/pkg/madmin/examples/data-usage-info.go @@ -0,0 +1,44 @@ +// +build ignore + +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "log" + + "github.com/minio/minio/pkg/madmin" +) + +func main() { + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY and my-bucketname are + // dummy values, please replace them with original values. + + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. + // New returns an MinIO Admin client object. + madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) + if err != nil { + log.Fatalln(err) + } + + dataUsageInfo, err := madmClnt.DataUsageInfo() + if err != nil { + log.Fatalln(err) + } + log.Println(dataUsageInfo) +} diff --git a/pkg/madmin/info-commands.go b/pkg/madmin/info-commands.go index c1054b473..1c66d5e1d 100644 --- a/pkg/madmin/info-commands.go +++ b/pkg/madmin/info-commands.go @@ -24,6 +24,7 @@ import ( "net/http" "net/url" "strconv" + "time" humanize "github.com/dustin/go-humanize" "github.com/minio/minio/pkg/cpu" @@ -140,6 +141,63 @@ func (adm *AdminClient) StorageInfo() (StorageInfo, error) { return storageInfo, nil } +type objectHistogramInterval struct { + name string + start, end int64 +} + +// ObjectsHistogramIntervals contains the list of intervals +// of an histogram analysis of objects sizes. +var ObjectsHistogramIntervals = []objectHistogramInterval{ + {"LESS_THAN_1024_B", -1, 1024 - 1}, + {"BETWEEN_1024_B_AND_1_MB", 1024, 1024*1024 - 1}, + {"BETWEEN_1_MB_AND_10_MB", 1024 * 1024, 1024*1024*10 - 1}, + {"BETWEEN_10_MB_AND_64_MB", 1024 * 1024 * 10, 1024*1024*64 - 1}, + {"BETWEEN_64_MB_AND_128_MB", 1024 * 1024 * 64, 1024*1024*128 - 1}, + {"BETWEEN_128_MB_AND_512_MB", 1024 * 1024 * 128, 1024*1024*512 - 1}, + {"GREATER_THAN_512_MB", 1024 * 1024 * 512, -1}, +} + +// DataUsageInfo represents data usage of an Object API +type DataUsageInfo struct { + LastUpdate time.Time `json:"lastUpdate"` + ObjectsCount uint64 `json:"objectsCount"` + ObjectsTotalSize uint64 `json:"objectsTotalSize"` + ObjectsSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"` + + BucketsCount uint64 `json:"bucketsCount"` + BucketsSizes map[string]uint64 `json:"bucketsSizes"` +} + +// DataUsageInfo - returns data usage of the current object API +func (adm *AdminClient) DataUsageInfo() (DataUsageInfo, error) { + resp, err := adm.executeMethod("GET", requestData{relPath: adminAPIPrefix + "/datausageinfo"}) + defer closeResponse(resp) + if err != nil { + return DataUsageInfo{}, err + } + + // Check response http status code + if resp.StatusCode != http.StatusOK { + return DataUsageInfo{}, httpRespToErrorResponse(resp) + } + + // Unmarshal the server's json response + var dataUsageInfo DataUsageInfo + + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return DataUsageInfo{}, err + } + + err = json.Unmarshal(respBytes, &dataUsageInfo) + if err != nil { + return DataUsageInfo{}, err + } + + return dataUsageInfo, nil +} + // ServerDrivesPerfInfo holds informantion about address and write speed of // all drives in a single server node type ServerDrivesPerfInfo struct {