From 1ebf6f146aed9b6ae0d2ae83d0d0a73414cc9c78 Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Thu, 12 Nov 2020 12:12:09 -0800 Subject: [PATCH] Add support for ILM transition (#10565) This PR adds transition support for ILM to transition data to another MinIO target represented by a storage class ARN. Subsequent GET or HEAD for that object will be streamed from the transition tier. If PostRestoreObject API is invoked, the transitioned object can be restored for duration specified to the source cluster. --- cmd/admin-bucket-handlers.go | 22 +- cmd/api-errors.go | 6 + cmd/api-router.go | 3 + cmd/bucket-lifecycle-handlers.go | 6 + cmd/bucket-lifecycle.go | 563 ++++++++++++++++++++++ cmd/bucket-metadata-sys.go | 3 - cmd/bucket-targets.go | 69 +++ cmd/data-crawler.go | 68 ++- cmd/erasure-decode.go | 2 + cmd/erasure-metadata.go | 8 + cmd/erasure-object.go | 18 +- cmd/generic-handlers.go | 1 - cmd/http/headers.go | 6 + cmd/object-api-datatypes.go | 9 + cmd/object-api-interface.go | 15 +- cmd/object-api-options.go | 1 + cmd/object-api-utils.go | 6 +- cmd/object-handlers.go | 185 +++++++ cmd/server-main.go | 3 + cmd/storage-datatypes.go | 4 + cmd/storage-datatypes_gen.go | 34 +- cmd/xl-storage-format-v1.go | 12 +- cmd/xl-storage-format-v2.go | 13 + cmd/xl-storage.go | 18 +- docs/bucket/lifecycle/README.md | 27 +- pkg/bucket/lifecycle/lifecycle.go | 95 +++- pkg/bucket/lifecycle/lifecycle_test.go | 2 +- pkg/bucket/lifecycle/noncurrentversion.go | 16 +- pkg/bucket/lifecycle/rule.go | 7 + pkg/bucket/lifecycle/rule_test.go | 33 -- pkg/bucket/lifecycle/transition.go | 144 +++++- pkg/bucket/policy/action.go | 6 + pkg/event/name.go | 19 + pkg/madmin/remote-target-commands.go | 6 +- 34 files changed, 1295 insertions(+), 135 deletions(-) diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index a9a4fe7be..1d7117193 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -129,16 +129,17 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http. vars := mux.Vars(r) bucket := vars["bucket"] + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + // Get current object layer instance. objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.SetBucketTargetAction) if objectAPI == nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) return } - if !globalIsErasure { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } // Check if bucket exists. if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { @@ -218,7 +219,10 @@ func (a adminAPIHandlers) ListRemoteTargetsHandler(w http.ResponseWriter, r *htt vars := mux.Vars(r) bucket := vars["bucket"] arnType := vars["type"] - + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } // Get current object layer instance. objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketTargetAction) if objectAPI == nil { @@ -255,16 +259,16 @@ func (a adminAPIHandlers) RemoveRemoteTargetHandler(w http.ResponseWriter, r *ht bucket := vars["bucket"] arn := vars["arn"] + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } // Get current object layer instance. objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.SetBucketTargetAction) if objectAPI == nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) return } - if !globalIsErasure { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } // Check if bucket exists. if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 6839aace8..9914c6e64 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -121,6 +121,7 @@ const ( ErrReplicationNeedsVersioningError ErrReplicationBucketNeedsVersioningError ErrBucketReplicationDisabledError + ErrObjectRestoreAlreadyInProgress ErrNoSuchKey ErrNoSuchUpload ErrNoSuchVersion @@ -916,6 +917,11 @@ var errorCodes = errorCodeMap{ Description: "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied", HTTPStatusCode: http.StatusBadRequest, }, + ErrObjectRestoreAlreadyInProgress: { + Code: "RestoreAlreadyInProgress", + Description: "Object restore is already in progress", + HTTPStatusCode: http.StatusConflict, + }, /// Bucket notification related errors. ErrEventNotification: { Code: "InvalidArgument", diff --git a/cmd/api-router.go b/cmd/api-router.go index 2af41df4b..f21e2789f 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -307,6 +307,9 @@ func registerAPIRouter(router *mux.Router) { // DeleteBucket bucket.Methods(http.MethodDelete).HandlerFunc( maxClients(collectAPIStats("deletebucket", httpTraceAll(api.DeleteBucketHandler)))) + // PostRestoreObject + bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc( + maxClients(collectAPIStats("restoreobject", httpTraceAll(api.PostRestoreObjectHandler)))).Queries("restore", "") } /// Root operation diff --git a/cmd/bucket-lifecycle-handlers.go b/cmd/bucket-lifecycle-handlers.go index bd381b2cb..1d82143d9 100644 --- a/cmd/bucket-lifecycle-handlers.go +++ b/cmd/bucket-lifecycle-handlers.go @@ -78,6 +78,12 @@ func (api objectAPIHandlers) PutBucketLifecycleHandler(w http.ResponseWriter, r return } + // Validate the transition storage ARNs + if err = validateLifecycleTransition(ctx, bucket, bucketLifecycle); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + configData, err := xml.Marshal(bucketLifecycle) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 520bf04d0..72924cc32 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -17,7 +17,26 @@ package cmd import ( + "context" + "encoding/xml" + "fmt" + "io" + "net/http" + "runtime" + "strings" + "time" + + miniogo "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/tags" + "github.com/minio/minio/cmd/crypto" + xhttp "github.com/minio/minio/cmd/http" + "github.com/minio/minio/cmd/logger" + sse "github.com/minio/minio/pkg/bucket/encryption" "github.com/minio/minio/pkg/bucket/lifecycle" + "github.com/minio/minio/pkg/event" + "github.com/minio/minio/pkg/hash" + "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/pkg/s3select" ) const ( @@ -46,3 +65,547 @@ func (sys *LifecycleSys) Get(bucketName string) (lc *lifecycle.Lifecycle, err er func NewLifecycleSys() *LifecycleSys { return &LifecycleSys{} } + +type transitionState struct { + // add future metrics here + transitionCh chan ObjectInfo +} + +func (t *transitionState) queueTransitionTask(oi ObjectInfo) { + select { + case t.transitionCh <- oi: + default: + } +} + +var ( + globalTransitionState *transitionState + globalTransitionConcurrent = runtime.GOMAXPROCS(0) / 2 +) + +func newTransitionState() *transitionState { + + // fix minimum concurrent transition to 1 for single CPU setup + if globalTransitionConcurrent == 0 { + globalTransitionConcurrent = 1 + } + ts := &transitionState{ + transitionCh: make(chan ObjectInfo, 10000), + } + go func() { + <-GlobalContext.Done() + close(ts.transitionCh) + }() + return ts +} + +// addWorker creates a new worker to process tasks +func (t *transitionState) addWorker(ctx context.Context, objectAPI ObjectLayer) { + // Add a new worker. + go func() { + for { + select { + case <-ctx.Done(): + return + case oi, ok := <-t.transitionCh: + if !ok { + return + } + transitionObject(ctx, objectAPI, oi) + } + } + }() +} + +func initBackgroundTransition(ctx context.Context, objectAPI ObjectLayer) { + if globalTransitionState == nil { + return + } + + // Start with globalTransitionConcurrent. + for i := 0; i < globalTransitionConcurrent; i++ { + globalTransitionState.addWorker(ctx, objectAPI) + } +} + +func validateLifecycleTransition(ctx context.Context, bucket string, lfc *lifecycle.Lifecycle) error { + for _, rule := range lfc.Rules { + if rule.Transition.StorageClass != "" { + sameTarget, destbucket, err := validateTransitionDestination(ctx, bucket, rule.Transition.StorageClass) + if err != nil { + return err + } + if sameTarget && destbucket == bucket { + return fmt.Errorf("Transition destination cannot be the same as the source bucket") + } + } + } + return nil +} + +// validateTransitionDestination returns error if transition destination bucket missing or not configured +// It also returns true if transition destination is same as this server. +func validateTransitionDestination(ctx context.Context, bucket string, targetLabel string) (bool, string, error) { + tgt := globalBucketTargetSys.GetRemoteTargetWithLabel(ctx, bucket, targetLabel) + if tgt == nil { + return false, "", BucketRemoteTargetNotFound{Bucket: bucket} + } + arn, err := madmin.ParseARN(tgt.Arn) + if err != nil { + return false, "", BucketRemoteTargetNotFound{Bucket: bucket} + } + clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgt.Arn) + if clnt == nil { + return false, "", BucketRemoteTargetNotFound{Bucket: bucket} + } + if found, _ := clnt.BucketExists(ctx, arn.Bucket); !found { + return false, "", BucketRemoteDestinationNotFound{Bucket: arn.Bucket} + } + sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort) + return sameTarget, arn.Bucket, nil +} + +// return true if ARN representing transition storage class is present in a active rule +// for the lifecycle configured on this bucket +func transitionSCInUse(ctx context.Context, lfc *lifecycle.Lifecycle, bucket, arnStr string) bool { + tgtLabel := globalBucketTargetSys.GetRemoteLabelWithArn(ctx, bucket, arnStr) + if tgtLabel == "" { + return false + } + for _, rule := range lfc.Rules { + if rule.Status == Disabled { + continue + } + if rule.Transition.StorageClass != "" && rule.Transition.StorageClass == tgtLabel { + return true + } + } + return false +} + +// set PutObjectOptions for PUT operation to transition data to target cluster +func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { + meta := make(map[string]string) + + tag, err := tags.ParseObjectTags(objInfo.UserTags) + if err != nil { + return + } + putOpts = miniogo.PutObjectOptions{ + UserMetadata: meta, + UserTags: tag.ToMap(), + ContentType: objInfo.ContentType, + ContentEncoding: objInfo.ContentEncoding, + StorageClass: objInfo.StorageClass, + Internal: miniogo.AdvancedPutOptions{ + SourceVersionID: objInfo.VersionID, + SourceMTime: objInfo.ModTime, + SourceETag: objInfo.ETag, + }, + } + if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok { + rmode := miniogo.RetentionMode(mode) + putOpts.Mode = rmode + } + if retainDateStr, ok := objInfo.UserDefined[xhttp.AmzObjectLockRetainUntilDate]; ok { + rdate, err := time.Parse(time.RFC3339, retainDateStr) + if err != nil { + return + } + putOpts.RetainUntilDate = rdate + } + if lhold, ok := objInfo.UserDefined[xhttp.AmzObjectLockLegalHold]; ok { + putOpts.LegalHold = miniogo.LegalHoldStatus(lhold) + } + + return +} + +// handle deletes of transitioned objects or object versions when one of the following is true: +// 1. temporarily restored copies of objects (restored with the PostRestoreObject API) expired. +// 2. life cycle expiry date is met on the object. +func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, objInfo ObjectInfo, lcOpts lifecycle.ObjectOpts, action lifecycle.Action) error { + if objInfo.TransitionStatus == "" { + return nil + } + lc, err := globalLifecycleSys.Get(bucket) + if err != nil { + return err + } + arn := getLifecycleTransitionTargetArn(ctx, lc, bucket, lcOpts) + if arn == nil { + return fmt.Errorf("remote target not configured") + } + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arn.String()) + if tgt == nil { + return fmt.Errorf("remote target not configured") + } + + var opts ObjectOptions + opts.Versioned = globalBucketVersioningSys.Enabled(bucket) + opts.VersionID = objInfo.VersionID + switch action { + case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: + // delete locally restored copy of object or object version + // from the source, while leaving metadata behind. The data on + // transitioned tier lies untouched and still accessible + opts.TransitionStatus = objInfo.TransitionStatus + _, err = objectAPI.DeleteObject(ctx, bucket, object, opts) + return err + case lifecycle.DeleteAction, lifecycle.DeleteVersionAction: + // When an object is past expiry, delete the data from transitioned tier and + // metadata from source + if err := tgt.RemoveObject(ctx, arn.Bucket, object, miniogo.RemoveObjectOptions{VersionID: objInfo.VersionID}); err != nil { + logger.LogIf(ctx, err) + } + _, err = objectAPI.DeleteObject(ctx, bucket, object, opts) + if err != nil { + return err + } + eventName := event.ObjectRemovedDelete + if objInfo.DeleteMarker { + eventName = event.ObjectRemovedDeleteMarkerCreated + } + // Notify object deleted event. + sendEvent(eventArgs{ + EventName: eventName, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [ILM-EXPIRY]", + }) + } + + // should never reach here + return nil +} + +// transition object to target specified by the transition ARN. When an object is transitioned to another +// storage specified by the transition ARN, the metadata is left behind on source cluster and original content +// is moved to the transition tier. Note that in the case of encrypted objects, entire encrypted stream is moved +// to the transition tier without decrypting or re-encrypting. +func transitionObject(ctx context.Context, objectAPI ObjectLayer, objInfo ObjectInfo) error { + lc, err := globalLifecycleSys.Get(objInfo.Bucket) + if err != nil { + return err + } + lcOpts := lifecycle.ObjectOpts{ + Name: objInfo.Name, + UserTags: objInfo.UserTags, + } + arn := getLifecycleTransitionTargetArn(ctx, lc, objInfo.Bucket, lcOpts) + if arn == nil { + return fmt.Errorf("remote target not configured") + } + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arn.String()) + if tgt == nil { + return fmt.Errorf("remote target not configured") + } + + gr, err := objectAPI.GetObjectNInfo(ctx, objInfo.Bucket, objInfo.Name, nil, http.Header{}, readLock, ObjectOptions{ + VersionID: objInfo.VersionID, + TransitionStatus: lifecycle.TransitionPending, + }) + if err != nil { + return err + } + oi := gr.ObjInfo + + if oi.TransitionStatus == lifecycle.TransitionComplete { + return nil + } + + putOpts := putTransitionOpts(oi) + if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, "", "", putOpts); err != nil { + return err + } + gr.Close() + + var opts ObjectOptions + opts.Versioned = globalBucketVersioningSys.Enabled(oi.Bucket) + opts.VersionID = oi.VersionID + opts.TransitionStatus = lifecycle.TransitionComplete + + if _, err = objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil { + return err + } + return nil +} + +// getLifecycleTransitionTargetArn returns transition ARN for storage class specified in the config. +func getLifecycleTransitionTargetArn(ctx context.Context, lc *lifecycle.Lifecycle, bucket string, obj lifecycle.ObjectOpts) *madmin.ARN { + for _, rule := range lc.FilterActionableRules(obj) { + if rule.Transition.StorageClass != "" { + return globalBucketTargetSys.GetRemoteArnWithLabel(ctx, bucket, rule.Transition.StorageClass) + } + } + return nil +} + +// getTransitionedObjectReader returns a reader from the transitioned tier. +func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, oi ObjectInfo, opts ObjectOptions) (gr *GetObjectReader, err error) { + var lc *lifecycle.Lifecycle + lc, err = globalLifecycleSys.Get(bucket) + if err != nil { + return nil, err + } + + arn := getLifecycleTransitionTargetArn(ctx, lc, bucket, lifecycle.ObjectOpts{ + Name: object, + UserTags: oi.UserTags, + ModTime: oi.ModTime, + VersionID: oi.VersionID, + DeleteMarker: oi.DeleteMarker, + IsLatest: oi.IsLatest, + }) + if arn == nil { + return nil, fmt.Errorf("remote target not configured") + } + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arn.String()) + if tgt == nil { + return nil, fmt.Errorf("remote target not configured") + } + fn, off, length, err := NewGetObjectReader(rs, oi, opts) + if err != nil { + return nil, ErrorRespToObjectError(err, bucket, object) + } + gopts := miniogo.GetObjectOptions{VersionID: opts.VersionID} + + // get correct offsets for encrypted object + if off >= 0 && length >= 0 { + if err := gopts.SetRange(off, off+length-1); err != nil { + return nil, ErrorRespToObjectError(err, bucket, object) + } + } + + reader, _, _, err := tgt.GetObject(ctx, arn.Bucket, object, gopts) + if err != nil { + return nil, err + } + closeReader := func() { reader.Close() } + + return fn(reader, h, opts.CheckPrecondFn, closeReader) +} + +// RestoreRequestType represents type of restore. +type RestoreRequestType string + +const ( + // SelectRestoreRequest specifies select request. This is the only valid value + SelectRestoreRequest RestoreRequestType = "SELECT" +) + +// Encryption specifies encryption setting on restored bucket +type Encryption struct { + EncryptionType sse.SSEAlgorithm `xml:"EncryptionType"` + KMSContext string `xml:"KMSContext,omitempty"` + KMSKeyID string `xml:"KMSKeyId,omitempty"` +} + +// MetadataEntry denotes name and value. +type MetadataEntry struct { + Name string `xml:"Name"` + Value string `xml:"Value"` +} + +// S3Location specifies s3 location that receives result of a restore object request +type S3Location struct { + BucketName string `xml:"BucketName,omitempty"` + Encryption Encryption `xml:"Encryption,omitempty"` + Prefix string `xml:"Prefix,omitempty"` + StorageClass string `xml:"StorageClass,omitempty"` + Tagging *tags.Tags `xml:"Tagging,omitempty"` + UserMetadata []MetadataEntry `xml:"UserMetadata"` +} + +// OutputLocation specifies bucket where object needs to be restored +type OutputLocation struct { + S3 S3Location `xml:"S3,omitempty"` +} + +// IsEmpty returns true if output location not specified. +func (o *OutputLocation) IsEmpty() bool { + return o.S3.BucketName == "" +} + +// SelectParameters specifies sql select parameters +type SelectParameters struct { + s3select.S3Select +} + +// IsEmpty returns true if no select parameters set +func (sp *SelectParameters) IsEmpty() bool { + return sp == nil || sp.S3Select == s3select.S3Select{} +} + +var ( + selectParamsXMLName = "SelectParameters" +) + +// UnmarshalXML - decodes XML data. +func (sp *SelectParameters) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { + // Essentially the same as S3Select barring the xml name. + if start.Name.Local == selectParamsXMLName { + start.Name = xml.Name{Space: "", Local: "SelectRequest"} + } + return sp.S3Select.UnmarshalXML(d, start) +} + +// RestoreObjectRequest - xml to restore a transitioned object +type RestoreObjectRequest struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ RestoreRequest" json:"-"` + Days int `xml:"Days,omitempty"` + Type RestoreRequestType `xml:"Type,omitempty"` + Tier string `xml:"Tier,-"` + Description string `xml:"Description,omitempty"` + SelectParameters *SelectParameters `xml:"SelectParameters,omitempty"` + OutputLocation OutputLocation `xml:"OutputLocation,omitempty"` +} + +// Maximum 2MiB size per restore object request. +const maxRestoreObjectRequestSize = 2 << 20 + +// parseRestoreRequest parses RestoreObjectRequest from xml +func parseRestoreRequest(reader io.Reader) (*RestoreObjectRequest, error) { + req := RestoreObjectRequest{} + if err := xml.NewDecoder(io.LimitReader(reader, maxRestoreObjectRequestSize)).Decode(&req); err != nil { + return nil, err + } + return &req, nil +} + +// validate a RestoreObjectRequest as per AWS S3 spec https://docs.aws.amazon.com/AmazonS3/latest/API/API_RestoreObject.html +func (r *RestoreObjectRequest) validate(ctx context.Context, objAPI ObjectLayer) error { + if r.Type != SelectRestoreRequest && !r.SelectParameters.IsEmpty() { + return fmt.Errorf("Select parameters can only be specified with SELECT request type") + } + if r.Type == SelectRestoreRequest && r.SelectParameters.IsEmpty() { + return fmt.Errorf("SELECT restore request requires select parameters to be specified") + } + + if r.Type != SelectRestoreRequest && !r.OutputLocation.IsEmpty() { + return fmt.Errorf("OutputLocation required only for SELECT request type") + } + if r.Type == SelectRestoreRequest && r.OutputLocation.IsEmpty() { + return fmt.Errorf("OutputLocation required for SELECT requests") + } + + if r.Days != 0 && r.Type == SelectRestoreRequest { + return fmt.Errorf("Days cannot be specified with SELECT restore request") + } + if r.Days == 0 && r.Type != SelectRestoreRequest { + return fmt.Errorf("restoration days should be at least 1") + } + // Check if bucket exists. + if !r.OutputLocation.IsEmpty() { + if _, err := objAPI.GetBucketInfo(ctx, r.OutputLocation.S3.BucketName); err != nil { + return err + } + if r.OutputLocation.S3.Prefix == "" { + return fmt.Errorf("Prefix is a required parameter in OutputLocation") + } + if r.OutputLocation.S3.Encryption.EncryptionType != crypto.SSEAlgorithmAES256 { + return NotImplemented{} + } + } + return nil +} + +// set ObjectOptions for PUT call to restore temporary copy of transitioned data +func putRestoreOpts(bucket, object string, rreq *RestoreObjectRequest, objInfo ObjectInfo) (putOpts ObjectOptions) { + meta := make(map[string]string) + sc := rreq.OutputLocation.S3.StorageClass + if sc == "" { + sc = objInfo.StorageClass + } + meta[strings.ToLower(xhttp.AmzStorageClass)] = sc + + if rreq.Type == SelectRestoreRequest { + for _, v := range rreq.OutputLocation.S3.UserMetadata { + if !strings.HasPrefix("x-amz-meta", strings.ToLower(v.Name)) { + meta["x-amz-meta-"+v.Name] = v.Value + continue + } + meta[v.Name] = v.Value + } + meta[xhttp.AmzObjectTagging] = rreq.OutputLocation.S3.Tagging.String() + if rreq.OutputLocation.S3.Encryption.EncryptionType != "" { + meta[crypto.SSEHeader] = crypto.SSEAlgorithmAES256 + } + return ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(bucket), + VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + UserDefined: meta, + } + } + for k, v := range objInfo.UserDefined { + meta[k] = v + } + meta[xhttp.AmzObjectTagging] = objInfo.UserTags + + return ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(bucket), + VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + UserDefined: meta, + VersionID: objInfo.VersionID, + MTime: objInfo.ModTime, + Expires: objInfo.Expires, + } +} + +var ( + errRestoreHDRMissing = fmt.Errorf("x-amz-restore header not found") + errRestoreHDRMalformed = fmt.Errorf("x-amz-restore header malformed") +) + +// parse x-amz-restore header from user metadata to get the status of ongoing request and expiry of restoration +// if any. This header value is of format: ongoing-request=true|false, expires=time +func parseRestoreHeaderFromMeta(meta map[string]string) (ongoing bool, expiry time.Time, err error) { + restoreHdr, ok := meta[xhttp.AmzRestore] + if !ok { + return ongoing, expiry, errRestoreHDRMissing + } + rslc := strings.SplitN(restoreHdr, ",", 2) + if len(rslc) != 2 { + return ongoing, expiry, errRestoreHDRMalformed + } + rstatusSlc := strings.SplitN(rslc[0], "=", 2) + if len(rstatusSlc) != 2 { + return ongoing, expiry, errRestoreHDRMalformed + } + rExpSlc := strings.SplitN(rslc[1], "=", 2) + if len(rExpSlc) != 2 { + return ongoing, expiry, errRestoreHDRMalformed + } + + expiry, err = time.Parse(http.TimeFormat, rExpSlc[1]) + if err != nil { + return + } + return rstatusSlc[1] == "true", expiry, nil +} + +// restoreTransitionedObject is similar to PostObjectRestore from AWS GLACIER +// storage class. When PostObjectRestore API is called, a temporary copy of the object +// is restored locally to the bucket on source cluster until the restore expiry date. +// The copy that was transitioned continues to reside in the transitioned tier. +func restoreTransitionedObject(ctx context.Context, bucket, object string, objAPI ObjectLayer, objInfo ObjectInfo, rreq *RestoreObjectRequest, restoreExpiry time.Time) error { + var rs *HTTPRangeSpec + gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, objInfo, ObjectOptions{ + VersionID: objInfo.VersionID}) + if err != nil { + return err + } + defer gr.Close() + hashReader, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size, globalCLIContext.StrictS3Compat) + if err != nil { + return err + } + pReader := NewPutObjReader(hashReader, nil, nil) + opts := putRestoreOpts(bucket, object, rreq, objInfo) + opts.UserDefined[xhttp.AmzRestore] = fmt.Sprintf("ongoing-request=%t, expiry-date=%s", false, restoreExpiry.Format(http.TimeFormat)) + if _, err := objAPI.PutObject(ctx, bucket, object, pReader, opts); err != nil { + return err + } + + return nil +} diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index ac84423f3..e9f646f19 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -168,9 +168,6 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat } meta.ReplicationConfigXML = configData case bucketTargetsFile: - if !globalIsErasure && !globalIsDistErasure { - return NotImplemented{} - } meta.BucketTargetsConfigJSON = configData default: return fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile) diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 32aad0ebd..7d274e80a 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -19,6 +19,7 @@ package cmd import ( "context" "net/http" + "strings" "sync" "time" @@ -88,6 +89,9 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket} } if tgt.Type == madmin.ReplicationService { + if !globalIsErasure { + return NotImplemented{} + } if !globalBucketVersioningSys.Enabled(bucket) { return BucketReplicationSourceNotVersioned{Bucket: bucket} } @@ -102,6 +106,20 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket} } } + if tgt.Type == madmin.ILMService { + if globalBucketVersioningSys.Enabled(bucket) { + vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket) + if err != nil { + if minio.ToErrorResponse(err).Code == "NoSuchBucket" { + return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket} + } + return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket} + } + if vcfg.Status != string(versioning.Enabled) { + return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket} + } + } + } sys.Lock() defer sys.Unlock() @@ -147,6 +165,9 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str return BucketRemoteArnInvalid{Bucket: bucket} } if arn.Type == madmin.ReplicationService { + if !globalIsErasure { + return NotImplemented{} + } // reject removal of remote target if replication configuration is present rcfg, err := getReplicationConfig(ctx, bucket) if err == nil && rcfg.RoleArn == arnStr { @@ -155,6 +176,16 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str } } } + if arn.Type == madmin.ILMService { + // reject removal of remote target if lifecycle transition uses this arn + config, err := globalBucketMetadataSys.GetLifecycleConfig(bucket) + if err == nil && transitionSCInUse(ctx, config, bucket, arnStr) { + if _, ok := sys.arnRemotesMap[arnStr]; ok { + return BucketRemoteRemoveDisallowed{Bucket: bucket} + } + } + } + // delete ARN type from list of matching targets sys.Lock() defer sys.Unlock() @@ -183,6 +214,44 @@ func (sys *BucketTargetSys) GetRemoteTargetClient(ctx context.Context, arn strin return sys.arnRemotesMap[arn] } +// GetRemoteTargetWithLabel returns bucket target given a target label +func (sys *BucketTargetSys) GetRemoteTargetWithLabel(ctx context.Context, bucket, targetLabel string) *madmin.BucketTarget { + sys.RLock() + defer sys.RUnlock() + for _, t := range sys.targetsMap[bucket] { + if strings.ToUpper(t.Label) == strings.ToUpper(targetLabel) { + tgt := t.Clone() + return &tgt + } + } + return nil +} + +// GetRemoteArnWithLabel returns bucket target's ARN given its target label +func (sys *BucketTargetSys) GetRemoteArnWithLabel(ctx context.Context, bucket, tgtLabel string) *madmin.ARN { + tgt := sys.GetRemoteTargetWithLabel(ctx, bucket, tgtLabel) + if tgt == nil { + return nil + } + arn, err := madmin.ParseARN(tgt.Arn) + if err != nil { + return nil + } + return arn +} + +// GetRemoteLabelWithArn returns a bucket target's label given its ARN +func (sys *BucketTargetSys) GetRemoteLabelWithArn(ctx context.Context, bucket, arnStr string) string { + sys.RLock() + defer sys.RUnlock() + for _, t := range sys.targetsMap[bucket] { + if t.Arn == arnStr { + return t.Label + } + } + return "" +} + // NewBucketTargetSys - creates new replication system. func NewBucketTargetSys() *BucketTargetSys { return &BucketTargetSys{ diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 7c80ff50a..30f2ef2fe 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -672,12 +672,17 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action IsLatest: meta.oi.IsLatest, NumVersions: meta.numVersions, SuccessorModTime: meta.successorModTime, + RestoreOngoing: meta.oi.RestoreOngoing, + RestoreExpires: meta.oi.RestoreExpires, + TransitionStatus: meta.oi.TransitionStatus, }) if i.debug { logger.Info(color.Green("applyActions:")+" lifecycle: %q (version-id=%s), Initial scan: %v", i.objectPath(), versionID, action) } switch action { case lifecycle.DeleteAction, lifecycle.DeleteVersionAction: + case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: + case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: default: // No action. return size @@ -706,22 +711,28 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action size = obj.Size // Recalculate action. - action = i.lifeCycle.ComputeAction( - lifecycle.ObjectOpts{ - Name: i.objectPath(), - UserTags: obj.UserTags, - ModTime: obj.ModTime, - VersionID: obj.VersionID, - DeleteMarker: obj.DeleteMarker, - IsLatest: obj.IsLatest, - NumVersions: meta.numVersions, - SuccessorModTime: meta.successorModTime, - }) + lcOpts := lifecycle.ObjectOpts{ + Name: i.objectPath(), + UserTags: obj.UserTags, + ModTime: obj.ModTime, + VersionID: obj.VersionID, + DeleteMarker: obj.DeleteMarker, + IsLatest: obj.IsLatest, + NumVersions: meta.numVersions, + SuccessorModTime: meta.successorModTime, + RestoreOngoing: obj.RestoreOngoing, + RestoreExpires: obj.RestoreExpires, + TransitionStatus: obj.TransitionStatus, + } + action = i.lifeCycle.ComputeAction(lcOpts) + if i.debug { logger.Info(color.Green("applyActions:")+" lifecycle: Secondary scan: %v", action) } switch action { case lifecycle.DeleteAction, lifecycle.DeleteVersionAction: + case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: + case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: default: // No action. return size @@ -729,7 +740,7 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action opts := ObjectOptions{} switch action { - case lifecycle.DeleteVersionAction: + case lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredVersionAction: // Defensive code, should never happen if obj.VersionID == "" { return size @@ -744,15 +755,34 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action } } opts.VersionID = obj.VersionID - case lifecycle.DeleteAction: + case lifecycle.DeleteAction, lifecycle.DeleteRestoredAction: opts.Versioned = globalBucketVersioningSys.Enabled(i.bucket) + case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: + if obj.TransitionStatus == "" { + opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) + opts.VersionID = obj.VersionID + opts.TransitionStatus = lifecycle.TransitionPending + if _, err = o.DeleteObject(ctx, obj.Bucket, obj.Name, opts); err != nil { + // Assume it is still there. + logger.LogIf(ctx, err) + return size + } + } + globalTransitionState.queueTransitionTask(obj) + return 0 } - - obj, err = o.DeleteObject(ctx, i.bucket, i.objectPath(), opts) - if err != nil { - // Assume it is still there. - logger.LogIf(ctx, err) - return size + if obj.TransitionStatus != "" { + if err := deleteTransitionedObject(ctx, o, i.bucket, i.objectPath(), obj, lcOpts, action); err != nil { + logger.LogIf(ctx, err) + return size + } + } else { + obj, err = o.DeleteObject(ctx, i.bucket, i.objectPath(), opts) + if err != nil { + // Assume it is still there. + logger.LogIf(ctx, err) + return size + } } eventName := event.ObjectRemovedDelete diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 68055979f..14861cea2 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -271,10 +271,12 @@ func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.Read return healRequired, err } } + if err = e.DecodeDataBlocks(bufs); err != nil { logger.LogIf(ctx, err) return healRequired, err } + n, err := writeDataBlocks(ctx, writer, bufs, e.dataBlocks, blockOffset, blockLength) if err != nil { return healRequired, err diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 1aa9526fd..aafbc80aa 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -142,6 +142,9 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { if fi.Deleted { objInfo.ReplicationStatus = replication.StatusType(fi.DeleteMarkerReplicationStatus) } + + objInfo.TransitionStatus = fi.TransitionStatus + // etag/md5Sum has already been extracted. We need to // remove to avoid it from appearing as part of // response headers. e.g, X-Minio-* or X-Amz-*. @@ -158,6 +161,11 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { objInfo.StorageClass = globalMinioDefaultStorageClass } objInfo.VersionPurgeStatus = fi.VersionPurgeStatus + // set restore status for transitioned object + if ongoing, exp, err := parseRestoreHeaderFromMeta(fi.Metadata); err == nil { + objInfo.RestoreOngoing = ongoing + objInfo.RestoreExpires = exp + } // Success. return objInfo } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 02e621210..05872d755 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -23,11 +23,13 @@ import ( "io" "net/http" "path" + "strings" "sync" "github.com/minio/minio-go/v7/pkg/tags" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/bucket/replication" "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/sync/errgroup" @@ -168,13 +170,18 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri ObjInfo: objInfo, }, toObjectErr(errMethodNotAllowed, bucket, object) } - + if objInfo.TransitionStatus == lifecycle.TransitionComplete { + // If transitioned, stream from transition tier unless object is restored locally or restore date is past. + restoreHdr, ok := objInfo.UserDefined[xhttp.AmzRestore] + if !ok || !strings.HasPrefix(restoreHdr, "ongoing-request=false") || (!objInfo.RestoreExpires.IsZero() && time.Now().After(objInfo.RestoreExpires)) { + return getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts) + } + } unlockOnDefer = false fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker) if nErr != nil { return nil, nErr } - pr, pw := io.Pipe() go func() { err := er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks) @@ -256,8 +263,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje if err != nil { return toObjectErr(err, bucket, object) } - var healOnce sync.Once + for ; partIndex <= lastPartIndex; partIndex++ { if length == totalBytesRead { break @@ -313,12 +320,10 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje } // Track total bytes read from disk and written to the client. totalBytesRead += partLength - // partOffset will be valid only for the first part, hence reset it to 0 for // the remaining parts. partOffset = 0 } // End of read all parts loop. - // Return success. return nil } @@ -990,6 +995,8 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string fi.VersionID = opts.VersionID } } + fi.TransitionStatus = opts.TransitionStatus + // versioning suspended means we add `null` // version as delete marker // Add delete marker, since we don't have any version specified explicitly. @@ -1010,6 +1017,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string ModTime: modTime, DeleteMarkerReplicationStatus: opts.DeleteMarkerReplicationStatus, VersionPurgeStatus: opts.VersionPurgeStatus, + TransitionStatus: opts.TransitionStatus, }); err != nil { return objInfo, toObjectErr(err, bucket, object) } diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index 0a15a2ba8..9d2c6304e 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -479,7 +479,6 @@ var supportedDummyObjectAPIs = map[string][]string{ // List of not implemented object APIs var notImplementedObjectResourceNames = map[string]struct{}{ - "restore": {}, "torrent": {}, } diff --git a/cmd/http/headers.go b/cmd/http/headers.go index f6fb9592c..68a2ab52d 100644 --- a/cmd/http/headers.go +++ b/cmd/http/headers.go @@ -66,6 +66,12 @@ const ( AmzTagCount = "x-amz-tagging-count" AmzTagDirective = "X-Amz-Tagging-Directive" + // S3 transition restore + AmzRestore = "x-amz-restore" + AmzRestoreExpiryDays = "X-Amz-Restore-Expiry-Days" + AmzRestoreRequestDate = "X-Amz-Restore-Request-Date" + AmzRestoreOutputPath = "x-amz-restore-output-path" + // S3 extensions AmzCopySourceIfModifiedSince = "x-amz-copy-source-if-modified-since" AmzCopySourceIfUnmodifiedSince = "x-amz-copy-source-if-unmodified-since" diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 8d3e583ba..8916ff76a 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -163,6 +163,15 @@ type ObjectInfo struct { // to a delete marker on an object. DeleteMarker bool + // TransitionStatus indicates if transition is complete/pending + TransitionStatus string + + // RestoreExpires indicates date a restored object expires + RestoreExpires time.Time + + // RestoreOngoing indicates if a restore is in progress + RestoreOngoing bool + // A standard MIME type describing the format of the object. ContentType string diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index bb0b8a203..8b9b726df 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -36,18 +36,21 @@ type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts Objec // ObjectOptions represents object options for ObjectLayer object operations type ObjectOptions struct { - ServerSideEncryption encrypt.ServerSide - VersionSuspended bool // indicates if the bucket was previously versioned but is currently suspended. - Versioned bool // indicates if the bucket is versioned - WalkVersions bool // indicates if the we are interested in walking versions - VersionID string // Specifies the versionID which needs to be overwritten or read - MTime time.Time // Is only set in POST/PUT operations + ServerSideEncryption encrypt.ServerSide + VersionSuspended bool // indicates if the bucket was previously versioned but is currently suspended. + Versioned bool // indicates if the bucket is versioned + WalkVersions bool // indicates if the we are interested in walking versions + VersionID string // Specifies the versionID which needs to be overwritten or read + MTime time.Time // Is only set in POST/PUT operations + Expires time.Time // Is only used in POST/PUT operations + DeleteMarker bool // Is only set in DELETE operations for delete marker replication UserDefined map[string]string // only set in case of POST/PUT operations PartNumber int // only useful in case of GetObject/HeadObject CheckPrecondFn CheckPreconditionFn // only set during GetObject/HeadObject/CopyObjectPart preconditional valuation DeleteMarkerReplicationStatus string // Is only set in DELETE operations VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted. + TransitionStatus string // status of the transition } // BucketOptions represents bucket options for ObjectLayer bucket operations diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index c9fabd7c7..6a6aefbd0 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -242,6 +242,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada } metadata["etag"] = etag } + // In the case of multipart custom format, the metadata needs to be checked in addition to header to see if it // is SSE-S3 encrypted, primarily because S3 protocol does not require SSE-S3 headers in PutObjectPart calls if GlobalGatewaySSE.SSES3() && (crypto.S3.IsRequested(r.Header) || crypto.S3.IsEncrypted(metadata)) { diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 86e689d6f..c7b889aeb 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -43,6 +43,7 @@ import ( "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/ioutil" "github.com/minio/minio/pkg/trie" @@ -590,7 +591,10 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl if err != nil { return nil, 0, 0, err } - + // if object is encrypted, transition content without decrypting. + if opts.TransitionStatus == lifecycle.TransitionPending && isEncrypted { + isEncrypted = false + } var skipLen int64 // Calculate range to read (different for // e.g. encrypted/compressed objects) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index e0ce3909b..671617881 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -21,8 +21,10 @@ import ( "context" "encoding/hex" "encoding/xml" + "fmt" "io" "net/http" + "net/http/httptest" "net/url" "sort" "strconv" @@ -42,6 +44,7 @@ import ( "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" objectlock "github.com/minio/minio/pkg/bucket/object/lock" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/replication" @@ -3245,3 +3248,185 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r writeSuccessNoContent(w) } + +// RestoreObjectHandler - POST restore object handler. +// ---------- +func (api objectAPIHandlers) PostRestoreObjectHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PostRestoreObject") + defer logger.AuditLog(w, r, "PostRestoreObject", mustGetClaimsFromToken(r)) + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := url.PathUnescape(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + // Fetch object stat info. + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + // Check for auth type to return S3 compatible error. + if s3Error := checkRequestAuthType(ctx, r, policy.RestoreObjectAction, bucket, object); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) + return + } + + if r.ContentLength <= 0 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEmptyRequestBody), r.URL, guessIsBrowserReq(r)) + return + } + + objInfo, err := getObjectInfo(ctx, bucket, object, ObjectOptions{}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if objInfo.TransitionStatus != lifecycle.TransitionComplete { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL, guessIsBrowserReq(r)) + return + } + + rreq, err := parseRestoreRequest(io.LimitReader(r.Body, r.ContentLength)) + if err != nil { + apiErr := errorCodes.ToAPIErr(ErrMalformedXML) + apiErr.Description = err.Error() + writeErrorResponse(ctx, w, apiErr, r.URL, guessIsBrowserReq(r)) + return + } + // validate the request + if err := rreq.validate(ctx, objectAPI); err != nil { + apiErr := errorCodes.ToAPIErr(ErrMalformedXML) + apiErr.Description = err.Error() + writeErrorResponse(ctx, w, apiErr, r.URL, guessIsBrowserReq(r)) + return + } + statusCode := http.StatusOK + alreadyRestored := false + if err == nil { + if objInfo.RestoreOngoing && rreq.Type != SelectRestoreRequest { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrObjectRestoreAlreadyInProgress), r.URL, guessIsBrowserReq(r)) + return + } + if !objInfo.RestoreOngoing && !objInfo.RestoreExpires.IsZero() { + statusCode = http.StatusAccepted + alreadyRestored = true + } + } + // set or upgrade restore expiry + restoreExpiry := lifecycle.ExpectedExpiryTime(time.Now(), rreq.Days) + metadata := cloneMSS(objInfo.UserDefined) + + // update self with restore metadata + if rreq.Type != SelectRestoreRequest { + objInfo.metadataOnly = true // Perform only metadata updates. + ongoingReq := true + if alreadyRestored { + ongoingReq = false + } + metadata[xhttp.AmzRestoreExpiryDays] = strconv.Itoa(rreq.Days) + metadata[xhttp.AmzRestoreRequestDate] = time.Now().UTC().Format(http.TimeFormat) + if alreadyRestored { + metadata[xhttp.AmzRestore] = fmt.Sprintf("ongoing-request=%t, expiry-date=%s", ongoingReq, restoreExpiry.Format(http.TimeFormat)) + } else { + metadata[xhttp.AmzRestore] = fmt.Sprintf("ongoing-request=%t", ongoingReq) + } + objInfo.UserDefined = metadata + if _, err := objectAPI.CopyObject(GlobalContext, bucket, object, bucket, object, objInfo, ObjectOptions{ + VersionID: objInfo.VersionID, + }, ObjectOptions{ + VersionID: objInfo.VersionID, + }); err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s: %s", objInfo.VersionID, err)) + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL, guessIsBrowserReq(r)) + return + } + // for previously restored object, just update the restore expiry + if alreadyRestored { + return + } + } + + restoreObject := mustGetUUID() + if rreq.OutputLocation.S3.BucketName != "" { + w.Header()[xhttp.AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restoreObject)} + } + w.WriteHeader(statusCode) + // Notify object restore started via a POST request. + sendEvent(eventArgs{ + EventName: event.ObjectRestorePostInitiated, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + // now process the restore in background + go func() { + rctx := GlobalContext + if !rreq.SelectParameters.IsEmpty() { + getObject := func(offset, length int64) (rc io.ReadCloser, err error) { + isSuffixLength := false + if offset < 0 { + isSuffixLength = true + } + + rs := &HTTPRangeSpec{ + IsSuffixLength: isSuffixLength, + Start: offset, + End: offset + length, + } + + return getTransitionedObjectReader(rctx, bucket, object, rs, r.Header, objInfo, ObjectOptions{ + VersionID: objInfo.VersionID, + }) + } + if err = rreq.SelectParameters.Open(getObject); err != nil { + if serr, ok := err.(s3select.SelectError); ok { + encodedErrorResponse := encodeResponse(APIErrorResponse{ + Code: serr.ErrorCode(), + Message: serr.ErrorMessage(), + BucketName: bucket, + Key: object, + Resource: r.URL.Path, + RequestID: w.Header().Get(xhttp.AmzRequestID), + HostID: globalDeploymentID, + }) + writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML) + } else { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + } + return + } + nr := httptest.NewRecorder() + rw := logger.NewResponseWriter(nr) + rw.LogErrBody = true + rw.LogAllBody = true + rreq.SelectParameters.Evaluate(rw) + rreq.SelectParameters.Close() + return + } + if err := restoreTransitionedObject(rctx, bucket, object, objectAPI, objInfo, rreq, restoreExpiry); err != nil { + return + } + + // Notify object restore completed via a POST request. + sendEvent(eventArgs{ + EventName: event.ObjectRestorePostCompleted, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + }() +} diff --git a/cmd/server-main.go b/cmd/server-main.go index 33a73d929..1365a6db5 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -425,6 +425,8 @@ func serverMain(ctx *cli.Context) { globalAllHealState = newHealState() globalBackgroundHealState = newHealState() globalReplicationState = newReplicationState() + globalTransitionState = newTransitionState() + } // Configure server. @@ -479,6 +481,7 @@ func serverMain(ctx *cli.Context) { if globalIsErasure { initAutoHeal(GlobalContext, newObject) initBackgroundReplication(GlobalContext, newObject) + initBackgroundTransition(GlobalContext, newObject) } if err = initServer(GlobalContext, newObject); err != nil { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index b75f0d71f..3890264cf 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -112,6 +112,10 @@ type FileInfo struct { // a deleted marker for a versioned bucket. Deleted bool + // TransitionStatus is set to Pending/Complete for transitioned + // entries based on state of transition + TransitionStatus string + // DataDir of the file DataDir string diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 4dfaa7771..1f076dbbc 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -342,8 +342,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 16 { - err = msgp.ArrayError{Wanted: 16, Got: zb0001} + if zb0001 != 17 { + err = msgp.ArrayError{Wanted: 17, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -371,6 +371,11 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Deleted") return } + z.TransitionStatus, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TransitionStatus") + return + } z.DataDir, err = dc.ReadString() if err != nil { err = msgp.WrapError(err, "DataDir") @@ -472,8 +477,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 16 - err = en.Append(0xdc, 0x0, 0x10) + // array header, size 17 + err = en.Append(0xdc, 0x0, 0x11) if err != nil { return } @@ -502,6 +507,11 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Deleted") return } + err = en.WriteString(z.TransitionStatus) + if err != nil { + err = msgp.WrapError(err, "TransitionStatus") + return + } err = en.WriteString(z.DataDir) if err != nil { err = msgp.WrapError(err, "DataDir") @@ -582,13 +592,14 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 16 - o = append(o, 0xdc, 0x0, 0x10) + // array header, size 17 + o = append(o, 0xdc, 0x0, 0x11) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.VersionID) o = msgp.AppendBool(o, z.IsLatest) o = msgp.AppendBool(o, z.Deleted) + o = msgp.AppendString(o, z.TransitionStatus) o = msgp.AppendString(o, z.DataDir) o = msgp.AppendBool(o, z.XLV1) o = msgp.AppendTime(o, z.ModTime) @@ -626,8 +637,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 16 { - err = msgp.ArrayError{Wanted: 16, Got: zb0001} + if zb0001 != 17 { + err = msgp.ArrayError{Wanted: 17, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -655,6 +666,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Deleted") return } + z.TransitionStatus, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TransitionStatus") + return + } z.DataDir, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err, "DataDir") @@ -757,7 +773,7 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *FileInfo) Msgsize() (s int) { - s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize + s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.TransitionStatus) + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize if z.Metadata != nil { for za0001, za0002 := range z.Metadata { _ = za0002 diff --git a/cmd/xl-storage-format-v1.go b/cmd/xl-storage-format-v1.go index af5ca731b..c0128c3c5 100644 --- a/cmd/xl-storage-format-v1.go +++ b/cmd/xl-storage-format-v1.go @@ -182,7 +182,11 @@ func (m *xlMetaV1Object) ToFileInfo(volume, path string) (FileInfo, error) { if !m.valid() { return FileInfo{}, errFileCorrupt } - return FileInfo{ + var transitionStatus string + if st, ok := m.Meta[ReservedMetadataPrefixLower+"transition-status"]; ok { + transitionStatus = st + } + fi := FileInfo{ Volume: volume, Name: path, ModTime: m.Stat.ModTime, @@ -192,7 +196,11 @@ func (m *xlMetaV1Object) ToFileInfo(volume, path string) (FileInfo, error) { Erasure: m.Erasure, VersionID: m.VersionID, DataDir: m.DataDir, - }, nil + } + if transitionStatus != "" { + fi.TransitionStatus = transitionStatus + } + return fi, nil } // XL metadata constants. diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index f832feae6..0d51ad70b 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -425,6 +425,10 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) { fi.Erasure.Distribution[i] = int(j.ErasureDist[i]) } fi.DataDir = uuid.UUID(j.DataDir).String() + + if st, ok := j.MetaSys[ReservedMetadataPrefixLower+"transition-status"]; ok { + fi.TransitionStatus = string(st) + } return fi, nil } @@ -490,6 +494,11 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { switch version.Type { case LegacyType: if version.ObjectV1.VersionID == fi.VersionID { + if fi.TransitionStatus != "" { + z.Versions[i].ObjectV1.Meta[ReservedMetadataPrefixLower+"transition-status"] = fi.TransitionStatus + return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil + } + z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) if fi.Deleted { z.Versions = append(z.Versions, ventry) @@ -543,6 +552,10 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { switch version.Type { case ObjectType: if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) { + if fi.TransitionStatus != "" { + z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+"transition-status"] = []byte(fi.TransitionStatus) + return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil + } z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) if findDataDir(version.ObjectV2.DataDir, z.Versions) > 0 { if fi.Deleted { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index e98666f0a..5bdb317f4 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -44,6 +44,7 @@ import ( "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/env" xioutil "github.com/minio/minio/pkg/ioutil" @@ -1008,8 +1009,10 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } - // when data-dir is specified. - if dataDir != "" { + // when data-dir is specified. Transition leverages existing DeleteObject + // api call to mark object as deleted.When object is pending transition, + // just update the metadata and avoid deleting data dir. + if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending { filePath := pathJoin(volumeDir, path, dataDir) if err = checkPathLength(filePath); err != nil { return err @@ -1019,8 +1022,9 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } } - - if !lastVersion { + // transitioned objects maintains metadata on the source cluster. When transition + // status is set, update the metadata to disk. + if !lastVersion || fi.TransitionStatus != "" { return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) } @@ -1164,7 +1168,11 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str if err != nil { return fi, err } - + // skip checking data dir when object is transitioned - after transition, data dir will + // be empty with just the metadata left behind. + if fi.TransitionStatus != "" { + checkDataDir = false + } if fi.DataDir != "" && checkDataDir { if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil { if err == errVolumeNotFound { diff --git a/docs/bucket/lifecycle/README.md b/docs/bucket/lifecycle/README.md index 499024848..8e1894837 100644 --- a/docs/bucket/lifecycle/README.md +++ b/docs/bucket/lifecycle/README.md @@ -46,7 +46,7 @@ Lifecycle configuration imported successfully to `play/testbucket`. - List the current settings ``` -$ mc ilm list play/testbucket +$ mc ilm ls play/testbucket ID | Prefix | Enabled | Expiry | Date/Days | Transition | Date/Days | Storage-Class | Tags ------------|----------|------------|--------|--------------|--------------|------------------|------------------|------------------ OldPictures | old/ | ✓ | ✓ | 1 Jan 2020 | ✗ | | | @@ -100,7 +100,32 @@ When an object has only one version as a delete marker, the latter can be automa } ``` +## 4. Enable ILM transition feature +In Erasure mode, MinIO supports transitioning of older objects to a different cluster by setting up transition rules in the bucket lifecycle configuration. This allows applications to optimize storage costs by moving less frequently accessed data to a cheaper storage without compromising accessibility of data. + +To transition objects in a bucket to a destination bucket on a different cluster, applications need to specify a transition ARN instead of storage class while setting up the ILM lifecycle rule. + +>To create a transition ARN for transitioning objects in srcbucket to a destbucket on cluster endpoint https://transition-endpoint:9000 using `mc`: + +``` +mc admin bucket remote add myminio/srcbucket https://accessKey:secretKey@transition-endpoint:9000/destbucket --service ilm --region us-east-1 --label "HDDTier" +Role ARN = 'arn:minio:ilm:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' +``` +> The access credentials on the target site needs *s3:GetBucketVersioning* permission if versioning is enabled on the `destbucket` bucket. + +Using above ARN, set up a lifecycle rule with transition: +``` + mc ilm add --expiry-days 365 --transition-days 45 --storage-class "HDDTier" myminio/srcbucket +``` + +Once transitioned, GET or HEAD on the object will stream the content from the transitioned tier. In the event that the object needs to be restored temporarily to the local cluster, the AWS [RestoreObject API](https://docs.aws.amazon.com/AmazonS3/latest/API/API_RestoreObject.html) can be utilized. + +``` +aws s3api restore-object --bucket srcbucket \ +--key object \ +--restore-request Days=3 +``` ## Explore Further - [MinIO | Golang Client API Reference](https://docs.min.io/docs/golang-client-api-reference.html#SetBucketLifecycle) diff --git a/pkg/bucket/lifecycle/lifecycle.go b/pkg/bucket/lifecycle/lifecycle.go index 16a41d87b..c9fbad14f 100644 --- a/pkg/bucket/lifecycle/lifecycle.go +++ b/pkg/bucket/lifecycle/lifecycle.go @@ -30,6 +30,13 @@ var ( errXMLNotWellFormed = Errorf("The XML you provided was not well-formed or did not validate against our published schema") ) +const ( + // TransitionComplete marks completed transition + TransitionComplete = "complete" + // TransitionPending - transition is yet to be attempted + TransitionPending = "pending" +) + // Action represents a delete action or other transition // actions that will be implemented later. type Action int @@ -39,10 +46,18 @@ type Action int const ( // NoneAction means no action required after evaluting lifecycle rules NoneAction Action = iota - // DeleteAction means the object needs to be removed after evaluting lifecycle rules + // DeleteAction means the object needs to be removed after evaluating lifecycle rules DeleteAction // DeleteVersionAction deletes a particular version DeleteVersionAction + // TransitionAction transitions a particular object after evaluating lifecycle transition rules + TransitionAction + //TransitionVersionAction transitions a particular object version after evaluating lifecycle transition rules + TransitionVersionAction + // DeleteRestoredAction means the temporarily restored object needs to be removed after evaluating lifecycle rules + DeleteRestoredAction + // DeleteRestoredVersionAction deletes a particular version that was temporarily restored + DeleteRestoredVersionAction ) // Lifecycle - Configuration for bucket lifecycle. @@ -85,13 +100,18 @@ func (lc Lifecycle) HasActiveRules(prefix string, recursive bool) bool { if rule.NoncurrentVersionTransition.NoncurrentDays > 0 { return true } - if rule.Expiration.IsNull() { + if rule.Expiration.IsNull() && rule.Transition.IsNull() { continue } - if !rule.Expiration.IsDateNull() && rule.Expiration.Date.After(time.Now()) { - continue + if !rule.Expiration.IsDateNull() && rule.Expiration.Date.Before(time.Now()) { + return true + } + if !rule.Transition.IsDateNull() && rule.Transition.Date.Before(time.Now()) { + return true + } + if !rule.Expiration.IsDaysNull() || !rule.Transition.IsDaysNull() { + return true } - return true } return false } @@ -160,15 +180,26 @@ func (lc Lifecycle) FilterActionableRules(obj ObjectOpts) []Rule { continue } // The NoncurrentVersionExpiration action requests MinIO to expire - // noncurrent versions of objects 100 days after the objects become + // noncurrent versions of objects x days after the objects become // noncurrent. if !rule.NoncurrentVersionExpiration.IsDaysNull() { rules = append(rules, rule) continue } + // The NoncurrentVersionTransition action requests MinIO to transition + // noncurrent versions of objects x days after the objects become + // noncurrent. + if !rule.NoncurrentVersionTransition.IsDaysNull() { + rules = append(rules, rule) + continue + } + if rule.Filter.TestTags(strings.Split(obj.UserTags, "&")) { rules = append(rules, rule) } + if !rule.Transition.IsNull() { + rules = append(rules, rule) + } } return rules } @@ -184,6 +215,9 @@ type ObjectOpts struct { DeleteMarker bool NumVersions int SuccessorModTime time.Time + TransitionStatus string + RestoreOngoing bool + RestoreExpires time.Time } // ComputeAction returns the action to perform by evaluating all lifecycle rules @@ -207,11 +241,20 @@ func (lc Lifecycle) ComputeAction(obj ObjectOpts) Action { if obj.VersionID != "" && !obj.IsLatest && !obj.SuccessorModTime.IsZero() { // Non current versions should be deleted if their age exceeds non current days configuration // https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html#intro-lifecycle-rules-actions - if time.Now().After(expectedExpiryTime(obj.SuccessorModTime, rule.NoncurrentVersionExpiration.NoncurrentDays)) { + if time.Now().After(ExpectedExpiryTime(obj.SuccessorModTime, int(rule.NoncurrentVersionExpiration.NoncurrentDays))) { return DeleteVersionAction } } } + if !rule.NoncurrentVersionTransition.IsDaysNull() { + if obj.VersionID != "" && !obj.IsLatest && !obj.SuccessorModTime.IsZero() && obj.TransitionStatus != TransitionComplete { + // Non current versions should be deleted if their age exceeds non current days configuration + // https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html#intro-lifecycle-rules-actions + if time.Now().After(ExpectedExpiryTime(obj.SuccessorModTime, int(rule.NoncurrentVersionTransition.NoncurrentDays))) { + return TransitionVersionAction + } + } + } // Remove the object or simply add a delete marker (once) in a versioned bucket if obj.VersionID == "" || obj.IsLatest && !obj.DeleteMarker { @@ -221,22 +264,42 @@ func (lc Lifecycle) ComputeAction(obj ObjectOpts) Action { action = DeleteAction } case !rule.Expiration.IsDaysNull(): - if time.Now().UTC().After(expectedExpiryTime(obj.ModTime, rule.Expiration.Days)) { + if time.Now().UTC().After(ExpectedExpiryTime(obj.ModTime, int(rule.Expiration.Days))) { action = DeleteAction } } + if action == NoneAction { + if obj.TransitionStatus != TransitionComplete { + switch { + case !rule.Transition.IsDateNull(): + if time.Now().UTC().After(rule.Transition.Date.Time) { + action = TransitionAction + } + case !rule.Transition.IsDaysNull(): + if time.Now().UTC().After(ExpectedExpiryTime(obj.ModTime, int(rule.Transition.Days))) { + action = TransitionAction + } + } + } + if !obj.RestoreExpires.IsZero() && time.Now().After(obj.RestoreExpires) { + if obj.VersionID != "" { + action = DeleteRestoredVersionAction + } else { + action = DeleteRestoredAction + } + } + } } } - return action } -// expectedExpiryTime calculates the expiry date/time based on a object modtime. -// The expected expiry time is always a midnight time following the the object -// modification time plus the number of expiration days. +// ExpectedExpiryTime calculates the expiry, transition or restore date/time based on a object modtime. +// The expected transition or restore time is always a midnight time following the the object +// modification time plus the number of transition/restore days. // e.g. If the object modtime is `Thu May 21 13:42:50 GMT 2020` and the object should -// expire in 1 day, then the expected expiry time is `Fri, 23 May 2020 00:00:00 GMT` -func expectedExpiryTime(modTime time.Time, days ExpirationDays) time.Time { +// transition in 1 day, then the expected transition time is `Fri, 23 May 2020 00:00:00 GMT` +func ExpectedExpiryTime(modTime time.Time, days int) time.Time { t := modTime.UTC().Add(time.Duration(days+1) * 24 * time.Hour) return t.Truncate(24 * time.Hour) } @@ -256,7 +319,7 @@ func (lc Lifecycle) PredictExpiryTime(obj ObjectOpts) (string, time.Time) { // expiration date and its associated rule ID. for _, rule := range lc.FilterActionableRules(obj) { if !rule.NoncurrentVersionExpiration.IsDaysNull() && !obj.IsLatest && obj.VersionID != "" { - return rule.ID, expectedExpiryTime(time.Now(), ExpirationDays(rule.NoncurrentVersionExpiration.NoncurrentDays)) + return rule.ID, ExpectedExpiryTime(time.Now(), int(rule.NoncurrentVersionExpiration.NoncurrentDays)) } if !rule.Expiration.IsDateNull() { @@ -266,7 +329,7 @@ func (lc Lifecycle) PredictExpiryTime(obj ObjectOpts) (string, time.Time) { } } if !rule.Expiration.IsDaysNull() { - expectedExpiry := expectedExpiryTime(obj.ModTime, rule.Expiration.Days) + expectedExpiry := ExpectedExpiryTime(obj.ModTime, int(rule.Expiration.Days)) if finalExpiryDate.IsZero() || finalExpiryDate.After(expectedExpiry) { finalExpiryRuleID = rule.ID finalExpiryDate = expectedExpiry diff --git a/pkg/bucket/lifecycle/lifecycle_test.go b/pkg/bucket/lifecycle/lifecycle_test.go index 836252561..ad25b8050 100644 --- a/pkg/bucket/lifecycle/lifecycle_test.go +++ b/pkg/bucket/lifecycle/lifecycle_test.go @@ -240,7 +240,7 @@ func TestExpectedExpiryTime(t *testing.T) { for i, tc := range testCases { t.Run(fmt.Sprintf("Test %d", i+1), func(t *testing.T) { - got := expectedExpiryTime(tc.modTime, tc.days) + got := ExpectedExpiryTime(tc.modTime, int(tc.days)) if !got.Equal(tc.expected) { t.Fatalf("Expected %v to be equal to %v", got, tc.expected) } diff --git a/pkg/bucket/lifecycle/noncurrentversion.go b/pkg/bucket/lifecycle/noncurrentversion.go index 73af35b1f..f23c6121a 100644 --- a/pkg/bucket/lifecycle/noncurrentversion.go +++ b/pkg/bucket/lifecycle/noncurrentversion.go @@ -32,10 +32,6 @@ type NoncurrentVersionTransition struct { StorageClass string `xml:"StorageClass"` } -var ( - errNoncurrentVersionTransitionUnsupported = Errorf("Specifying is not supported") -) - // MarshalXML if non-current days not set to non zero value func (n NoncurrentVersionExpiration) MarshalXML(e *xml.Encoder, start xml.StartElement) error { if n.IsDaysNull() { @@ -50,13 +46,6 @@ func (n NoncurrentVersionExpiration) IsDaysNull() bool { return n.NoncurrentDays == ExpirationDays(0) } -// UnmarshalXML is extended to indicate lack of support for -// NoncurrentVersionTransition xml tag in object lifecycle -// configuration -func (n NoncurrentVersionTransition) UnmarshalXML(d *xml.Decoder, startElement xml.StartElement) error { - return errNoncurrentVersionTransitionUnsupported -} - // MarshalXML is extended to leave out // tags func (n NoncurrentVersionTransition) MarshalXML(e *xml.Encoder, start xml.StartElement) error { @@ -65,3 +54,8 @@ func (n NoncurrentVersionTransition) MarshalXML(e *xml.Encoder, start xml.StartE } return e.EncodeElement(&n, start) } + +// IsDaysNull returns true if days field is null +func (n NoncurrentVersionTransition) IsDaysNull() bool { + return n.NoncurrentDays == ExpirationDays(0) +} diff --git a/pkg/bucket/lifecycle/rule.go b/pkg/bucket/lifecycle/rule.go index c6c523a72..66c90e490 100644 --- a/pkg/bucket/lifecycle/rule.go +++ b/pkg/bucket/lifecycle/rule.go @@ -100,6 +100,10 @@ func (r Rule) validateFilter() error { return r.Filter.Validate() } +func (r Rule) validateTransition() error { + return r.Transition.Validate() +} + // Prefix - a rule can either have prefix under or under // . This method returns the prefix from the // location where it is available @@ -147,5 +151,8 @@ func (r Rule) Validate() error { if err := r.validateFilter(); err != nil { return err } + if err := r.validateTransition(); err != nil { + return err + } return nil } diff --git a/pkg/bucket/lifecycle/rule_test.go b/pkg/bucket/lifecycle/rule_test.go index 12328f9fa..c3e7a1e48 100644 --- a/pkg/bucket/lifecycle/rule_test.go +++ b/pkg/bucket/lifecycle/rule_test.go @@ -22,39 +22,6 @@ import ( "testing" ) -// TestUnsupportedRules checks if Rule xml with unsuported tags return -// appropriate errors on parsing -func TestUnsupportedRules(t *testing.T) { - // NoncurrentVersionTransition, and Transition tags aren't supported - unsupportedTestCases := []struct { - inputXML string - expectedErr error - }{ - { // Rule with unsupported NoncurrentVersionTransition - inputXML: ` - - `, - expectedErr: errNoncurrentVersionTransitionUnsupported, - }, - { // Rule with unsupported Transition action - inputXML: ` - - `, - expectedErr: errTransitionUnsupported, - }, - } - - for i, tc := range unsupportedTestCases { - t.Run(fmt.Sprintf("Test %d", i+1), func(t *testing.T) { - var rule Rule - err := xml.Unmarshal([]byte(tc.inputXML), &rule) - if err != tc.expectedErr { - t.Fatalf("%d: Expected %v but got %v", i+1, tc.expectedErr, err) - } - }) - } -} - // TestInvalidRules checks if Rule xml with invalid elements returns // appropriate errors on validation func TestInvalidRules(t *testing.T) { diff --git a/pkg/bucket/lifecycle/transition.go b/pkg/bucket/lifecycle/transition.go index 7937b34cf..73cc59a90 100644 --- a/pkg/bucket/lifecycle/transition.go +++ b/pkg/bucket/lifecycle/transition.go @@ -18,25 +18,147 @@ package lifecycle import ( "encoding/xml" + "time" ) +var ( + errTransitionInvalidDays = Errorf("Days must be 0 or greater when used with Transition") + errTransitionInvalidDate = Errorf("Date must be provided in ISO 8601 format") + errTransitionInvalid = Errorf("Exactly one of Days (0 or greater) or Date (positive ISO 8601 format) should be present inside Expiration.") + errTransitionDateNotMidnight = Errorf("'Date' must be at midnight GMT") +) + +// TransitionDate is a embedded type containing time.Time to unmarshal +// Date in Transition +type TransitionDate struct { + time.Time +} + +// UnmarshalXML parses date from Transition and validates date format +func (tDate *TransitionDate) UnmarshalXML(d *xml.Decoder, startElement xml.StartElement) error { + var dateStr string + err := d.DecodeElement(&dateStr, &startElement) + if err != nil { + return err + } + // While AWS documentation mentions that the date specified + // must be present in ISO 8601 format, in reality they allow + // users to provide RFC 3339 compliant dates. + trnDate, err := time.Parse(time.RFC3339, dateStr) + if err != nil { + return errTransitionInvalidDate + } + // Allow only date timestamp specifying midnight GMT + hr, min, sec := trnDate.Clock() + nsec := trnDate.Nanosecond() + loc := trnDate.Location() + if !(hr == 0 && min == 0 && sec == 0 && nsec == 0 && loc.String() == time.UTC.String()) { + return errTransitionDateNotMidnight + } + + *tDate = TransitionDate{trnDate} + return nil +} + +// MarshalXML encodes expiration date if it is non-zero and encodes +// empty string otherwise +func (tDate TransitionDate) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error { + if tDate.Time.IsZero() { + return nil + } + return e.EncodeElement(tDate.Format(time.RFC3339), startElement) +} + +// TransitionDays is a type alias to unmarshal Days in Transition +type TransitionDays int + +// UnmarshalXML parses number of days from Transition and validates if +// >= 0 +func (tDays *TransitionDays) UnmarshalXML(d *xml.Decoder, startElement xml.StartElement) error { + var numDays int + err := d.DecodeElement(&numDays, &startElement) + if err != nil { + return err + } + if numDays < 0 { + return errTransitionInvalidDays + } + *tDays = TransitionDays(numDays) + return nil +} + +// MarshalXML encodes number of days to expire if it is non-zero and +// encodes empty string otherwise +func (tDays TransitionDays) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error { + if tDays == 0 { + return nil + } + return e.EncodeElement(int(tDays), startElement) +} + // Transition - transition actions for a rule in lifecycle configuration. type Transition struct { - XMLName xml.Name `xml:"Transition"` - Days int `xml:"Days,omitempty"` - Date string `xml:"Date,omitempty"` - StorageClass string `xml:"StorageClass"` + XMLName xml.Name `xml:"Transition"` + Days TransitionDays `xml:"Days,omitempty"` + Date TransitionDate `xml:"Date,omitempty"` + StorageClass string `xml:"StorageClass,omitempty"` + + set bool } -var errTransitionUnsupported = Errorf("Specifying tag is not supported") +// MarshalXML encodes transition field into an XML form. +func (t Transition) MarshalXML(enc *xml.Encoder, start xml.StartElement) error { + if !t.set { + return nil + } + type transitionWrapper Transition + return enc.EncodeElement(transitionWrapper(t), start) +} -// UnmarshalXML is extended to indicate lack of support for Transition -// xml tag in object lifecycle configuration -func (t Transition) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { - return errTransitionUnsupported +// UnmarshalXML decodes transition field from the XML form. +func (t *Transition) UnmarshalXML(d *xml.Decoder, startElement xml.StartElement) error { + type transitionWrapper Transition + var trw transitionWrapper + err := d.DecodeElement(&trw, &startElement) + if err != nil { + return err + } + *t = Transition(trw) + t.set = true + return nil } -// MarshalXML is extended to leave out tags -func (t Transition) MarshalXML(e *xml.Encoder, start xml.StartElement) error { +// Validate - validates the "Expiration" element +func (t Transition) Validate() error { + if !t.set { + return nil + } + + if t.IsDaysNull() && t.IsDateNull() { + return errXMLNotWellFormed + } + + // Both transition days and date are specified + if !t.IsDaysNull() && !t.IsDateNull() { + return errTransitionInvalid + } + if t.StorageClass == "" { + return errXMLNotWellFormed + } return nil } + +// IsDaysNull returns true if days field is null +func (t Transition) IsDaysNull() bool { + return t.Days == TransitionDays(0) +} + +// IsDateNull returns true if date field is null +func (t Transition) IsDateNull() bool { + return t.Date.Time.IsZero() +} + +// IsNull returns true if both date and days fields are null +func (t Transition) IsNull() bool { + return t.IsDaysNull() && t.IsDateNull() +} diff --git a/pkg/bucket/policy/action.go b/pkg/bucket/policy/action.go index ed5eb94d7..1c8a60559 100644 --- a/pkg/bucket/policy/action.go +++ b/pkg/bucket/policy/action.go @@ -169,6 +169,9 @@ const ( // GetObjectVersionForReplicationAction - GetObjectVersionForReplication REST API action GetObjectVersionForReplicationAction = "s3:GetObjectVersionForReplication" + + // RestoreObjectAction - RestoreObject REST API action + RestoreObjectAction = "s3:RestoreObject" ) // List of all supported object actions. @@ -195,6 +198,7 @@ var supportedObjectActions = map[Action]struct{}{ ReplicateDeleteAction: {}, ReplicateTagsAction: {}, GetObjectVersionForReplicationAction: {}, + RestoreObjectAction: {}, } // isObjectAction - returns whether action is object type or not. @@ -255,6 +259,7 @@ var supportedActions = map[Action]struct{}{ ReplicateDeleteAction: {}, ReplicateTagsAction: {}, GetObjectVersionForReplicationAction: {}, + RestoreObjectAction: {}, } // IsValid - checks if action is valid or not. @@ -410,4 +415,5 @@ var actionConditionKeyMap = map[Action]condition.KeySet{ ReplicateDeleteAction: condition.NewKeySet(condition.CommonKeys...), ReplicateTagsAction: condition.NewKeySet(condition.CommonKeys...), GetObjectVersionForReplicationAction: condition.NewKeySet(condition.CommonKeys...), + RestoreObjectAction: condition.NewKeySet(condition.CommonKeys...), } diff --git a/pkg/event/name.go b/pkg/event/name.go index 20386f8f2..6f5a108f7 100644 --- a/pkg/event/name.go +++ b/pkg/event/name.go @@ -52,6 +52,9 @@ const ( ObjectReplicationMissedThreshold ObjectReplicationReplicatedAfterThreshold ObjectReplicationNotTracked + ObjectRestorePostInitiated + ObjectRestorePostCompleted + ObjectRestorePostAll ) // Expand - returns expanded values of abbreviated event type. @@ -86,6 +89,11 @@ func (name Name) Expand() []Name { ObjectReplicationMissedThreshold, ObjectReplicationReplicatedAfterThreshold, } + case ObjectRestorePostAll: + return []Name{ + ObjectRestorePostInitiated, + ObjectRestorePostCompleted, + } default: return []Name{name} } @@ -140,6 +148,10 @@ func (name Name) String() string { return "s3:Replication:OperationMissedThreshold" case ObjectReplicationReplicatedAfterThreshold: return "s3:Replication:OperationReplicatedAfterThreshold" + case ObjectRestorePostInitiated: + return "s3:ObjectRestore:Post" + case ObjectRestorePostCompleted: + return "s3:ObjectRestore:Completed" } return "" @@ -236,6 +248,13 @@ func ParseName(s string) (Name, error) { return ObjectReplicationReplicatedAfterThreshold, nil case "s3:Replication:OperationNotTracked": return ObjectReplicationNotTracked, nil + case "s3:ObjectRestore:*": + return ObjectRestorePostAll, nil + case "s3:ObjectRestore:Post": + return ObjectRestorePostInitiated, nil + case "s3:ObjectRestore:Completed": + return ObjectRestorePostCompleted, nil + default: return 0, &ErrInvalidEventName{s} } diff --git a/pkg/madmin/remote-target-commands.go b/pkg/madmin/remote-target-commands.go index e2c7abcc9..8e6adb47b 100644 --- a/pkg/madmin/remote-target-commands.go +++ b/pkg/madmin/remote-target-commands.go @@ -35,11 +35,13 @@ type ServiceType string const ( // ReplicationService specifies replication service ReplicationService ServiceType = "replication" + // ILMService specifies ilm service + ILMService ServiceType = "ilm" ) -// IsValid returns true if ARN type represents replication +// IsValid returns true if ARN type represents replication or ilm func (t ServiceType) IsValid() bool { - return t == ReplicationService + return t == ReplicationService || t == ILMService } // ARN is a struct to define arn.