diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index f53bfa252..653595527 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -241,7 +241,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa for k, v := range putOpts.Header() { headerSize += len(k) + len(v) } - r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b) + r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit) _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts) r.Close() diff --git a/cmd/notification.go b/cmd/notification.go index 631627c26..4bbd966db 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1290,14 +1290,14 @@ func sendEvent(args eventArgs) { func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...string) bandwidth.Report { reports := make([]*bandwidth.Report, len(sys.peerClients)) g := errgroup.WithNErrs(len(sys.peerClients)) - for index, peer := range sys.peerClients { - if peer == nil { + for index := range sys.peerClients { + if sys.peerClients[index] == nil { continue } index := index g.Go(func() error { var err error - reports[index], err = peer.MonitorBandwidth(ctx, buckets) + reports[index], err = sys.peerClients[index].MonitorBandwidth(ctx, buckets) return err }, index) } @@ -1323,6 +1323,9 @@ func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ... d = consolidatedReport.BucketStats[bucket] d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond } + if d.LimitInBytesPerSecond < report.BucketStats[bucket].LimitInBytesPerSecond { + d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond + } d.CurrentBandwidthInBytesPerSecond += report.BucketStats[bucket].CurrentBandwidthInBytesPerSecond consolidatedReport.BucketStats[bucket] = d } diff --git a/pkg/bucket/bandwidth/monitor.go b/pkg/bucket/bandwidth/monitor.go index 027542ae8..2ec5d5376 100644 --- a/pkg/bucket/bandwidth/monitor.go +++ b/pkg/bucket/bandwidth/monitor.go @@ -26,16 +26,16 @@ import ( ) // throttleBandwidth gets the throttle for bucket with the configured value -func (m *Monitor) throttleBandwidth(ctx context.Context, bucket string, bandwidthBytesPerSecond int64) *throttle { +func (m *Monitor) throttleBandwidth(ctx context.Context, bucket string, bandwidthBytesPerSecond int64, clusterBandwidth int64) *throttle { m.lock.Lock() defer m.lock.Unlock() throttle, ok := m.bucketThrottle[bucket] if !ok { - throttle = newThrottle(ctx, bandwidthBytesPerSecond) + throttle = newThrottle(ctx, bandwidthBytesPerSecond, clusterBandwidth) m.bucketThrottle[bucket] = throttle return throttle } - throttle.SetBandwidth(bandwidthBytesPerSecond) + throttle.SetBandwidth(bandwidthBytesPerSecond, clusterBandwidth) return throttle } @@ -80,7 +80,7 @@ type Monitor struct { func NewMonitor(doneCh <-chan struct{}) *Monitor { m := &Monitor{ activeBuckets: make(map[string]*bucketMeasurement), - bucketMovingAvgTicker: time.NewTicker(1 * time.Second), + bucketMovingAvgTicker: time.NewTicker(2 * time.Second), pubsub: pubsub.New(), bucketThrottle: make(map[string]*throttle), doneCh: doneCh, @@ -124,7 +124,7 @@ func (m *Monitor) getReport(selectBucket SelectionFunction) *bandwidth.Report { continue } report.BucketStats[bucket] = bandwidth.Details{ - LimitInBytesPerSecond: m.bucketThrottle[bucket].bytesPerSecond, + LimitInBytesPerSecond: m.bucketThrottle[bucket].clusterBandwidth, CurrentBandwidthInBytesPerSecond: bucketMeasurement.getExpMovingAvgBytesPerSecond(), } } @@ -138,7 +138,6 @@ func (m *Monitor) process(doneCh <-chan struct{}) { m.processAvg() case <-doneCh: return - default: } } } diff --git a/pkg/bucket/bandwidth/monitor_test.go b/pkg/bucket/bandwidth/monitor_test.go index c8c2116c0..02ed7162c 100644 --- a/pkg/bucket/bandwidth/monitor_test.go +++ b/pkg/bucket/bandwidth/monitor_test.go @@ -35,8 +35,8 @@ func TestMonitor_GetThrottle(t *testing.T) { bucket string bpi int64 } - t1 := newThrottle(context.Background(), 100) - t2 := newThrottle(context.Background(), 200) + t1 := newThrottle(context.Background(), 100, 1024*1024) + t2 := newThrottle(context.Background(), 200, 1024*1024) tests := []struct { name string fields fields @@ -68,7 +68,7 @@ func TestMonitor_GetThrottle(t *testing.T) { m := &Monitor{ bucketThrottle: tt.fields.bucketThrottles, } - if got := m.throttleBandwidth(context.Background(), tt.fields.bucket, tt.fields.bpi); got.bytesPerInterval != tt.want.bytesPerInterval { + if got := m.throttleBandwidth(context.Background(), tt.fields.bucket, tt.fields.bpi, 1024*1024); got.bytesPerInterval != tt.want.bytesPerInterval { t.Errorf("throttleBandwidth() = %v, want %v", got, tt.want) } }) @@ -135,7 +135,8 @@ func TestMonitor_GetReport(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() thr := throttle{ - bytesPerSecond: 1024 * 1024, + bytesPerSecond: 1024 * 1024, + clusterBandwidth: 1024 * 1024, } m := &Monitor{ activeBuckets: tt.fields.activeBuckets, diff --git a/pkg/bucket/bandwidth/reader.go b/pkg/bucket/bandwidth/reader.go index 5f32d4ca5..9c85f0406 100644 --- a/pkg/bucket/bandwidth/reader.go +++ b/pkg/bucket/bandwidth/reader.go @@ -37,7 +37,7 @@ type MonitoredReader struct { } // NewMonitoredReader returns a io.ReadCloser that reports bandwidth details -func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.Reader, headerSize int, bandwidthBytesPerSecond int64) *MonitoredReader { +func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.Reader, headerSize int, bandwidthBytesPerSecond int64, clusterBandwidth int64) *MonitoredReader { timeNow := time.Now() b := monitor.track(bucket, object, timeNow) return &MonitoredReader{ @@ -47,7 +47,7 @@ func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, ob reader: reader, lastStop: timeNow, headerSize: headerSize, - throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond), + throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond, clusterBandwidth), monitor: monitor, } } diff --git a/pkg/bucket/bandwidth/throttle.go b/pkg/bucket/bandwidth/throttle.go index 7ba9cc3d7..7fc00fd0a 100644 --- a/pkg/bucket/bandwidth/throttle.go +++ b/pkg/bucket/bandwidth/throttle.go @@ -29,27 +29,31 @@ const ( // throttle implements the throttling for bandwidth type throttle struct { - generateTicker *time.Ticker // Ticker to generate available bandwidth - freeBytes int64 // unused bytes in the interval - bytesPerSecond int64 // max limit for bandwidth - bytesPerInterval int64 // bytes allocated for the interval - cond *sync.Cond // Used to notify waiting threads for bandwidth availability + generateTicker *time.Ticker // Ticker to generate available bandwidth + freeBytes int64 // unused bytes in the interval + bytesPerSecond int64 // max limit for bandwidth + bytesPerInterval int64 // bytes allocated for the interval + clusterBandwidth int64 // Cluster wide bandwidth needed for reporting + cond *sync.Cond // Used to notify waiting threads for bandwidth availability + goGenerate int64 // Flag to track if generate routine should be running. 0 == stopped + ctx context.Context // Context for generate } // newThrottle returns a new bandwidth throttle. Set bytesPerSecond to 0 for no limit -func newThrottle(ctx context.Context, bytesPerSecond int64) *throttle { +func newThrottle(ctx context.Context, bytesPerSecond int64, clusterBandwidth int64) *throttle { if bytesPerSecond == 0 { return &throttle{} } t := &throttle{ - bytesPerSecond: bytesPerSecond, - generateTicker: time.NewTicker(throttleInternal), + bytesPerSecond: bytesPerSecond, + generateTicker: time.NewTicker(throttleInternal), + clusterBandwidth: clusterBandwidth, + ctx: ctx, } t.cond = sync.NewCond(&sync.Mutex{}) - t.SetBandwidth(bytesPerSecond) + t.SetBandwidth(bytesPerSecond, clusterBandwidth) t.freeBytes = t.bytesPerInterval - go t.generateBandwidth(ctx) return t } @@ -74,12 +78,17 @@ func (t *throttle) GetLimitForBytes(want int64) int64 { } } atomic.AddInt64(&t.freeBytes, -send) + + // Bandwidth was consumed, start generate routine to allocate bandwidth + if atomic.CompareAndSwapInt64(&t.goGenerate, 0, 1) { + go t.generateBandwidth(t.ctx) + } return send } } // SetBandwidth sets a new bandwidth limit in bytes per second. -func (t *throttle) SetBandwidth(bandwidthBiPS int64) { +func (t *throttle) SetBandwidth(bandwidthBiPS int64, clusterBandwidth int64) { bpi := int64(throttleInternal) * bandwidthBiPS / int64(time.Second) atomic.StoreInt64(&t.bytesPerInterval, bpi) } @@ -94,6 +103,11 @@ func (t *throttle) generateBandwidth(ctx context.Context) { for { select { case <-t.generateTicker.C: + if atomic.LoadInt64(&t.freeBytes) == atomic.LoadInt64(&t.bytesPerInterval) { + // No bandwidth consumption stop the routine. + atomic.StoreInt64(&t.goGenerate, 0) + return + } // A new window is available t.cond.L.Lock() atomic.StoreInt64(&t.freeBytes, atomic.LoadInt64(&t.bytesPerInterval)) @@ -101,7 +115,6 @@ func (t *throttle) generateBandwidth(ctx context.Context) { t.cond.L.Unlock() case <-ctx.Done(): return - default: } } }