diff --git a/cmd/api-datatypes.go b/cmd/api-datatypes.go index 6303ed3bd..45aafd309 100644 --- a/cmd/api-datatypes.go +++ b/cmd/api-datatypes.go @@ -27,12 +27,14 @@ type DeletedObject struct { DeleteMarkerVersionID string `xml:"DeleteMarkerVersionId,omitempty"` ObjectName string `xml:"Key,omitempty"` VersionID string `xml:"VersionId,omitempty"` + + // MinIO extensions to support delete marker replication // Replication status of DeleteMarker - DeleteMarkerReplicationStatus string + DeleteMarkerReplicationStatus string `xml:"DeleteMarkerReplicationStatus,omitempty"` // MTime of DeleteMarker on source that needs to be propagated to replica - DeleteMarkerMTime time.Time + DeleteMarkerMTime time.Time `xml:"DeleteMarkerMTime,omitempty"` // Status of versioned delete (of object or DeleteMarker) - VersionPurgeStatus VersionPurgeStatusType + VersionPurgeStatus VersionPurgeStatusType `xml:"VersionPurgeStatus,omitempty"` } // ObjectToDelete carries key name for the object to delete. @@ -40,11 +42,11 @@ type ObjectToDelete struct { ObjectName string `xml:"Key"` VersionID string `xml:"VersionId"` // Replication status of DeleteMarker - DeleteMarkerReplicationStatus string + DeleteMarkerReplicationStatus string `xml:"DeleteMarkerReplicationStatus"` // Status of versioned delete (of object or DeleteMarker) - VersionPurgeStatus VersionPurgeStatusType + VersionPurgeStatus VersionPurgeStatusType `xml:"VersionPurgeStatus"` // Version ID of delete marker - DeleteMarkerVersionID string + DeleteMarkerVersionID string `xml:"DeleteMarkerVersionId"` } // createBucketConfiguration container for bucket configuration request from client. diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index a5f511c50..2bd487f38 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "path" "time" "github.com/minio/minio/pkg/madmin" @@ -96,9 +95,6 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { case task.bucket != "" && task.object != "": res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts) } - if task.bucket != "" && task.object != "" { - ObjectPathUpdated(path.Join(task.bucket, task.object)) - } task.responseCh <- healResult{result: res, err: err} case <-h.doneCh: diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 330936908..ee29c1f6f 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -441,7 +441,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, // Avoid duplicate objects, we use map to filter them out. if _, ok := objectsToDelete[object]; !ok { if replicateDeletes { - if delMarker, replicate := checkReplicateDelete(ctx, getObjectInfoFn, bucket, ObjectToDelete{ObjectName: object.ObjectName, VersionID: object.VersionID}); replicate { + delMarker, replicate := checkReplicateDelete(ctx, getObjectInfoFn, bucket, ObjectToDelete{ + ObjectName: object.ObjectName, + VersionID: object.VersionID, + }) + if replicate { if object.VersionID != "" { object.VersionPurgeStatus = Pending if delMarker { @@ -474,17 +478,30 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) for i := range errs { - dindex := objectsToDelete[ObjectToDelete{ - ObjectName: dObjects[i].ObjectName, - VersionID: dObjects[i].VersionID, - }] - apiErr := toAPIError(ctx, errs[i]) - if apiErr.Code == "" || apiErr.Code == "NoSuchKey" || apiErr.Code == "InvalidArgument" { - dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus - dObjects[i].VersionPurgeStatus = deleteList[i].VersionPurgeStatus + var dindex int + if replicateDeletes { + dindex = objectsToDelete[ObjectToDelete{ + ObjectName: dObjects[i].ObjectName, + VersionID: dObjects[i].VersionID, + DeleteMarkerVersionID: dObjects[i].DeleteMarkerVersionID, + VersionPurgeStatus: dObjects[i].VersionPurgeStatus, + DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus, + }] + } else { + dindex = objectsToDelete[ObjectToDelete{ + ObjectName: dObjects[i].ObjectName, + VersionID: dObjects[i].VersionID, + }] + } + if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) { + if replicateDeletes { + dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus + dObjects[i].VersionPurgeStatus = deleteList[i].VersionPurgeStatus + } deletedObjects[dindex] = dObjects[i] continue } + apiErr := toAPIError(ctx, errs[i]) dErrs[dindex] = DeleteError{ Code: apiErr.Code, Message: apiErr.Description, @@ -507,11 +524,13 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, // Write success response. writeSuccessResponseXML(w, encodedSuccessResponse) for _, dobj := range deletedObjects { - if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending { - globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ - DeletedObject: dobj, - Bucket: bucket, - }) + if replicateDeletes { + if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending { + globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + DeletedObject: dobj, + Bucket: bucket, + }) + } } } // Notify deleted event for objects. diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 522c5c499..30aca8239 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io" - "path" "sync" "time" @@ -528,7 +527,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin }(index, disk) } wg.Wait() - ObjectPathUpdated(path.Join(bucket, object)) + ObjectPathUpdated(pathJoin(bucket, object)) } } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 9cff2fcc2..0dd43f3f8 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -707,7 +707,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, toObjectErr(errFileParentIsFile, bucket, object) } - defer ObjectPathUpdated(path.Join(bucket, object)) + defer ObjectPathUpdated(pathJoin(bucket, object)) // Calculate s3 compatible md5sum for complete multipart. s3MD5 := getCompleteMultipartMD5(parts) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index d7732e989..02e621210 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -47,7 +47,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d return oi, NotImplemented{} } - defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) + defer ObjectPathUpdated(pathJoin(dstBucket, dstObject)) lk := er.NewNSLock(dstBucket, dstObject) if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { return oi, err @@ -443,8 +443,8 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str // Similar to rename but renames data from srcEntry to dstEntry at dataDir func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dataDir, dstBucket, dstEntry string, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) { dataDir = retainSlash(dataDir) - defer ObjectPathUpdated(path.Join(srcBucket, srcEntry)) - defer ObjectPathUpdated(path.Join(dstBucket, dstEntry)) + defer ObjectPathUpdated(pathJoin(srcBucket, srcEntry)) + defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry)) g := errgroup.WithNErrs(len(disks)) @@ -497,8 +497,8 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc dstEntry = retainSlash(dstEntry) srcEntry = retainSlash(srcEntry) } - defer ObjectPathUpdated(path.Join(srcBucket, srcEntry)) - defer ObjectPathUpdated(path.Join(dstBucket, dstEntry)) + defer ObjectPathUpdated(pathJoin(srcBucket, srcEntry)) + defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry)) g := errgroup.WithNErrs(len(disks)) @@ -541,7 +541,7 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st // putObject wrapper for erasureObjects PutObject func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { - defer ObjectPathUpdated(path.Join(bucket, object)) + defer ObjectPathUpdated(pathJoin(bucket, object)) data := r.Reader @@ -748,7 +748,7 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object func (er erasureObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int) error { var disks []StorageAPI var err error - defer ObjectPathUpdated(path.Join(bucket, object)) + defer ObjectPathUpdated(pathJoin(bucket, object)) tmpObj := mustGetUUID() if bucket == minioMetaTmpBucket { @@ -803,17 +803,16 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec versions := make([]FileInfo, len(objects)) for i := range objects { - modTime := opts.MTime - if opts.MTime.IsZero() { - modTime = UTCNow() - } - uuid := opts.VersionID - if uuid == "" { - uuid = mustGetUUID() - } - if objects[i].VersionID == "" { - if (opts.Versioned || opts.VersionSuspended) && !HasSuffix(objects[i].ObjectName, SlashSeparator) { + modTime := opts.MTime + if opts.MTime.IsZero() { + modTime = UTCNow() + } + uuid := opts.VersionID + if uuid == "" { + uuid = mustGetUUID() + } + if opts.Versioned || opts.VersionSuspended { versions[i] = FileInfo{ Name: objects[i].ObjectName, ModTime: modTime, @@ -976,7 +975,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string modTime = UTCNow() } if markDelete { - if (opts.Versioned || opts.VersionSuspended) && !HasSuffix(object, SlashSeparator) { + if opts.Versioned || opts.VersionSuspended { fi := FileInfo{ Name: object, Deleted: deleteMarker, diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index e2b6b511a..c9fabd7c7 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -125,17 +125,20 @@ func getOpts(ctx context.Context, r *http.Request, bucket, object string) (Objec opts.VersionID = vid delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker)) if delMarker != "" { - if delMarker != "true" && delMarker != "false" { + switch delMarker { + case "true": + opts.DeleteMarker = true + case "false": + default: + err = fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceDeleteMarker, fmt.Errorf("DeleteMarker should be true or false")) logger.LogIf(ctx, err) return opts, InvalidArgument{ Bucket: bucket, Object: object, - Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceDeleteMarker, fmt.Errorf("DeleteMarker should be true or false")), + Err: err, } } - if delMarker == "true" { - opts.DeleteMarker = true - } + } return opts, nil } @@ -150,32 +153,36 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts opts.VersionSuspended = globalBucketVersioningSys.Suspended(bucket) delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker)) if delMarker != "" { - if delMarker != "true" && delMarker != "false" { + switch delMarker { + case "true": + opts.DeleteMarker = true + case "false": + default: + err = fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceDeleteMarker, fmt.Errorf("DeleteMarker should be true or false")) logger.LogIf(ctx, err) return opts, InvalidArgument{ Bucket: bucket, Object: object, - Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceDeleteMarker, fmt.Errorf("DeleteMarker should be true or false")), + Err: err, } } - if delMarker == "true" { - opts.DeleteMarker = true - } } purgeVersion := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarkerDelete)) if purgeVersion != "" { - if purgeVersion != "true" && purgeVersion != "false" { + switch purgeVersion { + case "true": + opts.VersionPurgeStatus = Complete + case "false": + default: + err = fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceDeleteMarkerDelete, fmt.Errorf("DeleteMarkerPurge should be true or false")) logger.LogIf(ctx, err) return opts, InvalidArgument{ Bucket: bucket, Object: object, - Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceDeleteMarkerDelete, fmt.Errorf("DeleteMarkerPurge should be true or false")), + Err: err, } } - if purgeVersion == "true" { - opts.VersionPurgeStatus = Complete - } } mtime := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceMTime))