fix: make sure parentDirIsObject is used at set level (#11280)

parentDirIsObject is not using set level understanding
to check for parent objects, without this it can lead to
objects that can actually reside on a separate set as
objects and would conflict.
master
Harshavardhana 4 years ago committed by GitHub
parent ddb5d7043a
commit 4315f93421
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/erasure-multipart.go
  2. 2
      cmd/erasure-object.go
  3. 6
      cmd/erasure-sets.go
  4. 1
      cmd/metacache-set.go
  5. 2
      cmd/object-api-interface.go

@ -690,7 +690,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// Check if an object is present as one of the parent dir. // Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock). // -- FIXME. (needs a new kind of lock).
if er.parentDirIsObject(ctx, bucket, path.Dir(object)) { if opts.ParentIsObject != nil && opts.ParentIsObject(ctx, bucket, path.Dir(object)) {
return oi, toObjectErr(errFileParentIsFile, bucket, object) return oi, toObjectErr(errFileParentIsFile, bucket, object)
} }

@ -621,7 +621,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
// Check if an object is present as one of the parent dir. // Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock). // -- FIXME. (needs a new kind of lock).
// -- FIXME (this also causes performance issue when disks are down). // -- FIXME (this also causes performance issue when disks are down).
if er.parentDirIsObject(ctx, bucket, path.Dir(object)) { if opts.ParentIsObject != nil && opts.ParentIsObject(ctx, bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object) return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
} }

@ -773,8 +773,13 @@ func (s *erasureSets) GetObject(ctx context.Context, bucket, object string, star
return s.getHashedSet(object).GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts) return s.getHashedSet(object).GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
} }
func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
return s.getHashedSet(parent).parentDirIsObject(ctx, bucket, parent)
}
// PutObject - writes an object to hashedSet based on the object name. // PutObject - writes an object to hashedSet based on the object name.
func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
opts.ParentIsObject = s.parentDirIsObject
return s.getHashedSet(object).PutObject(ctx, bucket, object, data, opts) return s.getHashedSet(object).PutObject(ctx, bucket, object, data, opts)
} }
@ -1064,6 +1069,7 @@ func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object,
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name. // CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
opts.ParentIsObject = s.parentDirIsObject
return s.getHashedSet(object).CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) return s.getHashedSet(object).CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
} }

@ -682,6 +682,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{ _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{
UserDefined: custom, UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely. NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
ParentIsObject: nil,
}) })
if err != nil { if err != nil {
metaMu.Lock() metaMu.Lock()

@ -53,7 +53,7 @@ type ObjectOptions struct {
TransitionStatus string // status of the transition TransitionStatus string // status of the transition
NoLock bool // indicates to lower layers if the caller is expecting to hold locks. NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
ProxyRequest bool // only set for GET/HEAD in active-active replication scenario ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object.
} }
// BucketOptions represents bucket options for ObjectLayer bucket operations // BucketOptions represents bucket options for ObjectLayer bucket operations

Loading…
Cancel
Save