fix: handle concurrent lockers with multiple optimizations (#10640)

- select lockers which are non-local and online to have
  affinity towards remote servers for lock contention

- optimize lock retry interval to avoid sending too many
  messages during lock contention, reduces average CPU
  usage as well

- if bucket is not set, when deleteObject fails make sure
  setPutObjHeaders() honors lifecycle only if bucket name
  is set.

- fix top locks to list out always the oldest lockers always,
  avoid getting bogged down into map's unordered nature.
master
Harshavardhana 4 years ago committed by GitHub
parent 907a171edd
commit 736e58dd68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      cmd/admin-handlers.go
  2. 10
      cmd/bucket-metadata.go
  3. 2
      cmd/data-crawler.go
  4. 5
      cmd/endpoint.go
  5. 15
      cmd/erasure-object.go
  6. 7
      cmd/erasure-sets.go
  7. 1
      cmd/erasure-zones.go
  8. 3
      cmd/globals.go
  9. 7
      cmd/local-locker.go
  10. 5
      cmd/lock-rest-client.go
  11. 10
      cmd/logger/audit.go
  12. 6
      cmd/notification.go
  13. 4
      cmd/object-api-utils.go
  14. 26
      cmd/object-handlers-common.go
  15. 2
      cmd/peer-rest-server.go
  16. 24
      cmd/utils.go
  17. 16
      cmd/xl-storage.go
  18. 7
      pkg/dsync/drwmutex.go
  19. 4
      pkg/dsync/rpc-client-impl_test.go
  20. 3
      pkg/dsync/rpc-client-interface.go

@ -368,7 +368,7 @@ func lriToLockEntry(l lockRequesterInfo, resource, server string, rquorum, wquor
return entry
}
func topLockEntries(peerLocks []*PeerLocks, count int, rquorum, wquorum int, stale bool) madmin.LockEntries {
func topLockEntries(peerLocks []*PeerLocks, rquorum, wquorum int, stale bool) madmin.LockEntries {
entryMap := make(map[string]*madmin.LockEntry)
for _, peerLock := range peerLocks {
if peerLock == nil {
@ -388,9 +388,6 @@ func topLockEntries(peerLocks []*PeerLocks, count int, rquorum, wquorum int, sta
}
var lockEntries madmin.LockEntries
for _, v := range entryMap {
if len(lockEntries) == count {
break
}
if stale {
lockEntries = append(lockEntries, *v)
continue
@ -436,9 +433,13 @@ func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request
rquorum := getReadQuorum(objectAPI.SetDriveCount())
wquorum := getWriteQuorum(objectAPI.SetDriveCount())
topLocks := topLockEntries(peerLocks, count, rquorum, wquorum, stale)
topLocks := topLockEntries(peerLocks, rquorum, wquorum, stale)
// Marshal API response upto requested count.
if len(topLocks) > count && count > 0 {
topLocks = topLocks[:count]
}
// Marshal API response
jsonBytes, err := json.Marshal(topLocks)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)

@ -107,6 +107,10 @@ func newBucketMetadata(name string) BucketMetadata {
// Load - loads the metadata of bucket by name from ObjectLayer api.
// If an error is returned the returned metadata will be default initialized.
func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) error {
if name == "" {
logger.LogIf(ctx, errors.New("bucket name cannot be empty"))
return errors.New("bucket name cannot be empty")
}
configFile := path.Join(bucketConfigPrefix, name, bucketMetadataFile)
data, err := readConfig(ctx, api, configFile)
if err != nil {
@ -128,20 +132,22 @@ func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string)
}
// OK, parse data.
_, err = b.UnmarshalMsg(data[4:])
b.Name = name // in-case parsing failed for some reason, make sure bucket name is not empty.
return err
}
// loadBucketMetadata loads and migrates to bucket metadata.
func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket string) (BucketMetadata, error) {
b := newBucketMetadata(bucket)
err := b.Load(ctx, objectAPI, bucket)
err := b.Load(ctx, objectAPI, b.Name)
if err == nil {
return b, b.convertLegacyConfigs(ctx, objectAPI)
}
if err != errConfigNotFound {
if !errors.Is(err, errConfigNotFound) {
return b, err
}
// Old bucket without bucket metadata. Hence we migrate existing settings.
return b, b.convertLegacyConfigs(ctx, objectAPI)
}

@ -54,7 +54,7 @@ const (
var (
globalCrawlerConfig crawler.Config
dataCrawlerLeaderLockTimeout = newDynamicTimeout(1*time.Minute, 30*time.Second)
dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
)
// initDataCrawler will start the crawler unless disabled.

@ -770,10 +770,11 @@ func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) {
}
}
tr := newCustomHTTPTransport(tlsConfig, rest.DefaultTimeout)()
// allow transport to be HTTP/1.1 for proxying.
tr := newCustomHTTP11Transport(tlsConfig, rest.DefaultTimeout)()
// Allow more requests to be in flight with higher response header timeout.
tr.ResponseHeaderTimeout = 30 * time.Minute
tr.MaxIdleConns = 64
tr.MaxIdleConnsPerHost = 64
proxyEps = append(proxyEps, ProxyEndpoint{

@ -211,13 +211,6 @@ func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, s
return errUnexpected
}
// If its a directory request, we return an empty body.
if HasSuffix(object, SlashSeparator) {
_, err := writer.Write([]byte(""))
logger.LogIf(ctx, err)
return toObjectErr(err, bucket, object)
}
return er.getObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
@ -862,11 +855,11 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
dobjects[objIndex] = DeletedObject{
DeleteMarker: versions[objIndex].Deleted,
DeleteMarkerVersionID: versions[objIndex].VersionID,
ObjectName: versions[objIndex].Name,
ObjectName: decodeDirObject(versions[objIndex].Name),
}
} else {
dobjects[objIndex] = DeletedObject{
ObjectName: versions[objIndex].Name,
ObjectName: decodeDirObject(versions[objIndex].Name),
VersionID: versions[objIndex].VersionID,
}
}
@ -899,7 +892,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
// Acquire a write lock before deleting the object.
lk := er.NewNSLock(ctx, bucket, object)
if err = lk.GetLock(globalOperationTimeout); err != nil {
if err = lk.GetLock(globalDeleteOperationTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
@ -945,7 +938,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
}
}
return ObjectInfo{Bucket: bucket, Name: object, VersionID: opts.VersionID}, nil
return ObjectInfo{Bucket: bucket, Name: decodeDirObject(object), VersionID: opts.VersionID}, nil
}
// Send the successful but partial upload/delete, however ignore

@ -273,6 +273,13 @@ func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string
return func() ([]dsync.NetLocker, string) {
lockers := make([]dsync.NetLocker, s.setDriveCount)
copy(lockers, s.erasureLockers[setIndex])
sort.Slice(lockers, func(i, j int) bool {
// re-order lockers with affinity for
// - non-local lockers
// - online lockers
// are used first
return !lockers[i].IsLocal() && lockers[i].IsOnline()
})
return lockers, s.erasureLockOwner
}
}

@ -568,6 +568,7 @@ func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object s
break
}
}
return objInfo, err
}

@ -210,7 +210,8 @@ var (
globalDomainNames []string // Root domains for virtual host style requests
globalDomainIPs set.StringSet // Root domain IP address(s) for a distributed MinIO deployment
globalOperationTimeout = newDynamicTimeout(10*time.Minute, 5*time.Minute) // default timeout for general ops
globalOperationTimeout = newDynamicTimeout(10*time.Minute, 5*time.Minute) // default timeout for general ops
globalDeleteOperationTimeout = newDynamicTimeout(5*time.Minute, 1*time.Minute) // default time for delete ops
globalBucketObjectLockSys *BucketObjectLockSys
globalBucketQuotaSys *BucketQuotaSys

@ -203,11 +203,16 @@ func (l *localLocker) Close() error {
return nil
}
// Local locker is always online.
// IsOnline - local locker is always online.
func (l *localLocker) IsOnline() bool {
return true
}
// IsLocal - local locker returns true.
func (l *localLocker) IsLocal() bool {
return true
}
func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) {
select {
case <-ctx.Done():

@ -76,6 +76,11 @@ func (client *lockRESTClient) IsOnline() bool {
return client.restClient.IsOnline()
}
// Not a local locker
func (client *lockRESTClient) IsLocal() bool {
return false
}
// Close - marks the client as closed.
func (client *lockRESTClient) Close() error {
client.restClient.Close()

@ -60,16 +60,16 @@ func NewResponseWriter(w http.ResponseWriter) *ResponseWriter {
}
func (lrw *ResponseWriter) Write(p []byte) (int, error) {
n, err := lrw.ResponseWriter.Write(p)
lrw.bytesWritten += n
if lrw.TimeToFirstByte == 0 {
lrw.TimeToFirstByte = time.Now().UTC().Sub(lrw.StartTime)
}
if !lrw.headersLogged {
// We assume the response code to be '200 OK' when WriteHeader() is not called,
// that way following Golang HTTP response behavior.
lrw.WriteHeader(http.StatusOK)
}
n, err := lrw.ResponseWriter.Write(p)
lrw.bytesWritten += n
if lrw.TimeToFirstByte == 0 {
lrw.TimeToFirstByte = time.Now().UTC().Sub(lrw.StartTime)
}
if (lrw.LogErrBody && lrw.StatusCode >= http.StatusBadRequest) || lrw.LogAllBody {
// Always logging error responses.
lrw.body.Write(p)

@ -534,13 +534,13 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe
}
// Once we have received all the locks currently used from peers
// add the local peer locks list as well.
var getRespLocks GetLocksResp
llockers := make(GetLocksResp, 0, len(globalLockServers))
for _, llocker := range globalLockServers {
getRespLocks = append(getRespLocks, llocker.DupLockMap())
llockers = append(llockers, llocker.DupLockMap())
}
locksResp = append(locksResp, &PeerLocks{
Addr: getHostName(r),
Locks: getRespLocks,
Locks: llockers,
})
return locksResp
}

@ -322,6 +322,10 @@ func isStringEqual(s1 string, s2 string) bool {
// Ignores all reserved bucket names or invalid bucket names.
func isReservedOrInvalidBucket(bucketEntry string, strict bool) bool {
if bucketEntry == "" {
return true
}
bucketEntry = strings.TrimSuffix(bucketEntry, SlashSeparator)
if strict {
if err := s3utils.CheckValidBucketNameStrict(bucketEntry); err != nil {

@ -261,18 +261,20 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, delete bool) {
}
}
if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil && !delete {
ruleID, expiryTime := lc.PredictExpiryTime(lifecycle.ObjectOpts{
Name: objInfo.Name,
UserTags: objInfo.UserTags,
VersionID: objInfo.VersionID,
ModTime: objInfo.ModTime,
IsLatest: objInfo.IsLatest,
DeleteMarker: objInfo.DeleteMarker,
})
if !expiryTime.IsZero() {
w.Header()[xhttp.AmzExpiration] = []string{
fmt.Sprintf(`expiry-date="%s", rule-id="%s"`, expiryTime.Format(http.TimeFormat), ruleID),
if objInfo.Bucket != "" {
if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil && !delete {
ruleID, expiryTime := lc.PredictExpiryTime(lifecycle.ObjectOpts{
Name: objInfo.Name,
UserTags: objInfo.UserTags,
VersionID: objInfo.VersionID,
ModTime: objInfo.ModTime,
IsLatest: objInfo.IsLatest,
DeleteMarker: objInfo.DeleteMarker,
})
if !expiryTime.IsZero() {
w.Header()[xhttp.AmzExpiration] = []string{
fmt.Sprintf(`expiry-date="%s", rule-id="%s"`, expiryTime.Format(http.TimeFormat), ruleID),
}
}
}
}

@ -47,7 +47,7 @@ func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request)
ctx := newContext(r, w, "GetLocks")
llockers := make([]map[string][]lockRequesterInfo, 0, len(globalLockServers))
llockers := make(GetLocksResp, 0, len(globalLockServers))
for _, llocker := range globalLockServers {
llockers = append(llockers, llocker.DupLockMap())
}

@ -490,6 +490,29 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
}
}
func newCustomHTTP11Transport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.NewCustomDialContext(dialTimeout),
MaxIdleConnsPerHost: 16,
IdleConnTimeout: 1 * time.Minute,
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
return func() *http.Transport {
return tr
}
}
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
@ -532,7 +555,6 @@ func newGatewayHTTPTransport(timeout time.Duration) *http.Transport {
// Allow more requests to be in flight.
tr.ResponseHeaderTimeout = timeout
tr.MaxIdleConns = 256
tr.MaxIdleConnsPerHost = 16
return tr
}

@ -661,21 +661,7 @@ func (s *xlStorage) ListVols(context.Context) (volsInfo []VolInfo, err error) {
atomic.AddInt32(&s.activeIOCount, -1)
}()
volsInfo, err = listVols(s.diskPath)
if err != nil {
if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
for i, vol := range volsInfo {
volInfo := VolInfo{
Name: vol.Name,
Created: vol.Created,
}
volsInfo[i] = volInfo
}
return volsInfo, nil
return listVols(s.diskPath)
}
// List all the volumes from diskPath.

@ -133,7 +133,7 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option
}
const (
lockRetryInterval = 100 * time.Millisecond
lockRetryInterval = 1 * time.Second
)
// lockBlocking will try to acquire either a read or a write lock
@ -204,6 +204,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
dm.m.Unlock()
return locked
}
time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval)))
}
}
@ -268,6 +269,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
wg.Add(1)
go func(isReadLock bool) {
defer wg.Done()
// Wait until we have either
//
@ -317,9 +319,6 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
// Count locks in order to determine whether we have quorum or not
quorumLocked = checkQuorumLocked(locks, quorum)
// Signal that we have the quorum
wg.Done()
// Wait for the other responses and immediately release the locks
// (do not add them to the locks array because the DRWMutex could
// already has been unlocked again by the original calling thread)

@ -50,6 +50,10 @@ func (rpcClient *ReconnectRPCClient) IsOnline() bool {
return rpcClient.rpc != nil
}
func (rpcClient *ReconnectRPCClient) IsLocal() bool {
return false
}
// Close closes the underlying socket file descriptor.
func (rpcClient *ReconnectRPCClient) Close() error {
rpcClient.mutex.Lock()

@ -68,4 +68,7 @@ type NetLocker interface {
// Is the underlying connection online? (is always true for any local lockers)
IsOnline() bool
// Is the underlying locker local to this server?
IsLocal() bool
}

Loading…
Cancel
Save