diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index a3ff048f2..ababa8127 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -22,7 +22,6 @@ import ( "errors" "io" "net/http" - "net/url" "path" "reflect" "time" @@ -296,11 +295,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) { return false } - objectName, uerr := url.QueryUnescape(ev.S3.Object.Key) - if uerr != nil { - objectName = ev.S3.Object.Key - } - return len(rulesMap.Match(ev.EventName, objectName).ToSlice()) != 0 + return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) }) for _, peer := range peers { diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 50d3b144d..ad652a42a 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -131,6 +131,14 @@ func StartGateway(ctx *cli.Context, gw Gateway) { globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get()) logger.FatalIf(err, "Failed to read root CAs (%v)", err) + globalMinioEndpoint = func() string { + host := globalMinioHost + if host == "" { + host = sortIPs(localIP4.ToSlice())[0] + } + return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort)) + }() + // Handle gateway specific env gatewayHandleEnvVars() diff --git a/cmd/globals.go b/cmd/globals.go index 914128b85..ded2a096e 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -138,6 +138,8 @@ var ( globalMinioPort = globalMinioDefaultPort // Holds the host that was passed using --address globalMinioHost = "" + // Holds the possible host endpoint. + globalMinioEndpoint = "" // globalConfigSys server config system. globalConfigSys *ConfigSys diff --git a/cmd/notification.go b/cmd/notification.go index b288ec82d..d99884931 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -22,7 +22,6 @@ import ( "encoding/xml" "fmt" "io" - "net" "net/url" "path" "sort" @@ -50,6 +49,7 @@ import ( type NotificationSys struct { sync.RWMutex targetList *event.TargetList + targetResCh chan event.TargetIDResult bucketRulesMap map[string]event.RulesMap bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap peerClients []*peerRESTClient @@ -662,19 +662,6 @@ func (sys *NotificationSys) AddRemoteTarget(bucketName string, target event.Targ return nil } -// RemoteTargetExist - checks whether given target ID is a HTTP/PeerRPC client target or not. -func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event.TargetID) bool { - sys.Lock() - defer sys.Unlock() - - targetMap, ok := sys.bucketRemoteTargetRulesMap[bucketName] - if ok { - _, ok = targetMap[targetID] - } - - return ok -} - // Loads notification policies for all buckets into NotificationSys. func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error { for _, bucket := range buckets { @@ -713,6 +700,17 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error } } + go func() { + for res := range sys.targetResCh { + if res.Err != nil { + reqInfo := &logger.ReqInfo{} + reqInfo.AppendTags("targetID", res.ID.Name) + ctx := logger.SetReqInfo(GlobalContext, reqInfo) + logger.LogOnceIf(ctx, res.Err, res.ID) + } + } + }() + return sys.load(buckets, objAPI) } @@ -759,7 +757,9 @@ func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID { for _, rmap := range sys.bucketRulesMap { for _, rules := range rmap { for _, targetSet := range rules { - targetIDs = append(targetIDs, targetSet.ToSlice()...) + for id := range targetSet { + targetIDs = append(targetIDs, id) + } } } } @@ -780,69 +780,41 @@ func (sys *NotificationSys) RemoveNotification(bucketName string) { delete(sys.bucketRulesMap, bucketName) + targetIDSet := event.NewTargetIDSet() for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] { - sys.targetList.Remove(targetID) + targetIDSet[targetID] = struct{}{} delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID) } + sys.targetList.Remove(targetIDSet) delete(sys.bucketRemoteTargetRulesMap, bucketName) } -// RemoveAllRemoteTargets - closes and removes all HTTP/PeerRPC client targets. +// RemoveAllRemoteTargets - closes and removes all notification targets. func (sys *NotificationSys) RemoveAllRemoteTargets() { sys.Lock() defer sys.Unlock() for _, targetMap := range sys.bucketRemoteTargetRulesMap { - for targetID := range targetMap { - sys.targetList.Remove(targetID) - } - } -} - -// RemoveRemoteTarget - closes and removes target by target ID. -func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) { - for terr := range sys.targetList.Remove(targetID) { - reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", terr.ID.Name) - ctx := logger.SetReqInfo(GlobalContext, reqInfo) - logger.LogIf(ctx, terr.Err) - } - - sys.Lock() - defer sys.Unlock() - - if _, ok := sys.bucketRemoteTargetRulesMap[bucketName]; ok { - delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID) - if len(sys.bucketRemoteTargetRulesMap[bucketName]) == 0 { - delete(sys.bucketRemoteTargetRulesMap, bucketName) - } - } -} - -func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) (errs []event.TargetIDErr) { - errCh := sys.targetList.Send(eventData, targetIDs...) - for terr := range errCh { - errs = append(errs, terr) - if sys.RemoteTargetExist(bucketName, terr.ID) { - sys.RemoveRemoteTarget(bucketName, terr.ID) + targetIDSet := event.NewTargetIDSet() + for k := range targetMap { + targetIDSet[k] = struct{}{} } + sys.targetList.Remove(targetIDSet) } - - return errs } // Send - sends event data to all matching targets. -func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr { +func (sys *NotificationSys) Send(args eventArgs) { sys.RLock() targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) sys.RUnlock() if len(targetIDSet) == 0 { - return nil + return } - targetIDs := targetIDSet.ToSlice() - return sys.send(args.BucketName, args.ToEvent(), targetIDs...) + sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh) } // PutBucketObjectLockConfig - put bucket object lock configuration to all peers. @@ -1204,6 +1176,7 @@ func NewNotificationSys(endpoints EndpointZones) *NotificationSys { // bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init() return &NotificationSys{ targetList: event.NewTargetList(), + targetResCh: make(chan event.TargetIDResult), bucketRulesMap: make(map[string]event.RulesMap), bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), peerClients: newPeerRestClients(endpoints), @@ -1221,23 +1194,13 @@ type eventArgs struct { } // ToEvent - converts to notification event. -func (args eventArgs) ToEvent() event.Event { - getOriginEndpoint := func() string { - host := globalMinioHost - if host == "" { - // FIXME: Send FQDN or hostname of this machine than sending IP address. - host = sortIPs(localIP4.ToSlice())[0] - } - - return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort)) - } - +func (args eventArgs) ToEvent(escape bool) event.Event { eventTime := UTCNow() uniqueID := fmt.Sprintf("%X", eventTime.UnixNano()) respElements := map[string]string{ "x-amz-request-id": args.RespElements["requestId"], - "x-minio-origin-endpoint": getOriginEndpoint(), // MinIO specific custom elements. + "x-minio-origin-endpoint": globalMinioEndpoint, // MinIO specific custom elements. } // Add deployment as part of if globalDeploymentID != "" { @@ -1246,6 +1209,10 @@ func (args eventArgs) ToEvent() event.Event { if args.RespElements["content-length"] != "" { respElements["content-length"] = args.RespElements["content-length"] } + keyName := args.Object.Name + if escape { + keyName = url.QueryEscape(args.Object.Name) + } newEvent := event.Event{ EventVersion: "2.0", EventSource: "minio:s3", @@ -1264,7 +1231,7 @@ func (args eventArgs) ToEvent() event.Event { ARN: policy.ResourceARNPrefix + args.BucketName, }, Object: event.Object{ - Key: url.QueryEscape(args.Object.Name), + Key: keyName, VersionID: "1", Sequencer: uniqueID, }, @@ -1308,19 +1275,10 @@ func sendEvent(args eventArgs) { } if globalHTTPListen.HasSubscribers() { - globalHTTPListen.Publish(args.ToEvent()) + globalHTTPListen.Publish(args.ToEvent(false)) } - notifyCh := globalNotificationSys.Send(args) - go func() { - for _, err := range notifyCh { - reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name} - reqInfo.AppendTags("EventName", args.EventName.String()) - reqInfo.AppendTags("targetID", err.ID.Name) - ctx := logger.SetReqInfo(GlobalContext, reqInfo) - logger.LogOnceIf(ctx, err.Err, err.ID) - } - }() + globalNotificationSys.Send(args) } func readNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName string) (*event.Config, error) { diff --git a/cmd/peer-rest-client-target.go b/cmd/peer-rest-client-target.go deleted file mode 100644 index 05a5780b3..000000000 --- a/cmd/peer-rest-client-target.go +++ /dev/null @@ -1,72 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2018, 2019 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import "github.com/minio/minio/pkg/event" - -// PeerRESTClientTarget - RPCClient is an event.Target which sends event to target of remote peer. -type PeerRESTClientTarget struct { - id event.TargetID - remoteTargetID event.TargetID - restClient *peerRESTClient - bucketName string -} - -// ID - returns target ID. -func (target *PeerRESTClientTarget) ID() event.TargetID { - return target.id -} - -// IsActive - does nothing and available for interface compatibility. -func (target *PeerRESTClientTarget) IsActive() (bool, error) { - return true, nil -} - -// HasQueueStore - No-Op. Added for interface compatibility -func (target PeerRESTClientTarget) HasQueueStore() bool { - return false -} - -// Save - Sends event directly without persisting. -func (target *PeerRESTClientTarget) Save(eventData event.Event) error { - return target.send(eventData) -} - -// Send - interface compatible method does no-op. -func (target *PeerRESTClientTarget) Send(eventKey string) error { - return nil -} - -// sends event to remote peer by making RPC call. -func (target *PeerRESTClientTarget) send(eventData event.Event) error { - return target.restClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData) -} - -// Close - does nothing and available for interface compatibility. -func (target *PeerRESTClientTarget) Close() error { - return nil -} - -// NewPeerRESTClientTarget - creates RPCClient target with given target ID available in remote peer. -func NewPeerRESTClientTarget(bucketName string, targetID event.TargetID, restClient *peerRESTClient) *PeerRESTClientTarget { - return &PeerRESTClientTarget{ - id: event.TargetID{ID: targetID.ID, Name: targetID.Name + "+" + mustGetUUID()}, - remoteTargetID: targetID, - bucketName: bucketName, - restClient: restClient, - } -} diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 65578569a..18275222c 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -500,78 +500,6 @@ func (client *peerRESTClient) ReloadFormat(dryRun bool) error { return nil } -// SendEvent - calls send event RPC. -func (client *peerRESTClient) SendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error { - numTries := 10 - for { - err := client.sendEvent(bucket, targetID, remoteTargetID, eventData) - if err == nil { - return nil - } - if numTries == 0 { - return err - } - numTries-- - time.Sleep(5 * time.Second) - } -} - -func (client *peerRESTClient) sendEvent(bucket string, targetID, remoteTargetID event.TargetID, eventData event.Event) error { - args := sendEventRequest{ - TargetID: remoteTargetID, - Event: eventData, - } - - values := make(url.Values) - values.Set(peerRESTBucket, bucket) - - var reader bytes.Buffer - err := gob.NewEncoder(&reader).Encode(args) - if err != nil { - return err - } - respBody, err := client.call(peerRESTMethodSendEvent, values, &reader, -1) - if err != nil { - return err - } - - var eventResp sendEventResp - defer http.DrainBody(respBody) - err = gob.NewDecoder(respBody).Decode(&eventResp) - - if err != nil || !eventResp.Success { - reqInfo := &logger.ReqInfo{BucketName: bucket} - reqInfo.AppendTags("targetID", targetID.Name) - reqInfo.AppendTags("event", eventData.EventName.String()) - ctx := logger.SetReqInfo(GlobalContext, reqInfo) - logger.LogIf(ctx, err) - globalNotificationSys.RemoveRemoteTarget(bucket, targetID) - } - - return err -} - -// RemoteTargetExist - calls remote target ID exist REST API. -func (client *peerRESTClient) RemoteTargetExist(bucket string, targetID event.TargetID) (bool, error) { - values := make(url.Values) - values.Set(peerRESTBucket, bucket) - - var reader bytes.Buffer - err := gob.NewEncoder(&reader).Encode(targetID) - if err != nil { - return false, err - } - - respBody, err := client.call(peerRESTMethodTargetExists, values, &reader, -1) - if err != nil { - return false, err - } - defer http.DrainBody(respBody) - var targetExists remoteTargetExistsResp - err = gob.NewDecoder(respBody).Decode(&targetExists) - return targetExists.Exists, err -} - // RemoveBucketPolicy - Remove bucket policy on the peer node. func (client *peerRESTClient) RemoveBucketPolicy(bucket string) error { values := make(url.Values) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 5e0ba17e9..e1a66c538 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - peerRESTVersion = "v7" + peerRESTVersion = "v8" peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix @@ -52,17 +52,14 @@ const ( peerRESTMethodDownloadProfilingData = "/downloadprofilingdata" peerRESTMethodBucketPolicySet = "/setbucketpolicy" peerRESTMethodBucketNotificationPut = "/putbucketnotification" - peerRESTMethodBucketNotificationListen = "/listenbucketnotification" peerRESTMethodReloadFormat = "/reloadformat" - peerRESTMethodTargetExists = "/targetexists" - peerRESTMethodSendEvent = "/sendevent" peerRESTMethodTrace = "/trace" peerRESTMethodListen = "/listen" + peerRESTMethodLog = "/log" peerRESTMethodBucketLifecycleSet = "/setbucketlifecycle" peerRESTMethodBucketLifecycleRemove = "/removebucketlifecycle" peerRESTMethodBucketEncryptionSet = "/setbucketencryption" peerRESTMethodBucketEncryptionRemove = "/removebucketencryption" - peerRESTMethodLog = "/log" peerRESTMethodPutBucketObjectLockConfig = "/putbucketobjectlockconfig" peerRESTMethodBucketObjectLockConfigRemove = "/removebucketobjectlockconfig" ) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index db89f087a..3d06afa61 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -24,7 +24,6 @@ import ( "io" "io/ioutil" "net/http" - "net/url" "strconv" "strings" "time" @@ -774,97 +773,6 @@ func (s *peerRESTServer) SetBucketSSEConfigHandler(w http.ResponseWriter, r *htt w.(http.Flusher).Flush() } -type remoteTargetExistsResp struct { - Exists bool -} - -// TargetExistsHandler - Check if Target exists. -func (s *peerRESTServer) TargetExistsHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "TargetExists") - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("Invalid request")) - return - } - - vars := mux.Vars(r) - bucketName := vars[peerRESTBucket] - if bucketName == "" { - s.writeErrorResponse(w, errors.New("Bucket name is missing")) - return - } - var targetID event.TargetID - if r.ContentLength <= 0 { - s.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&targetID) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - var targetExists remoteTargetExistsResp - targetExists.Exists = globalNotificationSys.RemoteTargetExist(bucketName, targetID) - - defer w.(http.Flusher).Flush() - logger.LogIf(ctx, gob.NewEncoder(w).Encode(&targetExists)) -} - -type sendEventRequest struct { - Event event.Event - TargetID event.TargetID -} - -type sendEventResp struct { - Success bool -} - -// SendEventHandler - Send Event. -func (s *peerRESTServer) SendEventHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("Invalid request")) - return - } - - ctx := newContext(r, w, "SendEvent") - - vars := mux.Vars(r) - bucketName := vars[peerRESTBucket] - if bucketName == "" { - s.writeErrorResponse(w, errors.New("Bucket name is missing")) - return - } - var eventReq sendEventRequest - if r.ContentLength <= 0 { - s.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&eventReq) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - var eventResp sendEventResp - eventResp.Success = true - errs := globalNotificationSys.send(bucketName, eventReq.Event, eventReq.TargetID) - - for i := range errs { - reqInfo := (&logger.ReqInfo{}).AppendTags("Event", eventReq.Event.EventName.String()) - reqInfo.AppendTags("targetName", eventReq.TargetID.Name) - ctx := logger.SetReqInfo(GlobalContext, reqInfo) - logger.LogIf(ctx, errs[i].Err) - - eventResp.Success = false - s.writeErrorResponse(w, errs[i].Err) - return - } - logger.LogIf(ctx, gob.NewEncoder(w).Encode(&eventResp)) - w.(http.Flusher).Flush() -} - // PutBucketNotificationHandler - Set bucket policy. func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1077,11 +985,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { if ev.S3.Bucket.Name != values.Get(peerRESTListenBucket) { return false } - objectName, uerr := url.QueryUnescape(ev.S3.Object.Key) - if uerr != nil { - objectName = ev.S3.Object.Key - } - return len(rulesMap.Match(ev.EventName, objectName).ToSlice()) != 0 + return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) }) keepAliveTicker := time.NewTicker(500 * time.Millisecond) @@ -1239,8 +1143,6 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProfilingDataHandler)) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketNotificationPut).HandlerFunc(httpTraceHdrs(server.PutBucketNotificationHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...) diff --git a/cmd/server-main.go b/cmd/server-main.go index c4dfcc1da..c19adb0e4 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -111,7 +111,6 @@ func serverHandleCmdArgs(ctx *cli.Context) { globalMinioAddr = globalCLIContext.Addr globalMinioHost, globalMinioPort = mustSplitHostPort(globalMinioAddr) - endpoints := strings.Fields(env.Get(config.EnvEndpoints, "")) if len(endpoints) > 0 { globalEndpoints, globalXLSetDriveCount, setupType, err = createServerEndpoints(globalCLIContext.Addr, endpoints...) @@ -344,6 +343,14 @@ func serverMain(ctx *cli.Context) { globalRootCAs, err = config.GetRootCAs(globalCertsCADir.Get()) logger.FatalIf(err, "Failed to read root CAs (%v)", err) + globalMinioEndpoint = func() string { + host := globalMinioHost + if host == "" { + host = sortIPs(localIP4.ToSlice())[0] + } + return fmt.Sprintf("%s://%s", getURLScheme(globalIsSSL), net.JoinHostPort(host, globalMinioPort)) + }() + // Is distributed setup, error out if no certificates are found for HTTPS endpoints. if globalIsDistXL { if globalEndpoints.HTTPS() && !globalIsSSL { diff --git a/go.mod b/go.mod index e29429f42..270dda83f 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,7 @@ require ( github.com/minio/hdfs/v3 v3.0.1 github.com/minio/highwayhash v1.0.0 github.com/minio/lsync v1.0.1 - github.com/minio/minio-go/v6 v6.0.53 + github.com/minio/minio-go/v6 v6.0.55-0.20200424204115-7506d2996b22 github.com/minio/parquet-go v0.0.0-20200414234858-838cfa8aae61 github.com/minio/sha256-simd v0.1.1 github.com/minio/simdjson-go v0.1.5-0.20200303142138-b17fe061ea37 diff --git a/go.sum b/go.sum index 05df1e6c1..c15d82c95 100644 --- a/go.sum +++ b/go.sum @@ -276,6 +276,8 @@ github.com/minio/lsync v1.0.1 h1:AVvILxA976xc27hstd1oR+X9PQG0sPSom1MNb1ImfUs= github.com/minio/lsync v1.0.1/go.mod h1:tCFzfo0dlvdGl70IT4IAK/5Wtgb0/BrTmo/jE8pArKA= github.com/minio/minio-go/v6 v6.0.53 h1:8jzpwiOzZ5Iz7/goFWqNZRICbyWYShbb5rARjrnSCNI= github.com/minio/minio-go/v6 v6.0.53/go.mod h1:DIvC/IApeHX8q1BAMVCXSXwpmrmM+I+iBvhvztQorfI= +github.com/minio/minio-go/v6 v6.0.55-0.20200424204115-7506d2996b22 h1:nZEve4vdUhwHBoV18zRvPDgjL6NYyDJE5QJvz3l9bRs= +github.com/minio/minio-go/v6 v6.0.55-0.20200424204115-7506d2996b22/go.mod h1:KQMM+/44DSlSGSQWSfRrAZ12FVMmpWNuX37i2AX0jfI= github.com/minio/parquet-go v0.0.0-20200414234858-838cfa8aae61 h1:pUSI/WKPdd77gcuoJkSzhJ4wdS8OMDOsOu99MtpXEQA= github.com/minio/parquet-go v0.0.0-20200414234858-838cfa8aae61/go.mod h1:4trzEJ7N1nBTd5Tt7OCZT5SEin+WiAXpdJ/WgPkESA8= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= diff --git a/pkg/event/rules.go b/pkg/event/rules.go index 2ee7ff5f1..c68d392b8 100644 --- a/pkg/event/rules.go +++ b/pkg/event/rules.go @@ -53,6 +53,16 @@ func (rules Rules) Add(pattern string, targetID TargetID) { rules[pattern] = NewTargetIDSet(targetID).Union(rules[pattern]) } +// MatchSimple - returns true one of the matching object name in rules. +func (rules Rules) MatchSimple(objectName string) bool { + for pattern := range rules { + if wildcard.MatchSimple(pattern, objectName) { + return true + } + } + return false +} + // Match - returns TargetIDSet matching object name in rules. func (rules Rules) Match(objectName string) TargetIDSet { targetIDs := NewTargetIDSet() diff --git a/pkg/event/rulesmap.go b/pkg/event/rulesmap.go index a9c86b36d..8f6e32fa5 100644 --- a/pkg/event/rulesmap.go +++ b/pkg/event/rulesmap.go @@ -60,6 +60,11 @@ func (rulesMap RulesMap) Remove(rulesMap2 RulesMap) { } } +// MatchSimple - returns true if matching object name and event name in rules map. +func (rulesMap RulesMap) MatchSimple(eventName Name, objectName string) bool { + return rulesMap[eventName].MatchSimple(objectName) +} + // Match - returns TargetIDSet matching object name and event name in rules map. func (rulesMap RulesMap) Match(eventName Name, objectName string) TargetIDSet { return rulesMap[eventName].Match(objectName) diff --git a/pkg/event/targetidset.go b/pkg/event/targetidset.go index 97cf24950..f728107af 100644 --- a/pkg/event/targetidset.go +++ b/pkg/event/targetidset.go @@ -16,23 +16,12 @@ package event -import "fmt" - // TargetIDSet - Set representation of TargetIDs. type TargetIDSet map[TargetID]struct{} -// ToSlice - returns TargetID slice from TargetIDSet. -func (set TargetIDSet) ToSlice() []TargetID { - keys := make([]TargetID, 0, len(set)) - for k := range set { - keys = append(keys, k) - } - return keys -} - -// String - returns string representation. -func (set TargetIDSet) String() string { - return fmt.Sprintf("%v", set.ToSlice()) +// IsEmpty returns true if the set is empty. +func (set TargetIDSet) IsEmpty() bool { + return len(set) != 0 } // Clone - returns copy of this set. diff --git a/pkg/event/targetidset_test.go b/pkg/event/targetidset_test.go index a015867f0..b4eb03f45 100644 --- a/pkg/event/targetidset_test.go +++ b/pkg/event/targetidset_test.go @@ -21,56 +21,6 @@ import ( "testing" ) -func TestTargetIDSetToSlice(t *testing.T) { - testCases := []struct { - set TargetIDSet - expectedResult []TargetID - }{ - {NewTargetIDSet(), []TargetID{}}, - {NewTargetIDSet(TargetID{"1", "webhook"}), []TargetID{{"1", "webhook"}}}, - {NewTargetIDSet(TargetID{"1", "webhook"}, TargetID{"2", "amqp"}), []TargetID{{"1", "webhook"}, {"2", "amqp"}}}, - } - - for i, testCase := range testCases { - result := testCase.set.ToSlice() - - if len(result) != len(testCase.expectedResult) { - t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result) - } - - for _, targetID1 := range result { - var found bool - for _, targetID2 := range testCase.expectedResult { - if reflect.DeepEqual(targetID1, targetID2) { - found = true - break - } - } - if !found { - t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result) - } - } - } -} - -func TestTargetIDSetString(t *testing.T) { - testCases := []struct { - set TargetIDSet - expectedResult string - }{ - {NewTargetIDSet(), "[]"}, - {NewTargetIDSet(TargetID{"1", "webhook"}), "[1:webhook]"}, - } - - for i, testCase := range testCases { - result := testCase.set.String() - - if result != testCase.expectedResult { - t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result) - } - } -} - func TestTargetIDSetClone(t *testing.T) { testCases := []struct { set TargetIDSet diff --git a/pkg/event/targetlist.go b/pkg/event/targetlist.go index 333b3ec6d..4b97e4729 100644 --- a/pkg/event/targetlist.go +++ b/pkg/event/targetlist.go @@ -61,8 +61,9 @@ func (list *TargetList) Exists(id TargetID) bool { return found } -// TargetIDErr returns error associated for a targetID -type TargetIDErr struct { +// TargetIDResult returns result of Remove/Send operation, sets err if +// any for the associated TargetID +type TargetIDResult struct { // ID where the remove or send were initiated. ID TargetID // Stores any error while removing a target or while sending an event. @@ -70,40 +71,17 @@ type TargetIDErr struct { } // Remove - closes and removes targets by given target IDs. -func (list *TargetList) Remove(targetids ...TargetID) <-chan TargetIDErr { - errCh := make(chan TargetIDErr) - - go func() { - defer close(errCh) - - var wg sync.WaitGroup - for _, id := range targetids { - list.RLock() - target, ok := list.targets[id] - list.RUnlock() - if ok { - wg.Add(1) - go func(id TargetID, target Target) { - defer wg.Done() - if err := target.Close(); err != nil { - errCh <- TargetIDErr{ - ID: id, - Err: err, - } - } - }(id, target) - } - } - wg.Wait() +func (list *TargetList) Remove(targetIDSet TargetIDSet) { + list.Lock() + defer list.Unlock() - list.Lock() - for _, id := range targetids { + for id := range targetIDSet { + target, ok := list.targets[id] + if ok { + target.Close() delete(list.targets, id) } - list.Unlock() - }() - - return errCh + } } // Targets - list all targets @@ -140,14 +118,10 @@ func (list *TargetList) TargetMap() map[TargetID]Target { } // Send - sends events to targets identified by target IDs. -func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetIDErr { - errCh := make(chan TargetIDErr) - +func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { go func() { - defer close(errCh) - var wg sync.WaitGroup - for _, id := range targetIDs { + for id := range targetIDset { list.RLock() target, ok := list.targets[id] list.RUnlock() @@ -155,19 +129,18 @@ func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetID wg.Add(1) go func(id TargetID, target Target) { defer wg.Done() + tgtRes := TargetIDResult{ID: id} if err := target.Save(event); err != nil { - errCh <- TargetIDErr{ - ID: id, - Err: err, - } + tgtRes.Err = err } + resCh <- tgtRes }(id, target) + } else { + resCh <- TargetIDResult{ID: id} } } wg.Wait() }() - - return errCh } // NewTargetList - creates TargetList. diff --git a/pkg/event/targetlist_test.go b/pkg/event/targetlist_test.go index 13a88cd9e..905b442e4 100644 --- a/pkg/event/targetlist_test.go +++ b/pkg/event/targetlist_test.go @@ -163,40 +163,6 @@ func TestTargetListExists(t *testing.T) { } } -func TestTargetListRemove(t *testing.T) { - targetListCase1 := NewTargetList() - - targetListCase2 := NewTargetList() - if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil { - panic(err) - } - - targetListCase3 := NewTargetList() - if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, true}); err != nil { - panic(err) - } - - testCases := []struct { - targetList *TargetList - targetID TargetID - expectErr bool - }{ - {targetListCase1, TargetID{"1", "webhook"}, false}, - {targetListCase2, TargetID{"1", "webhook"}, false}, - {targetListCase3, TargetID{"3", "testcase"}, true}, - } - - for i, testCase := range testCases { - errCh := testCase.targetList.Remove(testCase.targetID) - err := <-errCh - expectErr := (err.Err != nil) - - if expectErr != testCase.expectErr { - t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) - } - } -} - func TestTargetListList(t *testing.T) { targetListCase1 := NewTargetList() @@ -273,10 +239,13 @@ func TestTargetListSend(t *testing.T) { {targetListCase4, TargetID{"4", "testcase"}, true}, } + resCh := make(chan TargetIDResult) for i, testCase := range testCases { - errCh := testCase.targetList.Send(Event{}, testCase.targetID) - err := <-errCh - expectErr := (err.Err != nil) + testCase.targetList.Send(Event{}, map[TargetID]struct{}{ + testCase.targetID: {}, + }, resCh) + res := <-resCh + expectErr := (res.Err != nil) if expectErr != testCase.expectErr { t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)