From 736e58dd68a8ed5acea4db547c1a222064a92220 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 8 Oct 2020 12:32:32 -0700 Subject: [PATCH] fix: handle concurrent lockers with multiple optimizations (#10640) - select lockers which are non-local and online to have affinity towards remote servers for lock contention - optimize lock retry interval to avoid sending too many messages during lock contention, reduces average CPU usage as well - if bucket is not set, when deleteObject fails make sure setPutObjHeaders() honors lifecycle only if bucket name is set. - fix top locks to list out always the oldest lockers always, avoid getting bogged down into map's unordered nature. --- cmd/admin-handlers.go | 13 +++++++------ cmd/bucket-metadata.go | 10 ++++++++-- cmd/data-crawler.go | 2 +- cmd/endpoint.go | 5 +++-- cmd/erasure-object.go | 15 ++++----------- cmd/erasure-sets.go | 7 +++++++ cmd/erasure-zones.go | 1 + cmd/globals.go | 3 ++- cmd/local-locker.go | 7 ++++++- cmd/lock-rest-client.go | 5 +++++ cmd/logger/audit.go | 10 +++++----- cmd/notification.go | 6 +++--- cmd/object-api-utils.go | 4 ++++ cmd/object-handlers-common.go | 26 ++++++++++++++------------ cmd/peer-rest-server.go | 2 +- cmd/utils.go | 24 +++++++++++++++++++++++- cmd/xl-storage.go | 16 +--------------- pkg/dsync/drwmutex.go | 7 +++---- pkg/dsync/rpc-client-impl_test.go | 4 ++++ pkg/dsync/rpc-client-interface.go | 3 +++ 20 files changed, 105 insertions(+), 65 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index ec33eeeba..72d1a6717 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -368,7 +368,7 @@ func lriToLockEntry(l lockRequesterInfo, resource, server string, rquorum, wquor return entry } -func topLockEntries(peerLocks []*PeerLocks, count int, rquorum, wquorum int, stale bool) madmin.LockEntries { +func topLockEntries(peerLocks []*PeerLocks, rquorum, wquorum int, stale bool) madmin.LockEntries { entryMap := make(map[string]*madmin.LockEntry) for _, peerLock := range peerLocks { if peerLock == nil { @@ -388,9 +388,6 @@ func topLockEntries(peerLocks []*PeerLocks, count int, rquorum, wquorum int, sta } var lockEntries madmin.LockEntries for _, v := range entryMap { - if len(lockEntries) == count { - break - } if stale { lockEntries = append(lockEntries, *v) continue @@ -436,9 +433,13 @@ func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request rquorum := getReadQuorum(objectAPI.SetDriveCount()) wquorum := getWriteQuorum(objectAPI.SetDriveCount()) - topLocks := topLockEntries(peerLocks, count, rquorum, wquorum, stale) + topLocks := topLockEntries(peerLocks, rquorum, wquorum, stale) + + // Marshal API response upto requested count. + if len(topLocks) > count && count > 0 { + topLocks = topLocks[:count] + } - // Marshal API response jsonBytes, err := json.Marshal(topLocks) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 8d632015e..667ce6bc4 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -107,6 +107,10 @@ func newBucketMetadata(name string) BucketMetadata { // Load - loads the metadata of bucket by name from ObjectLayer api. // If an error is returned the returned metadata will be default initialized. func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) error { + if name == "" { + logger.LogIf(ctx, errors.New("bucket name cannot be empty")) + return errors.New("bucket name cannot be empty") + } configFile := path.Join(bucketConfigPrefix, name, bucketMetadataFile) data, err := readConfig(ctx, api, configFile) if err != nil { @@ -128,20 +132,22 @@ func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) } // OK, parse data. _, err = b.UnmarshalMsg(data[4:]) + b.Name = name // in-case parsing failed for some reason, make sure bucket name is not empty. return err } // loadBucketMetadata loads and migrates to bucket metadata. func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket string) (BucketMetadata, error) { b := newBucketMetadata(bucket) - err := b.Load(ctx, objectAPI, bucket) + err := b.Load(ctx, objectAPI, b.Name) if err == nil { return b, b.convertLegacyConfigs(ctx, objectAPI) } - if err != errConfigNotFound { + if !errors.Is(err, errConfigNotFound) { return b, err } + // Old bucket without bucket metadata. Hence we migrate existing settings. return b, b.convertLegacyConfigs(ctx, objectAPI) } diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index eea402212..2f9a21d59 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -54,7 +54,7 @@ const ( var ( globalCrawlerConfig crawler.Config - dataCrawlerLeaderLockTimeout = newDynamicTimeout(1*time.Minute, 30*time.Second) + dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) ) // initDataCrawler will start the crawler unless disabled. diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 925a87c63..10ca105c7 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -770,10 +770,11 @@ func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) { } } - tr := newCustomHTTPTransport(tlsConfig, rest.DefaultTimeout)() + // allow transport to be HTTP/1.1 for proxying. + tr := newCustomHTTP11Transport(tlsConfig, rest.DefaultTimeout)() + // Allow more requests to be in flight with higher response header timeout. tr.ResponseHeaderTimeout = 30 * time.Minute - tr.MaxIdleConns = 64 tr.MaxIdleConnsPerHost = 64 proxyEps = append(proxyEps, ProxyEndpoint{ diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 992892a05..c0c289a0d 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -211,13 +211,6 @@ func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, s return errUnexpected } - // If its a directory request, we return an empty body. - if HasSuffix(object, SlashSeparator) { - _, err := writer.Write([]byte("")) - logger.LogIf(ctx, err) - return toObjectErr(err, bucket, object) - } - return er.getObject(ctx, bucket, object, startOffset, length, writer, etag, opts) } @@ -862,11 +855,11 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec dobjects[objIndex] = DeletedObject{ DeleteMarker: versions[objIndex].Deleted, DeleteMarkerVersionID: versions[objIndex].VersionID, - ObjectName: versions[objIndex].Name, + ObjectName: decodeDirObject(versions[objIndex].Name), } } else { dobjects[objIndex] = DeletedObject{ - ObjectName: versions[objIndex].Name, + ObjectName: decodeDirObject(versions[objIndex].Name), VersionID: versions[objIndex].VersionID, } } @@ -899,7 +892,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { // Acquire a write lock before deleting the object. lk := er.NewNSLock(ctx, bucket, object) - if err = lk.GetLock(globalOperationTimeout); err != nil { + if err = lk.GetLock(globalDeleteOperationTimeout); err != nil { return ObjectInfo{}, err } defer lk.Unlock() @@ -945,7 +938,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string } } - return ObjectInfo{Bucket: bucket, Name: object, VersionID: opts.VersionID}, nil + return ObjectInfo{Bucket: bucket, Name: decodeDirObject(object), VersionID: opts.VersionID}, nil } // Send the successful but partial upload/delete, however ignore diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 5d78bf0b7..d49654844 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -273,6 +273,13 @@ func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string return func() ([]dsync.NetLocker, string) { lockers := make([]dsync.NetLocker, s.setDriveCount) copy(lockers, s.erasureLockers[setIndex]) + sort.Slice(lockers, func(i, j int) bool { + // re-order lockers with affinity for + // - non-local lockers + // - online lockers + // are used first + return !lockers[i].IsLocal() && lockers[i].IsOnline() + }) return lockers, s.erasureLockOwner } } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 3ed667d52..d361a336d 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -568,6 +568,7 @@ func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object s break } } + return objInfo, err } diff --git a/cmd/globals.go b/cmd/globals.go index 889ca4a8e..a439f8eb4 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -210,7 +210,8 @@ var ( globalDomainNames []string // Root domains for virtual host style requests globalDomainIPs set.StringSet // Root domain IP address(s) for a distributed MinIO deployment - globalOperationTimeout = newDynamicTimeout(10*time.Minute, 5*time.Minute) // default timeout for general ops + globalOperationTimeout = newDynamicTimeout(10*time.Minute, 5*time.Minute) // default timeout for general ops + globalDeleteOperationTimeout = newDynamicTimeout(5*time.Minute, 1*time.Minute) // default time for delete ops globalBucketObjectLockSys *BucketObjectLockSys globalBucketQuotaSys *BucketQuotaSys diff --git a/cmd/local-locker.go b/cmd/local-locker.go index ddb41d859..c3140c3f1 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -203,11 +203,16 @@ func (l *localLocker) Close() error { return nil } -// Local locker is always online. +// IsOnline - local locker is always online. func (l *localLocker) IsOnline() bool { return true } +// IsLocal - local locker returns true. +func (l *localLocker) IsLocal() bool { + return true +} + func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { select { case <-ctx.Done(): diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index f8c45d9db..c89eb2945 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -76,6 +76,11 @@ func (client *lockRESTClient) IsOnline() bool { return client.restClient.IsOnline() } +// Not a local locker +func (client *lockRESTClient) IsLocal() bool { + return false +} + // Close - marks the client as closed. func (client *lockRESTClient) Close() error { client.restClient.Close() diff --git a/cmd/logger/audit.go b/cmd/logger/audit.go index 8e10d020c..2d294580e 100644 --- a/cmd/logger/audit.go +++ b/cmd/logger/audit.go @@ -60,16 +60,16 @@ func NewResponseWriter(w http.ResponseWriter) *ResponseWriter { } func (lrw *ResponseWriter) Write(p []byte) (int, error) { - n, err := lrw.ResponseWriter.Write(p) - lrw.bytesWritten += n - if lrw.TimeToFirstByte == 0 { - lrw.TimeToFirstByte = time.Now().UTC().Sub(lrw.StartTime) - } if !lrw.headersLogged { // We assume the response code to be '200 OK' when WriteHeader() is not called, // that way following Golang HTTP response behavior. lrw.WriteHeader(http.StatusOK) } + n, err := lrw.ResponseWriter.Write(p) + lrw.bytesWritten += n + if lrw.TimeToFirstByte == 0 { + lrw.TimeToFirstByte = time.Now().UTC().Sub(lrw.StartTime) + } if (lrw.LogErrBody && lrw.StatusCode >= http.StatusBadRequest) || lrw.LogAllBody { // Always logging error responses. lrw.body.Write(p) diff --git a/cmd/notification.go b/cmd/notification.go index bd4dfe5c7..8aaea5669 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -534,13 +534,13 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe } // Once we have received all the locks currently used from peers // add the local peer locks list as well. - var getRespLocks GetLocksResp + llockers := make(GetLocksResp, 0, len(globalLockServers)) for _, llocker := range globalLockServers { - getRespLocks = append(getRespLocks, llocker.DupLockMap()) + llockers = append(llockers, llocker.DupLockMap()) } locksResp = append(locksResp, &PeerLocks{ Addr: getHostName(r), - Locks: getRespLocks, + Locks: llockers, }) return locksResp } diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 9f0c48d56..86e689d6f 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -322,6 +322,10 @@ func isStringEqual(s1 string, s2 string) bool { // Ignores all reserved bucket names or invalid bucket names. func isReservedOrInvalidBucket(bucketEntry string, strict bool) bool { + if bucketEntry == "" { + return true + } + bucketEntry = strings.TrimSuffix(bucketEntry, SlashSeparator) if strict { if err := s3utils.CheckValidBucketNameStrict(bucketEntry); err != nil { diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index b2016d7f0..fc04cd346 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -261,18 +261,20 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, delete bool) { } } - if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil && !delete { - ruleID, expiryTime := lc.PredictExpiryTime(lifecycle.ObjectOpts{ - Name: objInfo.Name, - UserTags: objInfo.UserTags, - VersionID: objInfo.VersionID, - ModTime: objInfo.ModTime, - IsLatest: objInfo.IsLatest, - DeleteMarker: objInfo.DeleteMarker, - }) - if !expiryTime.IsZero() { - w.Header()[xhttp.AmzExpiration] = []string{ - fmt.Sprintf(`expiry-date="%s", rule-id="%s"`, expiryTime.Format(http.TimeFormat), ruleID), + if objInfo.Bucket != "" { + if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil && !delete { + ruleID, expiryTime := lc.PredictExpiryTime(lifecycle.ObjectOpts{ + Name: objInfo.Name, + UserTags: objInfo.UserTags, + VersionID: objInfo.VersionID, + ModTime: objInfo.ModTime, + IsLatest: objInfo.IsLatest, + DeleteMarker: objInfo.DeleteMarker, + }) + if !expiryTime.IsZero() { + w.Header()[xhttp.AmzExpiration] = []string{ + fmt.Sprintf(`expiry-date="%s", rule-id="%s"`, expiryTime.Format(http.TimeFormat), ruleID), + } } } } diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index e0db3ce08..6193d4957 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -47,7 +47,7 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) ctx := newContext(r, w, "GetLocks") - llockers := make([]map[string][]lockRequesterInfo, 0, len(globalLockServers)) + llockers := make(GetLocksResp, 0, len(globalLockServers)) for _, llocker := range globalLockServers { llockers = append(llockers, llocker.DupLockMap()) } diff --git a/cmd/utils.go b/cmd/utils.go index 21cb4bc70..d713b7781 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -490,6 +490,29 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) } } +func newCustomHTTP11Transport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport { + // For more details about various values used here refer + // https://golang.org/pkg/net/http/#Transport documentation + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: xhttp.NewCustomDialContext(dialTimeout), + MaxIdleConnsPerHost: 16, + IdleConnTimeout: 1 * time.Minute, + ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode. + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 10 * time.Second, + TLSClientConfig: tlsConfig, + // Go net/http automatically unzip if content-type is + // gzip disable this feature, as we are always interested + // in raw stream. + DisableCompression: true, + } + + return func() *http.Transport { + return tr + } +} + func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport { // For more details about various values used here refer // https://golang.org/pkg/net/http/#Transport documentation @@ -532,7 +555,6 @@ func newGatewayHTTPTransport(timeout time.Duration) *http.Transport { // Allow more requests to be in flight. tr.ResponseHeaderTimeout = timeout - tr.MaxIdleConns = 256 tr.MaxIdleConnsPerHost = 16 return tr } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index ce2653a02..b4652074b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -661,21 +661,7 @@ func (s *xlStorage) ListVols(context.Context) (volsInfo []VolInfo, err error) { atomic.AddInt32(&s.activeIOCount, -1) }() - volsInfo, err = listVols(s.diskPath) - if err != nil { - if isSysErrIO(err) { - return nil, errFaultyDisk - } - return nil, err - } - for i, vol := range volsInfo { - volInfo := VolInfo{ - Name: vol.Name, - Created: vol.Created, - } - volsInfo[i] = volInfo - } - return volsInfo, nil + return listVols(s.diskPath) } // List all the volumes from diskPath. diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 9374f0a19..4195e2713 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -133,7 +133,7 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option } const ( - lockRetryInterval = 100 * time.Millisecond + lockRetryInterval = 1 * time.Second ) // lockBlocking will try to acquire either a read or a write lock @@ -204,6 +204,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL dm.m.Unlock() return locked } + time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) } } @@ -268,6 +269,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is wg.Add(1) go func(isReadLock bool) { + defer wg.Done() // Wait until we have either // @@ -317,9 +319,6 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is // Count locks in order to determine whether we have quorum or not quorumLocked = checkQuorumLocked(locks, quorum) - // Signal that we have the quorum - wg.Done() - // Wait for the other responses and immediately release the locks // (do not add them to the locks array because the DRWMutex could // already has been unlocked again by the original calling thread) diff --git a/pkg/dsync/rpc-client-impl_test.go b/pkg/dsync/rpc-client-impl_test.go index 92c1ff529..99617c0e5 100644 --- a/pkg/dsync/rpc-client-impl_test.go +++ b/pkg/dsync/rpc-client-impl_test.go @@ -50,6 +50,10 @@ func (rpcClient *ReconnectRPCClient) IsOnline() bool { return rpcClient.rpc != nil } +func (rpcClient *ReconnectRPCClient) IsLocal() bool { + return false +} + // Close closes the underlying socket file descriptor. func (rpcClient *ReconnectRPCClient) Close() error { rpcClient.mutex.Lock() diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index 881b69d76..f385ad401 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -68,4 +68,7 @@ type NetLocker interface { // Is the underlying connection online? (is always true for any local lockers) IsOnline() bool + + // Is the underlying locker local to this server? + IsLocal() bool }