/* * Minio Cloud Storage, (C) 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "context" "encoding/json" "encoding/xml" "fmt" "net/url" "path" "sync" "time" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" ) // NotificationSys - notification system. type NotificationSys struct { sync.RWMutex targetList *event.TargetList bucketRulesMap map[string]event.RulesMap bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap peerRPCClientMap map[xnet.Host]*PeerRPCClient } // GetARNList - returns available ARNs. func (sys *NotificationSys) GetARNList() []string { arns := []string{} region := globalServerConfig.GetRegion() for _, targetID := range sys.targetList.List() { arns = append(arns, targetID.ToARN(region).String()) } return arns } // GetPeerRPCClient - returns PeerRPCClient of addr. func (sys *NotificationSys) GetPeerRPCClient(addr xnet.Host) *PeerRPCClient { return sys.peerRPCClientMap[addr] } // NotificationPeerErr returns error associated for a remote peer. type NotificationPeerErr struct { Host xnet.Host // Remote host on which the rpc call was initiated Err error // Error returned by the remote peer for an rpc call } // DeleteBucket - calls DeleteBucket RPC call on all peers. func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) { go func() { var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.DeleteBucket(bucketName); err != nil { logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() } // LoadCredentials - calls LoadCredentials RPC call on all peers. func (sys *NotificationSys) LoadCredentials() map[xnet.Host]error { errors := make(map[xnet.Host]error) var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() // Try to set credentials in three attempts. for i := 0; i < 3; i++ { err := client.LoadCredentials() if err == nil { break } errors[addr] = err // Wait for one second and no need wait after last attempt. if i < 2 { time.Sleep(1 * time.Second) } } }(addr, client) } wg.Wait() return errors } // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) { go func() { var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() } // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers. func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) { go func() { var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.RemoveBucketPolicy(bucketName); err != nil { logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() } // PutBucketNotification - calls PutBucketNotification RPC call on all peers. func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) { go func() { var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { defer wg.Done() if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.LogIf(ctx, err) } }(addr, client, rulesMap.Clone()) } wg.Wait() }() } // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, localPeer xnet.Host) { go func() { var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() } // AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name. func (sys *NotificationSys) AddRemoteTarget(bucketName string, target event.Target, rulesMap event.RulesMap) error { if err := sys.targetList.Add(target); err != nil { return err } sys.Lock() targetMap := sys.bucketRemoteTargetRulesMap[bucketName] if targetMap == nil { targetMap = make(map[event.TargetID]event.RulesMap) } targetMap[target.ID()] = rulesMap.Clone() sys.bucketRemoteTargetRulesMap[bucketName] = targetMap sys.Unlock() sys.AddRulesMap(bucketName, rulesMap) return nil } // RemoteTargetExist - checks whether given target ID is a HTTP/PeerRPC client target or not. func (sys *NotificationSys) RemoteTargetExist(bucketName string, targetID event.TargetID) bool { sys.Lock() defer sys.Unlock() targetMap, ok := sys.bucketRemoteTargetRulesMap[bucketName] if ok { _, ok = targetMap[targetID] } return ok } // initListeners - initializes PeerRPC clients available in listener.json. func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLayer, bucketName string) error { // listener.json is available/applicable only in DistXL mode. if !globalIsDistXL { return nil } // Construct path to listener.json for the given bucket. configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig) transactionConfigFile := configFile + ".transaction" // As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket // and configFile, take a transaction lock to avoid data race between readConfig() // and saveConfig(). objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) if err := objLock.GetLock(globalOperationTimeout); err != nil { return err } defer objLock.Unlock() reader, e := readConfig(ctx, objAPI, configFile) if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) { return e } listenerList := []ListenBucketNotificationArgs{} if reader != nil { err := json.NewDecoder(reader).Decode(&listenerList) if err != nil { logger.LogIf(ctx, err) return err } } if len(listenerList) == 0 { // Nothing to initialize for empty listener list. return nil } activeListenerList := []ListenBucketNotificationArgs{} for _, args := range listenerList { found, err := isLocalHost(args.Addr.Name) if err != nil { logger.GetReqInfo(ctx).AppendTags("host", args.Addr.Name) logger.LogIf(ctx, err) return err } if found { // As this function is called at startup, skip HTTP listener to this host. continue } rpcClient := sys.GetPeerRPCClient(args.Addr) if rpcClient == nil { return fmt.Errorf("unable to find PeerRPCClient by address %v in listener.json for bucket %v", args.Addr, bucketName) } exist, err := rpcClient.RemoteTargetExist(bucketName, args.TargetID) if err != nil { logger.GetReqInfo(ctx).AppendTags("targetID", args.TargetID.Name) logger.LogIf(ctx, err) return err } if !exist { // Skip previously connected HTTP listener which is not found in remote peer. continue } target := NewPeerRPCClientTarget(bucketName, args.TargetID, rpcClient) rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) if err = sys.AddRemoteTarget(bucketName, target, rulesMap); err != nil { logger.GetReqInfo(ctx).AppendTags("targetName", target.id.Name) logger.LogIf(ctx, err) return err } activeListenerList = append(activeListenerList, args) } data, err := json.Marshal(activeListenerList) if err != nil { logger.LogIf(ctx, err) return err } return saveConfig(objAPI, configFile, data) } // Init - initializes notification system from notification.xml and listener.json of all buckets. func (sys *NotificationSys) Init(objAPI ObjectLayer) error { if objAPI == nil { return errInvalidArgument } buckets, err := objAPI.ListBuckets(context.Background()) if err != nil { return err } for _, bucket := range buckets { ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name}) config, err := readNotificationConfig(ctx, objAPI, bucket.Name) if err != nil { if !IsErrIgnored(err, errDiskNotFound, errNoSuchNotifications) { return err } } else { sys.AddRulesMap(bucket.Name, config.ToRulesMap()) } if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil { return err } } return nil } // AddRulesMap - adds rules map for bucket name. func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) { sys.Lock() defer sys.Unlock() rulesMap = rulesMap.Clone() for _, targetRulesMap := range sys.bucketRemoteTargetRulesMap[bucketName] { rulesMap.Add(targetRulesMap) } rulesMap.Add(sys.bucketRulesMap[bucketName]) sys.bucketRulesMap[bucketName] = rulesMap } // RemoveRulesMap - removes rules map for bucket name. func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) { sys.Lock() defer sys.Unlock() sys.bucketRulesMap[bucketName].Remove(rulesMap) if len(sys.bucketRulesMap[bucketName]) == 0 { delete(sys.bucketRulesMap, bucketName) } } // RemoveNotification - removes all notification configuration for bucket name. func (sys *NotificationSys) RemoveNotification(bucketName string) { sys.Lock() defer sys.Unlock() delete(sys.bucketRulesMap, bucketName) for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] { sys.targetList.Remove(targetID) delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID) } delete(sys.bucketRemoteTargetRulesMap, bucketName) } // RemoveAllRemoteTargets - closes and removes all HTTP/PeerRPC client targets. func (sys *NotificationSys) RemoveAllRemoteTargets() { for _, targetMap := range sys.bucketRemoteTargetRulesMap { for targetID := range targetMap { sys.targetList.Remove(targetID) } } } // RemoveRemoteTarget - closes and removes target by target ID. func (sys *NotificationSys) RemoveRemoteTarget(bucketName string, targetID event.TargetID) { for terr := range sys.targetList.Remove(targetID) { reqInfo := (&logger.ReqInfo{}).AppendTags("targetID", terr.ID.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, terr.Err) } sys.Lock() defer sys.Unlock() if _, ok := sys.bucketRemoteTargetRulesMap[bucketName]; ok { delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID) if len(sys.bucketRemoteTargetRulesMap[bucketName]) == 0 { delete(sys.bucketRemoteTargetRulesMap, bucketName) } } } func (sys *NotificationSys) send(bucketName string, eventData event.Event, targetIDs ...event.TargetID) (errs []event.TargetIDErr) { errCh := sys.targetList.Send(eventData, targetIDs...) for terr := range errCh { errs = append(errs, terr) if sys.RemoteTargetExist(bucketName, terr.ID) { sys.RemoveRemoteTarget(bucketName, terr.ID) } } return errs } // Send - sends event data to all matching targets. func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr { sys.RLock() targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) sys.RUnlock() if len(targetIDSet) == 0 { return nil } targetIDs := targetIDSet.ToSlice() return sys.send(args.BucketName, args.ToEvent(), targetIDs...) } // NewNotificationSys - creates new notification system object. func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys { targetList := getNotificationTargets(config) peerRPCClientMap := makeRemoteRPCClients(endpoints) // bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init() return &NotificationSys{ targetList: targetList, bucketRulesMap: make(map[string]event.RulesMap), bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), peerRPCClientMap: peerRPCClientMap, } } type eventArgs struct { EventName event.Name BucketName string Object ObjectInfo ReqParams map[string]string Host string Port string UserAgent string } // ToEvent - converts to notification event. func (args eventArgs) ToEvent() event.Event { getOriginEndpoint := func() string { host := globalMinioHost if host == "" { // FIXME: Send FQDN or hostname of this machine than sending IP address. host = localIP4.ToSlice()[0] } return fmt.Sprintf("%s://%s:%s", getURLScheme(globalIsSSL), host, globalMinioPort) } creds := globalServerConfig.GetCredential() eventTime := UTCNow() uniqueID := fmt.Sprintf("%X", eventTime.UnixNano()) newEvent := event.Event{ EventVersion: "2.0", EventSource: "minio:s3", AwsRegion: globalServerConfig.GetRegion(), EventTime: eventTime.Format(event.AMZTimeFormat), EventName: args.EventName, UserIdentity: event.Identity{creds.AccessKey}, RequestParameters: args.ReqParams, ResponseElements: map[string]string{ "x-amz-request-id": uniqueID, "x-minio-origin-endpoint": getOriginEndpoint(), // Minio specific custom elements. }, S3: event.Metadata{ SchemaVersion: "1.0", ConfigurationID: "Config", Bucket: event.Bucket{ Name: args.BucketName, OwnerIdentity: event.Identity{creds.AccessKey}, ARN: policy.ResourceARNPrefix + args.BucketName, }, Object: event.Object{ Key: url.QueryEscape(args.Object.Name), VersionID: "1", Sequencer: uniqueID, }, }, Source: event.Source{ Host: args.Host, Port: args.Port, UserAgent: args.UserAgent, }, } if args.EventName != event.ObjectRemovedDelete { newEvent.S3.Object.ETag = args.Object.ETag newEvent.S3.Object.Size = args.Object.Size newEvent.S3.Object.ContentType = args.Object.ContentType newEvent.S3.Object.UserMetadata = args.Object.UserDefined } return newEvent } func sendEvent(args eventArgs) { // globalNotificationSys is not initialized in gateway mode. if globalNotificationSys == nil { return } notifyCh := globalNotificationSys.Send(args) go func() { for _, err := range notifyCh { reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name} reqInfo.AppendTags("EventName", args.EventName.String()) reqInfo.AppendTags("targetID", err.ID.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogOnceIf(ctx, err.Err, err.ID) } }() } func readNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucketName string) (*event.Config, error) { // Construct path to notification.xml for the given bucket. configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig) reader, err := readConfig(ctx, objAPI, configFile) if err != nil { if err == errConfigNotFound { err = errNoSuchNotifications } return nil, err } config, err := event.ParseConfig(reader, globalServerConfig.GetRegion(), globalNotificationSys.targetList) logger.LogIf(ctx, err) return config, err } func saveNotificationConfig(objAPI ObjectLayer, bucketName string, config *event.Config) error { data, err := xml.Marshal(config) if err != nil { return err } configFile := path.Join(bucketConfigPrefix, bucketName, bucketNotificationConfig) return saveConfig(objAPI, configFile, data) } // SaveListener - saves HTTP client currently listening for events to listener.json. func SaveListener(objAPI ObjectLayer, bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, addr xnet.Host) error { // listener.json is available/applicable only in DistXL mode. if !globalIsDistXL { return nil } ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName}) // Construct path to listener.json for the given bucket. configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig) transactionConfigFile := configFile + ".transaction" // As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket // and configFile, take a transaction lock to avoid data race between readConfig() // and saveConfig(). objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) if err := objLock.GetLock(globalOperationTimeout); err != nil { return err } defer objLock.Unlock() reader, err := readConfig(ctx, objAPI, configFile) if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) { return err } listenerList := []ListenBucketNotificationArgs{} if reader != nil { if err = json.NewDecoder(reader).Decode(&listenerList); err != nil { logger.LogIf(ctx, err) return err } } listenerList = append(listenerList, ListenBucketNotificationArgs{ EventNames: eventNames, Pattern: pattern, TargetID: targetID, Addr: addr, }) data, err := json.Marshal(listenerList) if err != nil { logger.LogIf(ctx, err) return err } return saveConfig(objAPI, configFile, data) } // RemoveListener - removes HTTP client currently listening for events from listener.json. func RemoveListener(objAPI ObjectLayer, bucketName string, targetID event.TargetID, addr xnet.Host) error { // listener.json is available/applicable only in DistXL mode. if !globalIsDistXL { return nil } ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucketName}) // Construct path to listener.json for the given bucket. configFile := path.Join(bucketConfigPrefix, bucketName, bucketListenerConfig) transactionConfigFile := configFile + ".transaction" // As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket // and configFile, take a transaction lock to avoid data race between readConfig() // and saveConfig(). objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) if err := objLock.GetLock(globalOperationTimeout); err != nil { return err } defer objLock.Unlock() reader, err := readConfig(ctx, objAPI, configFile) if err != nil && !IsErrIgnored(err, errDiskNotFound, errConfigNotFound) { return err } listenerList := []ListenBucketNotificationArgs{} if reader != nil { if err = json.NewDecoder(reader).Decode(&listenerList); err != nil { logger.LogIf(ctx, err) return err } } if len(listenerList) == 0 { // Nothing to remove. return nil } activeListenerList := []ListenBucketNotificationArgs{} for _, args := range listenerList { if args.TargetID == targetID && args.Addr.Equal(addr) { // Skip if matches continue } activeListenerList = append(activeListenerList, args) } data, err := json.Marshal(activeListenerList) if err != nil { logger.LogIf(ctx, err) return err } return saveConfig(objAPI, configFile, data) }