delayed locks until we have started reading the body (#10474)

This is to ensure that Go contexts work properly, after some
interesting experiments I found that Go net/http doesn't
cancel the context when Body is non-zero and hasn't been
read till EOF.

The following gist explains this, this can lead to pile up
of go-routines on the server which will never be canceled
and will die at a really later point in time, which can
simply overwhelm the server.

https://gist.github.com/harshavardhana/c51dcfd055780eaeb71db54f9c589150

To avoid this refactor the locking such that we take locks after we
have started reading from the body and only take locks when needed.

Also, remove contextReader as it's not useful, doesn't work as expected
context is not canceled until the body reaches EOF so there is no point
in wrapping it with context and putting a `select {` on it which
can unnecessarily increase the CPU overhead.

We will still use the context to cancel the lockers etc.
Additional simplification in the locker code to avoid timers
as re-using them is a complicated ordeal avoid them in
the hot path, since locking is very common this may avoid
lots of allocations.
master
Harshavardhana 4 years ago committed by GitHub
parent 224daee391
commit 0104af6bcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      cmd/bucket-handlers.go
  2. 69
      cmd/erasure-multipart.go
  3. 59
      cmd/erasure-object.go
  4. 8
      cmd/erasure-sets.go
  5. 123
      cmd/erasure-zones.go
  6. 2
      cmd/fs-v1-multipart.go
  7. 7
      cmd/fs-v1-multipart_test.go
  8. 2
      cmd/gateway-unsupported.go
  9. 2
      cmd/gateway/azure/gateway-azure.go
  10. 2
      cmd/gateway/gcs/gateway-gcs.go
  11. 2
      cmd/gateway/hdfs/gateway-hdfs.go
  12. 6
      cmd/gateway/s3/gateway-s3-sse.go
  13. 2
      cmd/gateway/s3/gateway-s3.go
  14. 2
      cmd/object-api-interface.go
  15. 10
      cmd/object-api-multipart_test.go
  16. 16
      cmd/object-api-utils.go
  17. 51
      cmd/object-handlers-common.go
  18. 46
      cmd/object-handlers.go
  19. 2
      cmd/object_api_suite_test.go
  20. 25
      cmd/server-main.go
  21. 69
      pkg/dsync/drwmutex.go
  22. 29
      pkg/lsync/lrwmutex.go

@ -666,9 +666,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
bucket := mux.Vars(r)["bucket"]
// To detect if the client has disconnected.
r.Body = &contextReader{r.Body, r.Context()}
// Require Content-Length to be set in the request
size := r.ContentLength
if size < 0 {

@ -346,7 +346,18 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
// of the multipart transaction.
//
// Implements S3 compatible Upload Part API.
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, e error) {
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return PartInfo{}, err
}
readLocked := true
defer func() {
if readLocked {
uploadIDLock.RUnlock()
}
}()
data := r.Reader
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
@ -359,7 +370,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
// Validates if upload ID exists.
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return pi, toObjectErr(err, bucket, object, uploadID)
}
@ -446,8 +457,17 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
}
}
// Unlock here before acquiring write locks all concurrent
// PutObjectParts would serialize here updating `xl.meta`
uploadIDLock.RUnlock()
readLocked = false
if err = uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return PartInfo{}, err
}
defer uploadIDLock.Unlock()
// Validates if upload ID exists.
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return pi, toObjectErr(err, bucket, object, uploadID)
}
@ -522,6 +542,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
UploadID: uploadID,
}
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return MultipartInfo{}, err
}
defer uploadIDLock.RUnlock()
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return result, toObjectErr(err, bucket, object, uploadID)
}
@ -564,6 +590,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
// ListPartsInfo structure is marshaled directly into XML and
// replied back to the client.
func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) {
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return ListPartsInfo{}, err
}
defer uploadIDLock.RUnlock()
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return result, toObjectErr(err, bucket, object, uploadID)
}
@ -648,8 +680,16 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
// md5sums of all the parts.
//
// Implements S3 compatible Complete multipart API.
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, e error) {
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
// Hold read-locks to verify uploaded parts, also disallows
// parallel part uploads as well.
uploadIDLock := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return oi, err
}
defer uploadIDLock.RUnlock()
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return oi, toObjectErr(err, bucket, object, uploadID)
}
@ -797,6 +837,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
}
// Hold namespace to complete the transaction
lk := er.NewNSLock(ctx, bucket, object)
if err = lk.GetLock(globalOperationTimeout); err != nil {
return oi, err
}
defer lk.Unlock()
// Rename the multipart object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
@ -832,11 +879,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
//
// Implements S3 compatible Abort multipart API, slight difference is
// that this is an atomic idempotent operation. Subsequent calls have
// no affect and further requests to the same uploadID would not be honored.
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
lk := er.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := lk.GetLock(globalOperationTimeout); err != nil {
return err
}
defer lk.Unlock()
// Validates if upload ID exists.
if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return toObjectErr(err, bucket, object, uploadID)

@ -145,6 +145,32 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
return nil, err
}
var unlockOnDefer bool
var nsUnlocker = func() {}
defer func() {
if unlockOnDefer {
nsUnlocker()
}
}()
// Acquire lock
if lockType != noLock {
lock := er.NewNSLock(ctx, bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(globalOperationTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(globalOperationTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.RUnlock
}
unlockOnDefer = true
}
// Handler directory request by returning a reader that
// returns no bytes.
if HasSuffix(object, SlashSeparator) {
@ -152,7 +178,8 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
if objInfo, err = er.getObjectInfoDir(ctx, bucket, object); err != nil {
return nil, toObjectErr(err, bucket, object)
}
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts)
unlockOnDefer = false
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts, nsUnlocker)
}
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts)
@ -173,7 +200,8 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
}, toObjectErr(errMethodNotAllowed, bucket, object)
}
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts)
unlockOnDefer = false
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker)
if nErr != nil {
return nil, nErr
}
@ -202,6 +230,13 @@ func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, s
return err
}
// Lock the object before reading.
lk := er.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalOperationTimeout); err != nil {
return err
}
defer lk.RUnlock()
// Start offset cannot be negative.
if startOffset < 0 {
logger.LogIf(ctx, errUnexpected, logger.Application)
@ -386,6 +421,13 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return info, err
}
// Lock the object before reading.
lk := er.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalOperationTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.RUnlock()
if HasSuffix(object, SlashSeparator) {
info, err = er.getObjectInfoDir(ctx, bucket, object)
if err != nil {
@ -738,6 +780,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
lk := er.NewNSLock(ctx, bucket, object)
if err := lk.GetLock(globalOperationTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
// Rename the successfully written temporary object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
@ -970,6 +1018,13 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
return objInfo, err
}
// Acquire a write lock before deleting the object.
lk := er.NewNSLock(ctx, bucket, object)
if err = lk.GetLock(globalOperationTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
storageDisks := er.getDisks()
writeQuorum := len(storageDisks)/2 + 1

@ -762,9 +762,13 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
// Check if this request is only metadata update.
if cpSrcDstSame && srcInfo.metadataOnly {
// Version ID is set for the destination and source == destination version ID.
// perform an in-place update.
if dstOpts.VersionID != "" && srcOpts.VersionID == dstOpts.VersionID {
return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
// Destination is not versioned and source version ID is empty
// perform an in-place update.
if !dstOpts.Versioned && srcOpts.VersionID == "" {
return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
@ -1070,8 +1074,8 @@ func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploa
}
// Aborts an in-progress multipart operation on hashedSet based on the object name.
func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
return s.getHashedSet(object).AbortMultipartUpload(ctx, bucket, object, uploadID)
func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
return s.getHashedSet(object).AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
}
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.

@ -459,38 +459,16 @@ func (z *erasureZones) MakeBucketWithLocation(ctx context.Context, bucket string
}
func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
var nsUnlocker = func() {}
// Acquire lock
if lockType != noLock {
lock := z.NewNSLock(ctx, bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(globalOperationTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(globalOperationTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.RUnlock
}
}
for _, zone := range z.zones {
gr, err = zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
if err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
continue
}
nsUnlocker()
return gr, err
}
gr.cleanUpFns = append(gr.cleanUpFns, nsUnlocker)
return gr, nil
}
nsUnlocker()
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
@ -498,17 +476,6 @@ func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string
}
func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
// Lock the object before reading.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalOperationTimeout); err != nil {
return err
}
defer lk.RUnlock()
if z.SingleZone() {
return z.zones[0].GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
for _, zone := range z.zones {
if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
@ -518,20 +485,13 @@ func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, sta
}
return nil
}
if opts.VersionID != "" {
return VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
return ObjectNotFound{Bucket: bucket, Object: object}
}
func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
// Lock the object before reading.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalOperationTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.RUnlock()
if z.SingleZone() {
return z.zones[0].GetObjectInfo(ctx, bucket, object, opts)
}
for _, zone := range z.zones {
objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
@ -550,13 +510,6 @@ func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string,
// PutObject - writes an object to least used erasure zone.
func (z *erasureZones) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) {
// Lock the object.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetLock(globalOperationTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
if z.SingleZone() {
return z.zones[0].PutObject(ctx, bucket, object, data, opts)
}
@ -571,13 +524,6 @@ func (z *erasureZones) PutObject(ctx context.Context, bucket string, object stri
}
func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
// Acquire a write lock before deleting the object.
lk := z.NewNSLock(ctx, bucket, object)
if err = lk.GetLock(globalOperationTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
if z.SingleZone() {
return z.zones[0].DeleteObject(ctx, bucket, object, opts)
}
@ -629,15 +575,7 @@ func (z *erasureZones) DeleteObjects(ctx context.Context, bucket string, objects
}
func (z *erasureZones) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
// Check if this request is only metadata update.
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if !cpSrcDstSame {
lk := z.NewNSLock(ctx, dstBucket, dstObject)
if err := lk.GetLock(globalOperationTimeout); err != nil {
return objInfo, err
}
defer lk.Unlock()
}
zoneIdx, err := z.getZoneIdx(ctx, dstBucket, dstObject, dstOpts, srcInfo.Size)
if err != nil {
@ -645,12 +583,19 @@ func (z *erasureZones) CopyObject(ctx context.Context, srcBucket, srcObject, dst
}
if cpSrcDstSame && srcInfo.metadataOnly {
// Version ID is set for the destination and source == destination version ID.
if dstOpts.VersionID != "" && srcOpts.VersionID == dstOpts.VersionID {
return z.zones[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
// Destination is not versioned and source version ID is empty
// perform an in-place update.
if !dstOpts.Versioned && srcOpts.VersionID == "" {
return z.zones[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
// Destination is versioned, source is not destination version,
// as a special case look for if the source object is not legacy
// from older format, for older format we will rewrite them as
// newer using PutObject() - this is an optimization to save space
if dstOpts.Versioned && srcOpts.VersionID != dstOpts.VersionID && !srcInfo.Legacy {
// CopyObject optimization where we don't create an entire copy
// of the content, instead we add a reference.
@ -1384,12 +1329,6 @@ func (z *erasureZones) PutObjectPart(ctx context.Context, bucket, object, upload
return PartInfo{}, err
}
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return PartInfo{}, err
}
defer uploadIDLock.Unlock()
if z.SingleZone() {
return z.zones[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
}
@ -1420,12 +1359,6 @@ func (z *erasureZones) GetMultipartInfo(ctx context.Context, bucket, object, upl
return MultipartInfo{}, err
}
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return MultipartInfo{}, err
}
defer uploadIDLock.RUnlock()
if z.SingleZone() {
return z.zones[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts)
}
@ -1456,12 +1389,6 @@ func (z *erasureZones) ListObjectParts(ctx context.Context, bucket, object, uplo
return ListPartsInfo{}, err
}
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return ListPartsInfo{}, err
}
defer uploadIDLock.RUnlock()
if z.SingleZone() {
return z.zones[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
}
@ -1484,25 +1411,19 @@ func (z *erasureZones) ListObjectParts(ctx context.Context, bucket, object, uplo
}
// Aborts an in-progress multipart operation on hashedSet based on the object name.
func (z *erasureZones) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
func (z *erasureZones) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
if err := checkAbortMultipartArgs(ctx, bucket, object, z); err != nil {
return err
}
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer uploadIDLock.Unlock()
if z.SingleZone() {
return z.zones[0].AbortMultipartUpload(ctx, bucket, object, uploadID)
return z.zones[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
}
for _, zone := range z.zones {
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{})
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil {
return zone.AbortMultipartUpload(ctx, bucket, object, uploadID)
return zone.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
}
switch err.(type) {
case InvalidUploadID:
@ -1524,22 +1445,6 @@ func (z *erasureZones) CompleteMultipartUpload(ctx context.Context, bucket, obje
return objInfo, err
}
// Hold read-locks to verify uploaded parts, also disallows
// parallel part uploads as well.
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return objInfo, err
}
defer uploadIDLock.RUnlock()
// Hold namespace to complete the transaction, only hold
// if uploadID can be held exclusively.
lk := z.NewNSLock(ctx, bucket, object)
if err = lk.GetLock(globalOperationTimeout); err != nil {
return objInfo, err
}
defer lk.Unlock()
if z.SingleZone() {
return z.zones[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
}

@ -779,7 +779,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// that this is an atomic idempotent operation. Subsequent calls have
// no affect and further requests to the same uploadID would not be
// honored.
func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
if err := checkAbortMultipartArgs(ctx, bucket, object, fs); err != nil {
return err
}

@ -61,7 +61,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
cleanupWg.Wait()
// Check if upload id was already purged.
if err = obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID); err != nil {
if err = obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID, ObjectOptions{}); err != nil {
if _, ok := err.(InvalidUploadID); !ok {
t.Fatal("Unexpected err: ", err)
}
@ -215,11 +215,12 @@ func TestAbortMultipartUpload(t *testing.T) {
md5Hex := getMD5Hash(data)
if _, err := obj.PutObjectPart(GlobalContext, bucketName, objectName, uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(data), 5, md5Hex, ""), ObjectOptions{}); err != nil {
opts := ObjectOptions{}
if _, err := obj.PutObjectPart(GlobalContext, bucketName, objectName, uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(data), 5, md5Hex, ""), opts); err != nil {
t.Fatal("Unexpected error ", err)
}
time.Sleep(time.Second) // Without Sleep on windows, the fs.AbortMultipartUpload() fails with "The process cannot access the file because it is being used by another process."
if err := obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID); err != nil {
if err := obj.AbortMultipartUpload(GlobalContext, bucketName, objectName, uploadID, opts); err != nil {
t.Fatal("Unexpected error ", err)
}
}

@ -91,7 +91,7 @@ func (a GatewayUnsupported) ListObjectParts(ctx context.Context, bucket string,
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (a GatewayUnsupported) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
func (a GatewayUnsupported) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, opts ObjectOptions) error {
return NotImplemented{}
}

@ -1238,7 +1238,7 @@ func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uplo
// AbortMultipartUpload - Not Implemented.
// There is no corresponding API in azure to abort an incomplete upload. The uncommmitted blocks
// gets deleted after one week.
func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) {
func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (err error) {
if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return err
}

@ -1253,7 +1253,7 @@ func (l *gcsGateway) cleanupMultipartUpload(ctx context.Context, bucket, key, up
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *gcsGateway) AbortMultipartUpload(ctx context.Context, bucket string, key string, uploadID string) error {
func (l *gcsGateway) AbortMultipartUpload(ctx context.Context, bucket string, key string, uploadID string, opts minio.ObjectOptions) error {
if err := l.checkUploadIDExists(ctx, bucket, key, uploadID); err != nil {
return err
}

@ -859,7 +859,7 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec
}, nil
}
func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) {
func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (err error) {
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return hdfsToObjectErr(ctx, err, bucket)

@ -569,12 +569,12 @@ func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, objec
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, opts minio.ObjectOptions) error {
if _, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID)); err != nil {
return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID)
return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
}
if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID); err != nil {
if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID, opts); err != nil {
return err
}

@ -653,7 +653,7 @@ func (l *s3Objects) ListObjectParts(ctx context.Context, bucket string, object s
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, opts minio.ObjectOptions) error {
err := l.Client.AbortMultipartUpload(ctx, bucket, object, uploadID)
return minio.ErrorRespToObjectError(err, bucket, object)
}

@ -110,7 +110,7 @@ type ObjectLayer interface {
PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error)
GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (info MultipartInfo, err error)
ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error)
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
// Healing operations.

@ -66,7 +66,7 @@ func testObjectNewMultipartUpload(obj ObjectLayer, instanceType string, t TestEr
t.Fatalf("%s : %s", instanceType, err.Error())
}
err = obj.AbortMultipartUpload(context.Background(), bucket, "\\", uploadID)
err = obj.AbortMultipartUpload(context.Background(), bucket, "\\", uploadID, opts)
if err != nil {
switch err.(type) {
case InvalidUploadID:
@ -114,7 +114,7 @@ func testObjectAbortMultipartUpload(obj ObjectLayer, instanceType string, t Test
}
// Iterating over creatPartCases to generate multipart chunks.
for i, testCase := range abortTestCases {
err = obj.AbortMultipartUpload(context.Background(), testCase.bucketName, testCase.objName, testCase.uploadID)
err = obj.AbortMultipartUpload(context.Background(), testCase.bucketName, testCase.objName, testCase.uploadID, opts)
if testCase.expectedErrType == nil && err != nil {
t.Errorf("Test %d, unexpected err is received: %v, expected:%v\n", i+1, err, testCase.expectedErrType)
}
@ -146,7 +146,8 @@ func testObjectAPIIsUploadIDExists(obj ObjectLayer, instanceType string, t TestE
t.Fatalf("%s : %s", instanceType, err.Error())
}
err = obj.AbortMultipartUpload(context.Background(), bucket, object, "abc")
opts := ObjectOptions{}
err = obj.AbortMultipartUpload(context.Background(), bucket, object, "abc", opts)
switch err.(type) {
case InvalidUploadID:
default:
@ -1565,7 +1566,7 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler)
}
for i, testCase := range testCases {
actualResult, actualErr := obj.ListObjectParts(context.Background(), testCase.bucket, testCase.object, testCase.uploadID, testCase.partNumberMarker, testCase.maxParts, ObjectOptions{})
actualResult, actualErr := obj.ListObjectParts(context.Background(), testCase.bucket, testCase.object, testCase.uploadID, testCase.partNumberMarker, testCase.maxParts, opts)
if actualErr != nil && testCase.shouldPass {
t.Errorf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s", i+1, instanceType, actualErr.Error())
}
@ -1801,6 +1802,7 @@ func testObjectCompleteMultipartUpload(obj ObjectLayer, instanceType string, t T
for _, testCase := range testCases {
testCase := testCase
t.(*testing.T).Run("", func(t *testing.T) {
opts = ObjectOptions{}
actualResult, actualErr := obj.CompleteMultipartUpload(context.Background(), testCase.bucket, testCase.object, testCase.uploadID, testCase.parts, ObjectOptions{})
if actualErr != nil && testCase.shouldPass {
t.Errorf("%s: Expected to pass, but failed with: <ERROR> %s", instanceType, actualErr)

@ -877,19 +877,3 @@ func newS2CompressReader(r io.Reader) io.ReadCloser {
}()
return pr
}
// Returns error if the context is canceled, indicating
// either client has disconnected
type contextReader struct {
io.ReadCloser
ctx context.Context
}
func (d *contextReader) Read(p []byte) (int, error) {
select {
case <-d.ctx.Done():
return 0, d.ctx.Err()
default:
return d.ReadCloser.Read(p)
}
}

@ -287,31 +287,30 @@ func deleteObject(ctx context.Context, obj ObjectLayer, cache CacheObjectLayer,
deleteObject = cache.DeleteObject
}
// Proceed to delete the object.
if objInfo, err = deleteObject(ctx, bucket, object, opts); err != nil {
return objInfo, err
}
// Requesting only a delete marker which was successfully attempted.
if objInfo.DeleteMarker {
// Notify object deleted marker event.
sendEvent(eventArgs{
EventName: event.ObjectRemovedDeleteMarkerCreated,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
} else {
// Notify object deleted event.
sendEvent(eventArgs{
EventName: event.ObjectRemovedDelete,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
objInfo, err = deleteObject(ctx, bucket, object, opts)
if objInfo.Name != "" {
// Requesting only a delete marker which was successfully attempted.
if objInfo.DeleteMarker {
// Notify object deleted marker event.
sendEvent(eventArgs{
EventName: event.ObjectRemovedDeleteMarkerCreated,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
} else {
// Notify object deleted event.
sendEvent(eventArgs{
EventName: event.ObjectRemovedDelete,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
}
return objInfo, nil
return objInfo, err
}

@ -189,7 +189,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
End: offset + length,
}
return getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, ObjectOptions{})
return getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
}
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
@ -891,10 +891,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
var lock = noLock
if !cpSrcDstSame {
lock = readLock
}
checkCopyPrecondFn := func(o ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&o, r); err != nil {
@ -906,6 +902,16 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
getOpts.CheckPrecondFn = checkCopyPrecondFn
// FIXME: a possible race exists between a parallel
// GetObject v/s CopyObject with metadata updates, ideally
// we should be holding write lock here but it is not
// possible due to other constraints such as knowing
// the type of source content etc.
lock := noLock
if !cpSrcDstSame {
lock = readLock
}
var rs *HTTPRangeSpec
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, lock, getOpts)
if err != nil {
@ -1227,6 +1233,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if api.CacheAPI() != nil {
copyObjectFn = api.CacheAPI().CopyObject
}
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
objInfo, err = copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
@ -1306,9 +1313,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
// To detect if the client has disconnected.
r.Body = &contextReader{r.Body, r.Context()}
// X-Amz-Copy-Source shouldn't be set for this call.
if _, ok := r.Header[xhttp.AmzCopySource]; ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r))
@ -1847,7 +1851,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return false
}
getOpts.CheckPrecondFn = checkCopyPartPrecondFn
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock, getOpts)
if err != nil {
if isErrPreconditionFailed(err) {
@ -2056,9 +2059,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
return
}
// To detect if the client has disconnected.
r.Body = &contextReader{r.Body, r.Context()}
// X-Amz-Copy-Source shouldn't be set for this call.
if _, ok := r.Header[xhttp.AmzCopySource]; ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r))
@ -2308,7 +2308,8 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter,
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
if err := abortMultipartUpload(ctx, bucket, object, uploadID); err != nil {
opts := ObjectOptions{}
if err := abortMultipartUpload(ctx, bucket, object, uploadID, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
@ -2355,7 +2356,7 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
return
}
var opts ObjectOptions
opts := ObjectOptions{}
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
@ -2521,9 +2522,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
var objectEncryptionKey []byte
var isEncrypted, ssec bool
var opts ObjectOptions
if objectAPI.IsEncryptionSupported() {
mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
@ -2546,7 +2546,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
partsMap := make(map[string]PartInfo)
if isEncrypted {
maxParts := 10000
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, maxParts, opts)
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, maxParts, ObjectOptions{})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
@ -2601,7 +2601,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)}
completeDoneCh := sendWhiteSpace(w)
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts)
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, ObjectOptions{})
// Stop writing white spaces to the client. Note that close(doneCh) style is not used as it
// can cause white space to be written after we send XML response in a race condition.
headerWritten := <-completeDoneCh
@ -2735,16 +2735,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
setPutObjHeaders(w, objInfo, true)
writeSuccessNoContent(w)
sendEvent(eventArgs{
EventName: event.ObjectRemovedDelete,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// PutObjectLegalHoldHandler - set legal hold configuration to object,

@ -166,7 +166,7 @@ func testMultipartObjectAbort(obj ObjectLayer, instanceType string, t TestErrHan
}
parts[i] = expectedETaghex
}
err = obj.AbortMultipartUpload(context.Background(), "bucket", "key", uploadID)
err = obj.AbortMultipartUpload(context.Background(), "bucket", "key", uploadID, ObjectOptions{})
if err != nil {
t.Fatalf("%s: <ERROR> %s", instanceType, err)
}

@ -17,12 +17,9 @@
package cmd
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
@ -247,7 +244,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
// version is needed, migration is needed etc.
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
for range retry.NewTimer(retryCtx) {
for range retry.NewTimerWithJitter(retryCtx, 250*time.Millisecond, 500*time.Millisecond, retry.MaxJitter) {
// let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop.
if err = txnLk.GetLock(configLockTimeout); err != nil {
@ -462,27 +459,7 @@ func serverMain(ctx *cli.Context) {
getCert = globalTLSCerts.GetCertificate
}
// Annonying hack to ensure that Go doesn't write its own logging,
// interleaved with our formatted logging, this allows us to
// honor --json and --quiet flag properly.
//
// Unfortunately we have to resort to this sort of hacky approach
// because, Go automatically initializes ErrorLog on its own
// and can log without application control.
//
// This is an implementation issue in Go and should be fixed, but
// until then this hack is okay and works for our needs.
pr, pw := io.Pipe()
go func() {
defer pr.Close()
scanner := bufio.NewScanner(&contextReader{pr, GlobalContext})
for scanner.Scan() {
logger.LogIf(GlobalContext, errors.New(scanner.Text()))
}
}()
httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert)
httpServer.ErrorLog = log.New(pw, "", 0)
httpServer.BaseContext = func(listener net.Listener) context.Context {
return GlobalContext
}

@ -25,7 +25,6 @@ import (
"time"
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/retry"
)
// Indicator if logging is enabled.
@ -132,6 +131,10 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option
return dm.lockBlocking(ctx, id, source, isReadLock, opts)
}
const (
lockRetryInterval = 50 * time.Millisecond
)
// lockBlocking will try to acquire either a read or a write lock
//
// The function will loop using a built-in timing randomized back-off
@ -140,40 +143,50 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option
func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) {
restClnts := dm.clnt.GetLockersFn()
retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Use incremental back-off algorithm for repeated attempts to acquire the lock
for range retry.NewTimer(retryCtx) {
// Create temp array on stack.
locks := make([]string, len(restClnts))
// Create lock array to capture the successful lockers
locks := make([]string, len(restClnts))
// Try to acquire the lock.
locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, opts.Tolerance, dm.Names...)
if !locked {
continue
cleanLocks := func(locks []string) {
for i := range locks {
locks[i] = ""
}
}
dm.m.Lock()
retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel()
// If success, copy array to object
if isReadLock {
// Append new array of strings at the end
dm.readersLocks = append(dm.readersLocks, make([]string, len(restClnts)))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
} else {
copy(dm.writeLocks, locks[:])
}
for {
// cleanup any older state, re-use the lock slice.
cleanLocks(locks)
select {
case <-retryCtx.Done():
// Caller context canceled or we timedout,
// return false anyways for both situations.
return false
default:
// Try to acquire the lock.
if locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, opts.Tolerance, dm.Names...); locked {
dm.m.Lock()
// If success, copy array to object
if isReadLock {
// Append new array of strings at the end
dm.readersLocks = append(dm.readersLocks, make([]string, len(restClnts)))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
} else {
copy(dm.writeLocks, locks[:])
}
dm.m.Unlock()
return locked
dm.m.Unlock()
return locked
}
time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval)))
}
}
// Failed to acquire the lock on this attempt, incrementally wait
// for a longer back-off time and try again afterwards.
return locked
}
// lock tries to acquire the distributed lock, returning true or false.

@ -19,10 +19,9 @@ package lsync
import (
"context"
"math"
"math/rand"
"sync"
"time"
"github.com/minio/minio/pkg/retry"
)
// A LRWMutex is a mutual exclusion lock with timeouts.
@ -94,25 +93,33 @@ func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) {
return locked
}
const (
lockRetryInterval = 50 * time.Millisecond
)
// lockLoop will acquire either a read or a write lock
//
// The call will block until the lock is granted using a built-in
// timing randomized back-off algorithm to try again until successful
func (lm *LRWMutex) lockLoop(ctx context.Context, id, source string, timeout time.Duration, isWriteLock bool) (locked bool) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
retryCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
for range retry.NewTimer(retryCtx) {
if lm.lock(id, source, isWriteLock) {
return true
for {
select {
case <-retryCtx.Done():
// Caller context canceled or we timedout,
// return false anyways for both situations.
return false
default:
if lm.lock(id, source, isWriteLock) {
return true
}
time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval)))
}
}
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
return false
}
// Unlock unlocks the write lock.

Loading…
Cancel
Save