/* * Minio Cloud Storage, (C) 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "context" "fmt" "path" "github.com/gorilla/mux" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" ) const s3Path = "/s3/remote" // PeerRPCReceiver - Peer RPC receiver for peer RPC server. type PeerRPCReceiver struct { AuthRPCServer } // DeleteBucketArgs - delete bucket RPC arguments. type DeleteBucketArgs struct { AuthRPCArgs BucketName string } // DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object. func (receiver *PeerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *AuthRPCArgs) error { globalNotificationSys.RemoveNotification(args.BucketName) globalPolicySys.Remove(args.BucketName) return nil } // SetBucketPolicyArgs - set bucket policy RPC arguments. type SetBucketPolicyArgs struct { AuthRPCArgs BucketName string Policy policy.Policy } // SetBucketPolicy - handles set bucket policy RPC call which adds bucket policy to globalPolicySys. func (receiver *PeerRPCReceiver) SetBucketPolicy(args *SetBucketPolicyArgs, reply *AuthRPCArgs) error { globalPolicySys.Set(args.BucketName, args.Policy) return nil } // RemoveBucketPolicyArgs - delete bucket policy RPC arguments. type RemoveBucketPolicyArgs struct { AuthRPCArgs BucketName string } // RemoveBucketPolicy - handles delete bucket policy RPC call which removes bucket policy to globalPolicySys. func (receiver *PeerRPCReceiver) RemoveBucketPolicy(args *RemoveBucketPolicyArgs, reply *AuthRPCArgs) error { globalPolicySys.Remove(args.BucketName) return nil } // PutBucketNotificationArgs - put bucket notification RPC arguments. type PutBucketNotificationArgs struct { AuthRPCArgs BucketName string RulesMap event.RulesMap } // PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object. func (receiver *PeerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *AuthRPCReply) error { if err := args.IsAuthenticated(); err != nil { return err } globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap) return nil } // ListenBucketNotificationArgs - listen bucket notification RPC arguments. type ListenBucketNotificationArgs struct { AuthRPCArgs `json:"-"` BucketName string `json:"-"` EventNames []event.Name `json:"eventNames"` Pattern string `json:"pattern"` TargetID event.TargetID `json:"targetId"` Addr xnet.Host `json:"addr"` } // ListenBucketNotification - handles listen bucket notification RPC call. It creates PeerRPCClient target which pushes requested events to target in remote peer. func (receiver *PeerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *AuthRPCReply) error { if err := args.IsAuthenticated(); err != nil { return err } rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr) if rpcClient == nil { return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr) } target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient) rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID()) if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil { reqInfo := &logger.ReqInfo{BucketName: target.bucketName} reqInfo.AppendTags("targetName", target.id.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err) return err } return nil } // RemoteTargetExistArgs - remote target ID exist RPC arguments. type RemoteTargetExistArgs struct { AuthRPCArgs BucketName string TargetID event.TargetID } // RemoteTargetExistReply - remote target ID exist RPC reply. type RemoteTargetExistReply struct { AuthRPCReply Exist bool } // RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not. func (receiver *PeerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *RemoteTargetExistReply) error { reply.Exist = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID) return nil } // SendEventArgs - send event RPC arguments. type SendEventArgs struct { AuthRPCArgs Event event.Event TargetID event.TargetID BucketName string } // SendEventReply - send event RPC reply. type SendEventReply struct { AuthRPCReply Error error } // SendEvent - handles send event RPC call which sends given event to target by given target ID. func (receiver *PeerRPCReceiver) SendEvent(args *SendEventArgs, reply *SendEventReply) error { if err := args.IsAuthenticated(); err != nil { return err } var err error for _, terr := range globalNotificationSys.send(args.BucketName, args.Event, args.TargetID) { reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String()) reqInfo.AppendTags("targetName", args.TargetID.Name) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, terr.Err) err = terr.Err } reply.Error = err return nil } // registerS3PeerRPCRouter - creates and registers Peer RPC server and its router. func registerS3PeerRPCRouter(router *mux.Router) error { peerRPCServer := newRPCServer() if err := peerRPCServer.RegisterName("Peer", &PeerRPCReceiver{}); err != nil { logger.LogIf(context.Background(), err) return err } subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter() subrouter.Path(s3Path).Handler(peerRPCServer) return nil } // PeerRPCClient - peer RPC client talks to peer RPC server. type PeerRPCClient struct { *AuthRPCClient } // DeleteBucket - calls delete bucket RPC. func (rpcClient *PeerRPCClient) DeleteBucket(bucketName string) error { args := DeleteBucketArgs{BucketName: bucketName} reply := AuthRPCReply{} return rpcClient.Call("Peer.DeleteBucket", &args, &reply) } // SetBucketPolicy - calls set bucket policy RPC. func (rpcClient *PeerRPCClient) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) error { args := SetBucketPolicyArgs{ BucketName: bucketName, Policy: *bucketPolicy, } reply := AuthRPCReply{} return rpcClient.Call("Peer.SetBucketPolicy", &args, &reply) } // RemoveBucketPolicy - calls remove bucket policy RPC. func (rpcClient *PeerRPCClient) RemoveBucketPolicy(bucketName string) error { args := RemoveBucketPolicyArgs{ BucketName: bucketName, } reply := AuthRPCReply{} return rpcClient.Call("Peer.RemoveBucketPolicy", &args, &reply) } // PutBucketNotification - calls put bukcet notification RPC. func (rpcClient *PeerRPCClient) PutBucketNotification(bucketName string, rulesMap event.RulesMap) error { args := PutBucketNotificationArgs{ BucketName: bucketName, RulesMap: rulesMap, } reply := AuthRPCReply{} return rpcClient.Call("Peer.PutBucketNotification", &args, &reply) } // ListenBucketNotification - calls listen bucket notification RPC. func (rpcClient *PeerRPCClient) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, addr xnet.Host) error { args := ListenBucketNotificationArgs{ BucketName: bucketName, EventNames: eventNames, Pattern: pattern, TargetID: targetID, Addr: addr, } reply := AuthRPCReply{} return rpcClient.Call("Peer.ListenBucketNotification", &args, &reply) } // RemoteTargetExist - calls remote target ID exist RPC. func (rpcClient *PeerRPCClient) RemoteTargetExist(bucketName string, targetID event.TargetID) (bool, error) { args := RemoteTargetExistArgs{ BucketName: bucketName, TargetID: targetID, } reply := RemoteTargetExistReply{} if err := rpcClient.Call("Peer.RemoteTargetExist", &args, &reply); err != nil { return false, err } return reply.Exist, nil } // SendEvent - calls send event RPC. func (rpcClient *PeerRPCClient) SendEvent(bucketName string, targetID, remoteTargetID event.TargetID, eventData event.Event) error { args := SendEventArgs{ BucketName: bucketName, TargetID: remoteTargetID, Event: eventData, } reply := SendEventReply{} if err := rpcClient.Call("Peer.SendEvent", &args, &reply); err != nil { return err } if reply.Error != nil { reqInfo := &logger.ReqInfo{BucketName: bucketName} reqInfo.AppendTags("targetID", targetID.Name) reqInfo.AppendTags("event", eventData.EventName.String()) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, reply.Error) globalNotificationSys.RemoveRemoteTarget(bucketName, targetID) } return reply.Error } // makeRemoteRPCClients - creates Peer RPCClients for given endpoint list. func makeRemoteRPCClients(endpoints EndpointList) map[xnet.Host]*PeerRPCClient { peerRPCClientMap := make(map[xnet.Host]*PeerRPCClient) cred := globalServerConfig.GetCredential() serviceEndpoint := path.Join(minioReservedBucketPath, s3Path) for _, hostStr := range GetRemotePeers(endpoints) { host, err := xnet.ParseHost(hostStr) logger.CriticalIf(context.Background(), err) peerRPCClientMap[*host] = &PeerRPCClient{newAuthRPCClient(authConfig{ accessKey: cred.AccessKey, secretKey: cred.SecretKey, serverAddr: hostStr, serviceEndpoint: serviceEndpoint, secureConn: globalIsSSL, serviceName: "Peer", })} } return peerRPCClientMap } // PeerRPCClientTarget - RPCClient is an event.Target which sends event to target of remote peer. type PeerRPCClientTarget struct { id event.TargetID remoteTargetID event.TargetID rpcClient *PeerRPCClient bucketName string } // ID - returns target ID. func (target *PeerRPCClientTarget) ID() event.TargetID { return target.id } // Send - sends event to remote peer by making RPC call. func (target *PeerRPCClientTarget) Send(eventData event.Event) error { return target.rpcClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData) } // Close - does nothing and available for interface compatibility. func (target *PeerRPCClientTarget) Close() error { return nil } // NewPeerRPCClientTarget - creates RPCClient target with given target ID available in remote peer. func NewPeerRPCClientTarget(bucketName string, targetID event.TargetID, rpcClient *PeerRPCClient) *PeerRPCClientTarget { return &PeerRPCClientTarget{ id: event.TargetID{targetID.ID, targetID.Name + "+" + mustGetUUID()}, remoteTargetID: targetID, bucketName: bucketName, rpcClient: rpcClient, } }