diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index a3de1dfb6..9181ffab3 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -288,23 +288,14 @@ func TestAdminServerInfo(t *testing.T) { t.Errorf("Expected to succeed but failed with %d", rec.Code) } - results := []ServerInfo{} + results := madmin.InfoMessage{} err = json.NewDecoder(rec.Body).Decode(&results) if err != nil { t.Fatalf("Failed to decode set config result json %v", err) } - if len(results) == 0 { - t.Error("Expected at least one server info result") - } - - for _, serverInfo := range results { - if serverInfo.Error != "" { - t.Errorf("Unexpected error = %v\n", serverInfo.Error) - } - if serverInfo.Data.Properties.Region != globalMinioDefaultRegion { - t.Errorf("Expected %s, got %s", globalMinioDefaultRegion, serverInfo.Data.Properties.Region) - } + if results.Region != globalMinioDefaultRegion { + t.Errorf("Expected %s, got %s", globalMinioDefaultRegion, results.Region) } } diff --git a/cmd/api-router.go b/cmd/api-router.go index 875794396..b1d5eb50b 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -166,8 +166,6 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool) bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler))).Queries("notification", "") // ListenBucketNotification bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}") - // ListenBucketNotificationV2 - bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotificationv2", httpTraceAll(api.ListenBucketNotificationHandlerV2))).Queries("type", "2", "events", "{events:.*}") // ListMultipartUploads bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler))).Queries("uploads", "") // ListObjectsV2M diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 10ddcdc53..c94fcae3f 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -31,7 +31,6 @@ import ( xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" - "github.com/minio/minio/pkg/event/target" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" ) @@ -39,7 +38,6 @@ import ( const ( bucketConfigPrefix = "buckets" bucketNotificationConfig = "notification.xml" - bucketListenerConfig = "listener.json" ) var errNoSuchNotifications = errors.New("The specified bucket does not have bucket notifications") @@ -174,10 +172,10 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, writeSuccessResponseHeadersOnly(w) } -func (api objectAPIHandlers) ListenBucketNotificationHandlerV2(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ListenBucketNotificationV2") +func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ListenBucketNotification") - defer logger.AuditLog(w, r, "ListenBucketNotificationV2", mustGetClaimsFromToken(r)) + defer logger.AuditLog(w, r, "ListenBucketNotification", mustGetClaimsFromToken(r)) // Validate if bucket exists. objAPI := api.ObjectAPI() @@ -311,131 +309,3 @@ func (api objectAPIHandlers) ListenBucketNotificationHandlerV2(w http.ResponseWr } } - -// ListenBucketNotificationHandler - This HTTP handler sends events to the connected HTTP client. -// Client should send prefix/suffix object name to match and events to watch as query parameters. -func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ListenBucketNotification") - - defer logger.AuditLog(w, r, "ListenBucketNotification", mustGetClaimsFromToken(r)) - - // Validate if bucket exists. - objAPI := api.ObjectAPI() - if objAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) - return - } - - if !objAPI.IsNotificationSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) - return - } - - if !objAPI.IsListenBucketSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) - return - } - vars := mux.Vars(r) - bucketName := vars["bucket"] - - if s3Error := checkRequestAuthType(ctx, r, policy.ListenBucketNotificationAction, bucketName, ""); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) - return - } - - values := r.URL.Query() - - var prefix string - if len(values["prefix"]) > 1 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNamePrefix), r.URL, guessIsBrowserReq(r)) - return - } - - if len(values["prefix"]) == 1 { - if err := event.ValidateFilterRuleValue(values["prefix"][0]); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - prefix = values["prefix"][0] - } - - var suffix string - if len(values["suffix"]) > 1 { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNameSuffix), r.URL, guessIsBrowserReq(r)) - return - } - - if len(values["suffix"]) == 1 { - if err := event.ValidateFilterRuleValue(values["suffix"][0]); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - suffix = values["suffix"][0] - } - - pattern := event.NewPattern(prefix, suffix) - - eventNames := []event.Name{} - for _, s := range values["events"] { - eventName, err := event.ParseName(s) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - eventNames = append(eventNames, eventName) - } - - if _, err := objAPI.GetBucketInfo(ctx, bucketName); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - host, err := xnet.ParseHost(r.RemoteAddr) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - w.Header().Set(xhttp.ContentType, "text/event-stream") - - target, err := target.NewHTTPClientTarget(*host, w) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - rulesMap := event.NewRulesMap(eventNames, pattern, target.ID()) - - if err = globalNotificationSys.AddRemoteTarget(bucketName, target, rulesMap); err != nil { - logger.GetReqInfo(ctx).AppendTags("target", target.ID().Name) - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - defer globalNotificationSys.RemoveRemoteTarget(bucketName, target.ID()) - defer globalNotificationSys.RemoveRulesMap(bucketName, rulesMap) - - thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - if err = SaveListener(objAPI, bucketName, eventNames, pattern, target.ID(), *thisAddr); err != nil { - logger.GetReqInfo(ctx).AppendTags("target", target.ID().Name) - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - - globalNotificationSys.ListenBucketNotification(ctx, bucketName, eventNames, pattern, target.ID(), *thisAddr) - - <-target.DoneCh - - if err = RemoveListener(objAPI, bucketName, target.ID(), *thisAddr); err != nil { - logger.GetReqInfo(ctx).AppendTags("target", target.ID().Name) - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } -} diff --git a/cmd/notification.go b/cmd/notification.go index 96b7f9987..a83f5ed43 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "encoding/json" "encoding/xml" "fmt" "io" @@ -590,24 +589,6 @@ func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketNam }() } -// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. -func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, - eventNames []event.Name, pattern string, targetID event.TargetID, localPeer xnet.Host) { - go func() { - ng := WithNPeers(len(sys.peerClients)) - for idx, client := range sys.peerClients { - if client == nil { - continue - } - client := client - ng.Go(ctx, func() error { - return client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer) - }, idx, *client.host) - } - ng.Wait() - }() -} - // AddNotificationTargetsFromConfig - adds notification targets from server config. func (sys *NotificationSys) AddNotificationTargetsFromConfig(cfg config.Config) error { targetList, err := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport()) @@ -660,82 +641,6 @@ func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event. return ok } -// ListenBucketNotificationArgs - listen bucket notification RPC arguments. -type ListenBucketNotificationArgs struct { - BucketName string `json:"-"` - EventNames []event.Name `json:"eventNames"` - Pattern string `json:"pattern"` - TargetID event.TargetID `json:"targetId"` - Addr xnet.Host `json:"addr"` -} - -// initListeners - initializes PeerREST clients available in listener.json. -func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLayer, bucketName string) error { - // listener.json is available/applicable only in DistXL mode. - if !globalIsDistXL { - return nil - } - - // Construct path to listener.json for the given bucket. - configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig) - configData, e := readConfig(ctx, objAPI, configFile) - if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) { - return e - } - - listenerList := []ListenBucketNotificationArgs{} - if configData != nil { - if err := json.Unmarshal(configData, &listenerList); err != nil { - logger.LogIf(ctx, err) - return err - } - } - - if len(listenerList) == 0 { - // Nothing to initialize for empty listener list. - return nil - } - - for _, args := range listenerList { - found, err := isLocalHost(args.Addr.Name, args.Addr.Port.String(), args.Addr.Port.String()) - if err != nil { - logger.GetReqInfo(ctx).AppendTags("host", args.Addr.String()) - logger.LogIf(ctx, err) - return err - } - if found { - // As this function is called at startup, skip HTTP listener to this host. - continue - } - - client, err := newPeerRESTClient(&args.Addr) - if err != nil { - return fmt.Errorf("unable to find PeerHost by address %v in listener.json for bucket %v", args.Addr, bucketName) - } - - exist, err := client.RemoteTargetExist(bucketName, args.TargetID) - if err != nil { - logger.GetReqInfo(ctx).AppendTags("targetID", args.TargetID.Name) - logger.LogIf(ctx, err) - return err - } - if !exist { - // Skip previously connected HTTP listener which is not found in remote peer. - continue - } - - target := NewPeerRESTClientTarget(bucketName, args.TargetID, client) - rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) - if err = sys.AddRemoteTarget(bucketName, target, rulesMap); err != nil { - logger.GetReqInfo(ctx).AppendTags("targetName", target.id.Name) - logger.LogIf(ctx, err) - return err - } - } - - return nil -} - // Loads notification policies for all buckets into NotificationSys. func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error { for _, bucket := range buckets { @@ -751,9 +656,6 @@ func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error continue } sys.AddRulesMap(bucket.Name, config.ToRulesMap()) - if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil { - return err - } } return nil } @@ -1283,7 +1185,6 @@ func (args eventArgs) ToEvent() event.Event { } func sendEvent(args eventArgs) { - // remove sensitive encryption entries in metadata. switch { case crypto.IsEncrypted(args.Object.UserDefined): @@ -1297,16 +1198,15 @@ func sendEvent(args eventArgs) { crypto.RemoveSensitiveEntries(args.Object.UserDefined) crypto.RemoveInternalEntries(args.Object.UserDefined) - if globalHTTPListen.HasSubscribers() { - globalHTTPListen.Publish(args.ToEvent()) - return - } - // globalNotificationSys is not initialized in gateway mode. if globalNotificationSys == nil { return } + if globalHTTPListen.HasSubscribers() { + globalHTTPListen.Publish(args.ToEvent()) + } + notifyCh := globalNotificationSys.Send(args) go func() { for _, err := range notifyCh { @@ -1345,92 +1245,3 @@ func saveNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig) return saveConfig(ctx, objAPI, configFile, data) } - -// SaveListener - saves HTTP client currently listening for events to listener.json. -func SaveListener(objAPI ObjectLayer, bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, addr xnet.Host) error { - // listener.json is available/applicable only in DistXL mode. - if !globalIsDistXL { - return nil - } - - ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName}) - - // Construct path to listener.json for the given bucket. - configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig) - - configData, err := readConfig(ctx, objAPI, configFile) - if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) { - return err - } - - listenerList := []ListenBucketNotificationArgs{} - if configData != nil { - if err = json.Unmarshal(configData, &listenerList); err != nil { - logger.LogIf(ctx, err) - return err - } - } - - listenerList = append(listenerList, ListenBucketNotificationArgs{ - EventNames: eventNames, - Pattern: pattern, - TargetID: targetID, - Addr: addr, - }) - - data, err := json.Marshal(listenerList) - if err != nil { - logger.LogIf(ctx, err) - return err - } - - return saveConfig(ctx, objAPI, configFile, data) -} - -// RemoveListener - removes HTTP client currently listening for events from listener.json. -func RemoveListener(objAPI ObjectLayer, bucketName string, targetID event.TargetID, addr xnet.Host) error { - // listener.json is available/applicable only in DistXL mode. - if !globalIsDistXL { - return nil - } - - ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName}) - - // Construct path to listener.json for the given bucket. - configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig) - configData, err := readConfig(ctx, objAPI, configFile) - if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) { - return err - } - - listenerList := []ListenBucketNotificationArgs{} - if configData != nil { - if err = json.Unmarshal(configData, &listenerList); err != nil { - logger.LogIf(ctx, err) - return err - } - } - - if len(listenerList) == 0 { - // Nothing to remove. - return nil - } - - activeListenerList := []ListenBucketNotificationArgs{} - for _, args := range listenerList { - if args.TargetID == targetID && args.Addr.Equal(addr) { - // Skip if matches - continue - } - - activeListenerList = append(activeListenerList, args) - } - - data, err := json.Marshal(activeListenerList) - if err != nil { - logger.LogIf(ctx, err) - return err - } - - return saveConfig(ctx, objAPI, configFile, data) -} diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index b602169a4..ad0e9a7fa 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -91,9 +91,6 @@ func deleteBucketMetadata(ctx context.Context, bucket string, objAPI ObjectLayer // Delete notification config, if present - ignore any errors. removeNotificationConfig(ctx, objAPI, bucket) - - // Delete listener config, if present - ignore any errors. - removeListenerConfig(ctx, objAPI, bucket) } // Depending on the disk type network or local, initialize storage API. @@ -233,13 +230,6 @@ func removeNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucket st return objAPI.DeleteObject(ctx, minioMetaBucket, ncPath) } -// Remove listener configuration from storage layer. Used when a bucket is deleted. -func removeListenerConfig(ctx context.Context, objAPI ObjectLayer, bucket string) error { - // make the path - lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) - return objAPI.DeleteObject(ctx, minioMetaBucket, lcPath) -} - func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) { endWalkCh := make(chan struct{}) defer close(endWalkCh) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index aee0d1e6d..03657555c 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -264,34 +264,6 @@ func (client *peerRESTClient) ReloadFormat(dryRun bool) error { return nil } -// ListenBucketNotification - send listen bucket notification to peer nodes. -func (client *peerRESTClient) ListenBucketNotification(bucket string, eventNames []event.Name, - pattern string, targetID event.TargetID, addr xnet.Host) error { - args := listenBucketNotificationReq{ - EventNames: eventNames, - Pattern: pattern, - TargetID: targetID, - Addr: addr, - } - - values := make(url.Values) - values.Set(peerRESTBucket, bucket) - - var reader bytes.Buffer - err := gob.NewEncoder(&reader).Encode(args) - if err != nil { - return err - } - - respBody, err := client.call(peerRESTMethodBucketNotificationListen, values, &reader, -1) - if err != nil { - return err - } - - defer http.DrainBody(respBody) - return nil -} - // SendEvent - calls send event RPC. func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error { numTries := 10 diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index c84a33469..9fced5d78 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -32,7 +32,6 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/lifecycle" - xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" trace "github.com/minio/minio/pkg/trace" ) @@ -847,58 +846,6 @@ func (s *peerRESTServer) PutBucketObjectLockConfigHandler(w http.ResponseWriter, w.(http.Flusher).Flush() } -type listenBucketNotificationReq struct { - EventNames []event.Name `json:"eventNames"` - Pattern string `json:"pattern"` - TargetID event.TargetID `json:"targetId"` - Addr xnet.Host `json:"addr"` -} - -// ListenBucketNotificationHandler - Listen bucket notification handler. -func (s *peerRESTServer) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("Invalid request")) - return - } - - vars := mux.Vars(r) - bucketName := vars[peerRESTBucket] - if bucketName == "" { - s.writeErrorResponse(w, errors.New("Bucket name is missing")) - return - } - - var args listenBucketNotificationReq - if r.ContentLength <= 0 { - s.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&args) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - restClient, err := newPeerRESTClient(&args.Addr) - if err != nil { - s.writeErrorResponse(w, fmt.Errorf("unable to find PeerRESTClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr)) - return - } - - target := NewPeerRESTClientTarget(bucketName, args.TargetID, restClient) - rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) - if err := globalNotificationSys.AddRemoteTarget(bucketName, target, rulesMap); err != nil { - reqInfo := &logger.ReqInfo{BucketName: target.bucketName} - reqInfo.AppendTags("target", target.id.Name) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - s.writeErrorResponse(w, err) - return - } - w.(http.Flusher).Flush() -} - // ServerUpdateHandler - updates the current server. func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1154,7 +1101,6 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketNotificationPut).HandlerFunc(httpTraceHdrs(server.PutBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketNotificationListen).HandlerFunc(httpTraceHdrs(server.ListenBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketLifecycleSet).HandlerFunc(httpTraceHdrs(server.SetBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...)