Do notification in background to not block S3 client REST calls (#6005)

master
Krishna Srinivas 6 years ago committed by kannappanr
parent b0b0fb4c8d
commit e40a5e05e1
  1. 5
      cmd/bucket-handlers.go
  2. 11
      cmd/bucket-notification-handlers.go
  3. 10
      cmd/bucket-policy-handlers.go
  4. 84
      cmd/notification.go
  5. 10
      cmd/web-handlers.go

@ -753,10 +753,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
globalNotificationSys.RemoveNotification(bucket) globalNotificationSys.RemoveNotification(bucket)
globalPolicySys.Remove(bucket) globalPolicySys.Remove(bucket)
for nerr := range globalNotificationSys.DeleteBucket(bucket) { globalNotificationSys.DeleteBucket(ctx, bucket)
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, nerr.Err)
}
if globalDNSConfig != nil { if globalDNSConfig != nil {
if err := globalDNSConfig.Delete(bucket); err != nil { if err := globalDNSConfig.Delete(bucket); err != nil {

@ -146,10 +146,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
rulesMap := config.ToRulesMap() rulesMap := config.ToRulesMap()
globalNotificationSys.AddRulesMap(bucketName, rulesMap) globalNotificationSys.AddRulesMap(bucketName, rulesMap)
for nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) { globalNotificationSys.PutBucketNotification(ctx, bucketName, rulesMap)
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, nerr.Err)
}
writeSuccessResponseHeadersOnly(w) writeSuccessResponseHeadersOnly(w)
} }
@ -260,11 +257,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
return return
} }
errCh := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr) globalNotificationSys.ListenBucketNotification(ctx, bucketName, eventNames, pattern, target.ID(), *thisAddr)
for nerr := range errCh {
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, nerr.Err)
}
<-target.DoneCh <-target.DoneCh

@ -91,10 +91,7 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
} }
globalPolicySys.Set(bucket, *bucketPolicy) globalPolicySys.Set(bucket, *bucketPolicy)
for nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) { globalNotificationSys.SetBucketPolicy(ctx, bucket, bucketPolicy)
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, nerr.Err)
}
// Success. // Success.
writeSuccessNoContent(w) writeSuccessNoContent(w)
@ -130,10 +127,7 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
} }
globalPolicySys.Remove(bucket) globalPolicySys.Remove(bucket)
for nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) { globalNotificationSys.RemoveBucketPolicy(ctx, bucket)
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, nerr.Err)
}
// Success. // Success.
writeSuccessNoContent(w) writeSuccessNoContent(w)

@ -68,28 +68,21 @@ type NotificationPeerErr struct {
} }
// DeleteBucket - calls DeleteBucket RPC call on all peers. // DeleteBucket - calls DeleteBucket RPC call on all peers.
func (sys *NotificationSys) DeleteBucket(bucketName string) <-chan NotificationPeerErr { func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) {
errCh := make(chan NotificationPeerErr)
go func() { go func() {
defer close(errCh)
var wg sync.WaitGroup var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.DeleteBucket(bucketName); err != nil { if err := client.DeleteBucket(bucketName); err != nil {
errCh <- NotificationPeerErr{ logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
Host: addr, logger.LogIf(ctx, err)
Err: err,
}
} }
}(addr, client) }(addr, client)
} }
wg.Wait() wg.Wait()
}() }()
return errCh
} }
// SetCredentials - calls SetCredentials RPC call on all peers. // SetCredentials - calls SetCredentials RPC call on all peers.
@ -120,104 +113,76 @@ func (sys *NotificationSys) SetCredentials(credentials auth.Credentials) map[xne
} }
// SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers.
func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) <-chan NotificationPeerErr { func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) {
errCh := make(chan NotificationPeerErr)
go func() { go func() {
defer close(errCh)
var wg sync.WaitGroup var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
errCh <- NotificationPeerErr{ logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
Host: addr, logger.LogIf(ctx, err)
Err: err,
}
} }
}(addr, client) }(addr, client)
} }
wg.Wait() wg.Wait()
}() }()
return errCh
} }
// RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers. // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers.
func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) <-chan NotificationPeerErr { func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) {
errCh := make(chan NotificationPeerErr)
go func() { go func() {
defer close(errCh)
var wg sync.WaitGroup var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.RemoveBucketPolicy(bucketName); err != nil { if err := client.RemoveBucketPolicy(bucketName); err != nil {
errCh <- NotificationPeerErr{ logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
Host: addr, logger.LogIf(ctx, err)
Err: err,
}
} }
}(addr, client) }(addr, client)
} }
wg.Wait() wg.Wait()
}() }()
return errCh
} }
// PutBucketNotification - calls PutBucketNotification RPC call on all peers. // PutBucketNotification - calls PutBucketNotification RPC call on all peers.
func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) <-chan NotificationPeerErr { func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) {
errCh := make(chan NotificationPeerErr)
go func() { go func() {
defer close(errCh)
var wg sync.WaitGroup var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) {
defer wg.Done() defer wg.Done()
if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
errCh <- NotificationPeerErr{ logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
Host: addr, logger.LogIf(ctx, err)
Err: err,
}
} }
}(addr, client, rulesMap.Clone()) }(addr, client, rulesMap.Clone())
} }
wg.Wait() wg.Wait()
}() }()
return errCh
} }
// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string, func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, eventNames []event.Name, pattern string,
targetID event.TargetID, localPeer xnet.Host) <-chan NotificationPeerErr { targetID event.TargetID, localPeer xnet.Host) {
errCh := make(chan NotificationPeerErr)
go func() { go func() {
defer close(errCh)
var wg sync.WaitGroup var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
errCh <- NotificationPeerErr{ logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name)
Host: addr, logger.LogIf(ctx, err)
Err: err,
}
} }
}(addr, client) }(addr, client)
} }
wg.Wait() wg.Wait()
}() }()
return errCh
} }
// AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name. // AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name.
@ -556,13 +521,16 @@ func sendEvent(args eventArgs) {
return return
} }
for _, err := range globalNotificationSys.Send(args) { notifyCh := globalNotificationSys.Send(args)
reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name} go func() {
reqInfo.AppendTags("EventName", args.EventName.String()) for _, err := range notifyCh {
reqInfo.AppendTags("targetID", err.ID.Name) reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name}
ctx := logger.SetReqInfo(context.Background(), reqInfo) reqInfo.AppendTags("EventName", args.EventName.String())
logger.LogOnceIf(ctx, err.Err, err.ID) reqInfo.AppendTags("targetID", err.ID.Name)
} ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogOnceIf(ctx, err.Err, err.ID)
}
}()
} }
func saveConfig(objAPI ObjectLayer, configFile string, data []byte) error { func saveConfig(objAPI ObjectLayer, configFile string, data []byte) error {

@ -193,10 +193,7 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs,
globalNotificationSys.RemoveNotification(args.BucketName) globalNotificationSys.RemoveNotification(args.BucketName)
globalPolicySys.Remove(args.BucketName) globalPolicySys.Remove(args.BucketName)
for nerr := range globalNotificationSys.DeleteBucket(args.BucketName) { globalNotificationSys.DeleteBucket(ctx, args.BucketName)
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, nerr.Err)
}
if globalDNSConfig != nil { if globalDNSConfig != nil {
if err := globalDNSConfig.Delete(args.BucketName); err != nil { if err := globalDNSConfig.Delete(args.BucketName); err != nil {
@ -979,10 +976,7 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
} }
globalPolicySys.Set(args.BucketName, *bucketPolicy) globalPolicySys.Set(args.BucketName, *bucketPolicy)
for nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) { globalNotificationSys.SetBucketPolicy(ctx, args.BucketName, bucketPolicy)
logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, nerr.Err)
}
return nil return nil
} }

Loading…
Cancel
Save