diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 7d4e3d0c4..2234a44e1 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -449,14 +449,15 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, deleteList := toNames(objectsToDelete) dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(bucket), + Versioned: globalBucketVersioningSys.Enabled(bucket), + VersionSuspended: globalBucketVersioningSys.Suspended(bucket), }) deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) for i := range errs { dindex := objectsToDelete[deleteList[i]] apiErr := toAPIError(ctx, errs[i]) - if apiErr.Code == "" || apiErr.Code == "NoSuchKey" { + if apiErr.Code == "" || apiErr.Code == "NoSuchKey" || apiErr.Code == "InvalidArgument" { deletedObjects[dindex] = dObjects[i] continue } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index b4b73ae79..75d248040 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -860,14 +861,22 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec versions := make([]FileInfo, len(objects)) for i := range objects { if objects[i].VersionID == "" { - if opts.Versioned && !HasSuffix(objects[i].ObjectName, SlashSeparator) { - versions[i] = FileInfo{ - Name: objects[i].ObjectName, - VersionID: mustGetUUID(), - ModTime: UTCNow(), - Deleted: true, // delete marker + if opts.Versioned || opts.VersionSuspended { + if !HasSuffix(objects[i].ObjectName, SlashSeparator) { + fi := FileInfo{ + Name: objects[i].ObjectName, + ModTime: UTCNow(), + Deleted: true, // delete marker + } + if opts.Versioned { + fi.VersionID = mustGetUUID() + } + // versioning suspended means we add `null` + // version as delete marker + + versions[i] = fi + continue } - continue } } versions[i] = FileInfo{ @@ -907,9 +916,11 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec for i := range delObjErrs { // delObjErrs[i] is not nil when disks[i] is also not nil if delObjErrs[i] != nil { - if delObjErrs[i][objIndex] != errFileNotFound { - diskErrs[i] = delObjErrs[i][objIndex] + if errors.Is(delObjErrs[i][objIndex], errFileNotFound) || + errors.Is(delObjErrs[i][objIndex], errFileVersionNotFound) { + continue } + diskErrs[i] = delObjErrs[i][objIndex] } } errs[objIndex] = reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex]) @@ -961,18 +972,27 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string writeQuorum := len(storageDisks)/2 + 1 if opts.VersionID == "" { - if opts.Versioned && !HasSuffix(object, SlashSeparator) { - fi := FileInfo{ - Name: object, - VersionID: mustGetUUID(), - Deleted: true, - ModTime: UTCNow(), - } - // Add delete marker, since we don't have any version specified explicitly. - if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil { - return objInfo, toObjectErr(err, bucket, object) + if opts.Versioned || opts.VersionSuspended { + if !HasSuffix(object, SlashSeparator) { + fi := FileInfo{ + Name: object, + Deleted: true, + ModTime: UTCNow(), + } + + if opts.Versioned { + fi.VersionID = mustGetUUID() + } + + // versioning suspended means we add `null` + // version as delete marker + + // Add delete marker, since we don't have any version specified explicitly. + if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil { + return objInfo, toObjectErr(err, bucket, object) + } + return fi.ToObjectInfo(bucket, object), nil } - return fi.ToObjectInfo(bucket, object), nil } } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index c68d1cb00..7215ba092 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -37,6 +37,7 @@ type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts Objec // ObjectOptions represents object options for ObjectLayer object operations type ObjectOptions struct { ServerSideEncryption encrypt.ServerSide + VersionSuspended bool // indicates if the bucket was previously versioned but is currently suspended. Versioned bool // indicates if the bucket is versioned WalkVersions bool // indicates if the we are interested in walking versions VersionID string // Specifies the versionID which needs to be overwritten or read diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index 61912ccd1..0adb87eed 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -133,6 +133,7 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts return opts, err } opts.Versioned = versioned + opts.VersionSuspended = globalBucketVersioningSys.Suspended(bucket) return opts, nil } diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 44354fc86..bcb344ae5 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -679,7 +679,8 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, } opts := ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(args.BucketName), + Versioned: globalBucketVersioningSys.Enabled(args.BucketName), + VersionSuspended: globalBucketVersioningSys.Suspended(args.BucketName), } var err error next: diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 61b8d8afe..59d54504e 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -227,29 +227,10 @@ func (z *xlMetaV2) Load(buf []byte) error { // AddVersion adds a new version func (z *xlMetaV2) AddVersion(fi FileInfo) error { - if fi.Deleted { - uv, err := uuid.Parse(fi.VersionID) - if err != nil { - return err - } - v := xlMetaV2Version{ - Type: DeleteType, - DeleteMarker: &xlMetaV2DeleteMarker{ - VersionID: uv, - ModTime: fi.ModTime.UnixNano(), - }, - } - if !v.Valid() { - return errors.New("internal error: invalid version entry generated") - } - z.Versions = append(z.Versions, v) - return nil - } - if fi.VersionID == "" { // this means versioning is not yet - // enabled i.e all versions are basically - // default value i.e "null" + // enabled or suspend i.e all versions + // are basically default value i.e "null" fi.VersionID = nullVersionID } @@ -354,11 +335,17 @@ func newXLMetaV2(fi FileInfo) (xlMetaV2, error) { } func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) { + versionID := "" + var uv uuid.UUID + // check if the version is not "null" + if !bytes.Equal(j.VersionID[:], uv[:]) { + versionID = uuid.UUID(j.VersionID).String() + } fi := FileInfo{ Volume: volume, Name: path, ModTime: time.Unix(0, j.ModTime).UTC(), - VersionID: uuid.UUID(j.VersionID).String(), + VersionID: versionID, Deleted: true, } return fi, nil @@ -434,10 +421,26 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { if fi.VersionID == nullVersionID { fi.VersionID = "" } + var uv uuid.UUID if fi.VersionID != "" { uv, _ = uuid.Parse(fi.VersionID) } + + var ventry xlMetaV2Version + if fi.Deleted { + ventry = xlMetaV2Version{ + Type: DeleteType, + DeleteMarker: &xlMetaV2DeleteMarker{ + VersionID: uv, + ModTime: fi.ModTime.UnixNano(), + }, + } + if !ventry.Valid() { + return "", false, errors.New("internal error: invalid version entry generated") + } + } + for i, version := range z.Versions { if !version.Valid() { return "", false, errFileCorrupt @@ -446,11 +449,17 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { case LegacyType: if version.ObjectV1.VersionID == fi.VersionID { z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) + if fi.Deleted { + z.Versions = append(z.Versions, ventry) + } return version.ObjectV1.DataDir, len(z.Versions) == 0, nil } case DeleteType: if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) { z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) + if fi.Deleted { + z.Versions = append(z.Versions, ventry) + } return "", len(z.Versions) == 0, nil } } @@ -478,15 +487,26 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) { z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) if findDataDir(version.ObjectV2.DataDir, z.Versions) > 0 { + if fi.Deleted { + z.Versions = append(z.Versions, ventry) + } // Found that another version references the same dataDir // we shouldn't remove it, and only remove the version instead return "", len(z.Versions) == 0, nil } + if fi.Deleted { + z.Versions = append(z.Versions, ventry) + } return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil } } } + if fi.Deleted { + z.Versions = append(z.Versions, ventry) + return "", false, nil + } + return "", false, errFileVersionNotFound } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 31ea6c0cd..ac729cbfe 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -381,17 +381,15 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac var totalSize int64 for _, version := range fivs.Versions { + oi := version.ToObjectInfo(item.bucket, item.objectPath()) size := item.applyActions(ctx, objAPI, actionMeta{ numVersions: len(fivs.Versions), - oi: version.ToObjectInfo(item.bucket, item.objectPath()), + oi: oi, }) if !version.Deleted { totalSize += size } - } - - for _, version := range fivs.Versions { - item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}) + item.healReplication(ctx, objAPI, actionMeta{oi: oi}) } return totalSize, nil }) @@ -1131,17 +1129,6 @@ func (s *xlStorage) DeleteVersion(volume, path string, fi FileInfo) error { return err } - if fi.Deleted { - if err = xlMeta.AddVersion(fi); err != nil { - return err - } - buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...)) - if err != nil { - return err - } - return s.WriteAll(volume, pathJoin(path, xlStorageFormatFile), bytes.NewReader(buf)) - } - dataDir, lastVersion, err := xlMeta.DeleteVersion(fi) if err != nil { return err @@ -2167,7 +2154,7 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s if fi.VersionID == "" { // return the latest "null" versionId info ofi, err := xlMeta.ToFileInfo(dstVolume, dstPath, nullVersionID) - if err == nil { + if err == nil && !ofi.Deleted { // Purge the destination path as we are not preserving anything // versioned object was not requested. oldDstDataPath = pathJoin(dstVolumeDir, dstPath, ofi.DataDir)