From cbf4bb62e0196c5f73570184a693ebd2c0f4f674 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 10 Feb 2021 11:45:02 -0800 Subject: [PATCH] fix: getPoolIdx decouple from top level options (#11512) top-level options shouldn't be passed down for GetObjectInfo() while verifying the objects in different pools, this is to make sure that we always get the value from the pool where the object exists. --- cmd/bucket-replication.go | 4 +--- cmd/erasure-object.go | 1 + cmd/erasure-server-pool.go | 39 +++++++++++++++++--------------------- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 40972f469..dd56e8abb 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -714,9 +714,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa } // This lower level implementation is necessary to avoid write locks from CopyObject. - poolIdx, err := z.getPoolIdx(ctx, bucket, object, ObjectOptions{ - VersionID: objInfo.VersionID, - }, objInfo.Size) + poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } else { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 1a15459fe..f4af2248b 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -460,6 +460,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin // Make sure to return object info to provide extra information. return objInfo, toObjectErr(errMethodNotAllowed, bucket, object) } + if fi.Deleted { if opts.VersionID == "" || opts.DeleteMarker { return objInfo, toObjectErr(errFileNotFound, bucket, object) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index aedbf8a4c..0595de33b 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -246,31 +246,26 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, s // getPoolIdx returns the found previous object and its corresponding pool idx, // if none are found falls back to most available space pool. -func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, opts ObjectOptions, size int64) (idx int, err error) { +func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { if z.SinglePool() { return 0, nil } for i, pool := range z.serverPools { - objInfo, err := pool.GetObjectInfo(ctx, bucket, object, opts) - switch err.(type) { - case VersionNotFound: - // VersionId not found, versionId was specified - case ObjectNotFound: - // VersionId was not specified but found delete marker or no versions exist. - case MethodNotAllowed: - // VersionId was specified but found delete marker - default: - // All other unhandled errors return right here. - if err != nil { - return -1, err + objInfo, err := pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{}) + if err != nil && !isErrObjectNotFound(err) { + return -1, err + } + if isErrObjectNotFound(err) { + // No object exists or its a delete marker, + // check objInfo to confirm. + if objInfo.DeleteMarker && objInfo.Name != "" { + return i, nil } - } - // delete marker not specified means no versions - // exist continue to next pool. - if !objInfo.DeleteMarker && err != nil { + // objInfo is not valid, truly the object doesn't + // exist proceed to next pool. continue } - // Success case and when DeleteMarker is true return. + // object exists at this pool. return i, nil } @@ -604,7 +599,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec return z.serverPools[0].PutObject(ctx, bucket, object, data, opts) } - idx, err := z.getPoolIdx(ctx, bucket, object, opts, data.Size()) + idx, err := z.getPoolIdx(ctx, bucket, object, data.Size()) if err != nil { return ObjectInfo{}, err } @@ -625,7 +620,7 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob } // We don't know the size here set 1GiB atleast. - idx, err := z.getPoolIdx(ctx, bucket, object, opts, 1<<30) + idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) if err != nil { return objInfo, err } @@ -676,7 +671,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) - poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, dstOpts, srcInfo.Size) + poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, srcInfo.Size) if err != nil { return objInfo, err } @@ -860,7 +855,7 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } // We don't know the exact size, so we ask for at least 1GiB file. - idx, err := z.getPoolIdx(ctx, bucket, object, opts, 1<<30) + idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) if err != nil { return "", err }