From 7e2b79984e74cbc35121c6dc6c09f4d9fb652cb8 Mon Sep 17 00:00:00 2001 From: Ritesh H Shukla Date: Thu, 3 Dec 2020 11:34:42 -0800 Subject: [PATCH] Stream bucket bandwidth measurements (#11014) --- cmd/admin-handlers.go | 41 +++++++++++++++++--- pkg/madmin/bandwidth.go | 51 +++++++++++++++++-------- pkg/madmin/examples/bucket-bandwidth.go | 19 +++++---- 3 files changed, 80 insertions(+), 31 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 2766848ff..64f45e966 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -41,6 +41,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger/message/log" "github.com/minio/minio/pkg/auth" + "github.com/minio/minio/pkg/bandwidth" "github.com/minio/minio/pkg/handlers" iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/madmin" @@ -1428,6 +1429,8 @@ func (a adminAPIHandlers) HealthInfoHandler(w http.ResponseWriter, r *http.Reque func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "BandwidthMonitor") + defer logger.AuditLog(w, r, "BandwidthMonitor", mustGetClaimsFromToken(r)) + // Validate request signature. _, adminAPIErr := checkAdminRequestAuthType(ctx, r, iampolicy.BandwidthMonitorAction, "") if adminAPIErr != ErrNone { @@ -1436,15 +1439,41 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http } setEventStreamHeaders(w) + reportCh := make(chan bandwidth.Report, 1) + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() bucketsRequestedString := r.URL.Query().Get("buckets") bucketsRequested := strings.Split(bucketsRequestedString, ",") - consolidatedReport := globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...) - enc := json.NewEncoder(w) - err := enc.Encode(consolidatedReport) - if err != nil { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + go func() { + for { + reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...) + select { + case <-ctx.Done(): + return + default: + time.Sleep(2 * time.Second) + } + } + }() + for { + select { + case report := <-reportCh: + enc := json.NewEncoder(w) + err := enc.Encode(report) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + return + } + w.(http.Flusher).Flush() + case <-keepAliveTicker.C: + if _, err := w.Write([]byte(" ")); err != nil { + return + } + w.(http.Flusher).Flush() + case <-ctx.Done(): + return + } } - w.(http.Flusher).Flush() } // ServerInfoHandler - GET /minio/admin/v3/info diff --git a/pkg/madmin/bandwidth.go b/pkg/madmin/bandwidth.go index 19221da62..3039cc94b 100644 --- a/pkg/madmin/bandwidth.go +++ b/pkg/madmin/bandwidth.go @@ -20,7 +20,6 @@ package madmin import ( "context" "encoding/json" - "io" "net/http" "net/url" "strings" @@ -28,10 +27,17 @@ import ( "github.com/minio/minio/pkg/bandwidth" ) -// GetBucketBandwidth - Get a snapshot of the bandwidth measurements for replication buckets. If no buckets -// generate replication traffic an empty map is returned. -func (adm *AdminClient) GetBucketBandwidth(ctx context.Context, buckets ...string) (bandwidth.Report, error) { +// Report includes the bandwidth report or the error encountered. +type Report struct { + Report bandwidth.Report + Err error +} + +// GetBucketBandwidth - Gets a channel reporting bandwidth measurements for replication buckets. If no buckets +// generate replication traffic an empty map is returned in the report until traffic is seen. +func (adm *AdminClient) GetBucketBandwidth(ctx context.Context, buckets ...string) <-chan Report { queryValues := url.Values{} + ch := make(chan Report) if len(buckets) > 0 { queryValues.Set("buckets", strings.Join(buckets, ",")) } @@ -40,22 +46,37 @@ func (adm *AdminClient) GetBucketBandwidth(ctx context.Context, buckets ...strin relPath: adminAPIPrefix + "/bandwidth", queryValues: queryValues, } - resp, err := adm.executeMethod(ctx, http.MethodGet, reqData) - defer closeResponse(resp) if err != nil { - return bandwidth.Report{}, err + defer closeResponse(resp) + ch <- Report{bandwidth.Report{}, err} + return ch } if resp.StatusCode != http.StatusOK { - return bandwidth.Report{}, httpRespToErrorResponse(resp) + ch <- Report{bandwidth.Report{}, httpRespToErrorResponse(resp)} + return ch } + dec := json.NewDecoder(resp.Body) - for { - var report bandwidth.Report - err = dec.Decode(&report) - if err != nil && err != io.EOF { - return bandwidth.Report{}, err + + go func(ctx context.Context, ch chan<- Report, resp *http.Response) { + defer func() { + closeResponse(resp) + close(ch) + }() + for { + var report bandwidth.Report + + if err = dec.Decode(&report); err != nil { + ch <- Report{bandwidth.Report{}, err} + return + } + select { + case <-ctx.Done(): + return + case ch <- Report{Report: report, Err: err}: + } } - return report, nil - } + }(ctx, ch, resp) + return ch } diff --git a/pkg/madmin/examples/bucket-bandwidth.go b/pkg/madmin/examples/bucket-bandwidth.go index a2e50a5f0..33fc76144 100644 --- a/pkg/madmin/examples/bucket-bandwidth.go +++ b/pkg/madmin/examples/bucket-bandwidth.go @@ -35,16 +35,15 @@ func main() { log.Fatalln(err) } ctx := context.Background() - report, err := madminClient.GetBucketBandwidth(ctx) - if err != nil { - log.Fatalln(err) - return + reportCh := madminClient.GetBucketBandwidth(ctx) + + for i := 0; i < 10; i++ { + report := <-reportCh + fmt.Printf("Report: %+v\n", report) } - fmt.Printf("Report: %+v\n", report) - report, err = madminClient.GetBucketBandwidth(ctx, "sourceBucket", "sourceBucket2") - if err != nil { - log.Fatalln(err) - return + reportCh = madminClient.GetBucketBandwidth(ctx, "sourceBucket", "sourceBucket2") + for i := 0; i < 10; i++ { + report := <-reportCh + fmt.Printf("Report: %+v\n", report) } - fmt.Printf("Report: %+v\n", report) }