From 9a547dcbfb1f403db8acca57b5ebdd5a79096da3 Mon Sep 17 00:00:00 2001 From: poornas Date: Thu, 30 Apr 2020 15:55:54 -0700 Subject: [PATCH] Add API's for managing bucket quota (#9379) This PR allows setting a "hard" or "fifo" quota restriction at the bucket level. Buckets that have reached the FIFO quota configured, will automatically be cleaned up in FIFO manner until bucket usage drops to configured quota. If a bucket is configured with a "hard" quota ceiling, all further writes are disallowed. --- cmd/admin-handlers-quota.go | 150 ++++++++++++++++ cmd/admin-handlers_test.go | 2 +- cmd/admin-router.go | 14 +- cmd/api-errors.go | 23 +++ cmd/daily-lifecycle-ops.go | 2 +- cmd/disk-cache-backend.go | 2 +- cmd/disk-cache-utils.go | 20 ++- cmd/gateway-main.go | 3 +- cmd/globals.go | 4 + cmd/notification.go | 38 ++++ cmd/object-api-errors.go | 14 ++ cmd/object-handlers.go | 19 +- cmd/object-lock.go | 6 +- cmd/peer-rest-client.go | 31 ++++ cmd/peer-rest-common.go | 2 + cmd/peer-rest-server.go | 51 ++++++ cmd/quota.go | 267 ++++++++++++++++++++++++++++ cmd/routers.go | 4 +- cmd/server-main.go | 8 +- cmd/test-utils_test.go | 2 +- pkg/iam/policy/admin-action.go | 12 ++ pkg/madmin/examples/bucket-quota.go | 56 ++++++ pkg/madmin/quota-commands.go | 140 +++++++++++++++ 23 files changed, 848 insertions(+), 22 deletions(-) create mode 100644 cmd/admin-handlers-quota.go create mode 100644 cmd/quota.go create mode 100644 pkg/madmin/examples/bucket-quota.go create mode 100644 pkg/madmin/quota-commands.go diff --git a/cmd/admin-handlers-quota.go b/cmd/admin-handlers-quota.go new file mode 100644 index 000000000..6e28a0fa0 --- /dev/null +++ b/cmd/admin-handlers-quota.go @@ -0,0 +1,150 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "io/ioutil" + "net/http" + "path" + + "github.com/gorilla/mux" + "github.com/minio/minio/cmd/config" + "github.com/minio/minio/pkg/env" + iampolicy "github.com/minio/minio/pkg/iam/policy" +) + +const ( + bucketQuotaConfigFile = "quota.json" +) + +// PutBucketQuotaConfigHandler - PUT Bucket quota configuration. +// ---------- +// Places a quota configuration on the specified bucket. The quota +// specified in the quota configuration will be applied by default +// to enforce total quota for the specified bucket. +func (a adminAPIHandlers) PutBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutBucketQuotaConfig") + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SetBucketQuotaAdminAction) + if objectAPI == nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + + // Turn off quota commands if data usage info is unavailable. + if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminBucketQuotaDisabled), r.URL) + return + } + + if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + defer r.Body.Close() + data, err := ioutil.ReadAll(r.Body) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL) + return + } + quotaCfg, err := parseBucketQuota(data) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + configFile := path.Join(bucketConfigPrefix, bucket, bucketQuotaConfigFile) + if err = saveConfig(ctx, objectAPI, configFile, data); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + if quotaCfg.Quota > 0 { + globalBucketQuotaSys.Set(bucket, quotaCfg) + globalNotificationSys.PutBucketQuotaConfig(ctx, bucket, quotaCfg) + + } else { + globalBucketQuotaSys.Remove(bucket) + globalNotificationSys.RemoveBucketQuotaConfig(ctx, bucket) + + } + + // Write success response. + writeSuccessResponseHeadersOnly(w) +} + +// GetBucketQuotaConfigHandler - gets bucket quota configuration +func (a adminAPIHandlers) GetBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "GetBucketQuotaConfig") + + objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketQuotaAdminAction) + if objectAPI == nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + configFile := path.Join(bucketConfigPrefix, bucket, bucketQuotaConfigFile) + configData, err := readConfig(ctx, objectAPI, configFile) + if err != nil { + if err != errConfigNotFound { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, BucketQuotaConfigNotFound{Bucket: bucket}), r.URL) + return + } + // Write success response. + writeSuccessResponseJSON(w, configData) +} + +// RemoveBucketQuotaConfigHandler - removes Bucket quota configuration. +// ---------- +// Removes quota configuration on the specified bucket. +func (a adminAPIHandlers) RemoveBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "RemoveBucketQuotaConfig") + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SetBucketQuotaAdminAction) + if objectAPI == nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + vars := mux.Vars(r) + bucket := vars["bucket"] + + if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + configFile := path.Join(bucketConfigPrefix, bucket, bucketQuotaConfigFile) + if err := deleteConfig(ctx, objectAPI, configFile); err != nil { + if err != errConfigNotFound { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, BucketQuotaConfigNotFound{Bucket: bucket}), r.URL) + return + } + globalBucketQuotaSys.Remove(bucket) + globalNotificationSys.RemoveBucketQuotaConfig(ctx, bucket) + // Write success response. + writeSuccessNoContent(w) +} diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 0104d1086..a0bdab362 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -86,7 +86,7 @@ func prepareAdminXLTestBed(ctx context.Context) (*adminXLTestBed, error) { // Setup admin mgmt REST API handlers. adminRouter := mux.NewRouter() - registerAdminRouter(adminRouter, true, true) + registerAdminRouter(adminRouter, true, true, false) return &adminXLTestBed{ xlDirs: xlDirs, diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 5119300b4..9b9af6682 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -35,7 +35,7 @@ const ( type adminAPIHandlers struct{} // registerAdminRouter - Add handler functions for each service REST API routes. -func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) { +func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps, enableBucketQuotaOps bool) { adminAPI := adminAPIHandlers{} // Admin router @@ -166,7 +166,19 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) // Set Group Status adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-group-status").HandlerFunc(httpTraceHdrs(adminAPI.SetGroupStatus)).Queries("group", "{group:.*}").Queries("status", "{status:.*}") + } + // Quota operations + if enableConfigOps && enableBucketQuotaOps { + // GetBucketQuotaConfig + adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-quota").HandlerFunc( + httpTraceHdrs(adminAPI.GetBucketQuotaConfigHandler)).Queries("bucket", "{bucket:.*}") + // PutBucketQuotaConfig + adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-quota").HandlerFunc( + httpTraceHdrs(adminAPI.PutBucketQuotaConfigHandler)).Queries("bucket", "{bucket:.*}") + // RemoveBucketQuotaConfig + adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-bucket-quota").HandlerFunc( + httpTraceHdrs(adminAPI.RemoveBucketQuotaConfigHandler)).Queries("bucket", "{bucket:.*}") } // -- Top APIs -- diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 204f6b90f..dcfef101e 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -235,6 +235,10 @@ const ( ErrAdminCredentialsMismatch ErrInsecureClientRequest ErrObjectTampered + // Bucket Quota error codes + ErrAdminBucketQuotaExceeded + ErrAdminNoSuchQuotaConfiguration + ErrAdminBucketQuotaDisabled ErrHealNotImplemented ErrHealNoSuchProcess @@ -1089,6 +1093,21 @@ var errorCodes = errorCodeMap{ Description: "Credentials in config mismatch with server environment variables", HTTPStatusCode: http.StatusServiceUnavailable, }, + ErrAdminBucketQuotaExceeded: { + Code: "XMinioAdminBucketQuotaExceeded", + Description: "Bucket quota exceeded", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrAdminNoSuchQuotaConfiguration: { + Code: "XMinioAdminNoSuchQuotaConfiguration", + Description: "The quota configuration does not exist", + HTTPStatusCode: http.StatusNotFound, + }, + ErrAdminBucketQuotaDisabled: { + Code: "XMinioAdminBucketQuotaDisabled", + Description: "Quota specified but disk usage crawl is disabled on MinIO server", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInsecureClientRequest: { Code: "XMinioInsecureClientRequest", Description: "Cannot respond to plain-text request from TLS-encrypted server", @@ -1783,6 +1802,10 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) { apiErr = ErrNoSuchLifecycleConfiguration case BucketSSEConfigNotFound: apiErr = ErrNoSuchBucketSSEConfig + case BucketQuotaConfigNotFound: + apiErr = ErrAdminNoSuchQuotaConfiguration + case BucketQuotaExceeded: + apiErr = ErrAdminBucketQuotaExceeded case *event.ErrInvalidEventName: apiErr = ErrEventNotification case *event.ErrInvalidARN: diff --git a/cmd/daily-lifecycle-ops.go b/cmd/daily-lifecycle-ops.go index aec4c7e56..5d44ae15a 100644 --- a/cmd/daily-lifecycle-ops.go +++ b/cmd/daily-lifecycle-ops.go @@ -89,7 +89,7 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error { } // Find the action that need to be executed if l.ComputeAction(obj.Name, obj.UserTags, obj.ModTime) == lifecycle.DeleteAction { - if bucketHasLockConfig && enforceRetentionForLifecycle(ctx, obj) { + if bucketHasLockConfig && enforceRetentionForDeletion(ctx, obj) { continue } objects = append(objects, obj.Name) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 475a36bd0..441972a3d 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -245,7 +245,7 @@ func (c *diskCache) purge(ctx context.Context) { // need to be cleaned up. expiry := UTCNow().Add(-cacheExpiryDays) // defaulting max hits count to 100 - scorer, err := newFileScorer(int64(toFree), time.Now().Unix(), 100) + scorer, err := newFileScorer(toFree, time.Now().Unix(), 100) if err != nil { logger.LogIf(ctx, err) return diff --git a/cmd/disk-cache-utils.go b/cmd/disk-cache-utils.go index 2f18bf630..b74f1526e 100644 --- a/cmd/disk-cache-utils.go +++ b/cmd/disk-cache-utils.go @@ -285,7 +285,7 @@ func isMetadataSame(m1, m2 map[string]string) bool { } type fileScorer struct { - saveBytes int64 + saveBytes uint64 now int64 maxHits int // 1/size for consistent score. @@ -294,21 +294,21 @@ type fileScorer struct { // queue is a linked list of files we want to delete. // The list is kept sorted according to score, highest at top, lowest at bottom. queue list.List - queuedBytes int64 + queuedBytes uint64 } type queuedFile struct { name string - size int64 + size uint64 score float64 } // newFileScorer allows to collect files to save a specific number of bytes. // Each file is assigned a score based on its age, size and number of hits. // A list of files is maintained -func newFileScorer(saveBytes int64, now int64, maxHits int) (*fileScorer, error) { - if saveBytes <= 0 { - return nil, errors.New("newFileScorer: saveBytes <= 0") +func newFileScorer(saveBytes uint64, now int64, maxHits int) (*fileScorer, error) { + if saveBytes == 0 { + return nil, errors.New("newFileScorer: saveBytes = 0") } if now < 0 { return nil, errors.New("newFileScorer: now < 0") @@ -325,7 +325,7 @@ func (f *fileScorer) addFile(name string, lastAccess time.Time, size int64, hits // Calculate how much we want to delete this object. file := queuedFile{ name: name, - size: size, + size: uint64(size), } score := float64(f.now - lastAccess.Unix()) // Size as fraction of how much we want to save, 0->1. @@ -353,7 +353,11 @@ func (f *fileScorer) addFile(name string, lastAccess time.Time, size int64, hits // Returns true if there still is a need to delete files (saveBytes >0), // false if no more bytes needs to be saved. func (f *fileScorer) adjustSaveBytes(n int64) bool { - f.saveBytes += n + if n < 0 { + f.saveBytes -= ^uint64(n - 1) + } else { + f.saveBytes += uint64(n) + } if f.saveBytes <= 0 { f.queue.Init() f.saveBytes = 0 diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index ad652a42a..cd16cd63c 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -176,10 +176,11 @@ func StartGateway(ctx *cli.Context, gw Gateway) { } enableIAMOps := globalEtcdClient != nil + enableBucketQuotaOps := env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn // Enable IAM admin APIs if etcd is enabled, if not just enable basic // operations such as profiling, server info etc. - registerAdminRouter(router, enableConfigOps, enableIAMOps) + registerAdminRouter(router, enableConfigOps, enableIAMOps, enableBucketQuotaOps) // Add healthcheck router registerHealthCheckRouter(router) diff --git a/cmd/globals.go b/cmd/globals.go index ded2a096e..fd97224d9 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -36,6 +36,7 @@ import ( xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/pkg/auth" objectlock "github.com/minio/minio/pkg/bucket/object/lock" + "github.com/minio/minio/pkg/certs" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/pubsub" @@ -217,6 +218,9 @@ var ( globalBucketObjectLockConfig = objectlock.NewBucketObjectLockConfig() + globalBucketQuotaSys = NewBucketQuotaSys() + globalBucketStorageCache bucketStorageCache + // Disk cache drives globalCacheConfig cache.Config diff --git a/cmd/notification.go b/cmd/notification.go index fd177bc07..2eb726357 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -567,6 +567,7 @@ func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName stri func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) { globalNotificationSys.RemoveNotification(bucketName) globalBucketObjectLockConfig.Remove(bucketName) + globalBucketQuotaSys.Remove(bucketName) globalPolicySys.Remove(bucketName) globalLifecycleSys.Remove(bucketName) @@ -619,6 +620,23 @@ func (sys *NotificationSys) RemoveBucketObjectLockConfig(ctx context.Context, bu }() } +// RemoveBucketQuotaConfig - calls RemoveBucketQuotaConfig RPC call on all peers. +func (sys *NotificationSys) RemoveBucketQuotaConfig(ctx context.Context, bucketName string) { + go func() { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(ctx, func() error { + return client.RemoveBucketQuotaConfig(bucketName) + }, idx, *client.host) + } + ng.Wait() + }() +} + // SetBucketLifecycle - calls SetBucketLifecycle on all peers. func (sys *NotificationSys) SetBucketLifecycle(ctx context.Context, bucketName string, bucketLifecycle *lifecycle.Lifecycle) { @@ -906,6 +924,26 @@ func (sys *NotificationSys) PutBucketObjectLockConfig(ctx context.Context, bucke } } +// PutBucketQuotaConfig - put bucket quota configuration to all peers. +func (sys *NotificationSys) PutBucketQuotaConfig(ctx context.Context, bucketName string, q madmin.BucketQuota) { + g := errgroup.WithNErrs(len(sys.peerClients)) + for index, client := range sys.peerClients { + if client == nil { + continue + } + index := index + g.Go(func() error { + return sys.peerClients[index].PutBucketQuotaConfig(bucketName, q) + }, index) + } + for i, err := range g.Wait() { + if err != nil { + logger.GetReqInfo(ctx).AppendTags("remotePeer", sys.peerClients[i].host.String()) + logger.LogIf(ctx, err) + } + } +} + // NetOBDInfo - Net OBD information func (sys *NotificationSys) NetOBDInfo(ctx context.Context) madmin.ServerNetOBDInfo { var sortedGlobalEndpoints []string diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index 915483a90..f8f9aed34 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -269,6 +269,20 @@ func (e BucketSSEConfigNotFound) Error() string { return "No bucket encryption found for bucket: " + e.Bucket } +// BucketQuotaConfigNotFound - no bucket quota config found. +type BucketQuotaConfigNotFound GenericError + +func (e BucketQuotaConfigNotFound) Error() string { + return "No quota config found for bucket : " + e.Bucket +} + +// BucketQuotaExceeded - bucket quota exceeded. +type BucketQuotaExceeded GenericError + +func (e BucketQuotaExceeded) Error() string { + return "Bucket quota exceeded for bucket: " + e.Bucket +} + /// Bucket related errors. // BucketNameInvalid - bucketname provided is invalid. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d200f57f8..2fad07d50 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -899,6 +899,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re } length = actualSize } + if err := enforceBucketQuota(ctx, dstBucket, actualSize); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } var compressMetadata map[string]string // No need to compress for remote etcd calls @@ -1328,7 +1332,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req sha256hex = getContentSha256Cksum(r, serviceS3) } } - + if err := enforceBucketQuota(ctx, bucket, size); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } // Check if bucket encryption is enabled _, encEnabled := globalBucketSSEConfigSys.Get(bucket) // This request header needs to be set prior to setting ObjectOptions @@ -1762,7 +1769,10 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } } - + if err := enforceBucketQuota(ctx, dstBucket, actualPartSize); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } // Special care for CopyObjectPart if partRangeErr := checkCopyPartRangeWithSize(rs, actualPartSize); partRangeErr != nil { writeCopyPartErr(ctx, w, partRangeErr, r.URL, guessIsBrowserReq(r)) @@ -2043,6 +2053,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } } + if err := enforceBucketQuota(ctx, bucket, size); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + actualSize := size // get encryption options diff --git a/cmd/object-lock.go b/cmd/object-lock.go index cd0046f2a..aa1166871 100644 --- a/cmd/object-lock.go +++ b/cmd/object-lock.go @@ -98,9 +98,9 @@ func enforceRetentionBypassForDeleteWeb(ctx context.Context, r *http.Request, bu return ErrNone } -// enforceRetentionForLifecycle checks if it is appropriate to remove an -// object according to locking configuration when this is lifecycle asking. -func enforceRetentionForLifecycle(ctx context.Context, objInfo ObjectInfo) (locked bool) { +// enforceRetentionForDeletion checks if it is appropriate to remove an +// object according to locking configuration when this is lifecycle/ bucket quota asking. +func enforceRetentionForDeletion(ctx context.Context, objInfo ObjectInfo) (locked bool) { lhold := objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined) if lhold.Status.Valid() && lhold.Status == objectlock.LegalHoldOn { return true diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 2bc9fad26..bd0b105b1 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -543,6 +543,18 @@ func (client *peerRESTClient) cycleServerBloomFilter(ctx context.Context, req bl return &resp, gob.NewDecoder(respBody).Decode(&resp) } +// RemoveBucketQuotaConfig - Remove bucket quota config on the peer node. +func (client *peerRESTClient) RemoveBucketQuotaConfig(bucket string) error { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + respBody, err := client.call(peerRESTMethodBucketQuotaConfigRemove, values, nil, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + // SetBucketPolicy - Set bucket policy on the peer node. func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error { values := make(url.Values) @@ -662,6 +674,25 @@ func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention return nil } +// PutBucketQuotaConfig - PUT bucket quota configuration. +func (client *peerRESTClient) PutBucketQuotaConfig(bucket string, q madmin.BucketQuota) error { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(&q) + if err != nil { + return err + } + + respBody, err := client.call(peerRESTMethodPutBucketQuotaConfig, values, &reader, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + // DeletePolicy - delete a specific canned policy. func (client *peerRESTClient) DeletePolicy(policyName string) (err error) { values := make(url.Values) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index ad5c6a659..762a7a2ca 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -63,6 +63,8 @@ const ( peerRESTMethodBucketEncryptionRemove = "/removebucketencryption" peerRESTMethodPutBucketObjectLockConfig = "/putbucketobjectlockconfig" peerRESTMethodBucketObjectLockConfigRemove = "/removebucketobjectlockconfig" + peerRESTMethodPutBucketQuotaConfig = "/putbucketquotaconfig" + peerRESTMethodBucketQuotaConfigRemove = "/removebucketquotaconfig" ) const ( diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 0cadb4905..071915b3a 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -599,6 +599,7 @@ func (s *peerRESTServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Requ globalNotificationSys.RemoveNotification(bucketName) globalPolicySys.Remove(bucketName) globalBucketObjectLockConfig.Remove(bucketName) + globalBucketQuotaSys.Remove(bucketName) globalLifecycleSys.Remove(bucketName) w.(http.Flusher).Flush() @@ -875,6 +876,54 @@ func (s *peerRESTServer) PutBucketObjectLockConfigHandler(w http.ResponseWriter, w.(http.Flusher).Flush() } +// PutBucketQuotaConfigHandler - handles PUT bucket quota configuration. +func (s *peerRESTServer) PutBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + var quota madmin.BucketQuota + if r.ContentLength < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + err := gob.NewDecoder(r.Body).Decode("a) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + globalBucketQuotaSys.Set(bucketName, quota) + w.(http.Flusher).Flush() +} + +// RemoveBucketQuotaConfigHandler - handles DELETE bucket quota configuration. +func (s *peerRESTServer) RemoveBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + globalBucketQuotaSys.Remove(bucketName) + w.(http.Flusher).Flush() +} + // ServerUpdateHandler - updates the current server. func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1181,4 +1230,6 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketObjectLockConfigRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketQuotaConfig).HandlerFunc(httpTraceHdrs(server.PutBucketQuotaConfigHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketQuotaConfigRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketQuotaConfigHandler)).Queries(restQueries(peerRESTBucket)...) } diff --git a/cmd/quota.go b/cmd/quota.go new file mode 100644 index 000000000..0beebf368 --- /dev/null +++ b/cmd/quota.go @@ -0,0 +1,267 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "path" + "sync" + "time" + + "github.com/minio/minio/cmd/config" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/env" + "github.com/minio/minio/pkg/event" + "github.com/minio/minio/pkg/madmin" +) + +// BucketQuotaSys - map of bucket and quota configuration. +type BucketQuotaSys struct { + sync.RWMutex + quotaMap map[string]madmin.BucketQuota +} + +// Set - set quota configuration. +func (sys *BucketQuotaSys) Set(bucketName string, q madmin.BucketQuota) { + sys.Lock() + sys.quotaMap[bucketName] = q + sys.Unlock() +} + +// Get - Get quota configuration. +func (sys *BucketQuotaSys) Get(bucketName string) (q madmin.BucketQuota, ok bool) { + sys.RLock() + defer sys.RUnlock() + q, ok = sys.quotaMap[bucketName] + return +} + +// Remove - removes quota configuration. +func (sys *BucketQuotaSys) Remove(bucketName string) { + sys.Lock() + delete(sys.quotaMap, bucketName) + sys.Unlock() +} + +// Exists - bucketName has Quota config set +func (sys *BucketQuotaSys) Exists(bucketName string) bool { + sys.RLock() + _, ok := sys.quotaMap[bucketName] + sys.RUnlock() + return ok +} + +// Keys - list of buckets with quota configuration +func (sys *BucketQuotaSys) Keys() []string { + sys.RLock() + defer sys.RUnlock() + var keys []string + for k := range sys.quotaMap { + keys = append(keys, k) + } + return keys +} + +// NewBucketQuotaSys returns initialized BucketQuotaSys +func NewBucketQuotaSys() *BucketQuotaSys { + return &BucketQuotaSys{quotaMap: map[string]madmin.BucketQuota{}} +} + +// parseBucketQuota parses BucketQuota from json +func parseBucketQuota(data []byte) (quotaCfg madmin.BucketQuota, err error) { + err = json.Unmarshal(data, "aCfg) + if err != nil { + return + } + if !quotaCfg.Type.IsValid() { + return quotaCfg, fmt.Errorf("Invalid quota type %s", quotaCfg.Type) + } + return +} + +type bucketStorageCache struct { + bucketsSizes map[string]uint64 + lastUpdate time.Time + mu sync.Mutex +} + +func (b *bucketStorageCache) check(ctx context.Context, q madmin.BucketQuota, bucket string, size int64) error { + b.mu.Lock() + defer b.mu.Unlock() + if time.Since(b.lastUpdate) > 10*time.Second { + dui, err := loadDataUsageFromBackend(ctx, newObjectLayerWithoutSafeModeFn()) + if err != nil { + return err + } + b.lastUpdate = time.Now() + b.bucketsSizes = dui.BucketsSizes + } + currUsage := b.bucketsSizes[bucket] + if (currUsage + uint64(size)) > q.Quota { + return BucketQuotaExceeded{Bucket: bucket} + } + return nil +} +func enforceBucketQuota(ctx context.Context, bucket string, size int64) error { + if size < 0 { + return nil + } + q, ok := globalBucketQuotaSys.Get(bucket) + if !ok { + return nil + } + + return globalBucketStorageCache.check(ctx, q, bucket, size) +} + +func initBucketQuotaSys(buckets []BucketInfo, objAPI ObjectLayer) error { + for _, bucket := range buckets { + ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name}) + configFile := path.Join(bucketConfigPrefix, bucket.Name, bucketQuotaConfigFile) + configData, err := readConfig(ctx, objAPI, configFile) + if err != nil { + if errors.Is(err, errConfigNotFound) { + continue + } + return err + } + quotaCfg, err := parseBucketQuota(configData) + if err != nil { + return err + } + globalBucketQuotaSys.Set(bucket.Name, quotaCfg) + } + return nil +} + +const ( + bgQuotaInterval = 1 * time.Hour +) + +// initQuotaEnforcement starts the routine that deletes objects in bucket +// that exceeds the FIFO quota +func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { + if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn { + go startBucketQuotaEnforcement(ctx, objAPI) + } +} + +func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { + for { + select { + case <-ctx.Done(): + return + case <-time.NewTimer(bgQuotaInterval).C: + logger.LogIf(ctx, enforceFIFOQuota(ctx, objAPI)) + } + + } +} + +// enforceFIFOQuota deletes objects in FIFO order until sufficient objects +// have been deleted so as to bring bucket usage within quota +func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error { + // Turn off quota enforcement if data usage info is unavailable. + if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff { + return nil + } + for _, bucket := range globalBucketQuotaSys.Keys() { + // Check if the current bucket has quota restrictions, if not skip it + cfg, ok := globalBucketQuotaSys.Get(bucket) + if !ok { + continue + } + if cfg.Type != madmin.FIFOQuota { + continue + } + _, bucketHasLockConfig := globalBucketObjectLockConfig.Get(bucket) + + dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) + if err != nil { + return err + } + var toFree uint64 + if dataUsageInfo.BucketsSizes[bucket] > cfg.Quota { + toFree = dataUsageInfo.BucketsSizes[bucket] - cfg.Quota + } + if toFree <= 0 { + continue + } + // Allocate new results channel to receive ObjectInfo. + objInfoCh := make(chan ObjectInfo) + + // Walk through all objects + if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil { + return err + } + // reuse the fileScorer used by disk cache to score entries by + // ModTime to find the oldest objects in bucket to delete. In + // the context of bucket quota enforcement - number of hits are + // irrelevant. + scorer, err := newFileScorer(toFree, time.Now().Unix(), 1) + if err != nil { + return err + } + + for obj := range objInfoCh { + // skip objects currently under retention + if bucketHasLockConfig && enforceRetentionForDeletion(ctx, obj) { + continue + } + scorer.addFile(obj.Name, obj.ModTime, obj.Size, 1) + } + var objects []string + numKeys := len(scorer.fileNames()) + for i, key := range scorer.fileNames() { + objects = append(objects, key) + if len(objects) < maxObjectList && (i < numKeys-1) { + // skip deletion until maxObjectList or end of slice + continue + } + + if len(objects) == 0 { + break + } + // Deletes a list of objects. + deleteErrs, err := objectAPI.DeleteObjects(ctx, bucket, objects) + if err != nil { + logger.LogIf(ctx, err) + } else { + for i := range deleteErrs { + if deleteErrs[i] != nil { + logger.LogIf(ctx, deleteErrs[i]) + continue + } + // Notify object deleted event. + sendEvent(eventArgs{ + EventName: event.ObjectRemovedDelete, + BucketName: bucket, + Object: ObjectInfo{ + Name: objects[i], + }, + Host: "Internal: [FIFO-QUOTA-EXPIRY]", + }) + } + objects = nil + } + } + } + return nil +} diff --git a/cmd/routers.go b/cmd/routers.go index 7e4237024..c12df2d1f 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -81,7 +81,7 @@ var globalHandlers = []HandlerFunc{ } // configureServer handler returns final handler for the http server. -func configureServerHandler(endpointZones EndpointZones) (http.Handler, error) { +func configureServerHandler(endpointZones EndpointZones, enableBucketQuotaOps bool) (http.Handler, error) { // Initialize router. `SkipClean(true)` stops gorilla/mux from // normalizing URL path minio/minio#3256 router := mux.NewRouter().SkipClean(true).UseEncodedPath() @@ -95,7 +95,7 @@ func configureServerHandler(endpointZones EndpointZones) (http.Handler, error) { registerSTSRouter(router) // Add Admin router, all APIs are enabled in server mode. - registerAdminRouter(router, true, true) + registerAdminRouter(router, true, true, enableBucketQuotaOps) // Add healthcheck router registerHealthCheckRouter(router) diff --git a/cmd/server-main.go b/cmd/server-main.go index 2278acf92..058b52929 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -275,6 +275,10 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error) if err = initBucketObjectLockConfig(buckets, newObject); err != nil { return fmt.Errorf("Unable to initialize object lock system: %w", err) } + // Initialize bucket quota system. + if err = initBucketQuotaSys(buckets, newObject); err != nil { + return fmt.Errorf("Unable to initialize bucket quota system: %w", err) + } // Initialize lifecycle system. if err = globalLifecycleSys.Init(buckets, newObject); err != nil { @@ -308,6 +312,7 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) { initDataUsageStats(ctx, objAPI) initDailyLifecycle(ctx, objAPI) + initQuotaEnforcement(ctx, objAPI) } // serverMain handler called for 'minio server' command. @@ -384,7 +389,8 @@ func serverMain(ctx *cli.Context) { // Configure server. var handler http.Handler - handler, err = configureServerHandler(globalEndpoints) + enableBucketQuotaOps := env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn + handler, err = configureServerHandler(globalEndpoints, enableBucketQuotaOps) if err != nil { logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services") } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 269013bc4..d62eb2808 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -322,7 +322,7 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer { testServer.AccessKey = credentials.AccessKey testServer.SecretKey = credentials.SecretKey - httpHandler, err := configureServerHandler(testServer.Disks) + httpHandler, err := configureServerHandler(testServer.Disks, false) if err != nil { t.Fatalf("Failed to configure one of the RPC services %s", err) } diff --git a/pkg/iam/policy/admin-action.go b/pkg/iam/policy/admin-action.go index 33c292985..04a8eda47 100644 --- a/pkg/iam/policy/admin-action.go +++ b/pkg/iam/policy/admin-action.go @@ -101,6 +101,14 @@ const ( AttachPolicyAdminAction = "admin:AttachUserOrGroupPolicy" // ListUserPoliciesAdminAction - allows listing user policies ListUserPoliciesAdminAction = "admin:ListUserPolicies" + + // Bucket quota Actions + + // SetBucketQuotaAdminAction - allow setting bucket quota + SetBucketQuotaAdminAction = "admin:SetBucketQuota" + // GetBucketQuotaAdminAction - allow getting bucket quota + GetBucketQuotaAdminAction = "admin:GetBucketQuota" + // AllAdminActions - provides all admin permissions AllAdminActions = "admin:*" ) @@ -135,6 +143,8 @@ var supportedAdminActions = map[AdminAction]struct{}{ DeletePolicyAdminAction: {}, GetPolicyAdminAction: {}, AttachPolicyAdminAction: {}, + SetBucketQuotaAdminAction: {}, + GetBucketQuotaAdminAction: {}, ListUserPoliciesAdminAction: {}, } @@ -184,4 +194,6 @@ var adminActionConditionKeyMap = map[Action]condition.KeySet{ GetPolicyAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), AttachPolicyAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), ListUserPoliciesAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), + SetBucketQuotaAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), + GetBucketQuotaAdminAction: condition.NewKeySet(condition.AllSupportedAdminKeys...), } diff --git a/pkg/madmin/examples/bucket-quota.go b/pkg/madmin/examples/bucket-quota.go new file mode 100644 index 000000000..7b75c0969 --- /dev/null +++ b/pkg/madmin/examples/bucket-quota.go @@ -0,0 +1,56 @@ +// +build ignore + +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "context" + "fmt" + "log" + + "github.com/minio/minio/pkg/madmin" +) + +func main() { + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY and my-bucketname are + // dummy values, please replace them with original values. + + // API requests are secure (HTTPS) if secure=true and insecure (HTTP) otherwise. + // New returns an MinIO Admin client object. + madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) + if err != nil { + log.Fatalln(err) + } + var kiB int64 = 1 << 10 + ctx := context.Background() + // set bucket quota config + if err := madmClnt.SetBucketQuota(ctx, "bucket-name", 64*kiB, HardQuota); err != nil { + log.Fatalln(err) + } + // gets bucket quota config + quotaCfg, err := madmClnt.GetBucketQuota(ctx, "bucket-name") + if err != nil { + log.Fatalln(err) + } + fmt.Println(quotaCfg) + // remove bucket quota config + if err := madmClnt.RemoveBucketQuota(ctx, "bucket-name"); err != nil { + log.Fatalln(err) + } +} diff --git a/pkg/madmin/quota-commands.go b/pkg/madmin/quota-commands.go new file mode 100644 index 000000000..bad640381 --- /dev/null +++ b/pkg/madmin/quota-commands.go @@ -0,0 +1,140 @@ +/* + * MinIO Cloud Storage, (C) 2018 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package madmin + +import ( + "context" + "encoding/json" + "io/ioutil" + "net/http" + "net/url" +) + +// QuotaType represents bucket quota type +type QuotaType string + +const ( + // HardQuota specifies a hard quota of usage for bucket + HardQuota QuotaType = "hard" + // FIFOQuota specifies a quota limit beyond which older files are deleted from bucket + FIFOQuota QuotaType = "fifo" +) + +// IsValid returns true if quota type is one of FIFO or Hard +func (t QuotaType) IsValid() bool { + return t == HardQuota || t == FIFOQuota +} + +// BucketQuota holds bucket quota restrictions +type BucketQuota struct { + Quota uint64 `json:"quota"` + Type QuotaType `json:"quotatype"` +} + +// RemoveBucketQuota - removes quota config on a bucket. +func (adm *AdminClient) RemoveBucketQuota(ctx context.Context, bucket string) error { + + queryValues := url.Values{} + queryValues.Set("bucket", bucket) + + reqData := requestData{ + relPath: adminAPIPrefix + "/remove-bucket-quota", + queryValues: queryValues, + } + + // Execute DELETE on /minio/admin/v3/remove-bucket-quota to delete bucket quota. + resp, err := adm.executeMethod(ctx, http.MethodDelete, reqData) + defer closeResponse(resp) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusNoContent { + return httpRespToErrorResponse(resp) + } + + return nil +} + +// GetBucketQuota - get info on a user +func (adm *AdminClient) GetBucketQuota(ctx context.Context, bucket string) (q BucketQuota, err error) { + queryValues := url.Values{} + queryValues.Set("bucket", bucket) + + reqData := requestData{ + relPath: adminAPIPrefix + "/get-bucket-quota", + queryValues: queryValues, + } + + // Execute GET on /minio/admin/v3/get-quota + resp, err := adm.executeMethod(ctx, http.MethodGet, reqData) + + defer closeResponse(resp) + if err != nil { + return q, err + } + + if resp.StatusCode != http.StatusOK { + return q, httpRespToErrorResponse(resp) + } + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return q, err + } + if err = json.Unmarshal(b, &q); err != nil { + return q, err + } + + return q, nil +} + +// SetBucketQuota - sets a bucket's quota. +func (adm *AdminClient) SetBucketQuota(ctx context.Context, bucket string, quota uint64, quotaType QuotaType) error { + + data, err := json.Marshal(BucketQuota{ + Quota: quota, + Type: quotaType, + }) + if err != nil { + return err + } + + queryValues := url.Values{} + queryValues.Set("bucket", bucket) + + reqData := requestData{ + relPath: adminAPIPrefix + "/set-bucket-quota", + queryValues: queryValues, + content: data, + } + + // Execute PUT on /minio/admin/v3/set-bucket-quota to set quota for a bucket. + resp, err := adm.executeMethod(ctx, http.MethodPut, reqData) + + defer closeResponse(resp) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return httpRespToErrorResponse(resp) + } + + return nil +}