diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 63517151b..5fd77048d 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -666,9 +666,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h bucket := mux.Vars(r)["bucket"] - // To detect if the client has disconnected. - r.Body = &contextReader{r.Body, r.Context()} - // Require Content-Length to be set in the request size := r.ContentLength if size < 0 { diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 6b0359173..f352857e1 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -346,7 +346,18 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec // of the multipart transaction. // // Implements S3 compatible Upload Part API. -func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, e error) { +func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { + uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) + if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + return PartInfo{}, err + } + readLocked := true + defer func() { + if readLocked { + uploadIDLock.RUnlock() + } + }() + data := r.Reader // Validate input data size and it can never be less than zero. if data.Size() < -1 { @@ -359,7 +370,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Validates if upload ID exists. - if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } @@ -446,8 +457,17 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } } + // Unlock here before acquiring write locks all concurrent + // PutObjectParts would serialize here updating `xl.meta` + uploadIDLock.RUnlock() + readLocked = false + if err = uploadIDLock.GetLock(globalOperationTimeout); err != nil { + return PartInfo{}, err + } + defer uploadIDLock.Unlock() + // Validates if upload ID exists. - if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } @@ -522,6 +542,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u UploadID: uploadID, } + uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) + if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + return MultipartInfo{}, err + } + defer uploadIDLock.RUnlock() + if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) } @@ -564,6 +590,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u // ListPartsInfo structure is marshaled directly into XML and // replied back to the client. func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) { + uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) + if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + return ListPartsInfo{}, err + } + defer uploadIDLock.RUnlock() + if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) } @@ -648,8 +680,16 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up // md5sums of all the parts. // // Implements S3 compatible Complete multipart API. -func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, e error) { - if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { +func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { + // Hold read-locks to verify uploaded parts, also disallows + // parallel part uploads as well. + uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) + if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + return oi, err + } + defer uploadIDLock.RUnlock() + + if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return oi, toObjectErr(err, bucket, object, uploadID) } @@ -797,6 +837,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } + // Hold namespace to complete the transaction + lk := er.NewNSLock(ctx, bucket, object) + if err = lk.GetLock(globalOperationTimeout); err != nil { + return oi, err + } + defer lk.Unlock() + // Rename the multipart object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, fi.DataDir, bucket, object, writeQuorum, nil); err != nil { @@ -832,11 +879,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // All parts are purged from all disks and reference to the uploadID // would be removed from the system, rollback is not possible on this // operation. -// -// Implements S3 compatible Abort multipart API, slight difference is -// that this is an atomic idempotent operation. Subsequent calls have -// no affect and further requests to the same uploadID would not be honored. -func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error { +func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { + lk := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) + if err := lk.GetLock(globalOperationTimeout); err != nil { + return err + } + defer lk.Unlock() + // Validates if upload ID exists. if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return toObjectErr(err, bucket, object, uploadID) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a240d55c3..754ed7c83 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -145,6 +145,32 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri return nil, err } + var unlockOnDefer bool + var nsUnlocker = func() {} + defer func() { + if unlockOnDefer { + nsUnlocker() + } + }() + + // Acquire lock + if lockType != noLock { + lock := er.NewNSLock(ctx, bucket, object) + switch lockType { + case writeLock: + if err = lock.GetLock(globalOperationTimeout); err != nil { + return nil, err + } + nsUnlocker = lock.Unlock + case readLock: + if err = lock.GetRLock(globalOperationTimeout); err != nil { + return nil, err + } + nsUnlocker = lock.RUnlock + } + unlockOnDefer = true + } + // Handler directory request by returning a reader that // returns no bytes. if HasSuffix(object, SlashSeparator) { @@ -152,7 +178,8 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri if objInfo, err = er.getObjectInfoDir(ctx, bucket, object); err != nil { return nil, toObjectErr(err, bucket, object) } - return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts) + unlockOnDefer = false + return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts, nsUnlocker) } fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) @@ -173,7 +200,8 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri }, toObjectErr(errMethodNotAllowed, bucket, object) } - fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts) + unlockOnDefer = false + fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker) if nErr != nil { return nil, nErr } @@ -202,6 +230,13 @@ func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, s return err } + // Lock the object before reading. + lk := er.NewNSLock(ctx, bucket, object) + if err := lk.GetRLock(globalOperationTimeout); err != nil { + return err + } + defer lk.RUnlock() + // Start offset cannot be negative. if startOffset < 0 { logger.LogIf(ctx, errUnexpected, logger.Application) @@ -386,6 +421,13 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin return info, err } + // Lock the object before reading. + lk := er.NewNSLock(ctx, bucket, object) + if err := lk.GetRLock(globalOperationTimeout); err != nil { + return ObjectInfo{}, err + } + defer lk.RUnlock() + if HasSuffix(object, SlashSeparator) { info, err = er.getObjectInfoDir(ctx, bucket, object) if err != nil { @@ -738,6 +780,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st return ObjectInfo{}, toObjectErr(err, bucket, object) } + lk := er.NewNSLock(ctx, bucket, object) + if err := lk.GetLock(globalOperationTimeout); err != nil { + return ObjectInfo{}, err + } + defer lk.Unlock() + // Rename the successfully written temporary object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) @@ -970,6 +1018,13 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string return objInfo, err } + // Acquire a write lock before deleting the object. + lk := er.NewNSLock(ctx, bucket, object) + if err = lk.GetLock(globalOperationTimeout); err != nil { + return ObjectInfo{}, err + } + defer lk.Unlock() + storageDisks := er.getDisks() writeQuorum := len(storageDisks)/2 + 1 diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 48672435d..7df3e2296 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -762,9 +762,13 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB // Check if this request is only metadata update. if cpSrcDstSame && srcInfo.metadataOnly { + // Version ID is set for the destination and source == destination version ID. + // perform an in-place update. if dstOpts.VersionID != "" && srcOpts.VersionID == dstOpts.VersionID { return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) } + // Destination is not versioned and source version ID is empty + // perform an in-place update. if !dstOpts.Versioned && srcOpts.VersionID == "" { return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) } @@ -1070,8 +1074,8 @@ func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploa } // Aborts an in-progress multipart operation on hashedSet based on the object name. -func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error { - return s.getHashedSet(object).AbortMultipartUpload(ctx, bucket, object, uploadID) +func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { + return s.getHashedSet(object).AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } // CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name. diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 868c04c86..0a05d5dc9 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -459,38 +459,16 @@ func (z *erasureZones) MakeBucketWithLocation(ctx context.Context, bucket string } func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { - var nsUnlocker = func() {} - - // Acquire lock - if lockType != noLock { - lock := z.NewNSLock(ctx, bucket, object) - switch lockType { - case writeLock: - if err = lock.GetLock(globalOperationTimeout); err != nil { - return nil, err - } - nsUnlocker = lock.Unlock - case readLock: - if err = lock.GetRLock(globalOperationTimeout); err != nil { - return nil, err - } - nsUnlocker = lock.RUnlock - } - } - for _, zone := range z.zones { gr, err = zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) if err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { continue } - nsUnlocker() return gr, err } - gr.cleanUpFns = append(gr.cleanUpFns, nsUnlocker) return gr, nil } - nsUnlocker() if opts.VersionID != "" { return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} } @@ -498,17 +476,6 @@ func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string } func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { - // Lock the object before reading. - lk := z.NewNSLock(ctx, bucket, object) - if err := lk.GetRLock(globalOperationTimeout); err != nil { - return err - } - defer lk.RUnlock() - - if z.SingleZone() { - return z.zones[0].GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - for _, zone := range z.zones { if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { @@ -518,20 +485,13 @@ func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, sta } return nil } + if opts.VersionID != "" { + return VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} + } return ObjectNotFound{Bucket: bucket, Object: object} } func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - // Lock the object before reading. - lk := z.NewNSLock(ctx, bucket, object) - if err := lk.GetRLock(globalOperationTimeout); err != nil { - return ObjectInfo{}, err - } - defer lk.RUnlock() - - if z.SingleZone() { - return z.zones[0].GetObjectInfo(ctx, bucket, object, opts) - } for _, zone := range z.zones { objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts) if err != nil { @@ -550,13 +510,6 @@ func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, // PutObject - writes an object to least used erasure zone. func (z *erasureZones) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) { - // Lock the object. - lk := z.NewNSLock(ctx, bucket, object) - if err := lk.GetLock(globalOperationTimeout); err != nil { - return ObjectInfo{}, err - } - defer lk.Unlock() - if z.SingleZone() { return z.zones[0].PutObject(ctx, bucket, object, data, opts) } @@ -571,13 +524,6 @@ func (z *erasureZones) PutObject(ctx context.Context, bucket string, object stri } func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - // Acquire a write lock before deleting the object. - lk := z.NewNSLock(ctx, bucket, object) - if err = lk.GetLock(globalOperationTimeout); err != nil { - return ObjectInfo{}, err - } - defer lk.Unlock() - if z.SingleZone() { return z.zones[0].DeleteObject(ctx, bucket, object, opts) } @@ -629,15 +575,7 @@ func (z *erasureZones) DeleteObjects(ctx context.Context, bucket string, objects } func (z *erasureZones) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) { - // Check if this request is only metadata update. cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) - if !cpSrcDstSame { - lk := z.NewNSLock(ctx, dstBucket, dstObject) - if err := lk.GetLock(globalOperationTimeout); err != nil { - return objInfo, err - } - defer lk.Unlock() - } zoneIdx, err := z.getZoneIdx(ctx, dstBucket, dstObject, dstOpts, srcInfo.Size) if err != nil { @@ -645,12 +583,19 @@ func (z *erasureZones) CopyObject(ctx context.Context, srcBucket, srcObject, dst } if cpSrcDstSame && srcInfo.metadataOnly { + // Version ID is set for the destination and source == destination version ID. if dstOpts.VersionID != "" && srcOpts.VersionID == dstOpts.VersionID { return z.zones[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) } + // Destination is not versioned and source version ID is empty + // perform an in-place update. if !dstOpts.Versioned && srcOpts.VersionID == "" { return z.zones[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) } + // Destination is versioned, source is not destination version, + // as a special case look for if the source object is not legacy + // from older format, for older format we will rewrite them as + // newer using PutObject() - this is an optimization to save space if dstOpts.Versioned && srcOpts.VersionID != dstOpts.VersionID && !srcInfo.Legacy { // CopyObject optimization where we don't create an entire copy // of the content, instead we add a reference. @@ -1384,12 +1329,6 @@ func (z *erasureZones) PutObjectPart(ctx context.Context, bucket, object, upload return PartInfo{}, err } - uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil { - return PartInfo{}, err - } - defer uploadIDLock.Unlock() - if z.SingleZone() { return z.zones[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } @@ -1420,12 +1359,6 @@ func (z *erasureZones) GetMultipartInfo(ctx context.Context, bucket, object, upl return MultipartInfo{}, err } - uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil { - return MultipartInfo{}, err - } - defer uploadIDLock.RUnlock() - if z.SingleZone() { return z.zones[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts) } @@ -1456,12 +1389,6 @@ func (z *erasureZones) ListObjectParts(ctx context.Context, bucket, object, uplo return ListPartsInfo{}, err } - uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil { - return ListPartsInfo{}, err - } - defer uploadIDLock.RUnlock() - if z.SingleZone() { return z.zones[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) } @@ -1484,25 +1411,19 @@ func (z *erasureZones) ListObjectParts(ctx context.Context, bucket, object, uplo } // Aborts an in-progress multipart operation on hashedSet based on the object name. -func (z *erasureZones) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error { +func (z *erasureZones) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { if err := checkAbortMultipartArgs(ctx, bucket, object, z); err != nil { return err } - uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil { - return err - } - defer uploadIDLock.Unlock() - if z.SingleZone() { - return z.zones[0].AbortMultipartUpload(ctx, bucket, object, uploadID) + return z.zones[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } for _, zone := range z.zones { - _, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) + _, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err == nil { - return zone.AbortMultipartUpload(ctx, bucket, object, uploadID) + return zone.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } switch err.(type) { case InvalidUploadID: @@ -1524,22 +1445,6 @@ func (z *erasureZones) CompleteMultipartUpload(ctx context.Context, bucket, obje return objInfo, err } - // Hold read-locks to verify uploaded parts, also disallows - // parallel part uploads as well. - uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) - if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil { - return objInfo, err - } - defer uploadIDLock.RUnlock() - - // Hold namespace to complete the transaction, only hold - // if uploadID can be held exclusively. - lk := z.NewNSLock(ctx, bucket, object) - if err = lk.GetLock(globalOperationTimeout); err != nil { - return objInfo, err - } - defer lk.Unlock() - if z.SingleZone() { return z.zones[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 2bbc25dd2..110ce4617 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -779,7 +779,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // that this is an atomic idempotent operation. Subsequent calls have // no affect and further requests to the same uploadID would not be // honored. -func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error { +func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { if err := checkAbortMultipartArgs(ctx, bucket, object, fs); err != nil { return err } diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index b3a826a07..e75bf2da9 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -61,7 +61,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { cleanupWg.Wait() // Check if upload id was already purged. - if err = obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID); err != nil { + if err = obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID, ObjectOptions{}); err != nil { if _, ok := err.(InvalidUploadID); !ok { t.Fatal("Unexpected err: ", err) } @@ -215,11 +215,12 @@ func TestAbortMultipartUpload(t *testing.T) { md5Hex := getMD5Hash(data) - if _, err := obj.PutObjectPart(GlobalContext, bucketName, objectName, uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(data), 5, md5Hex, ""), ObjectOptions{}); err != nil { + opts := ObjectOptions{} + if _, err := obj.PutObjectPart(GlobalContext, bucketName, objectName, uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(data), 5, md5Hex, ""), opts); err != nil { t.Fatal("Unexpected error ", err) } time.Sleep(time.Second) // Without Sleep on windows, the fs.AbortMultipartUpload() fails with "The process cannot access the file because it is being used by another process." - if err := obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID); err != nil { + if err := obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID, opts); err != nil { t.Fatal("Unexpected error ", err) } } diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 2fb919b3d..6fc3c0973 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -91,7 +91,7 @@ func (a GatewayUnsupported) ListObjectParts(ctx context.Context, bucket string, } // AbortMultipartUpload aborts a ongoing multipart upload -func (a GatewayUnsupported) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error { +func (a GatewayUnsupported) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, opts ObjectOptions) error { return NotImplemented{} } diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 10f4b4fa4..a12008443 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -1238,7 +1238,7 @@ func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uplo // AbortMultipartUpload - Not Implemented. // There is no corresponding API in azure to abort an incomplete upload. The uncommmitted blocks // gets deleted after one week. -func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) { +func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (err error) { if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return err } diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 79d774557..1bdd94f11 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -1253,7 +1253,7 @@ func (l *gcsGateway) cleanupMultipartUpload(ctx context.Context, bucket, key, up } // AbortMultipartUpload aborts a ongoing multipart upload -func (l *gcsGateway) AbortMultipartUpload(ctx context.Context, bucket string, key string, uploadID string) error { +func (l *gcsGateway) AbortMultipartUpload(ctx context.Context, bucket string, key string, uploadID string, opts minio.ObjectOptions) error { if err := l.checkUploadIDExists(ctx, bucket, key, uploadID); err != nil { return err } diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index 39ae7cc49..fcaaacc4e 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -859,7 +859,7 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec }, nil } -func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) { +func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (err error) { _, err = n.clnt.Stat(n.hdfsPathJoin(bucket)) if err != nil { return hdfsToObjectErr(ctx, err, bucket) diff --git a/cmd/gateway/s3/gateway-s3-sse.go b/cmd/gateway/s3/gateway-s3-sse.go index fab5eb7c3..9b373dfe4 100644 --- a/cmd/gateway/s3/gateway-s3-sse.go +++ b/cmd/gateway/s3/gateway-s3-sse.go @@ -569,12 +569,12 @@ func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, objec } // AbortMultipartUpload aborts a ongoing multipart upload -func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error { +func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, opts minio.ObjectOptions) error { if _, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID)); err != nil { - return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID) + return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } - if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID); err != nil { + if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID, opts); err != nil { return err } diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 994617a55..86cd1a919 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -653,7 +653,7 @@ func (l *s3Objects) ListObjectParts(ctx context.Context, bucket string, object s } // AbortMultipartUpload aborts a ongoing multipart upload -func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error { +func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, opts minio.ObjectOptions) error { err := l.Client.AbortMultipartUpload(ctx, bucket, object, uploadID) return minio.ErrorRespToObjectError(err, bucket, object) } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 7215ba092..620e1c495 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -110,7 +110,7 @@ type ObjectLayer interface { PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (info MultipartInfo, err error) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) - AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error + AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) // Healing operations. diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index f621a4cba..854962104 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -66,7 +66,7 @@ func testObjectNewMultipartUpload(obj ObjectLayer, instanceType string, t TestEr t.Fatalf("%s : %s", instanceType, err.Error()) } - err = obj.AbortMultipartUpload(context.Background(), bucket, "\\", uploadID) + err = obj.AbortMultipartUpload(context.Background(), bucket, "\\", uploadID, opts) if err != nil { switch err.(type) { case InvalidUploadID: @@ -114,7 +114,7 @@ func testObjectAbortMultipartUpload(obj ObjectLayer, instanceType string, t Test } // Iterating over creatPartCases to generate multipart chunks. for i, testCase := range abortTestCases { - err = obj.AbortMultipartUpload(context.Background(), testCase.bucketName, testCase.objName, testCase.uploadID) + err = obj.AbortMultipartUpload(context.Background(), testCase.bucketName, testCase.objName, testCase.uploadID, opts) if testCase.expectedErrType == nil && err != nil { t.Errorf("Test %d, unexpected err is received: %v, expected:%v\n", i+1, err, testCase.expectedErrType) } @@ -146,7 +146,8 @@ func testObjectAPIIsUploadIDExists(obj ObjectLayer, instanceType string, t TestE t.Fatalf("%s : %s", instanceType, err.Error()) } - err = obj.AbortMultipartUpload(context.Background(), bucket, object, "abc") + opts := ObjectOptions{} + err = obj.AbortMultipartUpload(context.Background(), bucket, object, "abc", opts) switch err.(type) { case InvalidUploadID: default: @@ -1565,7 +1566,7 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) } for i, testCase := range testCases { - actualResult, actualErr := obj.ListObjectParts(context.Background(), testCase.bucket, testCase.object, testCase.uploadID, testCase.partNumberMarker, testCase.maxParts, ObjectOptions{}) + actualResult, actualErr := obj.ListObjectParts(context.Background(), testCase.bucket, testCase.object, testCase.uploadID, testCase.partNumberMarker, testCase.maxParts, opts) if actualErr != nil && testCase.shouldPass { t.Errorf("Test %d: %s: Expected to pass, but failed with: %s", i+1, instanceType, actualErr.Error()) } @@ -1801,6 +1802,7 @@ func testObjectCompleteMultipartUpload(obj ObjectLayer, instanceType string, t T for _, testCase := range testCases { testCase := testCase t.(*testing.T).Run("", func(t *testing.T) { + opts = ObjectOptions{} actualResult, actualErr := obj.CompleteMultipartUpload(context.Background(), testCase.bucket, testCase.object, testCase.uploadID, testCase.parts, ObjectOptions{}) if actualErr != nil && testCase.shouldPass { t.Errorf("%s: Expected to pass, but failed with: %s", instanceType, actualErr) diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index e55aa1b6e..fdb09c334 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -877,19 +877,3 @@ func newS2CompressReader(r io.Reader) io.ReadCloser { }() return pr } - -// Returns error if the context is canceled, indicating -// either client has disconnected -type contextReader struct { - io.ReadCloser - ctx context.Context -} - -func (d *contextReader) Read(p []byte) (int, error) { - select { - case <-d.ctx.Done(): - return 0, d.ctx.Err() - default: - return d.ReadCloser.Read(p) - } -} diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index a75b366b7..fbcf4aef5 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -287,31 +287,30 @@ func deleteObject(ctx context.Context, obj ObjectLayer, cache CacheObjectLayer, deleteObject = cache.DeleteObject } // Proceed to delete the object. - if objInfo, err = deleteObject(ctx, bucket, object, opts); err != nil { - return objInfo, err - } - - // Requesting only a delete marker which was successfully attempted. - if objInfo.DeleteMarker { - // Notify object deleted marker event. - sendEvent(eventArgs{ - EventName: event.ObjectRemovedDeleteMarkerCreated, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }) - } else { - // Notify object deleted event. - sendEvent(eventArgs{ - EventName: event.ObjectRemovedDelete, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }) + objInfo, err = deleteObject(ctx, bucket, object, opts) + if objInfo.Name != "" { + // Requesting only a delete marker which was successfully attempted. + if objInfo.DeleteMarker { + // Notify object deleted marker event. + sendEvent(eventArgs{ + EventName: event.ObjectRemovedDeleteMarkerCreated, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + } else { + // Notify object deleted event. + sendEvent(eventArgs{ + EventName: event.ObjectRemovedDelete, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + } } - return objInfo, nil + return objInfo, err } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 0c19d3baa..bf23c5e5c 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -189,7 +189,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r End: offset + length, } - return getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, ObjectOptions{}) + return getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts) } objInfo, err := getObjectInfo(ctx, bucket, object, opts) @@ -891,10 +891,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re getObjectNInfo = api.CacheAPI().GetObjectNInfo } - var lock = noLock - if !cpSrcDstSame { - lock = readLock - } checkCopyPrecondFn := func(o ObjectInfo) bool { if objectAPI.IsEncryptionSupported() { if _, err := DecryptObjectInfo(&o, r); err != nil { @@ -906,6 +902,16 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re } getOpts.CheckPrecondFn = checkCopyPrecondFn + // FIXME: a possible race exists between a parallel + // GetObject v/s CopyObject with metadata updates, ideally + // we should be holding write lock here but it is not + // possible due to other constraints such as knowing + // the type of source content etc. + lock := noLock + if !cpSrcDstSame { + lock = readLock + } + var rs *HTTPRangeSpec gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, lock, getOpts) if err != nil { @@ -1227,6 +1233,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re if api.CacheAPI() != nil { copyObjectFn = api.CacheAPI().CopyObject } + // Copy source object to destination, if source and destination // object is same then only metadata is updated. objInfo, err = copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) @@ -1306,9 +1313,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } - // To detect if the client has disconnected. - r.Body = &contextReader{r.Body, r.Context()} - // X-Amz-Copy-Source shouldn't be set for this call. if _, ok := r.Header[xhttp.AmzCopySource]; ok { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r)) @@ -1847,7 +1851,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return false } getOpts.CheckPrecondFn = checkCopyPartPrecondFn - gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock, getOpts) if err != nil { if isErrPreconditionFailed(err) { @@ -2056,9 +2059,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } - // To detect if the client has disconnected. - r.Body = &contextReader{r.Body, r.Context()} - // X-Amz-Copy-Source shouldn't be set for this call. if _, ok := r.Header[xhttp.AmzCopySource]; ok { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r)) @@ -2308,7 +2308,8 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) return } - if err := abortMultipartUpload(ctx, bucket, object, uploadID); err != nil { + opts := ObjectOptions{} + if err := abortMultipartUpload(ctx, bucket, object, uploadID, opts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } @@ -2355,7 +2356,7 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht return } - var opts ObjectOptions + opts := ObjectOptions{} listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) @@ -2521,9 +2522,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite var objectEncryptionKey []byte var isEncrypted, ssec bool - var opts ObjectOptions if objectAPI.IsEncryptionSupported() { - mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -2546,7 +2546,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite partsMap := make(map[string]PartInfo) if isEncrypted { maxParts := 10000 - listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, maxParts, opts) + listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, maxParts, ObjectOptions{}) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -2601,7 +2601,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)} completeDoneCh := sendWhiteSpace(w) - objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts) + objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, ObjectOptions{}) // Stop writing white spaces to the client. Note that close(doneCh) style is not used as it // can cause white space to be written after we send XML response in a race condition. headerWritten := <-completeDoneCh @@ -2735,16 +2735,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. setPutObjHeaders(w, objInfo, true) writeSuccessNoContent(w) - - sendEvent(eventArgs{ - EventName: event.ObjectRemovedDelete, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - RespElements: extractRespElements(w), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }) } // PutObjectLegalHoldHandler - set legal hold configuration to object, diff --git a/cmd/object_api_suite_test.go b/cmd/object_api_suite_test.go index 45939478a..124002cc5 100644 --- a/cmd/object_api_suite_test.go +++ b/cmd/object_api_suite_test.go @@ -166,7 +166,7 @@ func testMultipartObjectAbort(obj ObjectLayer, instanceType string, t TestErrHan } parts[i] = expectedETaghex } - err = obj.AbortMultipartUpload(context.Background(), "bucket", "key", uploadID) + err = obj.AbortMultipartUpload(context.Background(), "bucket", "key", uploadID, ObjectOptions{}) if err != nil { t.Fatalf("%s: %s", instanceType, err) } diff --git a/cmd/server-main.go b/cmd/server-main.go index fc2f201d4..5e246e6d6 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -17,12 +17,9 @@ package cmd import ( - "bufio" "context" "errors" "fmt" - "io" - "log" "net" "os" "os/signal" @@ -247,7 +244,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { // version is needed, migration is needed etc. rquorum := InsufficientReadQuorum{} wquorum := InsufficientWriteQuorum{} - for range retry.NewTimer(retryCtx) { + for range retry.NewTimerWithJitter(retryCtx, 250*time.Millisecond, 500*time.Millisecond, retry.MaxJitter) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. if err = txnLk.GetLock(configLockTimeout); err != nil { @@ -462,27 +459,7 @@ func serverMain(ctx *cli.Context) { getCert = globalTLSCerts.GetCertificate } - // Annonying hack to ensure that Go doesn't write its own logging, - // interleaved with our formatted logging, this allows us to - // honor --json and --quiet flag properly. - // - // Unfortunately we have to resort to this sort of hacky approach - // because, Go automatically initializes ErrorLog on its own - // and can log without application control. - // - // This is an implementation issue in Go and should be fixed, but - // until then this hack is okay and works for our needs. - pr, pw := io.Pipe() - go func() { - defer pr.Close() - scanner := bufio.NewScanner(&contextReader{pr, GlobalContext}) - for scanner.Scan() { - logger.LogIf(GlobalContext, errors.New(scanner.Text())) - } - }() - httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert) - httpServer.ErrorLog = log.New(pw, "", 0) httpServer.BaseContext = func(listener net.Listener) context.Context { return GlobalContext } diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 0ccc0bbab..0bbf008a0 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -25,7 +25,6 @@ import ( "time" "github.com/minio/minio/pkg/console" - "github.com/minio/minio/pkg/retry" ) // Indicator if logging is enabled. @@ -132,6 +131,10 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option return dm.lockBlocking(ctx, id, source, isReadLock, opts) } +const ( + lockRetryInterval = 50 * time.Millisecond +) + // lockBlocking will try to acquire either a read or a write lock // // The function will loop using a built-in timing randomized back-off @@ -140,40 +143,50 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) { restClnts := dm.clnt.GetLockersFn() - retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout) - - defer cancel() + r := rand.New(rand.NewSource(time.Now().UnixNano())) - // Use incremental back-off algorithm for repeated attempts to acquire the lock - for range retry.NewTimer(retryCtx) { - // Create temp array on stack. - locks := make([]string, len(restClnts)) + // Create lock array to capture the successful lockers + locks := make([]string, len(restClnts)) - // Try to acquire the lock. - locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, opts.Tolerance, dm.Names...) - if !locked { - continue + cleanLocks := func(locks []string) { + for i := range locks { + locks[i] = "" } + } - dm.m.Lock() + retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout) + defer cancel() - // If success, copy array to object - if isReadLock { - // Append new array of strings at the end - dm.readersLocks = append(dm.readersLocks, make([]string, len(restClnts))) - // and copy stack array into last spot - copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:]) - } else { - copy(dm.writeLocks, locks[:]) - } + for { + // cleanup any older state, re-use the lock slice. + cleanLocks(locks) + + select { + case <-retryCtx.Done(): + // Caller context canceled or we timedout, + // return false anyways for both situations. + return false + default: + // Try to acquire the lock. + if locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, opts.Tolerance, dm.Names...); locked { + dm.m.Lock() + + // If success, copy array to object + if isReadLock { + // Append new array of strings at the end + dm.readersLocks = append(dm.readersLocks, make([]string, len(restClnts))) + // and copy stack array into last spot + copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:]) + } else { + copy(dm.writeLocks, locks[:]) + } - dm.m.Unlock() - return locked + dm.m.Unlock() + return locked + } + time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) + } } - - // Failed to acquire the lock on this attempt, incrementally wait - // for a longer back-off time and try again afterwards. - return locked } // lock tries to acquire the distributed lock, returning true or false. diff --git a/pkg/lsync/lrwmutex.go b/pkg/lsync/lrwmutex.go index 85deecb17..72719cd30 100644 --- a/pkg/lsync/lrwmutex.go +++ b/pkg/lsync/lrwmutex.go @@ -19,10 +19,9 @@ package lsync import ( "context" "math" + "math/rand" "sync" "time" - - "github.com/minio/minio/pkg/retry" ) // A LRWMutex is a mutual exclusion lock with timeouts. @@ -94,25 +93,33 @@ func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) { return locked } +const ( + lockRetryInterval = 50 * time.Millisecond +) + // lockLoop will acquire either a read or a write lock // // The call will block until the lock is granted using a built-in // timing randomized back-off algorithm to try again until successful func (lm *LRWMutex) lockLoop(ctx context.Context, id, source string, timeout time.Duration, isWriteLock bool) (locked bool) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + retryCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // We timed out on the previous lock, incrementally wait - // for a longer back-off time and try again afterwards. - for range retry.NewTimer(retryCtx) { - if lm.lock(id, source, isWriteLock) { - return true + for { + select { + case <-retryCtx.Done(): + // Caller context canceled or we timedout, + // return false anyways for both situations. + return false + default: + if lm.lock(id, source, isWriteLock) { + return true + } + time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) } } - - // We timed out on the previous lock, incrementally wait - // for a longer back-off time and try again afterwards. - return false } // Unlock unlocks the write lock.