fix: misc fixes for bandwidth reporting amd monitoring (#10683)

* Set peer for fetch bandwidth
* Fix the limit for bandwidth that is reported.
* Reduce CPU burn from bandwidth management.
master
Ritesh H Shukla 4 years ago committed by GitHub
parent ad726b49b4
commit 8a16a1a1a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/bucket-replication.go
  2. 9
      cmd/notification.go
  3. 11
      pkg/bucket/bandwidth/monitor.go
  4. 9
      pkg/bucket/bandwidth/monitor_test.go
  5. 4
      pkg/bucket/bandwidth/reader.go
  6. 37
      pkg/bucket/bandwidth/throttle.go

@ -241,7 +241,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b)
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
r.Close()

@ -1290,14 +1290,14 @@ func sendEvent(args eventArgs) {
func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...string) bandwidth.Report {
reports := make([]*bandwidth.Report, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, peer := range sys.peerClients {
if peer == nil {
for index := range sys.peerClients {
if sys.peerClients[index] == nil {
continue
}
index := index
g.Go(func() error {
var err error
reports[index], err = peer.MonitorBandwidth(ctx, buckets)
reports[index], err = sys.peerClients[index].MonitorBandwidth(ctx, buckets)
return err
}, index)
}
@ -1323,6 +1323,9 @@ func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...
d = consolidatedReport.BucketStats[bucket]
d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond
}
if d.LimitInBytesPerSecond < report.BucketStats[bucket].LimitInBytesPerSecond {
d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond
}
d.CurrentBandwidthInBytesPerSecond += report.BucketStats[bucket].CurrentBandwidthInBytesPerSecond
consolidatedReport.BucketStats[bucket] = d
}

@ -26,16 +26,16 @@ import (
)
// throttleBandwidth gets the throttle for bucket with the configured value
func (m *Monitor) throttleBandwidth(ctx context.Context, bucket string, bandwidthBytesPerSecond int64) *throttle {
func (m *Monitor) throttleBandwidth(ctx context.Context, bucket string, bandwidthBytesPerSecond int64, clusterBandwidth int64) *throttle {
m.lock.Lock()
defer m.lock.Unlock()
throttle, ok := m.bucketThrottle[bucket]
if !ok {
throttle = newThrottle(ctx, bandwidthBytesPerSecond)
throttle = newThrottle(ctx, bandwidthBytesPerSecond, clusterBandwidth)
m.bucketThrottle[bucket] = throttle
return throttle
}
throttle.SetBandwidth(bandwidthBytesPerSecond)
throttle.SetBandwidth(bandwidthBytesPerSecond, clusterBandwidth)
return throttle
}
@ -80,7 +80,7 @@ type Monitor struct {
func NewMonitor(doneCh <-chan struct{}) *Monitor {
m := &Monitor{
activeBuckets: make(map[string]*bucketMeasurement),
bucketMovingAvgTicker: time.NewTicker(1 * time.Second),
bucketMovingAvgTicker: time.NewTicker(2 * time.Second),
pubsub: pubsub.New(),
bucketThrottle: make(map[string]*throttle),
doneCh: doneCh,
@ -124,7 +124,7 @@ func (m *Monitor) getReport(selectBucket SelectionFunction) *bandwidth.Report {
continue
}
report.BucketStats[bucket] = bandwidth.Details{
LimitInBytesPerSecond: m.bucketThrottle[bucket].bytesPerSecond,
LimitInBytesPerSecond: m.bucketThrottle[bucket].clusterBandwidth,
CurrentBandwidthInBytesPerSecond: bucketMeasurement.getExpMovingAvgBytesPerSecond(),
}
}
@ -138,7 +138,6 @@ func (m *Monitor) process(doneCh <-chan struct{}) {
m.processAvg()
case <-doneCh:
return
default:
}
}
}

@ -35,8 +35,8 @@ func TestMonitor_GetThrottle(t *testing.T) {
bucket string
bpi int64
}
t1 := newThrottle(context.Background(), 100)
t2 := newThrottle(context.Background(), 200)
t1 := newThrottle(context.Background(), 100, 1024*1024)
t2 := newThrottle(context.Background(), 200, 1024*1024)
tests := []struct {
name string
fields fields
@ -68,7 +68,7 @@ func TestMonitor_GetThrottle(t *testing.T) {
m := &Monitor{
bucketThrottle: tt.fields.bucketThrottles,
}
if got := m.throttleBandwidth(context.Background(), tt.fields.bucket, tt.fields.bpi); got.bytesPerInterval != tt.want.bytesPerInterval {
if got := m.throttleBandwidth(context.Background(), tt.fields.bucket, tt.fields.bpi, 1024*1024); got.bytesPerInterval != tt.want.bytesPerInterval {
t.Errorf("throttleBandwidth() = %v, want %v", got, tt.want)
}
})
@ -135,7 +135,8 @@ func TestMonitor_GetReport(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
thr := throttle{
bytesPerSecond: 1024 * 1024,
bytesPerSecond: 1024 * 1024,
clusterBandwidth: 1024 * 1024,
}
m := &Monitor{
activeBuckets: tt.fields.activeBuckets,

@ -37,7 +37,7 @@ type MonitoredReader struct {
}
// NewMonitoredReader returns a io.ReadCloser that reports bandwidth details
func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.Reader, headerSize int, bandwidthBytesPerSecond int64) *MonitoredReader {
func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.Reader, headerSize int, bandwidthBytesPerSecond int64, clusterBandwidth int64) *MonitoredReader {
timeNow := time.Now()
b := monitor.track(bucket, object, timeNow)
return &MonitoredReader{
@ -47,7 +47,7 @@ func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, ob
reader: reader,
lastStop: timeNow,
headerSize: headerSize,
throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond),
throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond, clusterBandwidth),
monitor: monitor,
}
}

@ -29,27 +29,31 @@ const (
// throttle implements the throttling for bandwidth
type throttle struct {
generateTicker *time.Ticker // Ticker to generate available bandwidth
freeBytes int64 // unused bytes in the interval
bytesPerSecond int64 // max limit for bandwidth
bytesPerInterval int64 // bytes allocated for the interval
cond *sync.Cond // Used to notify waiting threads for bandwidth availability
generateTicker *time.Ticker // Ticker to generate available bandwidth
freeBytes int64 // unused bytes in the interval
bytesPerSecond int64 // max limit for bandwidth
bytesPerInterval int64 // bytes allocated for the interval
clusterBandwidth int64 // Cluster wide bandwidth needed for reporting
cond *sync.Cond // Used to notify waiting threads for bandwidth availability
goGenerate int64 // Flag to track if generate routine should be running. 0 == stopped
ctx context.Context // Context for generate
}
// newThrottle returns a new bandwidth throttle. Set bytesPerSecond to 0 for no limit
func newThrottle(ctx context.Context, bytesPerSecond int64) *throttle {
func newThrottle(ctx context.Context, bytesPerSecond int64, clusterBandwidth int64) *throttle {
if bytesPerSecond == 0 {
return &throttle{}
}
t := &throttle{
bytesPerSecond: bytesPerSecond,
generateTicker: time.NewTicker(throttleInternal),
bytesPerSecond: bytesPerSecond,
generateTicker: time.NewTicker(throttleInternal),
clusterBandwidth: clusterBandwidth,
ctx: ctx,
}
t.cond = sync.NewCond(&sync.Mutex{})
t.SetBandwidth(bytesPerSecond)
t.SetBandwidth(bytesPerSecond, clusterBandwidth)
t.freeBytes = t.bytesPerInterval
go t.generateBandwidth(ctx)
return t
}
@ -74,12 +78,17 @@ func (t *throttle) GetLimitForBytes(want int64) int64 {
}
}
atomic.AddInt64(&t.freeBytes, -send)
// Bandwidth was consumed, start generate routine to allocate bandwidth
if atomic.CompareAndSwapInt64(&t.goGenerate, 0, 1) {
go t.generateBandwidth(t.ctx)
}
return send
}
}
// SetBandwidth sets a new bandwidth limit in bytes per second.
func (t *throttle) SetBandwidth(bandwidthBiPS int64) {
func (t *throttle) SetBandwidth(bandwidthBiPS int64, clusterBandwidth int64) {
bpi := int64(throttleInternal) * bandwidthBiPS / int64(time.Second)
atomic.StoreInt64(&t.bytesPerInterval, bpi)
}
@ -94,6 +103,11 @@ func (t *throttle) generateBandwidth(ctx context.Context) {
for {
select {
case <-t.generateTicker.C:
if atomic.LoadInt64(&t.freeBytes) == atomic.LoadInt64(&t.bytesPerInterval) {
// No bandwidth consumption stop the routine.
atomic.StoreInt64(&t.goGenerate, 0)
return
}
// A new window is available
t.cond.L.Lock()
atomic.StoreInt64(&t.freeBytes, atomic.LoadInt64(&t.bytesPerInterval))
@ -101,7 +115,6 @@ func (t *throttle) generateBandwidth(ctx context.Context) {
t.cond.L.Unlock()
case <-ctx.Done():
return
default:
}
}
}

Loading…
Cancel
Save