Implement BucketUpdater interface to call BucketMetaState methods. (#3375)

master
Krishna Srinivas 8 years ago committed by Harshavardhana
parent bc9509bc8a
commit 8021061bd8
  1. 5
      cmd/bucket-metadata.go
  2. 24
      cmd/s3-peer-client.go
  3. 12
      cmd/s3-peer-rpc-handlers.go

@ -37,6 +37,11 @@ type BucketMetaState interface {
SendEvent(args *EventArgs) error SendEvent(args *EventArgs) error
} }
// BucketUpdater - Interface implementer calls one of BucketMetaState's methods.
type BucketUpdater interface {
BucketUpdate(client BucketMetaState) error
}
// Type that implements BucketMetaState for local node. // Type that implements BucketMetaState for local node.
type localBucketMetaState struct { type localBucketMetaState struct {
ObjectAPI func() ObjectLayer ObjectAPI func() ObjectLayer

@ -108,7 +108,7 @@ func (s3p s3Peers) GetPeerClient(peer string) BucketMetaState {
// The updates are sent via a type implementing the BucketMetaState // The updates are sent via a type implementing the BucketMetaState
// interface. This makes sure that the local node is directly updated, // interface. This makes sure that the local node is directly updated,
// and remote nodes are updated via RPC calls. // and remote nodes are updated via RPC calls.
func (s3p s3Peers) SendUpdate(peerIndex []int, args interface{}) []error { func (s3p s3Peers) SendUpdate(peerIndex []int, args BucketUpdater) []error {
// peer error array // peer error array
errs := make([]error, len(s3p)) errs := make([]error, len(s3p))
@ -119,27 +119,7 @@ func (s3p s3Peers) SendUpdate(peerIndex []int, args interface{}) []error {
// Function that sends update to peer at `index` // Function that sends update to peer at `index`
sendUpdateToPeer := func(index int) { sendUpdateToPeer := func(index int) {
defer wg.Done() defer wg.Done()
var err error errs[index] = args.BucketUpdate(s3p[index].bmsClient)
// Get BMS client for peer at `index`. The index is
// already checked for being within array bounds.
client := s3p[index].bmsClient
// Make the appropriate bucket metadata update
// according to the argument type
switch v := args.(type) {
case *SetBucketNotificationPeerArgs:
err = client.UpdateBucketNotification(v)
case *SetBucketListenerPeerArgs:
err = client.UpdateBucketListener(v)
case *SetBucketPolicyPeerArgs:
err = client.UpdateBucketPolicy(v)
default:
err = fmt.Errorf("Unknown arg in BucketMetaState updater - %v", args)
}
errs[index] = err
} }
// Special (but common) case of peerIndex == nil, implies send // Special (but common) case of peerIndex == nil, implies send

@ -48,6 +48,10 @@ type SetBucketNotificationPeerArgs struct {
NCfg *notificationConfig NCfg *notificationConfig
} }
func (s *SetBucketNotificationPeerArgs) BucketUpdate(client BucketMetaState) error {
return client.UpdateBucketNotification(s)
}
func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBucketNotificationPeerArgs, reply *GenericReply) error { func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBucketNotificationPeerArgs, reply *GenericReply) error {
// check auth // check auth
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
@ -68,6 +72,10 @@ type SetBucketListenerPeerArgs struct {
LCfg []listenerConfig LCfg []listenerConfig
} }
func (s *SetBucketListenerPeerArgs) BucketUpdate(client BucketMetaState) error {
return client.UpdateBucketListener(s)
}
func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args *SetBucketListenerPeerArgs, reply *GenericReply) error { func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args *SetBucketListenerPeerArgs, reply *GenericReply) error {
// check auth // check auth
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
@ -110,6 +118,10 @@ type SetBucketPolicyPeerArgs struct {
PChBytes []byte PChBytes []byte
} }
func (s *SetBucketPolicyPeerArgs) BucketUpdate(client BucketMetaState) error {
return client.UpdateBucketPolicy(s)
}
// tell receiving server to update a bucket policy // tell receiving server to update a bucket policy
func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args *SetBucketPolicyPeerArgs, reply *GenericReply) error { func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args *SetBucketPolicyPeerArgs, reply *GenericReply) error {
// check auth // check auth

Loading…
Cancel
Save