diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index f4af2248b..fb9aee6c7 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -370,12 +370,14 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { - // Lock the object before reading. - lk := er.NewNSLock(bucket, object) - if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { - return ObjectInfo{}, err + if !opts.NoLock { + // Lock the object before reading. + lk := er.NewNSLock(bucket, object) + if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { + return ObjectInfo{}, err + } + defer lk.RUnlock() } - defer lk.RUnlock() return er.getObjectInfo(ctx, bucket, object, opts) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 13eab0336..cb95e4dc2 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -511,16 +511,71 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object object = encodeDirObject(object) - for _, pool := range z.serverPools { - gr, err = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) - if err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - continue + if z.SinglePool() { + return z.serverPools[0].GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) + } + + var unlockOnDefer bool + var nsUnlocker = func() {} + defer func() { + if unlockOnDefer { + nsUnlocker() + } + }() + + // Acquire lock + if lockType != noLock { + lock := z.NewNSLock(bucket, object) + switch lockType { + case writeLock: + if err = lock.GetLock(ctx, globalOperationTimeout); err != nil { + return nil, err + } + nsUnlocker = lock.Unlock + case readLock: + if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil { + return nil, err + } + nsUnlocker = lock.RUnlock + } + unlockOnDefer = true + } + + errs := make([]error, len(z.serverPools)) + grs := make([]*GetObjectReader, len(z.serverPools)) + + lockType = noLock // do not take locks at lower levels + var wg sync.WaitGroup + for i, pool := range z.serverPools { + wg.Add(1) + go func(i int, pool *erasureSets) { + defer wg.Done() + grs[i], errs[i] = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) + }(i, pool) + } + wg.Wait() + + var found int = -1 + for i, err := range errs { + if err == nil { + found = i + break + } + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + for _, grr := range grs { + if grr != nil { + grr.Close() + } } return gr, err } - return gr, nil } + + if found >= 0 { + return grs[found], nil + } + + object = decodeDirObject(object) if opts.VersionID != "" { return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} } @@ -533,7 +588,6 @@ func (z *erasureServerPools) GetObject(ctx context.Context, bucket, object strin } object = encodeDirObject(object) - for _, pool := range z.serverPools { if err := pool.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { @@ -555,16 +609,49 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s } object = encodeDirObject(object) - for _, pool := range z.serverPools { - objInfo, err = pool.GetObjectInfo(ctx, bucket, object, opts) - if err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - continue - } - return objInfo, err + + if z.SinglePool() { + return z.serverPools[0].GetObjectInfo(ctx, bucket, object, opts) + } + + // Lock the object before reading. + lk := z.NewNSLock(bucket, object) + if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { + return ObjectInfo{}, err + } + defer lk.RUnlock() + + errs := make([]error, len(z.serverPools)) + objInfos := make([]ObjectInfo, len(z.serverPools)) + + opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups. + var wg sync.WaitGroup + for i, pool := range z.serverPools { + wg.Add(1) + go func(i int, pool *erasureSets) { + defer wg.Done() + objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, opts) + }(i, pool) + } + wg.Wait() + + var found int = -1 + for i, err := range errs { + if err == nil { + found = i + break + } + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + // some errors such as MethodNotAllowed for delete marker + // should be returned upwards. + return objInfos[i], err } - return objInfo, nil } + + if found >= 0 { + return objInfos[found], nil + } + object = decodeDirObject(object) if opts.VersionID != "" { return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} @@ -600,7 +687,6 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob } object = encodeDirObject(object) - if z.SinglePool() { return z.serverPools[0].DeleteObject(ctx, bucket, object, opts) } @@ -1590,27 +1676,13 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts) } - for _, pool := range z.serverPools { - objInfo, err := pool.PutObjectTags(ctx, bucket, object, tags, opts) - if err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - continue - } - return ObjectInfo{}, err - } - return objInfo, nil - } - if opts.VersionID != "" { - return ObjectInfo{}, VersionNotFound{ - Bucket: bucket, - Object: object, - VersionID: opts.VersionID, - } - } - return ObjectInfo{}, ObjectNotFound{ - Bucket: bucket, - Object: object, + // We don't know the size here set 1GiB atleast. + idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + if err != nil { + return ObjectInfo{}, err } + + return z.serverPools[idx].PutObjectTags(ctx, bucket, object, tags, opts) } // DeleteObjectTags - delete object tags from an existing object @@ -1619,27 +1691,14 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec if z.SinglePool() { return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts) } - for _, pool := range z.serverPools { - objInfo, err := pool.DeleteObjectTags(ctx, bucket, object, opts) - if err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - continue - } - return ObjectInfo{}, err - } - return objInfo, nil - } - if opts.VersionID != "" { - return ObjectInfo{}, VersionNotFound{ - Bucket: bucket, - Object: object, - VersionID: opts.VersionID, - } - } - return ObjectInfo{}, ObjectNotFound{ - Bucket: bucket, - Object: object, + + // We don't know the size here set 1GiB atleast. + idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + if err != nil { + return ObjectInfo{}, err } + + return z.serverPools[idx].DeleteObjectTags(ctx, bucket, object, opts) } // GetObjectTags - get object tags from an existing object @@ -1648,25 +1707,12 @@ func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object s if z.SinglePool() { return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts) } - for _, pool := range z.serverPools { - tags, err := pool.GetObjectTags(ctx, bucket, object, opts) - if err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - continue - } - return tags, err - } - return tags, nil - } - if opts.VersionID != "" { - return nil, VersionNotFound{ - Bucket: bucket, - Object: object, - VersionID: opts.VersionID, - } - } - return nil, ObjectNotFound{ - Bucket: bucket, - Object: object, + + // We don't know the size here set 1GiB atleast. + idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + if err != nil { + return nil, err } + + return z.serverPools[idx].GetObjectTags(ctx, bucket, object, opts) }