From 18063bf25cedaf072455f0ac9c5af6c32904f979 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 6 Oct 2020 12:03:57 -0700 Subject: [PATCH] fix: cleanup old directory handling code (#10633) we don't need them anymore, remove legacy code. --- cmd/api-errors.go | 8 +- cmd/erasure-object.go | 141 +++++++-------------------------- cmd/erasure-zones.go | 21 +++++ cmd/fs-v1.go | 4 +- cmd/object-api-common.go | 23 ------ cmd/object-api-input-checks.go | 3 +- 6 files changed, 55 insertions(+), 145 deletions(-) diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 00762ef04..0c03c76db 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -820,22 +820,22 @@ var errorCodes = errorCodeMap{ HTTPStatusCode: http.StatusBadRequest, }, ErrReplicationTargetNotFoundError: { - Code: "XminioAdminReplicationTargetNotFoundError", + Code: "XMinioAdminReplicationTargetNotFoundError", Description: "The replication target does not exist", HTTPStatusCode: http.StatusNotFound, }, ErrReplicationRemoteConnectionError: { - Code: "XminioAdminReplicationRemoteConnectionError", + Code: "XMinioAdminReplicationRemoteConnectionError", Description: "Remote service endpoint or target bucket not available", HTTPStatusCode: http.StatusNotFound, }, ErrBucketRemoteIdenticalToSource: { - Code: "XminioAdminRemoteIdenticalToSource", + Code: "XMinioAdminRemoteIdenticalToSource", Description: "The remote target cannot be identical to source", HTTPStatusCode: http.StatusBadRequest, }, ErrBucketRemoteAlreadyExists: { - Code: "XminioAdminBucketRemoteAlreadyExists", + Code: "XMinioAdminBucketRemoteAlreadyExists", Description: "The remote target already exists", HTTPStatusCode: http.StatusBadRequest, }, diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 438265a55..992892a05 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "context" "errors" "fmt" @@ -123,10 +122,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // GetObjectNInfo - returns object info and an object // Read(Closer). When err != nil, the returned reader is always nil. func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { - if err = checkGetObjArgs(ctx, bucket, object); err != nil { - return nil, err - } - var unlockOnDefer bool var nsUnlocker = func() {} defer func() { @@ -153,17 +148,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri unlockOnDefer = true } - // Handler directory request by returning a reader that - // returns no bytes. - if HasSuffix(object, SlashSeparator) { - var objInfo ObjectInfo - if objInfo, err = er.getObjectInfoDir(ctx, bucket, object); err != nil { - return nil, toObjectErr(err, bucket, object) - } - unlockOnDefer = false - return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts, nsUnlocker) - } - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) if err != nil { return nil, toObjectErr(err, bucket, object) @@ -208,10 +192,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri // startOffset indicates the starting read location of the object. // length indicates the total length of the object. func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { - if err := checkGetObjArgs(ctx, bucket, object); err != nil { - return err - } - // Lock the object before reading. lk := er.NewNSLock(ctx, bucket, object) if err := lk.GetRLock(globalOperationTimeout); err != nil { @@ -366,43 +346,8 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, etag, opts, fi, metaArr, onlineDisks) } -// getObjectInfoDir - This getObjectInfo is specific to object directory lookup. -func (er erasureObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (ObjectInfo, error) { - storageDisks := er.getDisks() - - g := errgroup.WithNErrs(len(storageDisks)) - - // Prepare object creation in a all disks - for index, disk := range storageDisks { - if disk == nil { - continue - } - index := index - g.Go(func() error { - // Check if 'prefix' is an object on this 'disk'. - entries, err := storageDisks[index].ListDir(ctx, bucket, object, 1) - if err != nil { - return err - } - if len(entries) > 0 { - // Not a directory if not empty. - return errFileNotFound - } - return nil - }, index) - } - - readQuorum := getReadQuorum(len(storageDisks)) - err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) - return dirObjectInfo(bucket, object, 0, map[string]string{}), err -} - // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { - if err = checkGetObjArgs(ctx, bucket, object); err != nil { - return info, err - } - // Lock the object before reading. lk := er.NewNSLock(ctx, bucket, object) if err := lk.GetRLock(globalOperationTimeout); err != nil { @@ -410,14 +355,6 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin } defer lk.RUnlock() - if HasSuffix(object, SlashSeparator) { - info, err = er.getObjectInfoDir(ctx, bucket, object) - if err != nil { - return info, toObjectErr(err, bucket, object) - } - return info, nil - } - return er.getObjectInfo(ctx, bucket, object, opts) } @@ -584,11 +521,6 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc // writes `xl.meta` which carries the necessary metadata for future // object operations. func (er erasureObjects) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { - // Validate put object input args. - if err = checkPutObjectArgs(ctx, bucket, object, er, data.Size()); err != nil { - return ObjectInfo{}, err - } - return er.putObject(ctx, bucket, object, data, opts) } @@ -788,11 +720,7 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object if disks[index] == nil { return errDiskNotFound } - err := disks[index].DeleteVersion(ctx, bucket, object, fi) - if err != nil && err != errVolumeNotFound { - return err - } - return nil + return disks[index].DeleteVersion(ctx, bucket, object, fi) }, index) } @@ -853,16 +781,9 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec dobjects := make([]DeletedObject, len(objects)) writeQuorums := make([]int, len(objects)) - for i, object := range objects { - errs[i] = checkDelObjArgs(ctx, bucket, object.ObjectName) - } - storageDisks := er.getDisks() for i := range objects { - if errs[i] != nil { - continue - } // Assume (N/2 + 1) quorums for all objects // this is a theoretical assumption such that // for delete's we do not need to honor storage @@ -876,21 +797,19 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec for i := range objects { if objects[i].VersionID == "" { if opts.Versioned || opts.VersionSuspended { - if !HasSuffix(objects[i].ObjectName, SlashSeparator) { - fi := FileInfo{ - Name: objects[i].ObjectName, - ModTime: UTCNow(), - Deleted: true, // delete marker - } - if opts.Versioned { - fi.VersionID = mustGetUUID() - } - // versioning suspended means we add `null` - // version as delete marker - - versions[i] = fi - continue + fi := FileInfo{ + Name: objects[i].ObjectName, + ModTime: UTCNow(), + Deleted: true, // delete marker + } + if opts.Versioned { + fi.VersionID = mustGetUUID() } + // versioning suspended means we add `null` + // version as delete marker + + versions[i] = fi + continue } } versions[i] = FileInfo{ @@ -978,10 +897,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec // any error as it is not necessary for the handler to reply back a // response to the client request. func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - if err = checkDelObjArgs(ctx, bucket, object); err != nil { - return objInfo, err - } - // Acquire a write lock before deleting the object. lk := er.NewNSLock(ctx, bucket, object) if err = lk.GetLock(globalOperationTimeout); err != nil { @@ -994,26 +909,24 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if opts.VersionID == "" { if opts.Versioned || opts.VersionSuspended { - if !HasSuffix(object, SlashSeparator) { - fi := FileInfo{ - Name: object, - Deleted: true, - ModTime: UTCNow(), - } + fi := FileInfo{ + Name: object, + Deleted: true, + ModTime: UTCNow(), + } - if opts.Versioned { - fi.VersionID = mustGetUUID() - } + if opts.Versioned { + fi.VersionID = mustGetUUID() + } - // versioning suspended means we add `null` - // version as delete marker + // versioning suspended means we add `null` + // version as delete marker - // Add delete marker, since we don't have any version specified explicitly. - if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil { - return objInfo, toObjectErr(err, bucket, object) - } - return fi.ToObjectInfo(bucket, object), nil + // Add delete marker, since we don't have any version specified explicitly. + if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil { + return objInfo, toObjectErr(err, bucket, object) } + return fi.ToObjectInfo(bucket, object), nil } } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index e41d49947..3ed667d52 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -460,6 +460,10 @@ func (z *erasureZones) MakeBucketWithLocation(ctx context.Context, bucket string } func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { + if err = checkGetObjArgs(ctx, bucket, object); err != nil { + return nil, err + } + object = encodeDirObject(object) for _, zone := range z.zones { @@ -479,6 +483,10 @@ func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string } func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { + if err := checkGetObjArgs(ctx, bucket, object); err != nil { + return err + } + object = encodeDirObject(object) for _, zone := range z.zones { @@ -497,6 +505,10 @@ func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, sta } func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { + if err = checkGetObjArgs(ctx, bucket, object); err != nil { + return objInfo, err + } + object = encodeDirObject(object) for _, zone := range z.zones { objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts) @@ -517,6 +529,11 @@ func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, // PutObject - writes an object to least used erasure zone. func (z *erasureZones) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) { + // Validate put object input args. + if err := checkPutObjectArgs(ctx, bucket, object, z); err != nil { + return ObjectInfo{}, err + } + object = encodeDirObject(object) if z.SingleZone() { @@ -533,6 +550,10 @@ func (z *erasureZones) PutObject(ctx context.Context, bucket string, object stri } func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { + if err = checkDelObjArgs(ctx, bucket, object); err != nil { + return objInfo, err + } + object = encodeDirObject(object) if z.SingleZone() { diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 169a5e7da..15350811d 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -653,7 +653,7 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu return fsMeta.ToObjectInfo(srcBucket, srcObject, fi), nil } - if err := checkPutObjectArgs(ctx, dstBucket, dstObject, fs, srcInfo.PutObjReader.Size()); err != nil { + if err := checkPutObjectArgs(ctx, dstBucket, dstObject, fs); err != nil { return ObjectInfo{}, err } @@ -1097,7 +1097,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string return objInfo, NotImplemented{} } - if err := checkPutObjectArgs(ctx, bucket, object, fs, r.Size()); err != nil { + if err := checkPutObjectArgs(ctx, bucket, object, fs); err != nil { return ObjectInfo{}, err } diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 551e20ec2..601cb85d2 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -56,29 +56,6 @@ func isObjectDir(object string, size int64) bool { return HasSuffix(object, SlashSeparator) && size == 0 } -// Converts just bucket, object metadata into ObjectInfo datatype. -func dirObjectInfo(bucket, object string, size int64, metadata map[string]string) ObjectInfo { - // This is a special case with size as '0' and object ends with - // a slash separator, we treat it like a valid operation and - // return success. - etag := metadata["etag"] - delete(metadata, "etag") - if etag == "" { - etag = emptyETag - } - - return ObjectInfo{ - Bucket: bucket, - Name: object, - ModTime: UTCNow(), - ContentType: "application/octet-stream", - IsDir: true, - Size: size, - ETag: etag, - UserDefined: metadata, - } -} - // Depending on the disk type network or local, initialize storage API. func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) { if endpoint.IsLocal { diff --git a/cmd/object-api-input-checks.go b/cmd/object-api-input-checks.go index 6415deade..465be5ed9 100644 --- a/cmd/object-api-input-checks.go +++ b/cmd/object-api-input-checks.go @@ -173,7 +173,7 @@ func checkObjectArgs(ctx context.Context, bucket, object string, obj ObjectLayer } // Checks for PutObject arguments validity, also validates if bucket exists. -func checkPutObjectArgs(ctx context.Context, bucket, object string, obj getBucketInfoI, size int64) error { +func checkPutObjectArgs(ctx context.Context, bucket, object string, obj getBucketInfoI) error { // Verify if bucket exists before validating object name. // This is done on purpose since the order of errors is // important here bucket does not exist error should @@ -187,7 +187,6 @@ func checkPutObjectArgs(ctx context.Context, bucket, object string, obj getBucke return err } if len(object) == 0 || - (HasSuffix(object, SlashSeparator) && size != 0) || !IsValidObjectPrefix(object) { return ObjectNameInvalid{ Bucket: bucket,