From 0aabc1d8d9a542d5f05e64f08df65f2107be69c8 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Thu, 13 Oct 2016 09:19:04 -0700 Subject: [PATCH] Use Peer RPC to propagate bucket policy changes (#2891) --- cmd/bucket-policy-handlers.go | 41 +++++++++++++++++----- cmd/bucket-policy-parser.go | 2 +- cmd/bucket-policy.go | 47 ++++++++++++++++--------- cmd/s3-peer-client.go | 11 ++++++ cmd/s3-peer-rpc-handlers.go | 30 ++++++++++++++-- cmd/test-utils_test.go | 30 ++++++++++++---- cmd/web-handlers.go | 7 ++-- cmd/web-handlers_test.go | 65 ++++++++++++++++++++++++++++++++--- 8 files changed, 191 insertions(+), 42 deletions(-) diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 88a982a8c..b37678f5d 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -182,7 +182,7 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht } // Save bucket policy. - if err = writeBucketPolicy(bucket, objAPI, bytes.NewReader(policyBytes), int64(len(policyBytes))); err != nil { + if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil { switch err.(type) { case BucketNameInvalid: writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) @@ -192,13 +192,38 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht return } - // Set the bucket policy in memory. - globalBucketPolicies.SetBucketPolicy(bucket, policy) - // Success. writeSuccessNoContent(w) } +// persistAndNotifyBucketPolicyChange - takes a policyChange argument, +// persists it to storage, and notify nodes in the cluster about the +// change. In-memory state is updated in response to the notification. +func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error { + // FIXME: Race exists between the bucket existence check and + // then updating the bucket policy. + if err := isBucketExist(bucket, objAPI); err != nil { + return err + } + + if pCh.IsRemove { + if err := removeBucketPolicy(bucket, objAPI); err != nil { + return err + } + } else { + if pCh.BktPolicy == nil { + return errInvalidArgument + } + if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil { + return err + } + } + + // Notify all peers (including self) to update in-memory state + S3PeersUpdateBucketPolicy(bucket, pCh) + return nil +} + // DeleteBucketPolicyHandler - DELETE Bucket policy // ----------------- // This implementation of the DELETE operation uses the policy @@ -220,8 +245,9 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r vars := mux.Vars(r) bucket := vars["bucket"] - // Delete bucket access policy. - if err := removeBucketPolicy(bucket, objAPI); err != nil { + // Delete bucket access policy, by passing an empty policy + // struct. + if err := persistAndNotifyBucketPolicyChange(bucket, policyChange{true, nil}, objAPI); err != nil { switch err.(type) { case BucketNameInvalid: writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) @@ -233,9 +259,6 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r return } - // Remove bucket policy. - globalBucketPolicies.RemoveBucketPolicy(bucket) - // Success. writeSuccessNoContent(w) } diff --git a/cmd/bucket-policy-parser.go b/cmd/bucket-policy-parser.go index 23685656d..a3643aba7 100644 --- a/cmd/bucket-policy-parser.go +++ b/cmd/bucket-policy-parser.go @@ -69,7 +69,7 @@ type bucketPolicy struct { func (b bucketPolicy) String() string { bbytes, err := json.Marshal(&b) if err != nil { - errorIf(err, "Unable to unmarshal bucket policy into JSON %#v", b) + errorIf(err, "Unable to marshal bucket policy into JSON %#v", b) return "" } return string(bbytes) diff --git a/cmd/bucket-policy.go b/cmd/bucket-policy.go index 792768d38..c22dddac5 100644 --- a/cmd/bucket-policy.go +++ b/cmd/bucket-policy.go @@ -18,7 +18,7 @@ package cmd import ( "bytes" - "errors" + "encoding/json" "io" "path" "sync" @@ -36,6 +36,16 @@ type bucketPolicies struct { bucketPolicyConfigs map[string]*bucketPolicy } +// Represent a policy change +type policyChange struct { + // isRemove is true if the policy change is to delete the + // policy on a bucket. + IsRemove bool + + // represents the new policy for the bucket + BktPolicy *bucketPolicy +} + // Fetch bucket policy for a given bucket. func (bp bucketPolicies) GetBucketPolicy(bucket string) *bucketPolicy { bp.rwMutex.RLock() @@ -44,24 +54,22 @@ func (bp bucketPolicies) GetBucketPolicy(bucket string) *bucketPolicy { } // Set a new bucket policy for a bucket, this operation will overwrite -// any previous bucketpolicies for the bucket. -func (bp *bucketPolicies) SetBucketPolicy(bucket string, policy *bucketPolicy) error { +// any previous bucket policies for the bucket. +func (bp *bucketPolicies) SetBucketPolicy(bucket string, pCh policyChange) error { bp.rwMutex.Lock() defer bp.rwMutex.Unlock() - if policy == nil { - return errors.New("invalid argument") + + if pCh.IsRemove { + delete(bp.bucketPolicyConfigs, bucket) + } else { + if pCh.BktPolicy == nil { + return errInvalidArgument + } + bp.bucketPolicyConfigs[bucket] = pCh.BktPolicy } - bp.bucketPolicyConfigs[bucket] = policy return nil } -// Remove bucket policy for a bucket, from in-memory map. -func (bp *bucketPolicies) RemoveBucketPolicy(bucket string) { - bp.rwMutex.Lock() - defer bp.rwMutex.Unlock() - delete(bp.bucketPolicyConfigs, bucket) -} - // Loads all bucket policies from persistent layer. func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolicy, err error) { // List buckets to proceed loading all notification configuration. @@ -202,16 +210,21 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error { return nil } -// writeBucketPolicy - save all bucket policies. -func writeBucketPolicy(bucket string, objAPI ObjectLayer, reader io.Reader, size int64) error { +// writeBucketPolicy - save a bucket policy that is assumed to be +// validated. +func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) error { // Verify if bucket actually exists if err := isBucketExist(bucket, objAPI); err != nil { return err } + buf, err := json.Marshal(bpy) + if err != nil { + errorIf(err, "Unable to marshal bucket policy '%v' to JSON", *bpy) + return err + } policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) - sha256sum := "" - if _, err := objAPI.PutObject(minioMetaBucket, policyPath, size, reader, nil, sha256sum); err != nil { + if _, err := objAPI.PutObject(minioMetaBucket, policyPath, int64(len(buf)), bytes.NewReader(buf), nil, ""); err != nil { errorIf(err, "Unable to set policy for the bucket %s", bucket) return errorCause(err) } diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index ad1d2d06f..7b4607040 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -209,3 +209,14 @@ func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) { errorIf(err, "Error sending peer update bucket listener to %s - %v", peer, err) } } + +// S3PeersUpdateBucketPolicy - Sends update bucket policy request to +// all peers. Currently we log an error and continue. +func S3PeersUpdateBucketPolicy(bucket string, pCh policyChange) { + setBPPArgs := &SetBPPArgs{Bucket: bucket, PCh: pCh} + peers := globalS3Peers.GetPeers() + errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketPolicyPeer", setBPPArgs) + for peer, err := range errsMap { + errorIf(err, "Error sending peer update bucket policy to %s - %v", peer, err) + } +} diff --git a/cmd/s3-peer-rpc-handlers.go b/cmd/s3-peer-rpc-handlers.go index 8e6c06366..580bc3a33 100644 --- a/cmd/s3-peer-rpc-handlers.go +++ b/cmd/s3-peer-rpc-handlers.go @@ -118,6 +118,32 @@ func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error { return errServerNotInitialized } - err := globalEventNotifier.SendListenerEvent(args.Arn, args.Event) - return err + return globalEventNotifier.SendListenerEvent(args.Arn, args.Event) +} + +// SetBPPArgs - Arguments collection for SetBucketPolicyPeer RPC call +type SetBPPArgs struct { + // For Auth + GenericArgs + + Bucket string + + // policy config + PCh policyChange +} + +// tell receiving server to update a bucket policy +func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args SetBPPArgs, reply *GenericReply) error { + // check auth + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + + // check if object layer is available. + objAPI := s3.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + return globalBucketPolicies.SetBucketPolicy(args.Bucket, args.PCh) } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index f38199dd8..8c114ddba 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -29,6 +29,7 @@ import ( "io" "io/ioutil" "math/rand" + "net" "net/http" "net/http/httptest" "net/url" @@ -182,22 +183,39 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer { t.Fatalf("Failed obtaining Temp Backend: %s", err) } - // Run TestServer. + srvCmdCfg := serverCmdConfig{ + disks: disks, + storageDisks: storageDisks, + } httpHandler, err := configureServerHandler( - serverCmdConfig{ - disks: disks, - storageDisks: storageDisks, - }, + srvCmdCfg, ) if err != nil { t.Fatalf("Failed to configure one of the RPC services %s", err) } + + // Run TestServer. testServer.Server = httptest.NewServer(httpHandler) + srvCmdCfg.serverAddr = testServer.Server.Listener.Addr().String() + testServer.Obj = objLayer globalObjLayerMutex.Lock() globalObjectAPI = objLayer globalObjLayerMutex.Unlock() + + // initialize peer rpc + _, portStr, err := net.SplitHostPort(srvCmdCfg.serverAddr) + if err != nil { + t.Fatal("Early setup error:", err) + } + globalMinioPort, err = strconv.Atoi(portStr) + if err != nil { + t.Fatal("Early setup error:", err) + } + globalMinioAddr = getLocalAddress(srvCmdCfg) + initGlobalS3Peers(disks) + return testServer } @@ -1617,7 +1635,7 @@ func ExecObjectLayerAPIAnonTest(t *testing.T, testName, bucketName, objectName, Statements: []policyStatement{policyFunc(bucketName, "")}, } - globalBucketPolicies.SetBucketPolicy(bucketName, &policy) + globalBucketPolicies.SetBucketPolicy(bucketName, policyChange{false, &policy}) // now call the handler again with the unsigned/anonymous request, it should be accepted. rec = httptest.NewRecorder() diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index afd458a48..86ba190d7 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -646,7 +646,7 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic } policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, bucketP, args.BucketName, args.Prefix) if len(policyInfo.Statements) == 0 { - if err = removeBucketPolicy(args.BucketName, objectAPI); err != nil { + if err = persistAndNotifyBucketPolicyChange(args.BucketName, policyChange{true, nil}, objectAPI); err != nil { return &json2.Error{Message: err.Error()} } return nil @@ -669,8 +669,9 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic return &json2.Error{Message: getAPIError(s3Error).Description} } - // TODO: update policy statements according to bucket name, prefix and policy arguments. - if err := writeBucketPolicy(args.BucketName, objectAPI, bytes.NewReader(data), int64(len(data))); err != nil { + // TODO: update policy statements according to bucket name, + // prefix and policy arguments. + if err := persistAndNotifyBucketPolicyChange(args.BucketName, policyChange{false, policy}, objectAPI); err != nil { return &json2.Error{Message: err.Error()} } diff --git a/cmd/web-handlers_test.go b/cmd/web-handlers_test.go index ce594ca0e..a9ff398ec 100644 --- a/cmd/web-handlers_test.go +++ b/cmd/web-handlers_test.go @@ -28,6 +28,7 @@ import ( "testing" "github.com/minio/minio-go/pkg/policy" + "github.com/minio/minio-go/pkg/set" ) // Authenticate and get JWT token - will be called before every webrpc handler invocation @@ -834,8 +835,26 @@ func testWebGetBucketPolicyHandler(obj ObjectLayer, instanceType string, t TestE t.Fatal("Unexpected error: ", err) } - policyDoc := "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Action\":[\"s3:GetBucketLocation\",\"s3:ListBucket\"],\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"*\"]},\"Resource\":[\"arn:aws:s3:::" + bucketName + "\"],\"Sid\":\"\"},{\"Action\":[\"s3:GetObject\"],\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"*\"]},\"Resource\":[\"arn:aws:s3:::" + bucketName + "/*\"],\"Sid\":\"\"}]}" - if err := writeBucketPolicy(bucketName, obj, bytes.NewReader([]byte(policyDoc)), int64(len(policyDoc))); err != nil { + policyVal := bucketPolicy{ + Version: "2012-10-17", + Statements: []policyStatement{ + { + Actions: set.CreateStringSet("s3:GetBucketLocation", "s3:ListBucket"), + Effect: "Allow", + Principal: map[string][]string{"AWS": {"*"}}, + Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName), + Sid: "", + }, + { + Actions: set.CreateStringSet("s3:GetObject"), + Effect: "Allow", + Principal: map[string][]string{"AWS": {"*"}}, + Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName + "/*"), + Sid: "", + }, + }, + } + if err := writeBucketPolicy(bucketName, obj, &policyVal); err != nil { t.Fatal("Unexpected error: ", err) } @@ -899,8 +918,46 @@ func testWebListAllBucketPoliciesHandler(obj ObjectLayer, instanceType string, t t.Fatal("Unexpected error: ", err) } - policyDoc := `{"Version":"2012-10-17","Statement":[{"Action":["s3:GetBucketLocation"],"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `"],"Sid":""},{"Action":["s3:ListBucket"],"Condition":{"StringEquals":{"s3:prefix":["hello"]}},"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `"],"Sid":""},{"Action":["s3:ListBucketMultipartUploads"],"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `"],"Sid":""},{"Action":["s3:AbortMultipartUpload","s3:DeleteObject","s3:GetObject","s3:ListMultipartUploadParts","s3:PutObject"],"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `/hello*"],"Sid":""}]}` - if err := writeBucketPolicy(bucketName, obj, bytes.NewReader([]byte(policyDoc)), int64(len(policyDoc))); err != nil { + policyVal := bucketPolicy{ + Version: "2012-10-17", + Statements: []policyStatement{ + { + Actions: set.CreateStringSet("s3:GetBucketLocation"), + Effect: "Allow", + Principal: map[string][]string{"AWS": {"*"}}, + Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName), + Sid: "", + }, + { + Actions: set.CreateStringSet("s3:ListBucket"), + Conditions: map[string]map[string]set.StringSet{ + "StringEquals": { + "s3:prefix": set.CreateStringSet("hello"), + }, + }, + Effect: "Allow", + Principal: map[string][]string{"AWS": {"*"}}, + Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName), + Sid: "", + }, + { + Actions: set.CreateStringSet("s3:ListBucketMultipartUploads"), + Effect: "Allow", + Principal: map[string][]string{"AWS": {"*"}}, + Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName), + Sid: "", + }, + { + Actions: set.CreateStringSet("s3:AbortMultipartUpload", "s3:DeleteObject", + "s3:GetObject", "s3:ListMultipartUploadParts", "s3:PutObject"), + Effect: "Allow", + Principal: map[string][]string{"AWS": {"*"}}, + Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName + "/hello*"), + Sid: "", + }, + }, + } + if err := writeBucketPolicy(bucketName, obj, &policyVal); err != nil { t.Fatal("Unexpected error: ", err) }