You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
323 lines
10 KiB
323 lines
10 KiB
/*
|
|
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/event"
|
|
xnet "github.com/minio/minio/pkg/net"
|
|
)
|
|
|
|
const s3Path = "/s3/remote"
|
|
|
|
// PeerRPCReceiver - Peer RPC receiver for peer RPC server.
|
|
type PeerRPCReceiver struct {
|
|
AuthRPCServer
|
|
}
|
|
|
|
// DeleteBucketArgs - delete bucket RPC arguments.
|
|
type DeleteBucketArgs struct {
|
|
AuthRPCArgs
|
|
BucketName string
|
|
}
|
|
|
|
// DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object.
|
|
func (receiver *PeerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *AuthRPCArgs) error {
|
|
globalNotificationSys.RemoveNotification(args.BucketName)
|
|
return nil
|
|
}
|
|
|
|
// UpdateBucketPolicyArgs - update bucket policy RPC arguments.
|
|
type UpdateBucketPolicyArgs struct {
|
|
AuthRPCArgs
|
|
BucketName string
|
|
}
|
|
|
|
// UpdateBucketPolicy - handles update bucket policy RPC call which sets bucket policies to given bucket in global BucketPolicies object.
|
|
func (receiver *PeerRPCReceiver) UpdateBucketPolicy(args *UpdateBucketPolicyArgs, reply *AuthRPCArgs) error {
|
|
objectAPI := newObjectLayerFn()
|
|
if objectAPI == nil {
|
|
// If the object layer is just coming up then it will load the policy from the disk.
|
|
return nil
|
|
}
|
|
return objectAPI.RefreshBucketPolicy(context.Background(), args.BucketName)
|
|
}
|
|
|
|
// PutBucketNotificationArgs - put bucket notification RPC arguments.
|
|
type PutBucketNotificationArgs struct {
|
|
AuthRPCArgs
|
|
BucketName string
|
|
RulesMap event.RulesMap
|
|
}
|
|
|
|
// PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object.
|
|
func (receiver *PeerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *AuthRPCReply) error {
|
|
if err := args.IsAuthenticated(); err != nil {
|
|
return err
|
|
}
|
|
|
|
globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap)
|
|
return nil
|
|
}
|
|
|
|
// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
|
|
type ListenBucketNotificationArgs struct {
|
|
AuthRPCArgs `json:"-"`
|
|
BucketName string `json:"-"`
|
|
EventNames []event.Name `json:"eventNames"`
|
|
Pattern string `json:"pattern"`
|
|
TargetID event.TargetID `json:"targetId"`
|
|
Addr xnet.Host `json:"addr"`
|
|
}
|
|
|
|
// ListenBucketNotification - handles listen bucket notification RPC call. It creates PeerRPCClient target which pushes requested events to target in remote peer.
|
|
func (receiver *PeerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *AuthRPCReply) error {
|
|
if err := args.IsAuthenticated(); err != nil {
|
|
return err
|
|
}
|
|
|
|
rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr)
|
|
if rpcClient == nil {
|
|
return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr)
|
|
}
|
|
|
|
target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient)
|
|
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
|
|
if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil {
|
|
reqInfo := &logger.ReqInfo{BucketName: target.bucketName}
|
|
reqInfo.AppendTags("targetName", target.id.Name)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RemoteTargetExistArgs - remote target ID exist RPC arguments.
|
|
type RemoteTargetExistArgs struct {
|
|
AuthRPCArgs
|
|
BucketName string
|
|
TargetID event.TargetID
|
|
}
|
|
|
|
// RemoteTargetExistReply - remote target ID exist RPC reply.
|
|
type RemoteTargetExistReply struct {
|
|
AuthRPCReply
|
|
Exist bool
|
|
}
|
|
|
|
// RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not.
|
|
func (receiver *PeerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *RemoteTargetExistReply) error {
|
|
reply.Exist = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID)
|
|
return nil
|
|
}
|
|
|
|
// SendEventArgs - send event RPC arguments.
|
|
type SendEventArgs struct {
|
|
AuthRPCArgs
|
|
Event event.Event
|
|
TargetID event.TargetID
|
|
BucketName string
|
|
}
|
|
|
|
// SendEventReply - send event RPC reply.
|
|
type SendEventReply struct {
|
|
AuthRPCReply
|
|
Error error
|
|
}
|
|
|
|
// SendEvent - handles send event RPC call which sends given event to target by given target ID.
|
|
func (receiver *PeerRPCReceiver) SendEvent(args *SendEventArgs, reply *SendEventReply) error {
|
|
if err := args.IsAuthenticated(); err != nil {
|
|
return err
|
|
}
|
|
|
|
var err error
|
|
if errMap := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID); len(errMap) != 0 {
|
|
var found bool
|
|
if err, found = errMap[args.TargetID]; !found {
|
|
return fmt.Errorf("error for target %v not found in error map %+v", args.TargetID, errMap)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String())
|
|
reqInfo.AppendTags("targetName", args.TargetID.Name)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
|
|
reply.Error = err
|
|
return nil
|
|
}
|
|
|
|
// registerS3PeerRPCRouter - creates and registers Peer RPC server and its router.
|
|
func registerS3PeerRPCRouter(router *mux.Router) error {
|
|
peerRPCServer := newRPCServer()
|
|
if err := peerRPCServer.RegisterName("Peer", &PeerRPCReceiver{}); err != nil {
|
|
logger.LogIf(context.Background(), err)
|
|
return err
|
|
}
|
|
|
|
subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter()
|
|
subrouter.Path(s3Path).Handler(peerRPCServer)
|
|
return nil
|
|
}
|
|
|
|
// PeerRPCClient - peer RPC client talks to peer RPC server.
|
|
type PeerRPCClient struct {
|
|
*AuthRPCClient
|
|
}
|
|
|
|
// DeleteBucket - calls delete bucket RPC.
|
|
func (rpcClient *PeerRPCClient) DeleteBucket(bucketName string) error {
|
|
args := DeleteBucketArgs{BucketName: bucketName}
|
|
reply := AuthRPCReply{}
|
|
return rpcClient.Call("Peer.DeleteBucket", &args, &reply)
|
|
}
|
|
|
|
// UpdateBucketPolicy - calls update bucket policy RPC.
|
|
func (rpcClient *PeerRPCClient) UpdateBucketPolicy(bucketName string) error {
|
|
args := UpdateBucketPolicyArgs{
|
|
BucketName: bucketName,
|
|
}
|
|
reply := AuthRPCReply{}
|
|
return rpcClient.Call("Peer.UpdateBucketPolicy", &args, &reply)
|
|
}
|
|
|
|
// PutBucketNotification - calls put bukcet notification RPC.
|
|
func (rpcClient *PeerRPCClient) PutBucketNotification(bucketName string, rulesMap event.RulesMap) error {
|
|
args := PutBucketNotificationArgs{
|
|
BucketName: bucketName,
|
|
RulesMap: rulesMap,
|
|
}
|
|
reply := AuthRPCReply{}
|
|
return rpcClient.Call("Peer.PutBucketNotification", &args, &reply)
|
|
}
|
|
|
|
// ListenBucketNotification - calls listen bucket notification RPC.
|
|
func (rpcClient *PeerRPCClient) ListenBucketNotification(bucketName string, eventNames []event.Name,
|
|
pattern string, targetID event.TargetID, addr xnet.Host) error {
|
|
args := ListenBucketNotificationArgs{
|
|
BucketName: bucketName,
|
|
EventNames: eventNames,
|
|
Pattern: pattern,
|
|
TargetID: targetID,
|
|
Addr: addr,
|
|
}
|
|
reply := AuthRPCReply{}
|
|
return rpcClient.Call("Peer.ListenBucketNotification", &args, &reply)
|
|
}
|
|
|
|
// RemoteTargetExist - calls remote target ID exist RPC.
|
|
func (rpcClient *PeerRPCClient) RemoteTargetExist(bucketName string, targetID event.TargetID) (bool, error) {
|
|
args := RemoteTargetExistArgs{
|
|
BucketName: bucketName,
|
|
TargetID: targetID,
|
|
}
|
|
|
|
reply := RemoteTargetExistReply{}
|
|
if err := rpcClient.Call("Peer.RemoteTargetExist", &args, &reply); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return reply.Exist, nil
|
|
}
|
|
|
|
// SendEvent - calls send event RPC.
|
|
func (rpcClient *PeerRPCClient) SendEvent(bucketName string, targetID, remoteTargetID event.TargetID, eventData event.Event) error {
|
|
args := SendEventArgs{
|
|
BucketName: bucketName,
|
|
TargetID: remoteTargetID,
|
|
Event: eventData,
|
|
}
|
|
reply := SendEventReply{}
|
|
if err := rpcClient.Call("Peer.SendEvent", &args, &reply); err != nil {
|
|
return err
|
|
}
|
|
|
|
if reply.Error != nil {
|
|
reqInfo := &logger.ReqInfo{BucketName: bucketName}
|
|
reqInfo.AppendTags("targetID", targetID.Name)
|
|
reqInfo.AppendTags("event", eventData.EventName.String())
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, reply.Error)
|
|
globalNotificationSys.RemoveRemoteTarget(bucketName, targetID)
|
|
}
|
|
|
|
return reply.Error
|
|
}
|
|
|
|
// makeRemoteRPCClients - creates Peer RPCClients for given endpoint list.
|
|
func makeRemoteRPCClients(endpoints EndpointList) map[xnet.Host]*PeerRPCClient {
|
|
peerRPCClientMap := make(map[xnet.Host]*PeerRPCClient)
|
|
|
|
cred := globalServerConfig.GetCredential()
|
|
serviceEndpoint := path.Join(minioReservedBucketPath, s3Path)
|
|
for _, hostStr := range GetRemotePeers(endpoints) {
|
|
host, err := xnet.ParseHost(hostStr)
|
|
logger.CriticalIf(context.Background(), err)
|
|
peerRPCClientMap[*host] = &PeerRPCClient{newAuthRPCClient(authConfig{
|
|
accessKey: cred.AccessKey,
|
|
secretKey: cred.SecretKey,
|
|
serverAddr: hostStr,
|
|
serviceEndpoint: serviceEndpoint,
|
|
secureConn: globalIsSSL,
|
|
serviceName: "Peer",
|
|
})}
|
|
}
|
|
|
|
return peerRPCClientMap
|
|
}
|
|
|
|
// PeerRPCClientTarget - RPCClient is an event.Target which sends event to target of remote peer.
|
|
type PeerRPCClientTarget struct {
|
|
id event.TargetID
|
|
remoteTargetID event.TargetID
|
|
rpcClient *PeerRPCClient
|
|
bucketName string
|
|
}
|
|
|
|
// ID - returns target ID.
|
|
func (target *PeerRPCClientTarget) ID() event.TargetID {
|
|
return target.id
|
|
}
|
|
|
|
// Send - sends event to remote peer by making RPC call.
|
|
func (target *PeerRPCClientTarget) Send(eventData event.Event) error {
|
|
return target.rpcClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData)
|
|
}
|
|
|
|
// Close - does nothing and available for interface compatibility.
|
|
func (target *PeerRPCClientTarget) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// NewPeerRPCClientTarget - creates RPCClient target with given target ID available in remote peer.
|
|
func NewPeerRPCClientTarget(bucketName string, targetID event.TargetID, rpcClient *PeerRPCClient) *PeerRPCClientTarget {
|
|
return &PeerRPCClientTarget{
|
|
id: event.TargetID{targetID.ID, targetID.Name + "+" + mustGetUUID()},
|
|
remoteTargetID: targetID,
|
|
bucketName: bucketName,
|
|
rpcClient: rpcClient,
|
|
}
|
|
}
|
|
|