diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index e929e3aec..3ab0b6cd6 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -41,13 +41,10 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger/message/log" "github.com/minio/minio/pkg/auth" - bandwidth "github.com/minio/minio/pkg/bandwidth" - bucketBandwidth "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/handlers" iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" - "github.com/minio/minio/pkg/sync/errgroup" trace "github.com/minio/minio/pkg/trace" ) @@ -1441,44 +1438,9 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http } setEventStreamHeaders(w) - peers := newPeerRestClients(globalEndpoints) bucketsRequestedString := r.URL.Query().Get("buckets") - var bucketsRequested []string - reports := make([]*bandwidth.Report, len(peers)) - selectBuckets := bucketBandwidth.SelectAllBuckets() - if bucketsRequestedString != "" { - bucketsRequested = strings.Split(bucketsRequestedString, ",") - selectBuckets = bucketBandwidth.SelectBuckets(bucketsRequested...) - } - reports = append(reports, globalBucketMonitor.GetReport(selectBuckets)) - g := errgroup.WithNErrs(len(peers)) - for index, peer := range peers { - if peer == nil { - continue - } - index := index - g.Go(func() error { - var err error - reports[index], err = peer.MonitorBandwidth(ctx, bucketsRequested) - return err - }, index) - } - consolidatedReport := bandwidth.Report{ - BucketStats: make(map[string]bandwidth.Details), - } - - for _, report := range reports { - for bucket := range report.BucketStats { - d, ok := consolidatedReport.BucketStats[bucket] - if !ok { - consolidatedReport.BucketStats[bucket] = bandwidth.Details{} - d = consolidatedReport.BucketStats[bucket] - d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond - } - d.CurrentBandwidthInBytesPerSecond += report.BucketStats[bucket].CurrentBandwidthInBytesPerSecond - consolidatedReport.BucketStats[bucket] = d - } - } + bucketsRequested := strings.Split(bucketsRequestedString, ",") + consolidatedReport := globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...) enc := json.NewEncoder(w) err := enc.Encode(consolidatedReport) if err != nil { diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 0d0c7d981..f53bfa252 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -235,8 +235,8 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa replicationStatus := replication.Complete // Setup bandwidth throttling - peerCount := len(globalEndpoints) - b := target.BandwidthLimit / int64(peerCount) + totalNodesCount := len(GetRemotePeers(globalEndpoints)) + 1 + b := target.BandwidthLimit / int64(totalNodesCount) var headerSize int for k, v := range putOpts.Header() { headerSize += len(k) + len(v) diff --git a/cmd/notification.go b/cmd/notification.go index f5cfe44d9..caf7c2d00 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -33,6 +33,8 @@ import ( "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" + bandwidth "github.com/minio/minio/pkg/bandwidth" + bucketBandwidth "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/madmin" @@ -1283,3 +1285,47 @@ func sendEvent(args eventArgs) { globalNotificationSys.Send(args) } + +// GetBandwidthReports - gets the bandwidth report from all nodes including self. +func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...string) bandwidth.Report { + reports := make([]*bandwidth.Report, len(sys.peerClients)) + g := errgroup.WithNErrs(len(sys.peerClients)) + for index, peer := range sys.peerClients { + if peer == nil { + continue + } + index := index + g.Go(func() error { + var err error + reports[index], err = peer.MonitorBandwidth(ctx, buckets) + return err + }, index) + } + + for index, err := range g.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", + sys.peerClients[index].host.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String()) + } + reports = append(reports, globalBucketMonitor.GetReport(bucketBandwidth.SelectBuckets(buckets...))) + consolidatedReport := bandwidth.Report{ + BucketStats: make(map[string]bandwidth.Details), + } + for _, report := range reports { + if report == nil || report.BucketStats == nil { + continue + } + for bucket := range report.BucketStats { + d, ok := consolidatedReport.BucketStats[bucket] + if !ok { + consolidatedReport.BucketStats[bucket] = bandwidth.Details{} + d = consolidatedReport.BucketStats[bucket] + d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond + } + d.CurrentBandwidthInBytesPerSecond += report.BucketStats[bucket].CurrentBandwidthInBytesPerSecond + consolidatedReport.BucketStats[bucket] = d + } + } + return consolidatedReport +} diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 4d8e73499..2056eda47 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -891,7 +891,6 @@ func newPeerRESTClient(peer *xnet.Host) *peerRESTClient { func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []string) (*bandwidth.Report, error) { values := make(url.Values) values.Set(peerRESTBuckets, strings.Join(buckets, ",")) - respBody, err := client.callWithContext(ctx, peerRESTMethodGetBandwidth, values, nil, -1) if err != nil { return nil, err diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 6fe971395..96bb11698 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -30,7 +30,6 @@ import ( "github.com/gorilla/mux" "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/bandwidth" b "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/madmin" @@ -1062,12 +1061,8 @@ func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) { doneCh := make(chan struct{}) defer close(doneCh) - var report *bandwidth.Report - selectBuckets := b.SelectAllBuckets() - if bucketsString != "" { - selectBuckets = b.SelectBuckets(strings.Split(bucketsString, ",")...) - } - report = globalBucketMonitor.GetReport(selectBuckets) + selectBuckets := b.SelectBuckets(strings.Split(bucketsString, ",")...) + report := globalBucketMonitor.GetReport(selectBuckets) enc := gob.NewEncoder(w) if err := enc.Encode(report); err != nil { diff --git a/pkg/bucket/bandwidth/monitor.go b/pkg/bucket/bandwidth/monitor.go index c86ee0106..027542ae8 100644 --- a/pkg/bucket/bandwidth/monitor.go +++ b/pkg/bucket/bandwidth/monitor.go @@ -91,18 +91,16 @@ func NewMonitor(doneCh <-chan struct{}) *Monitor { // SelectionFunction for buckets type SelectionFunction func(bucket string) bool -// SelectAllBuckets will select all buckets -func SelectAllBuckets() SelectionFunction { - return func(bucket string) bool { - return true - } -} - // SelectBuckets will select all the buckets passed in. func SelectBuckets(buckets ...string) SelectionFunction { + if len(buckets) == 0 { + return func(bucket string) bool { + return true + } + } return func(bucket string) bool { for _, b := range buckets { - if b != "" && b == bucket { + if b == "" || b == bucket { return true } } @@ -160,7 +158,7 @@ func (m *Monitor) processAvg() { for _, bucketMeasurement := range m.activeBuckets { bucketMeasurement.updateExponentialMovingAverage(time.Now()) } - m.pubsub.Publish(m.getReport(SelectAllBuckets())) + m.pubsub.Publish(m.getReport(SelectBuckets())) } // track returns the measurement object for bucket and object diff --git a/pkg/bucket/bandwidth/monitor_test.go b/pkg/bucket/bandwidth/monitor_test.go index 53a08753f..c8c2116c0 100644 --- a/pkg/bucket/bandwidth/monitor_test.go +++ b/pkg/bucket/bandwidth/monitor_test.go @@ -142,13 +142,13 @@ func TestMonitor_GetReport(t *testing.T) { bucketThrottle: map[string]*throttle{"bucket": &thr}, } m.activeBuckets["bucket"].updateExponentialMovingAverage(tt.fields.endTime) - got := m.GetReport(SelectAllBuckets()) + got := m.GetReport(SelectBuckets()) if !reflect.DeepEqual(got, tt.want) { t.Errorf("GetReport() = %v, want %v", got, tt.want) } m.activeBuckets["bucket"].incrementBytes(tt.fields.update2) m.activeBuckets["bucket"].updateExponentialMovingAverage(tt.fields.endTime2) - got = m.GetReport(SelectAllBuckets()) + got = m.GetReport(SelectBuckets()) if !reflect.DeepEqual(got, tt.want2) { t.Errorf("GetReport() = %v, want %v", got, tt.want2) }