diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index a1b00605b..7ca900fc2 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -591,15 +591,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } if hasLifecycleConfig && dobj.PurgeTransitioned == lifecycle.TransitionComplete { // clean up transitioned tier - action := lifecycle.DeleteAction - if dobj.VersionID != "" { - action = lifecycle.DeleteVersionAction - } deleteTransitionedObject(ctx, newObjectLayerFn(), bucket, dobj.ObjectName, lifecycle.ObjectOpts{ Name: dobj.ObjectName, VersionID: dobj.VersionID, DeleteMarker: dobj.DeleteMarker, - }, action, true) + }, false, true) } } diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index db040d283..48c233545 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -65,6 +65,41 @@ func NewLifecycleSys() *LifecycleSys { return &LifecycleSys{} } +type expiryState struct { + expiryCh chan ObjectInfo +} + +func (es *expiryState) queueExpiryTask(oi ObjectInfo) { + select { + case es.expiryCh <- oi: + default: + } +} + +var ( + globalExpiryState *expiryState +) + +func newExpiryState() *expiryState { + es := &expiryState{ + expiryCh: make(chan ObjectInfo, 10000), + } + go func() { + <-GlobalContext.Done() + close(es.expiryCh) + }() + return es +} + +func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { + globalExpiryState = newExpiryState() + go func() { + for oi := range globalExpiryState.expiryCh { + applyExpiryRule(ctx, objectAPI, oi, false) + } + }() +} + type transitionState struct { // add future metrics here transitionCh chan ObjectInfo @@ -246,7 +281,7 @@ func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { // 1. temporarily restored copies of objects (restored with the PostRestoreObject API) expired. // 2. life cycle expiry date is met on the object. // 3. Object is removed through DELETE api call -func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, action lifecycle.Action, isDeleteTierOnly bool) error { +func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, restoredObject, isDeleteTierOnly bool) error { if lcOpts.TransitionStatus == "" && !isDeleteTierOnly { return nil } @@ -266,43 +301,40 @@ func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket var opts ObjectOptions opts.Versioned = globalBucketVersioningSys.Enabled(bucket) opts.VersionID = lcOpts.VersionID - switch action { - case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: + if restoredObject { // delete locally restored copy of object or object version // from the source, while leaving metadata behind. The data on // transitioned tier lies untouched and still accessible opts.TransitionStatus = lcOpts.TransitionStatus _, err = objectAPI.DeleteObject(ctx, bucket, object, opts) return err - case lifecycle.DeleteAction, lifecycle.DeleteVersionAction: - // When an object is past expiry, delete the data from transitioned tier and - // metadata from source - if err := tgt.RemoveObject(context.Background(), arn.Bucket, object, miniogo.RemoveObjectOptions{VersionID: lcOpts.VersionID}); err != nil { - logger.LogIf(ctx, err) - } - - if isDeleteTierOnly { - return nil - } + } - objInfo, err := objectAPI.DeleteObject(ctx, bucket, object, opts) - if err != nil { - return err - } + // When an object is past expiry, delete the data from transitioned tier and + // metadata from source + if err := tgt.RemoveObject(context.Background(), arn.Bucket, object, miniogo.RemoveObjectOptions{VersionID: lcOpts.VersionID}); err != nil { + logger.LogIf(ctx, err) + } - eventName := event.ObjectRemovedDelete - if lcOpts.DeleteMarker { - eventName = event.ObjectRemovedDeleteMarkerCreated - } + if isDeleteTierOnly { + return nil + } - // Notify object deleted event. - sendEvent(eventArgs{ - EventName: eventName, - BucketName: bucket, - Object: objInfo, - Host: "Internal: [ILM-EXPIRY]", - }) + objInfo, err := objectAPI.DeleteObject(ctx, bucket, object, opts) + if err != nil { + return err + } + eventName := event.ObjectRemovedDelete + if lcOpts.DeleteMarker { + eventName = event.ObjectRemovedDeleteMarkerCreated } + // Notify object deleted event. + sendEvent(eventArgs{ + EventName: eventName, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [ILM-EXPIRY]", + }) // should never reach here return nil diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 532ddf497..f4666bafc 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -785,12 +785,12 @@ func (i *crawlItem) transformMetaDir() { // actionMeta contains information used to apply actions. type actionMeta struct { - oi ObjectInfo - successorModTime time.Time // The modtime of the successor version - numVersions int // The number of versions of this object - bitRotScan bool // indicates if bitrot check was requested. + oi ObjectInfo + bitRotScan bool // indicates if bitrot check was requested. } +var applyActionsLogPrefix = color.Green("applyActions:") + // applyActions will apply lifecycle checks on to a scanned item. // The resulting size on disk will always be returned. // The metadata will be compared to consensus on the object layer before any changes are applied. @@ -800,7 +800,6 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action if i.debug { logger.LogIf(ctx, err) } - applyActionsLogPrefix := color.Green("applyActions:") if i.heal { if i.debug { if meta.oi.VersionID != "" { @@ -839,8 +838,8 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action VersionID: meta.oi.VersionID, DeleteMarker: meta.oi.DeleteMarker, IsLatest: meta.oi.IsLatest, - NumVersions: meta.numVersions, - SuccessorModTime: meta.successorModTime, + NumVersions: meta.oi.NumVersions, + SuccessorModTime: meta.oi.SuccessorModTime, RestoreOngoing: meta.oi.RestoreOngoing, RestoreExpires: meta.oi.RestoreExpires, TransitionStatus: meta.oi.TransitionStatus, @@ -884,96 +883,129 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action return size } } - size = obj.Size - // Recalculate action. + var applied bool + action = evalActionFromLifecycle(ctx, *i.lifeCycle, obj, i.debug) + if action != lifecycle.NoneAction { + applied = applyLifecycleAction(ctx, action, o, obj) + } + + if applied { + return 0 + } + return size +} + +func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj ObjectInfo, debug bool) (action lifecycle.Action) { lcOpts := lifecycle.ObjectOpts{ - Name: i.objectPath(), + Name: obj.Name, UserTags: obj.UserTags, ModTime: obj.ModTime, VersionID: obj.VersionID, DeleteMarker: obj.DeleteMarker, IsLatest: obj.IsLatest, - NumVersions: meta.numVersions, - SuccessorModTime: meta.successorModTime, + NumVersions: obj.NumVersions, + SuccessorModTime: obj.SuccessorModTime, RestoreOngoing: obj.RestoreOngoing, RestoreExpires: obj.RestoreExpires, TransitionStatus: obj.TransitionStatus, } - action = i.lifeCycle.ComputeAction(lcOpts) - if i.debug { + + action = lc.ComputeAction(lcOpts) + if debug { console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", action) } - switch action { - case lifecycle.DeleteAction, lifecycle.DeleteVersionAction: - case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: - case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: - default: - // No action. - return size + + if action == lifecycle.NoneAction { + return action } - opts := ObjectOptions{} switch action { case lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredVersionAction: // Defensive code, should never happen if obj.VersionID == "" { - return size + return lifecycle.NoneAction } - if rcfg, _ := globalBucketObjectLockSys.Get(i.bucket); rcfg.LockEnabled { + if rcfg, _ := globalBucketObjectLockSys.Get(obj.Bucket); rcfg.LockEnabled { locked := enforceRetentionForDeletion(ctx, obj) if locked { - if i.debug { + if debug { if obj.VersionID != "" { - console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", i.objectPath(), obj.VersionID) + console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID) } else { - console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", i.objectPath()) + console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name) } } - return size - } - } - opts.VersionID = obj.VersionID - case lifecycle.DeleteAction, lifecycle.DeleteRestoredAction: - opts.Versioned = globalBucketVersioningSys.Enabled(i.bucket) - case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: - if obj.TransitionStatus == "" { - opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) - opts.VersionID = obj.VersionID - opts.TransitionStatus = lifecycle.TransitionPending - if _, err = o.DeleteObject(ctx, obj.Bucket, obj.Name, opts); err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - return 0 - } - // Assume it is still there. - logger.LogIf(ctx, err) - return size + return lifecycle.NoneAction } } - globalTransitionState.queueTransitionTask(obj) - return 0 } - if obj.TransitionStatus != "" { - if err := deleteTransitionedObject(ctx, o, i.bucket, i.objectPath(), lcOpts, action, false); err != nil { + return action +} + +func applyTransitionAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) bool { + opts := ObjectOptions{} + if obj.TransitionStatus == "" { + opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) + opts.VersionID = obj.VersionID + opts.TransitionStatus = lifecycle.TransitionPending + if _, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts); err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - return 0 + return false } + // Assume it is still there. logger.LogIf(ctx, err) - return size + return false } - // Notification already sent at *deleteTransitionedObject*, return '0' here. - return 0 } + globalTransitionState.queueTransitionTask(obj) + return true - obj, err = o.DeleteObject(ctx, i.bucket, i.objectPath(), opts) +} + +func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject bool) bool { + lcOpts := lifecycle.ObjectOpts{ + Name: obj.Name, + UserTags: obj.UserTags, + ModTime: obj.ModTime, + VersionID: obj.VersionID, + DeleteMarker: obj.DeleteMarker, + IsLatest: obj.IsLatest, + NumVersions: obj.NumVersions, + SuccessorModTime: obj.SuccessorModTime, + RestoreOngoing: obj.RestoreOngoing, + RestoreExpires: obj.RestoreExpires, + TransitionStatus: obj.TransitionStatus, + } + + if err := deleteTransitionedObject(ctx, objLayer, obj.Bucket, obj.Name, lcOpts, restoredObject, false); err != nil { + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { + return false + } + logger.LogIf(ctx, err) + return false + } + // Notification already sent at *deleteTransitionedObject*, just return 'true' here. + return true +} + +func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo) bool { + opts := ObjectOptions{} + + opts.VersionID = obj.VersionID + if opts.VersionID == "" { + opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) + } + + obj, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts) if err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - return 0 + return false } // Assume it is still there. logger.LogIf(ctx, err) - return size + return false } eventName := event.ObjectRemovedDelete @@ -984,11 +1016,33 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action // Notify object deleted event. sendEvent(eventArgs{ EventName: eventName, - BucketName: i.bucket, + BucketName: obj.Bucket, Object: obj, Host: "Internal: [ILM-EXPIRY]", }) - return 0 + + return true +} + +// Apply object, object version, restored object or restored object version action on the given object +func applyExpiryRule(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject bool) bool { + if obj.TransitionStatus != "" { + return applyExpiryOnTransitionedObject(ctx, objLayer, obj, restoredObject) + } + return applyExpiryOnNonTransitionedObjects(ctx, objLayer, obj) +} + +// Perform actions (removal of transitioning of objects), return true the action is successfully performed +func applyLifecycleAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) (success bool) { + switch action { + case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: + success = applyExpiryRule(ctx, objLayer, obj, false) + case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: + success = applyExpiryRule(ctx, objLayer, obj, true) + case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: + success = applyTransitionAction(ctx, action, objLayer, obj) + } + return } // objectPath returns the prefix and object name. diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 587daeb37..a65c6f7d4 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -107,18 +107,21 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { } objInfo := ObjectInfo{ - IsDir: HasSuffix(object, SlashSeparator), - Bucket: bucket, - Name: object, - VersionID: versionID, - IsLatest: fi.IsLatest, - DeleteMarker: fi.Deleted, - Size: fi.Size, - ModTime: fi.ModTime, - Legacy: fi.XLV1, - ContentType: fi.Metadata["content-type"], - ContentEncoding: fi.Metadata["content-encoding"], + IsDir: HasSuffix(object, SlashSeparator), + Bucket: bucket, + Name: object, + VersionID: versionID, + IsLatest: fi.IsLatest, + DeleteMarker: fi.Deleted, + Size: fi.Size, + ModTime: fi.ModTime, + Legacy: fi.XLV1, + ContentType: fi.Metadata["content-type"], + ContentEncoding: fi.Metadata["content-encoding"], + NumVersions: fi.NumVersions, + SuccessorModTime: fi.SuccessorModTime, } + // Update expires var ( t time.Time diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index ff3a75650..86c091d11 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -237,6 +237,11 @@ type ObjectInfo struct { backendType BackendType VersionPurgeStatus VersionPurgeStatusType + + // The total count of all versions of this object + NumVersions int + // The modtime of the successor object version if any + SuccessorModTime time.Time } // MultipartInfo captures metadata information about the uploadId diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 97d50473a..911bebf60 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -431,6 +431,16 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req objInfo := gr.ObjInfo + // Automatically remove the object/version is an expiry lifecycle rule can be applied + if lc, err := globalLifecycleSys.Get(bucket); err == nil { + action := evalActionFromLifecycle(ctx, *lc, objInfo, false) + if action == lifecycle.DeleteAction || action == lifecycle.DeleteVersionAction { + globalExpiryState.queueExpiryTask(objInfo) + writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey)) + return + } + } + // filter object lock metadata if permission does not permit getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object) legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object) @@ -590,6 +600,16 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re return } + // Automatically remove the object/version is an expiry lifecycle rule can be applied + if lc, err := globalLifecycleSys.Get(bucket); err == nil { + action := evalActionFromLifecycle(ctx, *lc, objInfo, false) + if action == lifecycle.DeleteAction || action == lifecycle.DeleteVersionAction { + globalExpiryState.queueExpiryTask(objInfo) + writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey)) + return + } + } + // filter object lock metadata if permission does not permit getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object) legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object) @@ -2820,10 +2840,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } if goi.TransitionStatus == lifecycle.TransitionComplete { // clean up transitioned tier - action := lifecycle.DeleteAction - if goi.VersionID != "" { - action = lifecycle.DeleteVersionAction - } deleteTransitionedObject(ctx, newObjectLayerFn(), bucket, object, lifecycle.ObjectOpts{ Name: object, UserTags: goi.UserTags, @@ -2831,7 +2847,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. DeleteMarker: goi.DeleteMarker, TransitionStatus: goi.TransitionStatus, IsLatest: goi.IsLatest, - }, action, true) + }, false, true) } setPutObjHeaders(w, objInfo, true) diff --git a/cmd/server-main.go b/cmd/server-main.go index 1e681f03b..655c95cb2 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -494,6 +494,7 @@ func serverMain(ctx *cli.Context) { initAutoHeal(GlobalContext, newObject) initBackgroundReplication(GlobalContext, newObject) initBackgroundTransition(GlobalContext, newObject) + initBackgroundExpiry(GlobalContext, newObject) } initDataCrawler(GlobalContext, newObject) diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index f723ec1cc..6eda4710e 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -155,6 +155,9 @@ type FileInfo struct { VersionPurgeStatus VersionPurgeStatusType Data []byte // optionally carries object data + + NumVersions int + SuccessorModTime time.Time } // VersionPurgeStatusKey denotes purge status in metadata diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 6795e2a50..972dbf755 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -245,8 +245,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 18 { - err = msgp.ArrayError{Wanted: 18, Got: zb0001} + if zb0001 != 20 { + err = msgp.ArrayError{Wanted: 20, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -380,13 +380,23 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Data") return } + z.NumVersions, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "NumVersions") + return + } + z.SuccessorModTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "SuccessorModTime") + return + } return } // EncodeMsg implements msgp.Encodable func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 18 - err = en.Append(0xdc, 0x0, 0x12) + // array header, size 20 + err = en.Append(0xdc, 0x0, 0x14) if err != nil { return } @@ -499,14 +509,24 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Data") return } + err = en.WriteInt(z.NumVersions) + if err != nil { + err = msgp.WrapError(err, "NumVersions") + return + } + err = en.WriteTime(z.SuccessorModTime) + if err != nil { + err = msgp.WrapError(err, "SuccessorModTime") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 18 - o = append(o, 0xdc, 0x0, 0x12) + // array header, size 20 + o = append(o, 0xdc, 0x0, 0x14) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.VersionID) @@ -540,6 +560,8 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus) o = msgp.AppendString(o, string(z.VersionPurgeStatus)) o = msgp.AppendBytes(o, z.Data) + o = msgp.AppendInt(o, z.NumVersions) + o = msgp.AppendTime(o, z.SuccessorModTime) return } @@ -551,8 +573,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 18 { - err = msgp.ArrayError{Wanted: 18, Got: zb0001} + if zb0001 != 20 { + err = msgp.ArrayError{Wanted: 20, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -686,6 +708,16 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Data") return } + z.NumVersions, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "NumVersions") + return + } + z.SuccessorModTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SuccessorModTime") + return + } o = bts return } @@ -703,7 +735,7 @@ func (z *FileInfo) Msgsize() (s int) { for za0003 := range z.Parts { s += z.Parts[za0003].Msgsize() } - s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) + s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) + msgp.IntSize + msgp.TimeSize return } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index ab653d0ca..d6f8aaf1e 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v25" // Add more small file optimization + storageRESTVersion = "v26" // Add NumVersions/SuccessorModTime fields in FileInfo storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 10793821e..ab408075c 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -777,17 +777,13 @@ next: scheduleReplicationDelete(ctx, dobj, objectAPI, replicateSync) } if goi.TransitionStatus == lifecycle.TransitionComplete && err == nil && goi.VersionID == "" { - action := lifecycle.DeleteAction - if goi.VersionID != "" { - action = lifecycle.DeleteVersionAction - } deleteTransitionedObject(ctx, newObjectLayerFn(), args.BucketName, objectName, lifecycle.ObjectOpts{ Name: objectName, UserTags: goi.UserTags, VersionID: goi.VersionID, DeleteMarker: goi.DeleteMarker, IsLatest: goi.IsLatest, - }, action, true) + }, false, true) } logger.LogIf(ctx, err) diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index dc95a2feb..0026d7c3c 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -652,6 +652,18 @@ func (z xlMetaV2) ListVersions(volume, path string) (versions []FileInfo, modTim return versions, latestModTime, nil } +func getModTimeFromVersion(v xlMetaV2Version) time.Time { + switch v.Type { + case ObjectType: + return time.Unix(0, v.ObjectV2.ModTime) + case DeleteType: + return time.Unix(0, v.DeleteMarker.ModTime) + case LegacyType: + return v.ObjectV1.Stat.ModTime + } + return time.Time{} +} + // ToFileInfo converts xlMetaV2 into a common FileInfo datastructure // for consumption across callers. func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err error) { @@ -663,82 +675,82 @@ func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e } } - var latestModTime time.Time - var latestIndex int - for i, version := range z.Versions { + for _, version := range z.Versions { if !version.Valid() { logger.LogIf(GlobalContext, fmt.Errorf("invalid version detected %#v", version)) - return FileInfo{}, errFileNotFound - } - var modTime time.Time - switch version.Type { - case ObjectType: - modTime = time.Unix(0, version.ObjectV2.ModTime) - case DeleteType: - modTime = time.Unix(0, version.DeleteMarker.ModTime) - case LegacyType: - modTime = version.ObjectV1.Stat.ModTime - } - if modTime.After(latestModTime) { - latestModTime = modTime - latestIndex = i + if versionID == "" { + return FileInfo{}, errFileNotFound + } + return FileInfo{}, errFileVersionNotFound + } } + orderedVersions := make([]xlMetaV2Version, len(z.Versions)) + copy(orderedVersions, z.Versions) + + sort.Slice(orderedVersions, func(i, j int) bool { + mtime1 := getModTimeFromVersion(orderedVersions[i]) + mtime2 := getModTimeFromVersion(orderedVersions[j]) + return mtime1.After(mtime2) + }) + if versionID == "" { - if len(z.Versions) >= 1 { - if !z.Versions[latestIndex].Valid() { - logger.LogIf(GlobalContext, fmt.Errorf("invalid version detected %#v", z.Versions[latestIndex])) - return FileInfo{}, errFileNotFound - } - switch z.Versions[latestIndex].Type { + if len(orderedVersions) >= 1 { + switch orderedVersions[0].Type { case ObjectType: - fi, err = z.Versions[latestIndex].ObjectV2.ToFileInfo(volume, path) - fi.IsLatest = true - return fi, err + fi, err = orderedVersions[0].ObjectV2.ToFileInfo(volume, path) case DeleteType: - fi, err = z.Versions[latestIndex].DeleteMarker.ToFileInfo(volume, path) - fi.IsLatest = true - return fi, err + fi, err = orderedVersions[0].DeleteMarker.ToFileInfo(volume, path) case LegacyType: - fi, err = z.Versions[latestIndex].ObjectV1.ToFileInfo(volume, path) - fi.IsLatest = true - return fi, err + fi, err = orderedVersions[0].ObjectV1.ToFileInfo(volume, path) } + fi.IsLatest = true + fi.NumVersions = len(orderedVersions) + return fi, err + } return FileInfo{}, errFileNotFound } - for _, version := range z.Versions { - if !version.Valid() { - logger.LogIf(GlobalContext, fmt.Errorf("invalid version detected %#v", version)) - if versionID == "" { - return FileInfo{}, errFileNotFound - } - return FileInfo{}, errFileVersionNotFound - } + var i = -1 + var version xlMetaV2Version + +findVersion: + for i, version = range orderedVersions { switch version.Type { case ObjectType: if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) { fi, err = version.ObjectV2.ToFileInfo(volume, path) - fi.IsLatest = latestModTime.Equal(fi.ModTime) - return fi, err + break findVersion } case LegacyType: if version.ObjectV1.VersionID == versionID { fi, err = version.ObjectV1.ToFileInfo(volume, path) - fi.IsLatest = latestModTime.Equal(fi.ModTime) - return fi, err + break findVersion } case DeleteType: if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) { fi, err = version.DeleteMarker.ToFileInfo(volume, path) - fi.IsLatest = latestModTime.Equal(fi.ModTime) - return fi, err + break findVersion } } } + if err != nil { + return fi, err + } + + if i >= 0 { + // A version is found, fill dynamic fields + fi.IsLatest = i == 0 + fi.NumVersions = len(z.Versions) + if i < len(orderedVersions)-1 { + fi.SuccessorModTime = getModTimeFromVersion(orderedVersions[i+1]) + } + return fi, nil + } + if versionID == "" { return FileInfo{}, errFileNotFound } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 2e6de27ba..fe399fdba 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -377,21 +377,14 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac } var totalSize int64 - var numVersions = len(fivs.Versions) sizeS := sizeSummary{} - for i, version := range fivs.Versions { - var successorModTime time.Time - if i > 0 { - successorModTime = fivs.Versions[i-1].ModTime - } + for _, version := range fivs.Versions { oi := version.ToObjectInfo(item.bucket, item.objectPath()) if objAPI != nil { totalSize += item.applyActions(ctx, objAPI, actionMeta{ - numVersions: numVersions, - successorModTime: successorModTime, - oi: oi, - bitRotScan: healOpts.Bitrot, + oi: oi, + bitRotScan: healOpts.Bitrot, }) item.healReplication(ctx, objAPI, oi, &sizeS) }