Add basic bandwidth monitoring for replication. (#10501)

This change tracks bandwidth for a bucket and object

- [x] Add Admin API
- [x] Add Peer API
- [x] Add BW throttling
- [x] Admin APIs to set replication limit
- [x] Admin APIs for fetch bandwidth
master
Ritesh H Shukla 4 years ago committed by GitHub
parent 071c004f8b
commit c2f16ee846
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 64
      cmd/admin-handlers.go
  2. 2
      cmd/admin-router.go
  3. 14
      cmd/bucket-metadata-sys.go
  4. 28
      cmd/bucket-replication.go
  5. 14
      cmd/bucket-targets.go
  6. 2
      cmd/globals.go
  7. 19
      cmd/peer-rest-client.go
  8. 2
      cmd/peer-rest-common.go
  9. 33
      cmd/peer-rest-server.go
  10. 4
      cmd/server-main.go
  11. 28
      pkg/bandwidth/bandwidth.go
  12. 87
      pkg/bucket/bandwidth/measurement.go
  13. 175
      pkg/bucket/bandwidth/monitor.go
  14. 157
      pkg/bucket/bandwidth/monitor_test.go
  15. 86
      pkg/bucket/bandwidth/reader.go
  16. 107
      pkg/bucket/bandwidth/throttle.go
  17. 4
      pkg/iam/policy/admin-action.go
  18. 2
      pkg/iam/policy/constants.go
  19. 61
      pkg/madmin/bandwidth.go
  20. 50
      pkg/madmin/examples/bucket-bandwidth.go
  21. 2
      pkg/madmin/examples/bucket-target.go
  22. 23
      pkg/madmin/remote-target-commands.go

@ -35,17 +35,19 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/logger/message/log"
"github.com/minio/minio/pkg/auth"
bandwidth "github.com/minio/minio/pkg/bandwidth"
bucketBandwidth "github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/handlers"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/sync/errgroup"
trace "github.com/minio/minio/pkg/trace"
)
@ -1425,6 +1427,66 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request)
}
// BandwidthMonitorHandler - GET /minio/admin/v3/bandwidth
// ----------
// Get bandwidth consumption information
func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "BandwidthMonitor")
// Validate request signature.
_, adminAPIErr := checkAdminRequestAuthType(ctx, r, iampolicy.BandwidthMonitorAction, "")
if adminAPIErr != ErrNone {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(adminAPIErr), r.URL)
return
}
setEventStreamHeaders(w)
peers := newPeerRestClients(globalEndpoints)
bucketsRequestedString := r.URL.Query().Get("buckets")
var bucketsRequested []string
reports := make([]*bandwidth.Report, len(peers))
selectBuckets := bucketBandwidth.SelectAllBuckets()
if bucketsRequestedString != "" {
bucketsRequested = strings.Split(bucketsRequestedString, ",")
selectBuckets = bucketBandwidth.SelectBuckets(bucketsRequested...)
}
reports = append(reports, globalBucketMonitor.GetReport(selectBuckets))
g := errgroup.WithNErrs(len(peers))
for index, peer := range peers {
if peer == nil {
continue
}
index := index
g.Go(func() error {
var err error
reports[index], err = peer.MonitorBandwidth(ctx, bucketsRequested)
return err
}, index)
}
consolidatedReport := bandwidth.Report{
BucketStats: make(map[string]bandwidth.Details),
}
for _, report := range reports {
for bucket := range report.BucketStats {
d, ok := consolidatedReport.BucketStats[bucket]
if !ok {
consolidatedReport.BucketStats[bucket] = bandwidth.Details{}
d = consolidatedReport.BucketStats[bucket]
d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond
}
d.CurrentBandwidthInBytesPerSecond += report.BucketStats[bucket].CurrentBandwidthInBytesPerSecond
consolidatedReport.BucketStats[bucket] = d
}
}
enc := json.NewEncoder(w)
err := enc.Encode(consolidatedReport)
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
}
w.(http.Flusher).Flush()
}
// ServerInfoHandler - GET /minio/admin/v3/info
// ----------
// Get server information

@ -214,6 +214,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
// -- OBD API --
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/obdinfo").
HandlerFunc(httpTraceHdrs(adminAPI.OBDInfoHandler))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/bandwidth").
HandlerFunc(httpTraceHdrs(adminAPI.BandwidthMonitorHandler))
}
}

@ -373,6 +373,20 @@ func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.Buc
return meta.bucketTargetConfig, nil
}
// GetBucketTarget returns the target for the bucket and arn.
func (sys *BucketMetadataSys) GetBucketTarget(bucket string, arn string) (madmin.BucketTarget, error) {
targets, err := sys.GetBucketTargetsConfig(bucket)
if err != nil {
return madmin.BucketTarget{}, err
}
for _, t := range targets.Targets {
if t.Arn == arn {
return t, nil
}
}
return madmin.BucketTarget{}, errConfigNotFound
}
// GetConfig returns a specific configuration from the bucket metadata.
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetConfig(bucket string) (BucketMetadata, error) {

@ -30,6 +30,7 @@ import (
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/event"
iampolicy "github.com/minio/minio/pkg/iam/policy"
@ -119,7 +120,7 @@ func mustReplicater(ctx context.Context, r *http.Request, bucket, object string,
return cfg.Replicate(opts)
}
func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
meta := make(map[string]string)
for k, v := range objInfo.UserDefined {
if k == xhttp.AmzBucketReplicationStatus {
@ -168,6 +169,7 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp
if crypto.S3.IsEncrypted(objInfo.UserDefined) {
putOpts.ServerSideEncryption = encrypt.NewSSE()
}
return
}
@ -184,16 +186,15 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
if tgt == nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, cfg.RoleArn))
return
}
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
})
if err != nil {
return
}
objInfo = gr.ObjInfo
size, err := objInfo.GetActualSize()
if err != nil {
@ -224,11 +225,26 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
return
}
}
putOpts := putReplicationOpts(dest, objInfo)
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err))
return
}
putOpts := putReplicationOpts(ctx, dest, objInfo)
replicationStatus := replication.Complete
_, err = tgt.PutObject(ctx, dest.Bucket, object, gr, size, "", "", putOpts)
gr.Close()
// Setup bandwidth throttling
peerCount := len(globalEndpoints)
b := target.BandwidthLimit / int64(peerCount)
var headerSize int
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b)
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
r.Close()
if err != nil {
replicationStatus = replication.Failed
}

@ -207,14 +207,14 @@ func (sys *BucketTargetSys) Init(ctx context.Context, buckets []BucketInfo, objA
return nil
}
// UpdateTarget updates target to reflect metadata updates
func (sys *BucketTargetSys) UpdateTarget(bucket string, cfg *madmin.BucketTargets) {
// UpdateAllTargets updates target to reflect metadata updates
func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketTargets) {
if sys == nil {
return
}
sys.Lock()
defer sys.Unlock()
if cfg == nil || cfg.Empty() {
if tgts == nil || tgts.Empty() {
// remove target and arn association
if tgts, ok := sys.targetsMap[bucket]; ok {
for _, t := range tgts {
@ -225,10 +225,10 @@ func (sys *BucketTargetSys) UpdateTarget(bucket string, cfg *madmin.BucketTarget
return
}
if len(cfg.Targets) > 0 {
sys.targetsMap[bucket] = cfg.Targets
if len(tgts.Targets) > 0 {
sys.targetsMap[bucket] = tgts.Targets
}
for _, tgt := range cfg.Targets {
for _, tgt := range tgts.Targets {
tgtClient, err := sys.getRemoteTargetClient(&tgt)
if err != nil {
continue
@ -238,7 +238,7 @@ func (sys *BucketTargetSys) UpdateTarget(bucket string, cfg *madmin.BucketTarget
sys.clientsCache[tgtClient.EndpointURL().String()] = tgtClient
}
}
sys.targetsMap[bucket] = cfg.Targets
sys.targetsMap[bucket] = tgts.Targets
}
// create minio-go clients for buckets having remote targets

@ -22,6 +22,7 @@ import (
"time"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/pkg/bucket/bandwidth"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/config/cache"
@ -149,6 +150,7 @@ var (
globalEnvTargetList *event.TargetList
globalBucketMetadataSys *BucketMetadataSys
globalBucketMonitor *bandwidth.Monitor
globalPolicySys *PolicySys
globalIAMSys *IAMSys

@ -26,6 +26,7 @@ import (
"math"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -35,6 +36,7 @@ import (
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/bandwidth"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
@ -884,3 +886,20 @@ func newPeerRESTClient(peer *xnet.Host) *peerRESTClient {
return &peerRESTClient{host: peer, restClient: restClient}
}
// MonitorBandwidth - send http trace request to peer nodes
func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []string) (*bandwidth.Report, error) {
values := make(url.Values)
values.Set(peerRESTBuckets, strings.Join(buckets, ","))
respBody, err := client.callWithContext(ctx, peerRESTMethodGetBandwidth, values, nil, -1)
if err != nil {
return nil, err
}
defer http.DrainBody(respBody)
dec := gob.NewDecoder(respBody)
var bandwidthReport bandwidth.Report
err = dec.Decode(&bandwidthReport)
return &bandwidthReport, err
}

@ -57,10 +57,12 @@ const (
peerRESTMethodListen = "/listen"
peerRESTMethodLog = "/log"
peerRESTMethodGetLocalDiskIDs = "/getlocaldiskids"
peerRESTMethodGetBandwidth = "/bandwidth"
)
const (
peerRESTBucket = "bucket"
peerRESTBuckets = "buckets"
peerRESTUser = "user"
peerRESTGroup = "group"
peerRESTUserTemp = "user-temp"

@ -30,6 +30,8 @@ import (
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bandwidth"
b "github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
trace "github.com/minio/minio/pkg/trace"
@ -628,7 +630,7 @@ func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *htt
}
if meta.bucketTargetConfig != nil {
globalBucketTargetSys.UpdateTarget(bucketName, meta.bucketTargetConfig)
globalBucketTargetSys.UpdateAllTargets(bucketName, meta.bucketTargetConfig)
}
}
@ -1047,6 +1049,34 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
return true
}
// GetBandwidth gets the bandwidth for the buckets requested.
func (s *peerRESTServer) GetBandwidth(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
bucketsString := r.URL.Query().Get("buckets")
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
doneCh := make(chan struct{})
defer close(doneCh)
var report *bandwidth.Report
selectBuckets := b.SelectAllBuckets()
if bucketsString != "" {
selectBuckets = b.SelectBuckets(strings.Split(bucketsString, ",")...)
}
report = globalBucketMonitor.GetReport(selectBuckets)
enc := gob.NewEncoder(w)
if err := enc.Encode(report); err != nil {
s.writeErrorResponse(w, errors.New("Encoding report failed: "+err.Error()))
return
}
w.(http.Flusher).Flush()
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
@ -1085,4 +1115,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocalDiskIDs).HandlerFunc(httpTraceHdrs(server.GetLocalDiskIDs))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBandwidth).HandlerFunc(httpTraceHdrs(server.GetBandwidth))
}

@ -32,6 +32,7 @@ import (
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/certs"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/env"
@ -158,6 +159,9 @@ func newAllSubsystems() {
// Create new bucket metadata system.
globalBucketMetadataSys = NewBucketMetadataSys()
// Create the bucket bandwidth monitor
globalBucketMonitor = bandwidth.NewMonitor(GlobalServiceDoneCh)
// Create a new config system.
globalConfigSys = NewConfigSys()

@ -0,0 +1,28 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bandwidth
// Details for the measured bandwidth
type Details struct {
LimitInBytesPerSecond int64 `json:"limitInBits"`
CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth"`
}
// Report captures the details for all buckets.
type Report struct {
BucketStats map[string]Details `json:"bucketStats,omitempty"`
}

@ -0,0 +1,87 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bandwidth
import (
"sync"
"sync/atomic"
"time"
)
const (
// betaBucket is the weight used to calculate exponential moving average
betaBucket = 0.1 // Number of averages considered = 1/(1-betaObject)
)
// bucketMeasurement captures the bandwidth details for one bucket
type bucketMeasurement struct {
lock sync.Mutex
bytesSinceLastWindow uint64 // Total bytes since last window was processed
startTime time.Time // Start time for window
expMovingAvg float64 // Previously calculate sliding window
}
// newBucketMeasurement creates a new instance of the measurement with the initial start time.
func newBucketMeasurement(initTime time.Time) *bucketMeasurement {
return &bucketMeasurement{
startTime: initTime,
}
}
// incrementBytes add bytes reported for a bucket.
func (m *bucketMeasurement) incrementBytes(bytes uint64) {
atomic.AddUint64(&m.bytesSinceLastWindow, bytes)
}
// updateExponentialMovingAverage processes the measurements captured so far.
func (m *bucketMeasurement) updateExponentialMovingAverage(endTime time.Time) {
// Calculate aggregate avg bandwidth and exp window avg
m.lock.Lock()
defer func() {
m.startTime = endTime
m.lock.Unlock()
}()
if endTime.Before(m.startTime) {
return
}
duration := endTime.Sub(m.startTime)
bytesSinceLastWindow := atomic.SwapUint64(&m.bytesSinceLastWindow, 0)
if m.expMovingAvg == 0 {
// Should address initial calculation and should be fine for resuming from 0
m.expMovingAvg = float64(bytesSinceLastWindow) / duration.Seconds()
return
}
increment := float64(bytesSinceLastWindow) / duration.Seconds()
m.expMovingAvg = exponentialMovingAverage(betaBucket, m.expMovingAvg, increment)
}
// exponentialMovingAverage calculates the exponential moving average
func exponentialMovingAverage(beta, previousAvg, incrementAvg float64) float64 {
return (1-beta)*incrementAvg + beta*previousAvg
}
// getExpMovingAvgBytesPerSecond returns the exponential moving average for the bucket in bytes
func (m *bucketMeasurement) getExpMovingAvgBytesPerSecond() float64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.expMovingAvg
}

@ -0,0 +1,175 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bandwidth
import (
"context"
"sync"
"time"
"github.com/minio/minio/pkg/bandwidth"
"github.com/minio/minio/pkg/pubsub"
)
// throttleBandwidth gets the throttle for bucket with the configured value
func (m *Monitor) throttleBandwidth(ctx context.Context, bucket string, bandwidthBytesPerSecond int64) *throttle {
m.lock.Lock()
defer m.lock.Unlock()
throttle, ok := m.bucketThrottle[bucket]
if !ok {
throttle = newThrottle(ctx, bandwidthBytesPerSecond)
m.bucketThrottle[bucket] = throttle
return throttle
}
throttle.SetBandwidth(bandwidthBytesPerSecond)
return throttle
}
// SubscribeToBuckets subscribes to buckets. Empty array for monitoring all buckets.
func (m *Monitor) SubscribeToBuckets(subCh chan interface{}, doneCh <-chan struct{}, buckets []string) {
m.pubsub.Subscribe(subCh, doneCh, func(f interface{}) bool {
if buckets != nil || len(buckets) == 0 {
return true
}
report, ok := f.(*bandwidth.Report)
if !ok {
return false
}
for _, b := range buckets {
_, ok := report.BucketStats[b]
if ok {
return true
}
}
return false
})
}
// Monitor implements the monitoring for bandwidth measurements.
type Monitor struct {
lock sync.Mutex // lock for all updates
activeBuckets map[string]*bucketMeasurement // Buckets with objects in flight
bucketMovingAvgTicker *time.Ticker // Ticker for calculating moving averages
pubsub *pubsub.PubSub // PubSub for reporting bandwidths.
bucketThrottle map[string]*throttle
startProcessing sync.Once
doneCh <-chan struct{}
}
// NewMonitor returns a monitor with defaults.
func NewMonitor(doneCh <-chan struct{}) *Monitor {
m := &Monitor{
activeBuckets: make(map[string]*bucketMeasurement),
bucketMovingAvgTicker: time.NewTicker(1 * time.Second),
pubsub: pubsub.New(),
bucketThrottle: make(map[string]*throttle),
doneCh: doneCh,
}
return m
}
// SelectionFunction for buckets
type SelectionFunction func(bucket string) bool
// SelectAllBuckets will select all buckets
func SelectAllBuckets() SelectionFunction {
return func(bucket string) bool {
return true
}
}
// SelectBuckets will select all the buckets passed in.
func SelectBuckets(buckets ...string) SelectionFunction {
return func(bucket string) bool {
for _, b := range buckets {
if b != "" && b == bucket {
return true
}
}
return false
}
}
// GetReport gets the report for all bucket bandwidth details.
func (m *Monitor) GetReport(selectBucket SelectionFunction) *bandwidth.Report {
m.lock.Lock()
defer m.lock.Unlock()
return m.getReport(selectBucket)
}
func (m *Monitor) getReport(selectBucket SelectionFunction) *bandwidth.Report {
report := &bandwidth.Report{
BucketStats: make(map[string]bandwidth.Details),
}
for bucket, bucketMeasurement := range m.activeBuckets {
if !selectBucket(bucket) {
continue
}
report.BucketStats[bucket] = bandwidth.Details{
LimitInBytesPerSecond: m.bucketThrottle[bucket].bytesPerSecond,
CurrentBandwidthInBytesPerSecond: bucketMeasurement.getExpMovingAvgBytesPerSecond(),
}
}
return report
}
func (m *Monitor) process(doneCh <-chan struct{}) {
for {
select {
case <-m.bucketMovingAvgTicker.C:
m.processAvg()
case <-doneCh:
return
default:
}
}
}
func (m *Monitor) getBucketMeasurement(bucket string, initTime time.Time) *bucketMeasurement {
bucketTracker, ok := m.activeBuckets[bucket]
if !ok {
bucketTracker = newBucketMeasurement(initTime)
m.activeBuckets[bucket] = bucketTracker
}
return bucketTracker
}
func (m *Monitor) processAvg() {
m.lock.Lock()
defer m.lock.Unlock()
for _, bucketMeasurement := range m.activeBuckets {
bucketMeasurement.updateExponentialMovingAverage(time.Now())
}
m.pubsub.Publish(m.getReport(SelectAllBuckets()))
}
// track returns the measurement object for bucket and object
func (m *Monitor) track(bucket string, object string, timeNow time.Time) *bucketMeasurement {
m.lock.Lock()
defer m.lock.Unlock()
m.startProcessing.Do(func() {
go m.process(m.doneCh)
})
b := m.getBucketMeasurement(bucket, timeNow)
return b
}

@ -0,0 +1,157 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bandwidth
import (
"context"
"reflect"
"testing"
"time"
"github.com/minio/minio/pkg/bandwidth"
)
const (
oneMiB uint64 = 1024 * 1024
)
func TestMonitor_GetThrottle(t *testing.T) {
type fields struct {
bucketThrottles map[string]*throttle
bucket string
bpi int64
}
t1 := newThrottle(context.Background(), 100)
t2 := newThrottle(context.Background(), 200)
tests := []struct {
name string
fields fields
want *throttle
}{
{
name: "Existing",
fields: fields{
bucketThrottles: map[string]*throttle{"bucket": t1},
bucket: "bucket",
bpi: 100,
},
want: t1,
},
{
name: "new",
fields: fields{
bucketThrottles: map[string]*throttle{"bucket": t1},
bucket: "bucket2",
bpi: 200,
},
want: t2,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
m := &Monitor{
bucketThrottle: tt.fields.bucketThrottles,
}
if got := m.throttleBandwidth(context.Background(), tt.fields.bucket, tt.fields.bpi); got.bytesPerInterval != tt.want.bytesPerInterval {
t.Errorf("throttleBandwidth() = %v, want %v", got, tt.want)
}
})
}
}
func TestMonitor_GetReport(t *testing.T) {
type fields struct {
activeBuckets map[string]*bucketMeasurement
endTime time.Time
update2 uint64
endTime2 time.Time
}
start := time.Now()
m0 := newBucketMeasurement(start)
m0.incrementBytes(0)
m1MiBPS := newBucketMeasurement(start)
m1MiBPS.incrementBytes(oneMiB)
tests := []struct {
name string
fields fields
want *bandwidth.Report
want2 *bandwidth.Report
}{
{
name: "ZeroToOne",
fields: fields{
activeBuckets: map[string]*bucketMeasurement{
"bucket": m0,
},
endTime: start.Add(1 * time.Second),
update2: oneMiB,
endTime2: start.Add(2 * time.Second),
},
want: &bandwidth.Report{
BucketStats: map[string]bandwidth.Details{"bucket": {LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: 0}},
},
want2: &bandwidth.Report{
BucketStats: map[string]bandwidth.Details{"bucket": {LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: (1024 * 1024) / start.Add(2*time.Second).Sub(start.Add(1*time.Second)).Seconds()}},
},
},
{
name: "OneToTwo",
fields: fields{
activeBuckets: map[string]*bucketMeasurement{
"bucket": m1MiBPS,
},
endTime: start.Add(1 * time.Second),
update2: 2 * oneMiB,
endTime2: start.Add(2 * time.Second),
},
want: &bandwidth.Report{
BucketStats: map[string]bandwidth.Details{"bucket": {LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: float64(oneMiB)}},
},
want2: &bandwidth.Report{
BucketStats: map[string]bandwidth.Details{"bucket": {
LimitInBytesPerSecond: 1024 * 1024,
CurrentBandwidthInBytesPerSecond: exponentialMovingAverage(betaBucket, float64(oneMiB), 2*float64(oneMiB))}},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
thr := throttle{
bytesPerSecond: 1024 * 1024,
}
m := &Monitor{
activeBuckets: tt.fields.activeBuckets,
bucketThrottle: map[string]*throttle{"bucket": &thr},
}
m.activeBuckets["bucket"].updateExponentialMovingAverage(tt.fields.endTime)
got := m.GetReport(SelectAllBuckets())
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetReport() = %v, want %v", got, tt.want)
}
m.activeBuckets["bucket"].incrementBytes(tt.fields.update2)
m.activeBuckets["bucket"].updateExponentialMovingAverage(tt.fields.endTime2)
got = m.GetReport(SelectAllBuckets())
if !reflect.DeepEqual(got, tt.want2) {
t.Errorf("GetReport() = %v, want %v", got, tt.want2)
}
})
}
}

@ -0,0 +1,86 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package bandwidth
import (
"context"
"io"
"time"
)
// MonitoredReader monitors the bandwidth
type MonitoredReader struct {
bucket string // Token to track bucket
bucketMeasurement *bucketMeasurement // bucket measurement object
object string // Token to track object
reader io.Reader // Reader to wrap
lastStop time.Time // Last timestamp for a measurement
headerSize int // Size of the header not captured by reader
throttle *throttle // throttle the rate at which replication occur
monitor *Monitor // Monitor reference
closed bool // Reader is closed
}
// NewMonitoredReader returns a io.ReadCloser that reports bandwidth details
func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.Reader, headerSize int, bandwidthBytesPerSecond int64) *MonitoredReader {
timeNow := time.Now()
b := monitor.track(bucket, object, timeNow)
return &MonitoredReader{
bucket: bucket,
object: object,
bucketMeasurement: b,
reader: reader,
lastStop: timeNow,
headerSize: headerSize,
throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond),
monitor: monitor,
}
}
// Read wraps the read reader
func (m *MonitoredReader) Read(p []byte) (n int, err error) {
if m.closed {
err = io.ErrClosedPipe
return
}
p = p[:m.throttle.GetLimitForBytes(int64(len(p)))]
n, err = m.reader.Read(p)
stop := time.Now()
update := uint64(n + m.headerSize)
m.bucketMeasurement.incrementBytes(update)
m.lastStop = stop
unused := len(p) - (n + m.headerSize)
m.headerSize = 0 // Set to 0 post first read
if unused > 0 {
m.throttle.ReleaseUnusedBandwidth(int64(unused))
}
return
}
// Close stops tracking the io
func (m *MonitoredReader) Close() error {
rc, ok := m.reader.(io.ReadCloser)
m.closed = true
if ok {
return rc.Close()
}
return nil
}

@ -0,0 +1,107 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bandwidth
import (
"context"
"sync"
"sync/atomic"
"time"
)
const (
throttleInternal = 250 * time.Millisecond
)
// throttle implements the throttling for bandwidth
type throttle struct {
generateTicker *time.Ticker // Ticker to generate available bandwidth
freeBytes int64 // unused bytes in the interval
bytesPerSecond int64 // max limit for bandwidth
bytesPerInterval int64 // bytes allocated for the interval
cond *sync.Cond // Used to notify waiting threads for bandwidth availability
}
// newThrottle returns a new bandwidth throttle. Set bytesPerSecond to 0 for no limit
func newThrottle(ctx context.Context, bytesPerSecond int64) *throttle {
if bytesPerSecond == 0 {
return &throttle{}
}
t := &throttle{
bytesPerSecond: bytesPerSecond,
generateTicker: time.NewTicker(throttleInternal),
}
t.cond = sync.NewCond(&sync.Mutex{})
t.SetBandwidth(bytesPerSecond)
t.freeBytes = t.bytesPerInterval
go t.generateBandwidth(ctx)
return t
}
// GetLimitForBytes gets the bytes that are possible to send within the limit
// if want is <= 0 or no bandwidth limit set, returns want.
// Otherwise a value > 0 will always be returned.
func (t *throttle) GetLimitForBytes(want int64) int64 {
if want <= 0 || atomic.LoadInt64(&t.bytesPerInterval) == 0 {
return want
}
t.cond.L.Lock()
defer t.cond.L.Unlock()
for {
var send int64
freeBytes := atomic.LoadInt64(&t.freeBytes)
send = want
if freeBytes < want {
send = freeBytes
if send <= 0 {
t.cond.Wait()
continue
}
}
atomic.AddInt64(&t.freeBytes, -send)
return send
}
}
// SetBandwidth sets a new bandwidth limit in bytes per second.
func (t *throttle) SetBandwidth(bandwidthBiPS int64) {
bpi := int64(throttleInternal) * bandwidthBiPS / int64(time.Second)
atomic.StoreInt64(&t.bytesPerInterval, bpi)
}
// ReleaseUnusedBandwidth releases bandwidth that was allocated for a user
func (t *throttle) ReleaseUnusedBandwidth(bytes int64) {
atomic.AddInt64(&t.freeBytes, bytes)
}
// generateBandwidth periodically allocates new bandwidth to use
func (t *throttle) generateBandwidth(ctx context.Context) {
for {
select {
case <-t.generateTicker.C:
// A new window is available
t.cond.L.Lock()
atomic.StoreInt64(&t.freeBytes, atomic.LoadInt64(&t.bytesPerInterval))
t.cond.Broadcast()
t.cond.L.Unlock()
case <-ctx.Done():
return
default:
}
}
}

@ -49,6 +49,8 @@ const (
ServerInfoAdminAction = "admin:ServerInfo"
// OBDInfoAdminAction - allow obtaining cluster on-board diagnostics
OBDInfoAdminAction = "admin:OBDInfo"
// BandwidthMonitorAction - allow monitoring bandwidth usage
BandwidthMonitorAction = "admin:BandwidthMonitor"
// ServerUpdateAdminAction - allow MinIO binary update
ServerUpdateAdminAction = "admin:ServerUpdate"
@ -131,6 +133,7 @@ var supportedAdminActions = map[AdminAction]struct{}{
KMSKeyStatusAdminAction: {},
ServerInfoAdminAction: {},
OBDInfoAdminAction: {},
BandwidthMonitorAction: {},
ServerUpdateAdminAction: {},
ServiceRestartAdminAction: {},
ServiceStopAdminAction: {},
@ -173,6 +176,7 @@ var adminActionConditionKeyMap = map[Action]condition.KeySet{
ServerInfoAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
DataUsageInfoAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
OBDInfoAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
BandwidthMonitorAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
TopLocksAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
ProfilingAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),
TraceAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...),

@ -75,7 +75,7 @@ var AdminDiagnostics = Policy{
Actions: NewActionSet(ProfilingAdminAction,
TraceAdminAction, ConsoleLogAdminAction,
ServerInfoAdminAction, TopLocksAdminAction,
OBDInfoAdminAction),
OBDInfoAdminAction, BandwidthMonitorAction),
Resources: NewResourceSet(NewResource("*", "")),
},
},

@ -0,0 +1,61 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package madmin
import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"strings"
"github.com/minio/minio/pkg/bandwidth"
)
// GetBucketBandwidth - Get a snapshot of the bandwidth measurements for replication buckets. If no buckets
// generate replication traffic an empty map is returned.
func (adm *AdminClient) GetBucketBandwidth(ctx context.Context, buckets ...string) (bandwidth.Report, error) {
queryValues := url.Values{}
if len(buckets) > 0 {
queryValues.Set("buckets", strings.Join(buckets, ","))
}
reqData := requestData{
relPath: adminAPIPrefix + "/bandwidth",
queryValues: queryValues,
}
resp, err := adm.executeMethod(ctx, http.MethodGet, reqData)
if err != nil {
closeResponse(resp)
return bandwidth.Report{}, err
}
if resp.StatusCode != http.StatusOK {
return bandwidth.Report{}, httpRespToErrorResponse(resp)
}
dec := json.NewDecoder(resp.Body)
for {
var report bandwidth.Report
err = dec.Decode(&report)
if err != nil && err != io.EOF {
return bandwidth.Report{}, err
}
return report, nil
}
}

@ -0,0 +1,50 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"context"
"fmt"
"log"
"github.com/minio/minio/pkg/madmin"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY and my-bucketname are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTP) otherwise.
// New returns an MinIO Admin client object.
madminClient, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
if err != nil {
log.Fatalln(err)
}
ctx := context.Background()
report, err := madminClient.GetBucketBandwidth(ctx)
if err != nil {
log.Fatalln(err)
return
}
fmt.Printf("Report: %+v\n", report)
report, err = madminClient.GetBucketBandwidth(ctx, "sourceBucket", "sourceBucket2")
if err != nil {
log.Fatalln(err)
return
}
fmt.Printf("Report: %+v\n", report)
}

@ -42,7 +42,7 @@ func main() {
if err != nil {
log.Fatalln(err)
}
target := madmin.BucketTarget{Endpoint: "site2:9000", Credentials: creds, TargetBucket: "destbucket", IsSSL: false, Type: madmin.ReplicationArn}
target := madmin.BucketTarget{Endpoint: "site2:9000", Credentials: creds, TargetBucket: "destbucket", IsSSL: false, Type: madmin.ReplicationArn, BandwidthLimit: 2 * 1024 * 1024}
// Set bucket target
if err := madmClnt.SetBucketTarget(ctx, "srcbucket", &target); err != nil {
log.Fatalln(err)

@ -84,17 +84,18 @@ func ParseARN(s string) (*ARN, error) {
// BucketTarget represents the target bucket and site association.
type BucketTarget struct {
SourceBucket string `json:"sourcebucket"`
Endpoint string `json:"endpoint"`
Credentials *auth.Credentials `json:"credentials"`
TargetBucket string `json:"targetbucket"`
Secure bool `json:"secure"`
Path string `json:"path,omitempty"`
API string `json:"api,omitempty"`
Arn string `json:"arn,omitempty"`
Type ServiceType `json:"type"`
Region string `json:"omitempty"`
Label string `json:"label,omitempty"`
SourceBucket string `json:"sourcebucket"`
Endpoint string `json:"endpoint"`
Credentials *auth.Credentials `json:"credentials"`
TargetBucket string `json:"targetbucket"`
Secure bool `json:"secure"`
Path string `json:"path,omitempty"`
API string `json:"api,omitempty"`
Arn string `json:"arn,omitempty"`
Type ServiceType `json:"type"`
Region string `json:"omitempty"`
Label string `json:"label,omitempty"`
BandwidthLimit int64 `json:"bandwidthlimit,omitempty"`
}
// Clone returns shallow clone of BucketTarget without secret key in credentials

Loading…
Cancel
Save