Use Peer RPC to propagate bucket policy changes (#2891)

master
Aditya Manthramurthy 8 years ago committed by Harshavardhana
parent 55f6828750
commit 0aabc1d8d9
  1. 41
      cmd/bucket-policy-handlers.go
  2. 2
      cmd/bucket-policy-parser.go
  3. 47
      cmd/bucket-policy.go
  4. 11
      cmd/s3-peer-client.go
  5. 30
      cmd/s3-peer-rpc-handlers.go
  6. 30
      cmd/test-utils_test.go
  7. 7
      cmd/web-handlers.go
  8. 65
      cmd/web-handlers_test.go

@ -182,7 +182,7 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
}
// Save bucket policy.
if err = writeBucketPolicy(bucket, objAPI, bytes.NewReader(policyBytes), int64(len(policyBytes))); err != nil {
if err = persistAndNotifyBucketPolicyChange(bucket, policyChange{false, policy}, objAPI); err != nil {
switch err.(type) {
case BucketNameInvalid:
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
@ -192,13 +192,38 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
return
}
// Set the bucket policy in memory.
globalBucketPolicies.SetBucketPolicy(bucket, policy)
// Success.
writeSuccessNoContent(w)
}
// persistAndNotifyBucketPolicyChange - takes a policyChange argument,
// persists it to storage, and notify nodes in the cluster about the
// change. In-memory state is updated in response to the notification.
func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI ObjectLayer) error {
// FIXME: Race exists between the bucket existence check and
// then updating the bucket policy.
if err := isBucketExist(bucket, objAPI); err != nil {
return err
}
if pCh.IsRemove {
if err := removeBucketPolicy(bucket, objAPI); err != nil {
return err
}
} else {
if pCh.BktPolicy == nil {
return errInvalidArgument
}
if err := writeBucketPolicy(bucket, objAPI, pCh.BktPolicy); err != nil {
return err
}
}
// Notify all peers (including self) to update in-memory state
S3PeersUpdateBucketPolicy(bucket, pCh)
return nil
}
// DeleteBucketPolicyHandler - DELETE Bucket policy
// -----------------
// This implementation of the DELETE operation uses the policy
@ -220,8 +245,9 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
vars := mux.Vars(r)
bucket := vars["bucket"]
// Delete bucket access policy.
if err := removeBucketPolicy(bucket, objAPI); err != nil {
// Delete bucket access policy, by passing an empty policy
// struct.
if err := persistAndNotifyBucketPolicyChange(bucket, policyChange{true, nil}, objAPI); err != nil {
switch err.(type) {
case BucketNameInvalid:
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
@ -233,9 +259,6 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
return
}
// Remove bucket policy.
globalBucketPolicies.RemoveBucketPolicy(bucket)
// Success.
writeSuccessNoContent(w)
}

@ -69,7 +69,7 @@ type bucketPolicy struct {
func (b bucketPolicy) String() string {
bbytes, err := json.Marshal(&b)
if err != nil {
errorIf(err, "Unable to unmarshal bucket policy into JSON %#v", b)
errorIf(err, "Unable to marshal bucket policy into JSON %#v", b)
return ""
}
return string(bbytes)

@ -18,7 +18,7 @@ package cmd
import (
"bytes"
"errors"
"encoding/json"
"io"
"path"
"sync"
@ -36,6 +36,16 @@ type bucketPolicies struct {
bucketPolicyConfigs map[string]*bucketPolicy
}
// Represent a policy change
type policyChange struct {
// isRemove is true if the policy change is to delete the
// policy on a bucket.
IsRemove bool
// represents the new policy for the bucket
BktPolicy *bucketPolicy
}
// Fetch bucket policy for a given bucket.
func (bp bucketPolicies) GetBucketPolicy(bucket string) *bucketPolicy {
bp.rwMutex.RLock()
@ -44,24 +54,22 @@ func (bp bucketPolicies) GetBucketPolicy(bucket string) *bucketPolicy {
}
// Set a new bucket policy for a bucket, this operation will overwrite
// any previous bucketpolicies for the bucket.
func (bp *bucketPolicies) SetBucketPolicy(bucket string, policy *bucketPolicy) error {
// any previous bucket policies for the bucket.
func (bp *bucketPolicies) SetBucketPolicy(bucket string, pCh policyChange) error {
bp.rwMutex.Lock()
defer bp.rwMutex.Unlock()
if policy == nil {
return errors.New("invalid argument")
if pCh.IsRemove {
delete(bp.bucketPolicyConfigs, bucket)
} else {
if pCh.BktPolicy == nil {
return errInvalidArgument
}
bp.bucketPolicyConfigs[bucket] = pCh.BktPolicy
}
bp.bucketPolicyConfigs[bucket] = policy
return nil
}
// Remove bucket policy for a bucket, from in-memory map.
func (bp *bucketPolicies) RemoveBucketPolicy(bucket string) {
bp.rwMutex.Lock()
defer bp.rwMutex.Unlock()
delete(bp.bucketPolicyConfigs, bucket)
}
// Loads all bucket policies from persistent layer.
func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolicy, err error) {
// List buckets to proceed loading all notification configuration.
@ -202,16 +210,21 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
return nil
}
// writeBucketPolicy - save all bucket policies.
func writeBucketPolicy(bucket string, objAPI ObjectLayer, reader io.Reader, size int64) error {
// writeBucketPolicy - save a bucket policy that is assumed to be
// validated.
func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) error {
// Verify if bucket actually exists
if err := isBucketExist(bucket, objAPI); err != nil {
return err
}
buf, err := json.Marshal(bpy)
if err != nil {
errorIf(err, "Unable to marshal bucket policy '%v' to JSON", *bpy)
return err
}
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
sha256sum := ""
if _, err := objAPI.PutObject(minioMetaBucket, policyPath, size, reader, nil, sha256sum); err != nil {
if _, err := objAPI.PutObject(minioMetaBucket, policyPath, int64(len(buf)), bytes.NewReader(buf), nil, ""); err != nil {
errorIf(err, "Unable to set policy for the bucket %s", bucket)
return errorCause(err)
}

@ -209,3 +209,14 @@ func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) {
errorIf(err, "Error sending peer update bucket listener to %s - %v", peer, err)
}
}
// S3PeersUpdateBucketPolicy - Sends update bucket policy request to
// all peers. Currently we log an error and continue.
func S3PeersUpdateBucketPolicy(bucket string, pCh policyChange) {
setBPPArgs := &SetBPPArgs{Bucket: bucket, PCh: pCh}
peers := globalS3Peers.GetPeers()
errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketPolicyPeer", setBPPArgs)
for peer, err := range errsMap {
errorIf(err, "Error sending peer update bucket policy to %s - %v", peer, err)
}
}

@ -118,6 +118,32 @@ func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error {
return errServerNotInitialized
}
err := globalEventNotifier.SendListenerEvent(args.Arn, args.Event)
return err
return globalEventNotifier.SendListenerEvent(args.Arn, args.Event)
}
// SetBPPArgs - Arguments collection for SetBucketPolicyPeer RPC call
type SetBPPArgs struct {
// For Auth
GenericArgs
Bucket string
// policy config
PCh policyChange
}
// tell receiving server to update a bucket policy
func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args SetBPPArgs, reply *GenericReply) error {
// check auth
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
// check if object layer is available.
objAPI := s3.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
return globalBucketPolicies.SetBucketPolicy(args.Bucket, args.PCh)
}

@ -29,6 +29,7 @@ import (
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"net/url"
@ -182,22 +183,39 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
t.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
}
// Run TestServer.
srvCmdCfg := serverCmdConfig{
disks: disks,
storageDisks: storageDisks,
}
httpHandler, err := configureServerHandler(
serverCmdConfig{
disks: disks,
storageDisks: storageDisks,
},
srvCmdCfg,
)
if err != nil {
t.Fatalf("Failed to configure one of the RPC services <ERROR> %s", err)
}
// Run TestServer.
testServer.Server = httptest.NewServer(httpHandler)
srvCmdCfg.serverAddr = testServer.Server.Listener.Addr().String()
testServer.Obj = objLayer
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
globalObjLayerMutex.Unlock()
// initialize peer rpc
_, portStr, err := net.SplitHostPort(srvCmdCfg.serverAddr)
if err != nil {
t.Fatal("Early setup error:", err)
}
globalMinioPort, err = strconv.Atoi(portStr)
if err != nil {
t.Fatal("Early setup error:", err)
}
globalMinioAddr = getLocalAddress(srvCmdCfg)
initGlobalS3Peers(disks)
return testServer
}
@ -1617,7 +1635,7 @@ func ExecObjectLayerAPIAnonTest(t *testing.T, testName, bucketName, objectName,
Statements: []policyStatement{policyFunc(bucketName, "")},
}
globalBucketPolicies.SetBucketPolicy(bucketName, &policy)
globalBucketPolicies.SetBucketPolicy(bucketName, policyChange{false, &policy})
// now call the handler again with the unsigned/anonymous request, it should be accepted.
rec = httptest.NewRecorder()

@ -646,7 +646,7 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
}
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, bucketP, args.BucketName, args.Prefix)
if len(policyInfo.Statements) == 0 {
if err = removeBucketPolicy(args.BucketName, objectAPI); err != nil {
if err = persistAndNotifyBucketPolicyChange(args.BucketName, policyChange{true, nil}, objectAPI); err != nil {
return &json2.Error{Message: err.Error()}
}
return nil
@ -669,8 +669,9 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
return &json2.Error{Message: getAPIError(s3Error).Description}
}
// TODO: update policy statements according to bucket name, prefix and policy arguments.
if err := writeBucketPolicy(args.BucketName, objectAPI, bytes.NewReader(data), int64(len(data))); err != nil {
// TODO: update policy statements according to bucket name,
// prefix and policy arguments.
if err := persistAndNotifyBucketPolicyChange(args.BucketName, policyChange{false, policy}, objectAPI); err != nil {
return &json2.Error{Message: err.Error()}
}

@ -28,6 +28,7 @@ import (
"testing"
"github.com/minio/minio-go/pkg/policy"
"github.com/minio/minio-go/pkg/set"
)
// Authenticate and get JWT token - will be called before every webrpc handler invocation
@ -834,8 +835,26 @@ func testWebGetBucketPolicyHandler(obj ObjectLayer, instanceType string, t TestE
t.Fatal("Unexpected error: ", err)
}
policyDoc := "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Action\":[\"s3:GetBucketLocation\",\"s3:ListBucket\"],\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"*\"]},\"Resource\":[\"arn:aws:s3:::" + bucketName + "\"],\"Sid\":\"\"},{\"Action\":[\"s3:GetObject\"],\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"*\"]},\"Resource\":[\"arn:aws:s3:::" + bucketName + "/*\"],\"Sid\":\"\"}]}"
if err := writeBucketPolicy(bucketName, obj, bytes.NewReader([]byte(policyDoc)), int64(len(policyDoc))); err != nil {
policyVal := bucketPolicy{
Version: "2012-10-17",
Statements: []policyStatement{
{
Actions: set.CreateStringSet("s3:GetBucketLocation", "s3:ListBucket"),
Effect: "Allow",
Principal: map[string][]string{"AWS": {"*"}},
Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName),
Sid: "",
},
{
Actions: set.CreateStringSet("s3:GetObject"),
Effect: "Allow",
Principal: map[string][]string{"AWS": {"*"}},
Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName + "/*"),
Sid: "",
},
},
}
if err := writeBucketPolicy(bucketName, obj, &policyVal); err != nil {
t.Fatal("Unexpected error: ", err)
}
@ -899,8 +918,46 @@ func testWebListAllBucketPoliciesHandler(obj ObjectLayer, instanceType string, t
t.Fatal("Unexpected error: ", err)
}
policyDoc := `{"Version":"2012-10-17","Statement":[{"Action":["s3:GetBucketLocation"],"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `"],"Sid":""},{"Action":["s3:ListBucket"],"Condition":{"StringEquals":{"s3:prefix":["hello"]}},"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `"],"Sid":""},{"Action":["s3:ListBucketMultipartUploads"],"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `"],"Sid":""},{"Action":["s3:AbortMultipartUpload","s3:DeleteObject","s3:GetObject","s3:ListMultipartUploadParts","s3:PutObject"],"Effect":"Allow","Principal":{"AWS":["*"]},"Resource":["arn:aws:s3:::` + bucketName + `/hello*"],"Sid":""}]}`
if err := writeBucketPolicy(bucketName, obj, bytes.NewReader([]byte(policyDoc)), int64(len(policyDoc))); err != nil {
policyVal := bucketPolicy{
Version: "2012-10-17",
Statements: []policyStatement{
{
Actions: set.CreateStringSet("s3:GetBucketLocation"),
Effect: "Allow",
Principal: map[string][]string{"AWS": {"*"}},
Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName),
Sid: "",
},
{
Actions: set.CreateStringSet("s3:ListBucket"),
Conditions: map[string]map[string]set.StringSet{
"StringEquals": {
"s3:prefix": set.CreateStringSet("hello"),
},
},
Effect: "Allow",
Principal: map[string][]string{"AWS": {"*"}},
Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName),
Sid: "",
},
{
Actions: set.CreateStringSet("s3:ListBucketMultipartUploads"),
Effect: "Allow",
Principal: map[string][]string{"AWS": {"*"}},
Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName),
Sid: "",
},
{
Actions: set.CreateStringSet("s3:AbortMultipartUpload", "s3:DeleteObject",
"s3:GetObject", "s3:ListMultipartUploadParts", "s3:PutObject"),
Effect: "Allow",
Principal: map[string][]string{"AWS": {"*"}},
Resources: set.CreateStringSet("arn:aws:s3:::" + bucketName + "/hello*"),
Sid: "",
},
},
}
if err := writeBucketPolicy(bucketName, obj, &policyVal); err != nil {
t.Fatal("Unexpected error: ", err)
}

Loading…
Cancel
Save