From fb48ca50201df26b019b27a4886b871d00b9ee2a Mon Sep 17 00:00:00 2001 From: Bala FA Date: Wed, 13 Nov 2019 04:20:18 +0530 Subject: [PATCH] Add Get/Put Bucket Lock Configuration API support (#8120) This feature implements [PUT Bucket object lock configuration][1] and [GET Bucket object lock configuration][2]. After object lock configuration is set, existing and new objects are set to WORM for specified duration. Currently Governance mode works exactly like Compliance mode. Fixes #8101 [1] https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTObjectLockConfiguration.html [2] https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETObjectLockConfiguration.html --- cmd/api-router.go | 18 ++- cmd/bucket-handlers.go | 215 ++++++++++++++++++++++++++++++++++- cmd/dummy-handlers.go | 6 - cmd/fs-v1-multipart.go | 4 +- cmd/fs-v1.go | 4 +- cmd/globals.go | 2 + cmd/notification.go | 71 +++++++++++- cmd/object-handlers.go | 197 ++++++++++++++++++++++++++++---- cmd/object-lock.go | 191 +++++++++++++++++++++++++++++++ cmd/peer-rest-client.go | 19 ++++ cmd/peer-rest-common.go | 69 +++++------ cmd/peer-rest-server.go | 32 ++++++ cmd/utils.go | 8 ++ cmd/web-handlers.go | 8 +- cmd/xl-v1-multipart.go | 6 +- cmd/xl-v1-object.go | 6 +- docs/retention/README.md | 31 +++++ mint/run/core/awscli/test.sh | 53 ++++++++- 18 files changed, 863 insertions(+), 77 deletions(-) create mode 100644 cmd/object-lock.go create mode 100644 docs/retention/README.md diff --git a/cmd/api-router.go b/cmd/api-router.go index a566ac899..cf2a18648 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -111,6 +111,14 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool) bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobject", httpTraceHdrs(api.PutObjectHandler))) // DeleteObject bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(collectAPIStats("deleteobject", httpTraceAll(api.DeleteObjectHandler))) + // PutObjectLegalHold + bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.PutObjectLegalHoldHandler)).Queries("legal-hold", "").Queries("versionId", "") + // GetObjectLegalHold + bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.GetObjectLegalHoldHandler)).Queries("legal-hold", "").Queries("versionId", "") + // PutObjectRetention + bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.PutObjectRetentionHandler)).Queries("retention", "").Queries("versionId", "") + // GetObjectRetention + bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.GetObjectRetentionHandler)).Queries("retention", "").Queries("versionId", "") /// Bucket operations // GetBucketLocation @@ -127,8 +135,6 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool) bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketcors", httpTraceAll(api.GetBucketCorsHandler))).Queries("cors", "") // GetBucketWebsiteHandler - this is a dummy call. bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketwebsite", httpTraceAll(api.GetBucketWebsiteHandler))).Queries("website", "") - // GetBucketVersioningHandler - this is a dummy call. - bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketversion", httpTraceAll(api.GetBucketVersioningHandler))).Queries("versioning", "") // GetBucketAccelerateHandler - this is a dummy call. bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketaccelerate", httpTraceAll(api.GetBucketAccelerateHandler))).Queries("accelerate", "") // GetBucketRequestPaymentHandler - this is a dummy call. @@ -146,6 +152,10 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool) // DeleteBucketTaggingHandler bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebuckettagging", httpTraceAll(api.DeleteBucketTaggingHandler))).Queries("tagging", "") + // GetBucketObjectLockConfig + bucket.Methods(http.MethodGet).HandlerFunc(httpTraceAll(api.GetBucketObjectLockConfigHandler)).Queries("object-lock", "") + // GetBucketVersioning + bucket.Methods(http.MethodGet).HandlerFunc(httpTraceAll(api.GetBucketVersioningHandler)).Queries("versioning", "") // GetBucketNotification bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler))).Queries("notification", "") // ListenBucketNotification @@ -163,6 +173,10 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool) // PutBucketPolicy bucket.Methods("PUT").HandlerFunc(collectAPIStats("putbucketpolicy", httpTraceAll(api.PutBucketPolicyHandler))).Queries("policy", "") + // PutBucketObjectLockConfig + bucket.Methods(http.MethodPut).HandlerFunc(httpTraceAll(api.PutBucketObjectLockConfigHandler)).Queries("object-lock", "") + // PutBucketVersioning + bucket.Methods(http.MethodPut).HandlerFunc(httpTraceAll(api.PutBucketVersioningHandler)).Queries("versioning", "") // PutBucketNotification bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketnotification", httpTraceAll(api.PutBucketNotificationHandler))).Queries("notification", "") // PutBucket diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index d2391f2f9..fb8bb2c90 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -44,6 +44,13 @@ import ( "github.com/minio/minio/pkg/sync/errgroup" ) +const ( + getBucketVersioningResponse = `` + objectLockConfig = "object-lock.xml" + objectLockEnabledConfigFile = "object-lock-enabled.json" + objectLockEnabledConfig = `{"x-amz-bucket-object-lock-enabled":true}` +) + // Check if there are buckets on server without corresponding entry in etcd backend and // make entries. Here is the general flow // - Range over all the available buckets @@ -364,7 +371,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } // Deny if WORM is enabled - if globalWORMEnabled { + if _, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { // Not required to check whether given objects exist or not, because // DeleteMultipleObject is always successful irrespective of object existence. writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) @@ -473,6 +480,16 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req vars := mux.Vars(r) bucket := vars["bucket"] + objectLockEnabled := false + if _, found := r.Header[http.CanonicalHeaderKey("x-amz-bucket-object-lock-enabled")]; found { + if r.Header.Get("x-amz-bucket-object-lock-enabled") != "true" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r)) + return + } + + objectLockEnabled = true + } + if s3Error := checkRequestAuthType(ctx, r, policy.CreateBucketAction, bucket, ""); s3Error != ErrNone { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) return @@ -500,6 +517,15 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } + + if objectLockEnabled { + configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile) + if err = saveConfig(ctx, objectAPI, configFile, []byte(objectLockEnabledConfig)); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + } + if err = globalDNSConfig.Put(bucket); err != nil { objectAPI.DeleteBucket(ctx, bucket) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) @@ -528,6 +554,14 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req return } + if objectLockEnabled { + configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile) + if err = saveConfig(ctx, objectAPI, configFile, []byte(objectLockEnabledConfig)); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + } + // Make sure to add Location information here only for bucket w.Header().Set(xhttp.Location, path.Clean(r.URL.Path)) // Clean any trailing slashes. @@ -858,6 +892,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. } } + globalBucketRetentionConfig.Delete(bucket) globalNotificationSys.RemoveNotification(bucket) globalPolicySys.Remove(bucket) globalNotificationSys.DeleteBucket(ctx, bucket) @@ -867,3 +902,181 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. // Write success response. writeSuccessNoContent(w) } + +// PutBucketVersioningHandler - PUT Bucket Versioning. +// ---------- +// No-op. Available for API compatibility. +func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutBucketVersioning") + + defer logger.AuditLog(w, r, "PutBucketVersioning", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + getBucketInfo := objectAPI.GetBucketInfo + if _, err := getBucketInfo(ctx, bucket); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + // Write success response. + writeSuccessResponseHeadersOnly(w) +} + +// GetBucketVersioningHandler - GET Bucket Versioning. +// ---------- +// No-op. Available for API compatibility. +func (api objectAPIHandlers) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "GetBucketVersioning") + + defer logger.AuditLog(w, r, "GetBucketVersioning", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + getBucketInfo := objectAPI.GetBucketInfo + if _, err := getBucketInfo(ctx, bucket); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + // Write success response. + writeSuccessResponseXML(w, []byte(getBucketVersioningResponse)) +} + +// PutBucketObjectLockConfigHandler - PUT Bucket object lock configuration. +// ---------- +// Places an Object Lock configuration on the specified bucket. The rule +// specified in the Object Lock configuration will be applied by default +// to every new object placed in the specified bucket. +func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutBucketObjectLockConfig") + + defer logger.AuditLog(w, r, "PutBucketObjectLockConfig", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + // Deny if WORM is enabled + if globalWORMEnabled { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) + return + } + + config, err := parseObjectLockConfig(r.Body) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL, guessIsBrowserReq(r)) + return + } + + configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile) + configData, err := readConfig(ctx, objectAPI, configFile) + if err != nil { + aerr := toAPIError(ctx, err) + if err == errConfigNotFound { + aerr = errorCodes.ToAPIErr(ErrMethodNotAllowed) + } + writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r)) + return + } + + if string(configData) != objectLockEnabledConfig { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r)) + return + } + + if config.Rule != nil { + data, err := xml.Marshal(config) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + configFile := path.Join(bucketConfigPrefix, bucket, objectLockConfig) + if err = saveConfig(ctx, objectAPI, configFile, data); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + retention := config.ToRetention() + globalBucketRetentionConfig.Set(bucket, retention) + globalNotificationSys.PutBucketObjectLockConfig(ctx, bucket, retention) + } else { + globalBucketRetentionConfig.Delete(bucket) + } + + // Write success response. + writeSuccessResponseHeadersOnly(w) +} + +// GetBucketObjectLockConfigHandler - GET Bucket object lock configuration. +// ---------- +// Gets the Object Lock configuration for a bucket. The rule specified in +// the Object Lock configuration will be applied by default to every new +// object placed in the specified bucket. +func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "GetBucketObjectLockConfig") + + defer logger.AuditLog(w, r, "GetBucketObjectLockConfig", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile) + configData, err := readConfig(ctx, objectAPI, configFile) + if err != nil { + aerr := toAPIError(ctx, err) + if err == errConfigNotFound { + aerr = errorCodes.ToAPIErr(ErrMethodNotAllowed) + } + writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r)) + return + } + + if string(configData) != objectLockEnabledConfig { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r)) + return + } + + configFile = path.Join(bucketConfigPrefix, bucket, objectLockConfig) + configData, err = readConfig(ctx, objectAPI, configFile) + if err != nil { + if err != errConfigNotFound { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if configData, err = xml.Marshal(newObjectLockConfig()); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + } + + // Write success response. + writeSuccessResponseXML(w, configData) +} diff --git a/cmd/dummy-handlers.go b/cmd/dummy-handlers.go index de23fd5ca..383d8d41d 100644 --- a/cmd/dummy-handlers.go +++ b/cmd/dummy-handlers.go @@ -48,12 +48,6 @@ func (api objectAPIHandlers) GetBucketWebsiteHandler(w http.ResponseWriter, r *h w.(http.Flusher).Flush() } -// GetBucketVersioning - GET bucket versioning, a dummy api -func (api objectAPIHandlers) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { - writeSuccessResponseHeadersOnly(w) - w.(http.Flusher).Flush() -} - // GetBucketAccelerate - GET bucket accelerate, a dummy api func (api objectAPIHandlers) GetBucketAccelerateHandler(w http.ResponseWriter, r *http.Request) { writeSuccessResponseHeadersOnly(w) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 96148bc28..5015b847c 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -683,8 +683,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)); err == nil && retention.Retain(fi.ModTime()) { return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object} } } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 4f4d1a2f0..530f4e9e7 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -925,8 +925,8 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string // Entire object was written to the temp location, now it's safe to rename it to the actual location. fsNSObjPath := pathJoin(fs.fsPath, bucket, object) // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = fsStatFile(ctx, fsNSObjPath); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if fi, err := fsStatFile(ctx, fsNSObjPath); err == nil && retention.Retain(fi.ModTime()) { return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object} } } diff --git a/cmd/globals.go b/cmd/globals.go index 18bfa72b1..59f5612aa 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -202,6 +202,8 @@ var ( // Is worm enabled globalWORMEnabled bool + globalBucketRetentionConfig = newBucketRetentionConfig() + // Disk cache drives globalCacheConfig cache.Config diff --git a/cmd/notification.go b/cmd/notification.go index 3754fb14c..2b0ba5ea5 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -789,6 +789,35 @@ func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error return nil } +func (sys *NotificationSys) initBucketRetentionConfig(objAPI ObjectLayer) error { + buckets, err := objAPI.ListBuckets(context.Background()) + if err != nil { + return err + } + for _, bucket := range buckets { + ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name}) + configFile := path.Join(bucketConfigPrefix, bucket.Name, objectLockConfig) + configData, err := readConfig(ctx, objAPI, configFile) + if err != nil { + if err == errConfigNotFound { + continue + } + + return err + } + + config, err := parseObjectLockConfig(bytes.NewReader(configData)) + if err != nil { + return err + } + + if config.Rule != nil { + globalBucketRetentionConfig.Set(bucket.Name, config.ToRetention()) + } + } + return nil +} + // Init - initializes notification system from notification.xml and listener.json of all buckets. func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error { if objAPI == nil { @@ -808,7 +837,8 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error // - Read quorum is lost just after the initialization // of the object layer. retryTimerCh := newRetryTimerSimple(doneCh) - for { + stop := false + for !stop { select { case <-retryTimerCh: if err := sys.load(buckets, objAPI); err != nil { @@ -820,6 +850,26 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error } return err } + stop = true + case <-globalOSSignalCh: + return fmt.Errorf("Initializing Notification sub-system gracefully stopped") + } + } + + // Initializing bucket retention config needs a retry mechanism if + // read quorum is lost just after the initialization of the object layer. + for { + select { + case <-retryTimerCh: + if err := sys.initBucketRetentionConfig(objAPI); err != nil { + if err == errDiskNotFound || + strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || + strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { + logger.Info("Waiting for bucket retention configuration to be initialized..") + continue + } + return err + } return nil case <-globalOSSignalCh: return fmt.Errorf("Initializing Notification sub-system gracefully stopped") @@ -926,6 +976,25 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr { return sys.send(args.BucketName, args.ToEvent(), targetIDs...) } +// PutBucketObjectLockConfig - put bucket object lock configuration to all peers. +func (sys *NotificationSys) PutBucketObjectLockConfig(ctx context.Context, bucketName string, retention Retention) { + var wg sync.WaitGroup + for _, client := range sys.peerClients { + if client == nil { + continue + } + wg.Add(1) + go func(client *peerRESTClient) { + defer wg.Done() + if err := client.PutBucketObjectLockConfig(bucketName, retention); err != nil { + logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) + logger.LogIf(ctx, err) + } + }(client) + } + wg.Wait() +} + // NetReadPerfInfo - Network read performance information. func (sys *NotificationSys) NetReadPerfInfo(size int64) []ServerNetReadPerfInfo { reply := make([]ServerNetReadPerfInfo, len(sys.peerClients)) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 97f86e074..92dc67fef 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -736,8 +736,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re } cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) - if globalWORMEnabled { // Deny if WORM is enabled. - if _, err := objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil { + + // Deny if WORM is enabled. + if retention, isWORMBucket := isWORMEnabled(dstBucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil && retention.Retain(oi.ModTime) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } @@ -1202,11 +1204,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - putObject = api.CacheAPI().PutObject - } rawReader := hashReader pReader := NewPutObjReader(rawReader, nil, nil) @@ -1220,8 +1217,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = getObjectInfo(ctx, bucket, object, opts); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } @@ -1347,8 +1344,8 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r } // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } @@ -1520,8 +1517,8 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt } // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil { + if retention, isWORMBucket := isWORMEnabled(dstBucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil && retention.Retain(oi.ModTime) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } @@ -1897,8 +1894,8 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http pReader := NewPutObjReader(rawReader, nil, nil) // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } @@ -2006,8 +2003,8 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, } // Deny if WORM is enabled - if globalWORMEnabled { - if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil && retention.Retain(oi.ModTime) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } @@ -2182,8 +2179,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } // Deny if WORM is enabled - if globalWORMEnabled { - if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil && retention.Retain(oi.ModTime) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } @@ -2377,7 +2374,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } // Deny if WORM is enabled - if globalWORMEnabled { + if _, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { // Not required to check whether given object exists or not, because // DeleteObject is always successful irrespective of object existence. writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) @@ -2404,3 +2401,163 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } writeSuccessNoContent(w) } + +// PutObjectLegalHoldHandler - set legal hold configuration to object, +func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutObjectLegalHold") + + defer logger.AuditLog(w, r, "PutObjectLegalHold", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object := vars["object"] + + if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r)) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + opts, err := getOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) +} + +// GetObjectLegalHoldHandler - get legal hold configuration to object, +func (api objectAPIHandlers) GetObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "GetObjectLegalHold") + + defer logger.AuditLog(w, r, "GetObjectLegalHold", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object := vars["object"] + + if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r)) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + opts, err := getOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) +} + +// PutObjectRetentionHandler - set legal hold configuration to object, +func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutObjectRetention") + + defer logger.AuditLog(w, r, "PutObjectRetention", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object := vars["object"] + + if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r)) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + opts, err := getOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) +} + +// GetObjectRetentionHandler - get legal hold configuration to object, +func (api objectAPIHandlers) GetObjectRetentionHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "GetObjectRetention") + + defer logger.AuditLog(w, r, "GetObjectRetention", mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + object := vars["object"] + + if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r)) + return + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + opts, err := getOpts(ctx, r, bucket, object) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) +} diff --git a/cmd/object-lock.go b/cmd/object-lock.go new file mode 100644 index 000000000..c64e483b1 --- /dev/null +++ b/cmd/object-lock.go @@ -0,0 +1,191 @@ +/* + * MinIO Cloud Storage, (C) 2016, 2017 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "encoding/xml" + "fmt" + "io" + "sync" + "time" +) + +// RetentionMode - object retention mode. +type RetentionMode string + +const ( + // Governance - governance mode. + Governance RetentionMode = "GOVERNANCE" + + // Compliance - compliance mode. + Compliance RetentionMode = "COMPLIANCE" +) + +// Retention - bucket level retention configuration. +type Retention struct { + Mode RetentionMode + Validity time.Duration +} + +// IsEmpty - returns whether retention is empty or not. +func (r Retention) IsEmpty() bool { + return r.Mode == "" || r.Validity == 0 +} + +// Retain - check whether given date is retainable by validity time. +func (r Retention) Retain(created time.Time) bool { + return globalWORMEnabled || created.Add(r.Validity).After(time.Now()) +} + +// BucketRetentionConfig - map of bucket and retention configuration. +type BucketRetentionConfig struct { + sync.RWMutex + retentionMap map[string]Retention +} + +// Set - set retention configuration. +func (config *BucketRetentionConfig) Set(bucketName string, retention Retention) { + config.Lock() + config.retentionMap[bucketName] = retention + config.Unlock() +} + +// Get - Get retention configuration. +func (config *BucketRetentionConfig) Get(bucketName string) (r Retention, ok bool) { + config.RLock() + defer config.RUnlock() + + r, ok = config.retentionMap[bucketName] + return r, ok +} + +// Delete - delete retention configuration. +func (config *BucketRetentionConfig) Delete(bucketName string) { + config.Lock() + delete(config.retentionMap, bucketName) + config.Unlock() +} + +func newBucketRetentionConfig() *BucketRetentionConfig { + return &BucketRetentionConfig{ + retentionMap: map[string]Retention{}, + } +} + +// DefaultRetention - default retention configuration. +type DefaultRetention struct { + XMLName xml.Name `xml:"DefaultRetention"` + Mode RetentionMode `xml:"Mode"` + Days *uint64 `xml:"Days"` + Years *uint64 `xml:"Years"` +} + +// UnmarshalXML - decodes XML data. +func (dr *DefaultRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { + // Make subtype to avoid recursive UnmarshalXML(). + type defaultRetention DefaultRetention + retention := defaultRetention{} + + if err := d.DecodeElement(&retention, &start); err != nil { + return err + } + + switch string(retention.Mode) { + case "GOVERNANCE", "COMPLIANCE": + default: + return fmt.Errorf("unknown retention mode %v", retention.Mode) + } + + if retention.Days == nil && retention.Years == nil { + return fmt.Errorf("either Days or Years must be specified") + } + + if retention.Days != nil && retention.Years != nil { + return fmt.Errorf("either Days or Years must be specified, not both") + } + + if retention.Days != nil { + if *retention.Days == 0 { + return fmt.Errorf("Days should not be zero") + } + } else if *retention.Years == 0 { + return fmt.Errorf("Years should not be zero") + } + + *dr = DefaultRetention(retention) + + return nil +} + +// ObjectLockConfig - object lock configuration specified in +// https://docs.aws.amazon.com/AmazonS3/latest/API/Type_API_ObjectLockConfiguration.html +type ObjectLockConfig struct { + XMLNS string `xml:"xmlns,attr,omitempty"` + XMLName xml.Name `xml:"ObjectLockConfiguration"` + ObjectLockEnabled string `xml:"ObjectLockEnabled"` + Rule *struct { + DefaultRetention DefaultRetention `xml:"DefaultRetention"` + } `xml:"Rule,omitempty"` +} + +// UnmarshalXML - decodes XML data. +func (config *ObjectLockConfig) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { + // Make subtype to avoid recursive UnmarshalXML(). + type objectLockConfig ObjectLockConfig + parsedConfig := objectLockConfig{} + + if err := d.DecodeElement(&parsedConfig, &start); err != nil { + return err + } + + if parsedConfig.ObjectLockEnabled != "Enabled" { + return fmt.Errorf("only 'Enabled' value is allowd to ObjectLockEnabled element") + } + + *config = ObjectLockConfig(parsedConfig) + return nil +} + +// ToRetention - convert to Retention type. +func (config *ObjectLockConfig) ToRetention() (r Retention) { + if config.Rule != nil { + r.Mode = config.Rule.DefaultRetention.Mode + utcNow := time.Now().UTC() + if config.Rule.DefaultRetention.Days != nil { + r.Validity = utcNow.AddDate(0, 0, int(*config.Rule.DefaultRetention.Days)).Sub(utcNow) + } else { + r.Validity = utcNow.AddDate(int(*config.Rule.DefaultRetention.Years), 0, 0).Sub(utcNow) + } + } + + return r +} + +func parseObjectLockConfig(reader io.Reader) (*ObjectLockConfig, error) { + config := ObjectLockConfig{} + if err := xml.NewDecoder(reader).Decode(&config); err != nil { + return nil, err + } + + return &config, nil +} + +func newObjectLockConfig() *ObjectLockConfig { + return &ObjectLockConfig{ + ObjectLockEnabled: "Enabled", + } +} diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 3ad9b528d..5ab2f03e4 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -430,6 +430,25 @@ func (client *peerRESTClient) PutBucketNotification(bucket string, rulesMap even return nil } +// PutBucketObjectLockConfig - PUT bucket object lock configuration. +func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention Retention) error { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(&retention) + if err != nil { + return err + } + + respBody, err := client.call(peerRESTMethodPutBucketObjectLockConfig, values, &reader, -1) + if err != nil { + return err + } + defer http.DrainBody(respBody) + return nil +} + // DeletePolicy - delete a specific canned policy. func (client *peerRESTClient) DeletePolicy(policyName string) (err error) { values := make(url.Values) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index ac4a8fe87..483541dfd 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -24,40 +24,41 @@ const ( ) const ( - peerRESTMethodNetReadPerfInfo = "/netreadperfinfo" - peerRESTMethodCollectNetPerfInfo = "/collectnetperfinfo" - peerRESTMethodServerInfo = "/serverinfo" - peerRESTMethodCPULoadInfo = "/cpuloadinfo" - peerRESTMethodMemUsageInfo = "/memusageinfo" - peerRESTMethodDrivePerfInfo = "/driveperfinfo" - peerRESTMethodDeleteBucket = "/deletebucket" - peerRESTMethodServerUpdate = "/serverupdate" - peerRESTMethodSignalService = "/signalservice" - peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" - peerRESTMethodBackgroundOpsStatus = "/backgroundopsstatus" - peerRESTMethodGetLocks = "/getlocks" - peerRESTMethodBucketPolicyRemove = "/removebucketpolicy" - peerRESTMethodLoadUser = "/loaduser" - peerRESTMethodDeleteUser = "/deleteuser" - peerRESTMethodLoadPolicy = "/loadpolicy" - peerRESTMethodLoadPolicyMapping = "/loadpolicymapping" - peerRESTMethodDeletePolicy = "/deletepolicy" - peerRESTMethodLoadUsers = "/loadusers" - peerRESTMethodLoadGroup = "/loadgroup" - peerRESTMethodStartProfiling = "/startprofiling" - peerRESTMethodDownloadProfilingData = "/downloadprofilingdata" - peerRESTMethodBucketPolicySet = "/setbucketpolicy" - peerRESTMethodBucketNotificationPut = "/putbucketnotification" - peerRESTMethodBucketNotificationListen = "/listenbucketnotification" - peerRESTMethodReloadFormat = "/reloadformat" - peerRESTMethodTargetExists = "/targetexists" - peerRESTMethodSendEvent = "/sendevent" - peerRESTMethodTrace = "/trace" - peerRESTMethodBucketLifecycleSet = "/setbucketlifecycle" - peerRESTMethodBucketLifecycleRemove = "/removebucketlifecycle" - peerRESTMethodLog = "/log" - peerRESTMethodHardwareCPUInfo = "/cpuhardwareinfo" - peerRESTMethodHardwareNetworkInfo = "/networkhardwareinfo" + peerRESTMethodNetReadPerfInfo = "/netreadperfinfo" + peerRESTMethodCollectNetPerfInfo = "/collectnetperfinfo" + peerRESTMethodServerInfo = "/serverinfo" + peerRESTMethodCPULoadInfo = "/cpuloadinfo" + peerRESTMethodMemUsageInfo = "/memusageinfo" + peerRESTMethodDrivePerfInfo = "/driveperfinfo" + peerRESTMethodDeleteBucket = "/deletebucket" + peerRESTMethodServerUpdate = "/serverupdate" + peerRESTMethodSignalService = "/signalservice" + peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" + peerRESTMethodBackgroundOpsStatus = "/backgroundopsstatus" + peerRESTMethodGetLocks = "/getlocks" + peerRESTMethodBucketPolicyRemove = "/removebucketpolicy" + peerRESTMethodLoadUser = "/loaduser" + peerRESTMethodDeleteUser = "/deleteuser" + peerRESTMethodLoadPolicy = "/loadpolicy" + peerRESTMethodLoadPolicyMapping = "/loadpolicymapping" + peerRESTMethodDeletePolicy = "/deletepolicy" + peerRESTMethodLoadUsers = "/loadusers" + peerRESTMethodLoadGroup = "/loadgroup" + peerRESTMethodStartProfiling = "/startprofiling" + peerRESTMethodDownloadProfilingData = "/downloadprofilingdata" + peerRESTMethodBucketPolicySet = "/setbucketpolicy" + peerRESTMethodBucketNotificationPut = "/putbucketnotification" + peerRESTMethodBucketNotificationListen = "/listenbucketnotification" + peerRESTMethodReloadFormat = "/reloadformat" + peerRESTMethodTargetExists = "/targetexists" + peerRESTMethodSendEvent = "/sendevent" + peerRESTMethodTrace = "/trace" + peerRESTMethodBucketLifecycleSet = "/setbucketlifecycle" + peerRESTMethodBucketLifecycleRemove = "/removebucketlifecycle" + peerRESTMethodLog = "/log" + peerRESTMethodHardwareCPUInfo = "/cpuhardwareinfo" + peerRESTMethodHardwareNetworkInfo = "/networkhardwareinfo" + peerRESTMethodPutBucketObjectLockConfig = "putbucketobjectlockconfig" ) const ( diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 560b1218c..aeb6b14e0 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -506,6 +506,7 @@ func (s *peerRESTServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Requ globalNotificationSys.RemoveNotification(bucketName) globalPolicySys.Remove(bucketName) + globalBucketRetentionConfig.Delete(bucketName) w.(http.Flusher).Flush() } @@ -756,6 +757,36 @@ func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r * w.(http.Flusher).Flush() } +// PutBucketObjectLockConfigHandler - handles PUT bucket object lock configuration. +func (s *peerRESTServer) PutBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + var retention Retention + if r.ContentLength < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + err := gob.NewDecoder(r.Body).Decode(&retention) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + globalBucketRetentionConfig.Set(bucketName, retention) + w.(http.Flusher).Flush() +} + type listenBucketNotificationReq struct { EventNames []event.Name `json:"eventNames"` Pattern string `json:"pattern"` @@ -992,6 +1023,7 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { func registerPeerRESTHandlers(router *mux.Router) { server := &peerRESTServer{} subrouter := router.PathPrefix(peerRESTPrefix).Subrouter() + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetReadPerfInfo).HandlerFunc(httpTraceHdrs(server.NetReadPerfInfoHandler)).Queries(restQueries(peerRESTNetPerfSize)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCollectNetPerfInfo).HandlerFunc(httpTraceHdrs(server.CollectNetPerfInfoHandler)).Queries(restQueries(peerRESTNetPerfSize)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler)) diff --git a/cmd/utils.go b/cmd/utils.go index e39e69afd..b6c339cbe 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -492,3 +492,11 @@ func getMinioMode() string { func iamPolicyName() string { return globalOpenIDConfig.ClaimPrefix + iampolicy.PolicyName } + +func isWORMEnabled(bucket string) (Retention, bool) { + if globalWORMEnabled { + return Retention{}, true + } + + return globalBucketRetentionConfig.Get(bucket) +} diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index c36e21446..604607930 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -668,8 +668,8 @@ next: // If not a directory, remove the object. if !hasSuffix(objectName, SlashSeparator) && objectName != "" { // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = objectAPI.GetObjectInfo(ctx, args.BucketName, objectName, ObjectOptions{}); err == nil { + if retention, isWORMBucket := isWORMEnabled(args.BucketName); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, args.BucketName, objectName, ObjectOptions{}); err == nil && retention.Retain(oi.ModTime) { return toJSONError(ctx, errMethodNotAllowed) } } @@ -1029,8 +1029,8 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { crypto.RemoveSensitiveEntries(metadata) // Deny if WORM is enabled - if globalWORMEnabled { - if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil { + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) { writeWebErrorResponse(w, errMethodNotAllowed) return } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 2ba3e8b14..a2a6b11fa 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -761,8 +761,10 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, if xl.isObject(bucket, object) { // Deny if WORM is enabled - if globalWORMEnabled { - return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object} + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := xl.getObjectInfo(ctx, bucket, object); err == nil && retention.Retain(oi.ModTime) { + return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object} + } } // Rename if an object already exists to temporary location. diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index c3d1810d3..6caa09b58 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -653,8 +653,10 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, if xl.isObject(bucket, object) { // Deny if WORM is enabled - if globalWORMEnabled { - return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object} + if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket { + if oi, err := xl.getObjectInfo(ctx, bucket, object); err == nil && retention.Retain(oi.ModTime) { + return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object} + } } // Rename if an object already exists to temporary location. diff --git a/docs/retention/README.md b/docs/retention/README.md new file mode 100644 index 000000000..7c187c2d4 --- /dev/null +++ b/docs/retention/README.md @@ -0,0 +1,31 @@ +# Object Lock and Immutablity [![Slack](https://slack.min.io/slack?type=svg)](https://slack.min.io) + +MinIO server allows to set bucket level WORM which makes objects in the bucket immutable i.e. delete and overwrite are not allowed till stipulated time specified in the bucket's object lock configuration. + +## Get Started + +### 1. Prerequisites + +Install MinIO - [MinIO Quickstart Guide](https://docs.min.io/docs/minio-quickstart-guide). + +### 2. Set per bucket WORM + +WORM on a bucket is enabled by setting object lock configuration. This configuration is applied to existing and new objects in the bucket. Below is an example sets `Governance` mode and one day retention time from object creation time of all objects in `mybucket`. + +```sh +$ awscli s3api put-object-lock-configuration --bucket mybucket --object-lock-configuration 'ObjectLockEnabled=\"Enabled\",Rule={DefaultRetention={Mode=\"GOVERNANCE\",Days=1}}' +``` + +### 3. Note + +- When global WORM is enabled by `MINIO_WORM` environment variable or `worm` field in configuration file supersedes bucket level WORM and `PUT object lock configuration` REST API is disabled. +- Currently Governance mode is treated as Compliance mode. +- Once object lock configuration is set to a bucket, existing and new objects are put in WORM mode. + +## Explore Further + +- [Use `mc` with MinIO Server](https://docs.min.io/docs/minio-client-quickstart-guide) +- [Use `aws-cli` with MinIO Server](https://docs.min.io/docs/aws-cli-with-minio) +- [Use `s3cmd` with MinIO Server](https://docs.min.io/docs/s3cmd-with-minio) +- [Use `minio-go` SDK with MinIO Server](https://docs.min.io/docs/golang-client-quickstart-guide) +- [The MinIO documentation website](https://docs.min.io) diff --git a/mint/run/core/awscli/test.sh b/mint/run/core/awscli/test.sh index dd4674f56..884f9377f 100755 --- a/mint/run/core/awscli/test.sh +++ b/mint/run/core/awscli/test.sh @@ -1380,6 +1380,56 @@ function test_serverside_encryption_error() { return $rv } +# WORM bucket tests. +function test_worm_bucket() { + # log start time + start_time=$(get_time) + + function="make_bucket" + bucket_name=$(make_bucket) + rv=$? + + # if make bucket succeeds set object lock configuration + if [ $rv -eq 0 ]; then + args=( s3api put-object-lock-configuration --bucket "${bucket_name}" --object-lock-configuration 'ObjectLockEnabled="Enabled",Rule={DefaultRetention={Mode="GOVERNANCE",Days=1}}' ) + out=$("${AWS}" "${args[@]}" 2>&1) + rv=$? + else + # if make bucket fails, $bucket_name has the error output + out="${bucket_name}" + fi + + # if setting object lock configuration succeeds, upload a file first time + if [ $rv -eq 0 ]; then + function="${AWS} s3api put-object --body ${MINT_DATA_DIR}/datafile-1-kB --bucket ${bucket_name} --key datafile-1-kB" + out=$($function 2>&1) + rv=$? + else + # if make bucket fails, $bucket_name has the error output + out="${bucket_name}" + fi + + # second time upload of same file should fail due to WORM setting + if [ $rv -eq 0 ]; then + function="${AWS} s3api put-object --body ${MINT_DATA_DIR}/datafile-1-kB --bucket ${bucket_name} --key datafile-1-kB" + out=$($function 2>&1) + rv=$? + else + out="First time object upload failed" + fi + + if [ $rv -ne 0 ]; then + log_success "$(get_duration "$start_time")" "${test_function}" + rv=0 + else + # cleanup is not possible due to one day validity of object lock configurataion + log_failure "$(get_duration "$start_time")" "${function}" "${out}" + rv=-1 + fi + + return $rv +} + # main handler for all the tests. main() { # Success tests @@ -1404,7 +1454,8 @@ main() { # Error tests test_list_objects_error && \ test_put_object_error && \ - test_serverside_encryption_error + test_serverside_encryption_error && \ + test_worm_bucket return $? }