diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 0dc9100c2..e929e3aec 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -35,17 +35,19 @@ import ( "time" "github.com/gorilla/mux" - "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger/message/log" "github.com/minio/minio/pkg/auth" + bandwidth "github.com/minio/minio/pkg/bandwidth" + bucketBandwidth "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/handlers" iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" + "github.com/minio/minio/pkg/sync/errgroup" trace "github.com/minio/minio/pkg/trace" ) @@ -1425,6 +1427,66 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request) } +// BandwidthMonitorHandler - GET /minio/admin/v3/bandwidth +// ---------- +// Get bandwidth consumption information +func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "BandwidthMonitor") + + // Validate request signature. + _, adminAPIErr := checkAdminRequestAuthType(ctx, r, iampolicy.BandwidthMonitorAction, "") + if adminAPIErr != ErrNone { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(adminAPIErr), r.URL) + return + } + + setEventStreamHeaders(w) + peers := newPeerRestClients(globalEndpoints) + bucketsRequestedString := r.URL.Query().Get("buckets") + var bucketsRequested []string + reports := make([]*bandwidth.Report, len(peers)) + selectBuckets := bucketBandwidth.SelectAllBuckets() + if bucketsRequestedString != "" { + bucketsRequested = strings.Split(bucketsRequestedString, ",") + selectBuckets = bucketBandwidth.SelectBuckets(bucketsRequested...) + } + reports = append(reports, globalBucketMonitor.GetReport(selectBuckets)) + g := errgroup.WithNErrs(len(peers)) + for index, peer := range peers { + if peer == nil { + continue + } + index := index + g.Go(func() error { + var err error + reports[index], err = peer.MonitorBandwidth(ctx, bucketsRequested) + return err + }, index) + } + consolidatedReport := bandwidth.Report{ + BucketStats: make(map[string]bandwidth.Details), + } + + for _, report := range reports { + for bucket := range report.BucketStats { + d, ok := consolidatedReport.BucketStats[bucket] + if !ok { + consolidatedReport.BucketStats[bucket] = bandwidth.Details{} + d = consolidatedReport.BucketStats[bucket] + d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond + } + d.CurrentBandwidthInBytesPerSecond += report.BucketStats[bucket].CurrentBandwidthInBytesPerSecond + consolidatedReport.BucketStats[bucket] = d + } + } + enc := json.NewEncoder(w) + err := enc.Encode(consolidatedReport) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + } + w.(http.Flusher).Flush() +} + // ServerInfoHandler - GET /minio/admin/v3/info // ---------- // Get server information diff --git a/cmd/admin-router.go b/cmd/admin-router.go index d5e4b9185..3c52c1ce5 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -214,6 +214,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) // -- OBD API -- adminRouter.Methods(http.MethodGet).Path(adminVersion + "/obdinfo"). HandlerFunc(httpTraceHdrs(adminAPI.OBDInfoHandler)) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/bandwidth"). + HandlerFunc(httpTraceHdrs(adminAPI.BandwidthMonitorHandler)) } } diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 2e1011d00..d1c951ef7 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -373,6 +373,20 @@ func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.Buc return meta.bucketTargetConfig, nil } +// GetBucketTarget returns the target for the bucket and arn. +func (sys *BucketMetadataSys) GetBucketTarget(bucket string, arn string) (madmin.BucketTarget, error) { + targets, err := sys.GetBucketTargetsConfig(bucket) + if err != nil { + return madmin.BucketTarget{}, err + } + for _, t := range targets.Targets { + if t.Arn == arn { + return t, nil + } + } + return madmin.BucketTarget{}, errConfigNotFound +} + // GetConfig returns a specific configuration from the bucket metadata. // The returned object may not be modified. func (sys *BucketMetadataSys) GetConfig(bucket string) (BucketMetadata, error) { diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 571c168d1..0d0c7d981 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -30,6 +30,7 @@ import ( "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/bucket/replication" "github.com/minio/minio/pkg/event" iampolicy "github.com/minio/minio/pkg/iam/policy" @@ -119,7 +120,7 @@ func mustReplicater(ctx context.Context, r *http.Request, bucket, object string, return cfg.Replicate(opts) } -func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { +func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { meta := make(map[string]string) for k, v := range objInfo.UserDefined { if k == xhttp.AmzBucketReplicationStatus { @@ -168,6 +169,7 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp if crypto.S3.IsEncrypted(objInfo.UserDefined) { putOpts.ServerSideEncryption = encrypt.NewSSE() } + return } @@ -184,16 +186,15 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn) if tgt == nil { + logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, cfg.RoleArn)) return } - gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ VersionID: objInfo.VersionID, }) if err != nil { return } - objInfo = gr.ObjInfo size, err := objInfo.GetActualSize() if err != nil { @@ -224,11 +225,26 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa return } } - putOpts := putReplicationOpts(dest, objInfo) + target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err)) + return + } + putOpts := putReplicationOpts(ctx, dest, objInfo) replicationStatus := replication.Complete - _, err = tgt.PutObject(ctx, dest.Bucket, object, gr, size, "", "", putOpts) - gr.Close() + + // Setup bandwidth throttling + peerCount := len(globalEndpoints) + b := target.BandwidthLimit / int64(peerCount) + var headerSize int + for k, v := range putOpts.Header() { + headerSize += len(k) + len(v) + } + r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b) + + _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts) + r.Close() if err != nil { replicationStatus = replication.Failed } diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 023b645b6..7ea65669f 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -207,14 +207,14 @@ func (sys *BucketTargetSys) Init(ctx context.Context, buckets []BucketInfo, objA return nil } -// UpdateTarget updates target to reflect metadata updates -func (sys *BucketTargetSys) UpdateTarget(bucket string, cfg *madmin.BucketTargets) { +// UpdateAllTargets updates target to reflect metadata updates +func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketTargets) { if sys == nil { return } sys.Lock() defer sys.Unlock() - if cfg == nil || cfg.Empty() { + if tgts == nil || tgts.Empty() { // remove target and arn association if tgts, ok := sys.targetsMap[bucket]; ok { for _, t := range tgts { @@ -225,10 +225,10 @@ func (sys *BucketTargetSys) UpdateTarget(bucket string, cfg *madmin.BucketTarget return } - if len(cfg.Targets) > 0 { - sys.targetsMap[bucket] = cfg.Targets + if len(tgts.Targets) > 0 { + sys.targetsMap[bucket] = tgts.Targets } - for _, tgt := range cfg.Targets { + for _, tgt := range tgts.Targets { tgtClient, err := sys.getRemoteTargetClient(&tgt) if err != nil { continue @@ -238,7 +238,7 @@ func (sys *BucketTargetSys) UpdateTarget(bucket string, cfg *madmin.BucketTarget sys.clientsCache[tgtClient.EndpointURL().String()] = tgtClient } } - sys.targetsMap[bucket] = cfg.Targets + sys.targetsMap[bucket] = tgts.Targets } // create minio-go clients for buckets having remote targets diff --git a/cmd/globals.go b/cmd/globals.go index cedd01ff8..b88add2a4 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -22,6 +22,7 @@ import ( "time" "github.com/minio/minio-go/v7/pkg/set" + "github.com/minio/minio/pkg/bucket/bandwidth" humanize "github.com/dustin/go-humanize" "github.com/minio/minio/cmd/config/cache" @@ -149,6 +150,7 @@ var ( globalEnvTargetList *event.TargetList globalBucketMetadataSys *BucketMetadataSys + globalBucketMonitor *bandwidth.Monitor globalPolicySys *PolicySys globalIAMSys *IAMSys diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 47b8d8269..4d8e73499 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -26,6 +26,7 @@ import ( "math" "net/url" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -35,6 +36,7 @@ import ( xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/rest" + "github.com/minio/minio/pkg/bandwidth" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" @@ -884,3 +886,20 @@ func newPeerRESTClient(peer *xnet.Host) *peerRESTClient { return &peerRESTClient{host: peer, restClient: restClient} } + +// MonitorBandwidth - send http trace request to peer nodes +func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []string) (*bandwidth.Report, error) { + values := make(url.Values) + values.Set(peerRESTBuckets, strings.Join(buckets, ",")) + + respBody, err := client.callWithContext(ctx, peerRESTMethodGetBandwidth, values, nil, -1) + if err != nil { + return nil, err + } + defer http.DrainBody(respBody) + + dec := gob.NewDecoder(respBody) + var bandwidthReport bandwidth.Report + err = dec.Decode(&bandwidthReport) + return &bandwidthReport, err +} diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 14c9d288a..ef7578117 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -57,10 +57,12 @@ const ( peerRESTMethodListen = "/listen" peerRESTMethodLog = "/log" peerRESTMethodGetLocalDiskIDs = "/getlocaldiskids" + peerRESTMethodGetBandwidth = "/bandwidth" ) const ( peerRESTBucket = "bucket" + peerRESTBuckets = "buckets" peerRESTUser = "user" peerRESTGroup = "group" peerRESTUserTemp = "user-temp" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index e4bca01a8..6fe971395 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -30,6 +30,8 @@ import ( "github.com/gorilla/mux" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bandwidth" + b "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/madmin" trace "github.com/minio/minio/pkg/trace" @@ -628,7 +630,7 @@ func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *htt } if meta.bucketTargetConfig != nil { - globalBucketTargetSys.UpdateTarget(bucketName, meta.bucketTargetConfig) + globalBucketTargetSys.UpdateAllTargets(bucketName, meta.bucketTargetConfig) } } @@ -1047,6 +1049,34 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { return true } +// GetBandwidth gets the bandwidth for the buckets requested. +func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + bucketsString := r.URL.Query().Get("buckets") + w.WriteHeader(http.StatusOK) + w.(http.Flusher).Flush() + + doneCh := make(chan struct{}) + defer close(doneCh) + + var report *bandwidth.Report + selectBuckets := b.SelectAllBuckets() + if bucketsString != "" { + selectBuckets = b.SelectBuckets(strings.Split(bucketsString, ",")...) + } + report = globalBucketMonitor.GetReport(selectBuckets) + + enc := gob.NewEncoder(w) + if err := enc.Encode(report); err != nil { + s.writeErrorResponse(w, errors.New("Encoding report failed: "+err.Error())) + return + } + w.(http.Flusher).Flush() +} + // registerPeerRESTHandlers - register peer rest router. func registerPeerRESTHandlers(router *mux.Router) { server := &peerRESTServer{} @@ -1085,4 +1115,5 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocalDiskIDs).HandlerFunc(httpTraceHdrs(server.GetLocalDiskIDs)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBandwidth).HandlerFunc(httpTraceHdrs(server.GetBandwidth)) } diff --git a/cmd/server-main.go b/cmd/server-main.go index 797bf96dd..78cff470e 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -32,6 +32,7 @@ import ( xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" + "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/certs" "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/env" @@ -158,6 +159,9 @@ func newAllSubsystems() { // Create new bucket metadata system. globalBucketMetadataSys = NewBucketMetadataSys() + // Create the bucket bandwidth monitor + globalBucketMonitor = bandwidth.NewMonitor(GlobalServiceDoneCh) + // Create a new config system. globalConfigSys = NewConfigSys() diff --git a/pkg/bandwidth/bandwidth.go b/pkg/bandwidth/bandwidth.go new file mode 100644 index 000000000..29af5b8c0 --- /dev/null +++ b/pkg/bandwidth/bandwidth.go @@ -0,0 +1,28 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bandwidth + +// Details for the measured bandwidth +type Details struct { + LimitInBytesPerSecond int64 `json:"limitInBits"` + CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth"` +} + +// Report captures the details for all buckets. +type Report struct { + BucketStats map[string]Details `json:"bucketStats,omitempty"` +} diff --git a/pkg/bucket/bandwidth/measurement.go b/pkg/bucket/bandwidth/measurement.go new file mode 100644 index 000000000..4ff0e2229 --- /dev/null +++ b/pkg/bucket/bandwidth/measurement.go @@ -0,0 +1,87 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bandwidth + +import ( + "sync" + "sync/atomic" + "time" +) + +const ( + // betaBucket is the weight used to calculate exponential moving average + betaBucket = 0.1 // Number of averages considered = 1/(1-betaObject) +) + +// bucketMeasurement captures the bandwidth details for one bucket +type bucketMeasurement struct { + lock sync.Mutex + bytesSinceLastWindow uint64 // Total bytes since last window was processed + startTime time.Time // Start time for window + expMovingAvg float64 // Previously calculate sliding window +} + +// newBucketMeasurement creates a new instance of the measurement with the initial start time. +func newBucketMeasurement(initTime time.Time) *bucketMeasurement { + return &bucketMeasurement{ + startTime: initTime, + } +} + +// incrementBytes add bytes reported for a bucket. +func (m *bucketMeasurement) incrementBytes(bytes uint64) { + atomic.AddUint64(&m.bytesSinceLastWindow, bytes) +} + +// updateExponentialMovingAverage processes the measurements captured so far. +func (m *bucketMeasurement) updateExponentialMovingAverage(endTime time.Time) { + // Calculate aggregate avg bandwidth and exp window avg + m.lock.Lock() + defer func() { + m.startTime = endTime + m.lock.Unlock() + }() + + if endTime.Before(m.startTime) { + return + } + + duration := endTime.Sub(m.startTime) + + bytesSinceLastWindow := atomic.SwapUint64(&m.bytesSinceLastWindow, 0) + + if m.expMovingAvg == 0 { + // Should address initial calculation and should be fine for resuming from 0 + m.expMovingAvg = float64(bytesSinceLastWindow) / duration.Seconds() + return + } + + increment := float64(bytesSinceLastWindow) / duration.Seconds() + m.expMovingAvg = exponentialMovingAverage(betaBucket, m.expMovingAvg, increment) +} + +// exponentialMovingAverage calculates the exponential moving average +func exponentialMovingAverage(beta, previousAvg, incrementAvg float64) float64 { + return (1-beta)*incrementAvg + beta*previousAvg +} + +// getExpMovingAvgBytesPerSecond returns the exponential moving average for the bucket in bytes +func (m *bucketMeasurement) getExpMovingAvgBytesPerSecond() float64 { + m.lock.Lock() + defer m.lock.Unlock() + return m.expMovingAvg +} diff --git a/pkg/bucket/bandwidth/monitor.go b/pkg/bucket/bandwidth/monitor.go new file mode 100644 index 000000000..c86ee0106 --- /dev/null +++ b/pkg/bucket/bandwidth/monitor.go @@ -0,0 +1,175 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bandwidth + +import ( + "context" + "sync" + "time" + + "github.com/minio/minio/pkg/bandwidth" + "github.com/minio/minio/pkg/pubsub" +) + +// throttleBandwidth gets the throttle for bucket with the configured value +func (m *Monitor) throttleBandwidth(ctx context.Context, bucket string, bandwidthBytesPerSecond int64) *throttle { + m.lock.Lock() + defer m.lock.Unlock() + throttle, ok := m.bucketThrottle[bucket] + if !ok { + throttle = newThrottle(ctx, bandwidthBytesPerSecond) + m.bucketThrottle[bucket] = throttle + return throttle + } + throttle.SetBandwidth(bandwidthBytesPerSecond) + return throttle +} + +// SubscribeToBuckets subscribes to buckets. Empty array for monitoring all buckets. +func (m *Monitor) SubscribeToBuckets(subCh chan interface{}, doneCh <-chan struct{}, buckets []string) { + m.pubsub.Subscribe(subCh, doneCh, func(f interface{}) bool { + if buckets != nil || len(buckets) == 0 { + return true + } + report, ok := f.(*bandwidth.Report) + if !ok { + return false + } + for _, b := range buckets { + _, ok := report.BucketStats[b] + if ok { + return true + } + } + return false + }) +} + +// Monitor implements the monitoring for bandwidth measurements. +type Monitor struct { + lock sync.Mutex // lock for all updates + + activeBuckets map[string]*bucketMeasurement // Buckets with objects in flight + + bucketMovingAvgTicker *time.Ticker // Ticker for calculating moving averages + + pubsub *pubsub.PubSub // PubSub for reporting bandwidths. + + bucketThrottle map[string]*throttle + + startProcessing sync.Once + + doneCh <-chan struct{} +} + +// NewMonitor returns a monitor with defaults. +func NewMonitor(doneCh <-chan struct{}) *Monitor { + m := &Monitor{ + activeBuckets: make(map[string]*bucketMeasurement), + bucketMovingAvgTicker: time.NewTicker(1 * time.Second), + pubsub: pubsub.New(), + bucketThrottle: make(map[string]*throttle), + doneCh: doneCh, + } + return m +} + +// SelectionFunction for buckets +type SelectionFunction func(bucket string) bool + +// SelectAllBuckets will select all buckets +func SelectAllBuckets() SelectionFunction { + return func(bucket string) bool { + return true + } +} + +// SelectBuckets will select all the buckets passed in. +func SelectBuckets(buckets ...string) SelectionFunction { + return func(bucket string) bool { + for _, b := range buckets { + if b != "" && b == bucket { + return true + } + } + return false + } +} + +// GetReport gets the report for all bucket bandwidth details. +func (m *Monitor) GetReport(selectBucket SelectionFunction) *bandwidth.Report { + m.lock.Lock() + defer m.lock.Unlock() + return m.getReport(selectBucket) +} + +func (m *Monitor) getReport(selectBucket SelectionFunction) *bandwidth.Report { + report := &bandwidth.Report{ + BucketStats: make(map[string]bandwidth.Details), + } + for bucket, bucketMeasurement := range m.activeBuckets { + if !selectBucket(bucket) { + continue + } + report.BucketStats[bucket] = bandwidth.Details{ + LimitInBytesPerSecond: m.bucketThrottle[bucket].bytesPerSecond, + CurrentBandwidthInBytesPerSecond: bucketMeasurement.getExpMovingAvgBytesPerSecond(), + } + } + return report +} + +func (m *Monitor) process(doneCh <-chan struct{}) { + for { + select { + case <-m.bucketMovingAvgTicker.C: + m.processAvg() + case <-doneCh: + return + default: + } + } +} + +func (m *Monitor) getBucketMeasurement(bucket string, initTime time.Time) *bucketMeasurement { + bucketTracker, ok := m.activeBuckets[bucket] + if !ok { + bucketTracker = newBucketMeasurement(initTime) + m.activeBuckets[bucket] = bucketTracker + } + return bucketTracker +} + +func (m *Monitor) processAvg() { + m.lock.Lock() + defer m.lock.Unlock() + for _, bucketMeasurement := range m.activeBuckets { + bucketMeasurement.updateExponentialMovingAverage(time.Now()) + } + m.pubsub.Publish(m.getReport(SelectAllBuckets())) +} + +// track returns the measurement object for bucket and object +func (m *Monitor) track(bucket string, object string, timeNow time.Time) *bucketMeasurement { + m.lock.Lock() + defer m.lock.Unlock() + m.startProcessing.Do(func() { + go m.process(m.doneCh) + }) + b := m.getBucketMeasurement(bucket, timeNow) + return b +} diff --git a/pkg/bucket/bandwidth/monitor_test.go b/pkg/bucket/bandwidth/monitor_test.go new file mode 100644 index 000000000..53a08753f --- /dev/null +++ b/pkg/bucket/bandwidth/monitor_test.go @@ -0,0 +1,157 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bandwidth + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/minio/minio/pkg/bandwidth" +) + +const ( + oneMiB uint64 = 1024 * 1024 +) + +func TestMonitor_GetThrottle(t *testing.T) { + type fields struct { + bucketThrottles map[string]*throttle + bucket string + bpi int64 + } + t1 := newThrottle(context.Background(), 100) + t2 := newThrottle(context.Background(), 200) + tests := []struct { + name string + fields fields + want *throttle + }{ + { + name: "Existing", + fields: fields{ + bucketThrottles: map[string]*throttle{"bucket": t1}, + bucket: "bucket", + bpi: 100, + }, + want: t1, + }, + { + name: "new", + fields: fields{ + bucketThrottles: map[string]*throttle{"bucket": t1}, + bucket: "bucket2", + bpi: 200, + }, + want: t2, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + m := &Monitor{ + bucketThrottle: tt.fields.bucketThrottles, + } + if got := m.throttleBandwidth(context.Background(), tt.fields.bucket, tt.fields.bpi); got.bytesPerInterval != tt.want.bytesPerInterval { + t.Errorf("throttleBandwidth() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestMonitor_GetReport(t *testing.T) { + type fields struct { + activeBuckets map[string]*bucketMeasurement + endTime time.Time + update2 uint64 + endTime2 time.Time + } + start := time.Now() + m0 := newBucketMeasurement(start) + m0.incrementBytes(0) + m1MiBPS := newBucketMeasurement(start) + m1MiBPS.incrementBytes(oneMiB) + tests := []struct { + name string + fields fields + want *bandwidth.Report + want2 *bandwidth.Report + }{ + { + name: "ZeroToOne", + fields: fields{ + activeBuckets: map[string]*bucketMeasurement{ + "bucket": m0, + }, + endTime: start.Add(1 * time.Second), + update2: oneMiB, + endTime2: start.Add(2 * time.Second), + }, + want: &bandwidth.Report{ + BucketStats: map[string]bandwidth.Details{"bucket": {LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: 0}}, + }, + want2: &bandwidth.Report{ + BucketStats: map[string]bandwidth.Details{"bucket": {LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: (1024 * 1024) / start.Add(2*time.Second).Sub(start.Add(1*time.Second)).Seconds()}}, + }, + }, + { + name: "OneToTwo", + fields: fields{ + activeBuckets: map[string]*bucketMeasurement{ + "bucket": m1MiBPS, + }, + endTime: start.Add(1 * time.Second), + update2: 2 * oneMiB, + endTime2: start.Add(2 * time.Second), + }, + want: &bandwidth.Report{ + BucketStats: map[string]bandwidth.Details{"bucket": {LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: float64(oneMiB)}}, + }, + want2: &bandwidth.Report{ + BucketStats: map[string]bandwidth.Details{"bucket": { + LimitInBytesPerSecond: 1024 * 1024, + CurrentBandwidthInBytesPerSecond: exponentialMovingAverage(betaBucket, float64(oneMiB), 2*float64(oneMiB))}}, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + thr := throttle{ + bytesPerSecond: 1024 * 1024, + } + m := &Monitor{ + activeBuckets: tt.fields.activeBuckets, + bucketThrottle: map[string]*throttle{"bucket": &thr}, + } + m.activeBuckets["bucket"].updateExponentialMovingAverage(tt.fields.endTime) + got := m.GetReport(SelectAllBuckets()) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetReport() = %v, want %v", got, tt.want) + } + m.activeBuckets["bucket"].incrementBytes(tt.fields.update2) + m.activeBuckets["bucket"].updateExponentialMovingAverage(tt.fields.endTime2) + got = m.GetReport(SelectAllBuckets()) + if !reflect.DeepEqual(got, tt.want2) { + t.Errorf("GetReport() = %v, want %v", got, tt.want2) + } + }) + } +} diff --git a/pkg/bucket/bandwidth/reader.go b/pkg/bucket/bandwidth/reader.go new file mode 100644 index 000000000..5f32d4ca5 --- /dev/null +++ b/pkg/bucket/bandwidth/reader.go @@ -0,0 +1,86 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package bandwidth + +import ( + "context" + "io" + "time" +) + +// MonitoredReader monitors the bandwidth +type MonitoredReader struct { + bucket string // Token to track bucket + bucketMeasurement *bucketMeasurement // bucket measurement object + object string // Token to track object + reader io.Reader // Reader to wrap + lastStop time.Time // Last timestamp for a measurement + headerSize int // Size of the header not captured by reader + throttle *throttle // throttle the rate at which replication occur + monitor *Monitor // Monitor reference + closed bool // Reader is closed +} + +// NewMonitoredReader returns a io.ReadCloser that reports bandwidth details +func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.Reader, headerSize int, bandwidthBytesPerSecond int64) *MonitoredReader { + timeNow := time.Now() + b := monitor.track(bucket, object, timeNow) + return &MonitoredReader{ + bucket: bucket, + object: object, + bucketMeasurement: b, + reader: reader, + lastStop: timeNow, + headerSize: headerSize, + throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond), + monitor: monitor, + } +} + +// Read wraps the read reader +func (m *MonitoredReader) Read(p []byte) (n int, err error) { + if m.closed { + err = io.ErrClosedPipe + return + } + p = p[:m.throttle.GetLimitForBytes(int64(len(p)))] + + n, err = m.reader.Read(p) + stop := time.Now() + update := uint64(n + m.headerSize) + + m.bucketMeasurement.incrementBytes(update) + m.lastStop = stop + unused := len(p) - (n + m.headerSize) + m.headerSize = 0 // Set to 0 post first read + + if unused > 0 { + m.throttle.ReleaseUnusedBandwidth(int64(unused)) + } + return +} + +// Close stops tracking the io +func (m *MonitoredReader) Close() error { + rc, ok := m.reader.(io.ReadCloser) + m.closed = true + if ok { + return rc.Close() + } + return nil +} diff --git a/pkg/bucket/bandwidth/throttle.go b/pkg/bucket/bandwidth/throttle.go new file mode 100644 index 000000000..7ba9cc3d7 --- /dev/null +++ b/pkg/bucket/bandwidth/throttle.go @@ -0,0 +1,107 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bandwidth + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +const ( + throttleInternal = 250 * time.Millisecond +) + +// throttle implements the throttling for bandwidth +type throttle struct { + generateTicker *time.Ticker // Ticker to generate available bandwidth + freeBytes int64 // unused bytes in the interval + bytesPerSecond int64 // max limit for bandwidth + bytesPerInterval int64 // bytes allocated for the interval + cond *sync.Cond // Used to notify waiting threads for bandwidth availability +} + +// newThrottle returns a new bandwidth throttle. Set bytesPerSecond to 0 for no limit +func newThrottle(ctx context.Context, bytesPerSecond int64) *throttle { + if bytesPerSecond == 0 { + return &throttle{} + } + t := &throttle{ + bytesPerSecond: bytesPerSecond, + generateTicker: time.NewTicker(throttleInternal), + } + + t.cond = sync.NewCond(&sync.Mutex{}) + t.SetBandwidth(bytesPerSecond) + t.freeBytes = t.bytesPerInterval + go t.generateBandwidth(ctx) + return t +} + +// GetLimitForBytes gets the bytes that are possible to send within the limit +// if want is <= 0 or no bandwidth limit set, returns want. +// Otherwise a value > 0 will always be returned. +func (t *throttle) GetLimitForBytes(want int64) int64 { + if want <= 0 || atomic.LoadInt64(&t.bytesPerInterval) == 0 { + return want + } + t.cond.L.Lock() + defer t.cond.L.Unlock() + for { + var send int64 + freeBytes := atomic.LoadInt64(&t.freeBytes) + send = want + if freeBytes < want { + send = freeBytes + if send <= 0 { + t.cond.Wait() + continue + } + } + atomic.AddInt64(&t.freeBytes, -send) + return send + } +} + +// SetBandwidth sets a new bandwidth limit in bytes per second. +func (t *throttle) SetBandwidth(bandwidthBiPS int64) { + bpi := int64(throttleInternal) * bandwidthBiPS / int64(time.Second) + atomic.StoreInt64(&t.bytesPerInterval, bpi) +} + +// ReleaseUnusedBandwidth releases bandwidth that was allocated for a user +func (t *throttle) ReleaseUnusedBandwidth(bytes int64) { + atomic.AddInt64(&t.freeBytes, bytes) +} + +// generateBandwidth periodically allocates new bandwidth to use +func (t *throttle) generateBandwidth(ctx context.Context) { + for { + select { + case <-t.generateTicker.C: + // A new window is available + t.cond.L.Lock() + atomic.StoreInt64(&t.freeBytes, atomic.LoadInt64(&t.bytesPerInterval)) + t.cond.Broadcast() + t.cond.L.Unlock() + case <-ctx.Done(): + return + default: + } + } +} diff --git a/pkg/iam/policy/admin-action.go b/pkg/iam/policy/admin-action.go index 23ff5503b..59dceec24 100644 --- a/pkg/iam/policy/admin-action.go +++ b/pkg/iam/policy/admin-action.go @@ -49,6 +49,8 @@ const ( ServerInfoAdminAction = "admin:ServerInfo" // OBDInfoAdminAction - allow obtaining cluster on-board diagnostics OBDInfoAdminAction = "admin:OBDInfo" + // BandwidthMonitorAction - allow monitoring bandwidth usage + BandwidthMonitorAction = "admin:BandwidthMonitor" // ServerUpdateAdminAction - allow MinIO binary update ServerUpdateAdminAction = "admin:ServerUpdate" @@ -131,6 +133,7 @@ var supportedAdminActions = map[AdminAction]struct{}{ KMSKeyStatusAdminAction: {}, ServerInfoAdminAction: {}, OBDInfoAdminAction: {}, + BandwidthMonitorAction: {}, ServerUpdateAdminAction: {}, ServiceRestartAdminAction: {}, ServiceStopAdminAction: {}, @@ -173,6 +176,7 @@ var adminActionConditionKeyMap = map[Action]condition.KeySet{ ServerInfoAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), DataUsageInfoAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), OBDInfoAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), + BandwidthMonitorAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), TopLocksAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), ProfilingAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), TraceAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), diff --git a/pkg/iam/policy/constants.go b/pkg/iam/policy/constants.go index 846b6622b..c6c075708 100644 --- a/pkg/iam/policy/constants.go +++ b/pkg/iam/policy/constants.go @@ -75,7 +75,7 @@ var AdminDiagnostics = Policy{ Actions: NewActionSet(ProfilingAdminAction, TraceAdminAction, ConsoleLogAdminAction, ServerInfoAdminAction, TopLocksAdminAction, - OBDInfoAdminAction), + OBDInfoAdminAction, BandwidthMonitorAction), Resources: NewResourceSet(NewResource("*", "")), }, }, diff --git a/pkg/madmin/bandwidth.go b/pkg/madmin/bandwidth.go new file mode 100644 index 000000000..4452b1824 --- /dev/null +++ b/pkg/madmin/bandwidth.go @@ -0,0 +1,61 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package madmin + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/url" + "strings" + + "github.com/minio/minio/pkg/bandwidth" +) + +// GetBucketBandwidth - Get a snapshot of the bandwidth measurements for replication buckets. If no buckets +// generate replication traffic an empty map is returned. +func (adm *AdminClient) GetBucketBandwidth(ctx context.Context, buckets ...string) (bandwidth.Report, error) { + queryValues := url.Values{} + if len(buckets) > 0 { + queryValues.Set("buckets", strings.Join(buckets, ",")) + } + + reqData := requestData{ + relPath: adminAPIPrefix + "/bandwidth", + queryValues: queryValues, + } + + resp, err := adm.executeMethod(ctx, http.MethodGet, reqData) + if err != nil { + closeResponse(resp) + return bandwidth.Report{}, err + } + if resp.StatusCode != http.StatusOK { + return bandwidth.Report{}, httpRespToErrorResponse(resp) + } + dec := json.NewDecoder(resp.Body) + for { + var report bandwidth.Report + err = dec.Decode(&report) + if err != nil && err != io.EOF { + return bandwidth.Report{}, err + } + return report, nil + } +} diff --git a/pkg/madmin/examples/bucket-bandwidth.go b/pkg/madmin/examples/bucket-bandwidth.go new file mode 100644 index 000000000..a2e50a5f0 --- /dev/null +++ b/pkg/madmin/examples/bucket-bandwidth.go @@ -0,0 +1,50 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "log" + + "github.com/minio/minio/pkg/madmin" +) + +func main() { + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY and my-bucketname are + // dummy values, please replace them with original values. + + // API requests are secure (HTTPS) if secure=true and insecure (HTTP) otherwise. + // New returns an MinIO Admin client object. + madminClient, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) + if err != nil { + log.Fatalln(err) + } + ctx := context.Background() + report, err := madminClient.GetBucketBandwidth(ctx) + if err != nil { + log.Fatalln(err) + return + } + fmt.Printf("Report: %+v\n", report) + report, err = madminClient.GetBucketBandwidth(ctx, "sourceBucket", "sourceBucket2") + if err != nil { + log.Fatalln(err) + return + } + fmt.Printf("Report: %+v\n", report) +} diff --git a/pkg/madmin/examples/bucket-target.go b/pkg/madmin/examples/bucket-target.go index 234706b91..eac3602be 100644 --- a/pkg/madmin/examples/bucket-target.go +++ b/pkg/madmin/examples/bucket-target.go @@ -42,7 +42,7 @@ func main() { if err != nil { log.Fatalln(err) } - target := madmin.BucketTarget{Endpoint: "site2:9000", Credentials: creds, TargetBucket: "destbucket", IsSSL: false, Type: madmin.ReplicationArn} + target := madmin.BucketTarget{Endpoint: "site2:9000", Credentials: creds, TargetBucket: "destbucket", IsSSL: false, Type: madmin.ReplicationArn, BandwidthLimit: 2 * 1024 * 1024} // Set bucket target if err := madmClnt.SetBucketTarget(ctx, "srcbucket", &target); err != nil { log.Fatalln(err) diff --git a/pkg/madmin/remote-target-commands.go b/pkg/madmin/remote-target-commands.go index 1f514df7f..e2c7abcc9 100644 --- a/pkg/madmin/remote-target-commands.go +++ b/pkg/madmin/remote-target-commands.go @@ -84,17 +84,18 @@ func ParseARN(s string) (*ARN, error) { // BucketTarget represents the target bucket and site association. type BucketTarget struct { - SourceBucket string `json:"sourcebucket"` - Endpoint string `json:"endpoint"` - Credentials *auth.Credentials `json:"credentials"` - TargetBucket string `json:"targetbucket"` - Secure bool `json:"secure"` - Path string `json:"path,omitempty"` - API string `json:"api,omitempty"` - Arn string `json:"arn,omitempty"` - Type ServiceType `json:"type"` - Region string `json:"omitempty"` - Label string `json:"label,omitempty"` + SourceBucket string `json:"sourcebucket"` + Endpoint string `json:"endpoint"` + Credentials *auth.Credentials `json:"credentials"` + TargetBucket string `json:"targetbucket"` + Secure bool `json:"secure"` + Path string `json:"path,omitempty"` + API string `json:"api,omitempty"` + Arn string `json:"arn,omitempty"` + Type ServiceType `json:"type"` + Region string `json:"omitempty"` + Label string `json:"label,omitempty"` + BandwidthLimit int64 `json:"bandwidthlimit,omitempty"` } // Clone returns shallow clone of BucketTarget without secret key in credentials