From 6199aa0707ee9ca0c8e781f1198ee6aa802c3abb Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Wed, 12 Oct 2016 01:03:50 -0700 Subject: [PATCH] Peer RPCs for bucket notifications (#2877) * Implements a Peer RPC router that sends info to all Minio servers in the cluster. * Bucket notifications are propagated to all nodes via this RPC router. * Bucket listener configuration is persisted to separate object layer file (`listener.json`) and peer RPCs are used to communicate changes throughout the cluster. * When events are generated, RPC calls to send them to other servers where bucket listeners may be connected is implemented. * Some bucket notification tests are now disabled as they cannot work in the new design. * Minor fix in `funcFromPC` to use `path.Join` --- cmd/bucket-notification-datatypes.go | 32 +- cmd/bucket-notification-handlers.go | 145 ++++++-- cmd/bucket-notification-handlers_test.go | 351 ++---------------- cmd/bucket-notification-utils.go | 30 -- cmd/bucket-notification-utils_test.go | 36 -- cmd/control-router.go | 18 - cmd/control-router_test.go | 65 +--- cmd/event-notifier.go | 431 ++++++++++++++++------ cmd/event-notifier_test.go | 451 +++++++++++++---------- cmd/globals.go | 5 + cmd/logger.go | 5 +- cmd/notify-listener.go | 82 +++++ cmd/routers.go | 3 + cmd/s3-peer-client.go | 176 +++++++++ cmd/s3-peer-router.go | 43 +++ cmd/s3-peer-rpc-handlers.go | 123 +++++++ cmd/server-main.go | 6 + cmd/server-startup-msg.go | 4 +- cmd/server_test.go | 119 ------ cmd/server_v2_test.go | 119 ------ cmd/signature-v4.go | 6 +- cmd/test-utils_test.go | 68 +++- cmd/utils.go | 29 +- cmd/utils_test.go | 63 ++++ 24 files changed, 1331 insertions(+), 1079 deletions(-) create mode 100644 cmd/notify-listener.go create mode 100644 cmd/s3-peer-client.go create mode 100644 cmd/s3-peer-router.go create mode 100644 cmd/s3-peer-rpc-handlers.go diff --git a/cmd/bucket-notification-datatypes.go b/cmd/bucket-notification-datatypes.go index 79532dd22..db5cd1e76 100644 --- a/cmd/bucket-notification-datatypes.go +++ b/cmd/bucket-notification-datatypes.go @@ -32,30 +32,32 @@ type keyFilter struct { FilterRules []filterRule `xml:"FilterRule,omitempty"` } -// Common elements of service notification. -type serviceConfig struct { - Events []string `xml:"Event"` - Filter struct { - Key keyFilter `xml:"S3Key,omitempty"` - } - ID string `xml:"Id"` +type filterStruct struct { + Key keyFilter `xml:"S3Key,omitempty" json:"S3Key,omitempty"` +} + +// ServiceConfig - Common elements of service notification. +type ServiceConfig struct { + Events []string `xml:"Event" json:"Event"` + Filter filterStruct `xml:"Filter" json:"Filter"` + ID string `xml:"Id" json:"Id"` } // Queue SQS configuration. type queueConfig struct { - serviceConfig + ServiceConfig QueueARN string `xml:"Queue"` } // Topic SNS configuration, this is a compliance field not used by minio yet. type topicConfig struct { - serviceConfig - TopicARN string `xml:"Topic"` + ServiceConfig + TopicARN string `xml:"Topic" json:"Topic"` } // Lambda function configuration, this is a compliance field not used by minio yet. type lambdaConfig struct { - serviceConfig + ServiceConfig LambdaARN string `xml:"CloudFunction"` } @@ -64,10 +66,16 @@ type lambdaConfig struct { type notificationConfig struct { XMLName xml.Name `xml:"NotificationConfiguration"` QueueConfigs []queueConfig `xml:"QueueConfiguration"` - TopicConfigs []topicConfig `xml:"TopicConfiguration"` LambdaConfigs []lambdaConfig `xml:"CloudFunctionConfiguration"` } +// listenerConfig structure represents run-time notification +// configuration for live listeners +type listenerConfig struct { + TopicConfig topicConfig `json:"TopicConfiguration"` + TargetServer string `json:"TargetServer"` +} + // Internal error used to signal notifications not set. var errNoSuchNotifications = errors.New("The specified bucket does not have bucket notifications") diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index e34c5efad..af5a7f77a 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -32,6 +32,7 @@ import ( const ( bucketConfigPrefix = "buckets" bucketNotificationConfig = "notification.xml" + bucketListenerConfig = "listener.json" ) // GetBucketNotificationHandler - This implementation of the GET @@ -117,11 +118,10 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, // Reads the incoming notification configuration. var buffer bytes.Buffer - var bufferSize int64 if r.ContentLength >= 0 { - bufferSize, err = io.CopyN(&buffer, r.Body, r.ContentLength) + _, err = io.CopyN(&buffer, r.Body, r.ContentLength) } else { - bufferSize, err = io.Copy(&buffer, r.Body) + _, err = io.Copy(&buffer, r.Body) } if err != nil { errorIf(err, "Unable to read incoming body.") @@ -144,24 +144,39 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, return } - // Proceed to save notification configuration. - notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - sha256sum := "" - var metadata map[string]string - _, err = objectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), metadata, sha256sum) + // Put bucket notification config. + err = PutBucketNotificationConfig(bucket, ¬ificationCfg, objectAPI) if err != nil { - errorIf(err, "Unable to write bucket notification configuration.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } - // Set bucket notification config. - globalEventNotifier.SetBucketNotificationConfig(bucket, ¬ificationCfg) - // Success. writeSuccessResponse(w, nil) } +// PutBucketNotificationConfig - Put a new notification config for a +// bucket (overwrites any previous config) persistently, updates +// global in-memory state, and notify other nodes in the cluster (if +// any) +func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI ObjectLayer) error { + if ncfg == nil { + return errInvalidArgument + } + + // persist config to disk + err := persistNotificationConfig(bucket, ncfg, objAPI) + if err != nil { + return fmt.Errorf("Unable to persist Bucket notification config to object layer - config=%v errMsg=%v", *ncfg, err) + } + + // All servers (including local) are told to update in-memory + // config + S3PeersUpdateBucketNotification(bucket, ncfg) + + return nil +} + // writeNotification marshals notification message before writing to client. func writeNotification(w http.ResponseWriter, notification map[string][]NotificationEvent) error { // Invalid response writer. @@ -172,7 +187,7 @@ func writeNotification(w http.ResponseWriter, notification map[string][]Notifica if notification == nil { return errInvalidArgument } - // Marshal notification data into XML and write to client. + // Marshal notification data into JSON and write to client. notificationBytes, err := json.Marshal(¬ification) if err != nil { return err @@ -251,13 +266,18 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit _, err := objAPI.GetBucketInfo(bucket) if err != nil { - errorIf(err, "Unable to bucket info.") + errorIf(err, "Unable to get bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano()) - accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen" + accountARN := fmt.Sprintf( + "arn:minio:sqs:%s:%s:listen-%s", + serverConfig.GetRegion(), + accountID, + globalMinioAddr, + ) var filterRules []filterRule if prefix != "" { filterRules = append(filterRules, filterRule{ @@ -272,13 +292,14 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit }) } - // Make topic configuration corresponding to this ListenBucketNotification request. + // Make topic configuration corresponding to this + // ListenBucketNotification request. topicCfg := &topicConfig{ TopicARN: accountARN, - serviceConfig: serviceConfig{ + ServiceConfig: ServiceConfig{ Events: events, Filter: struct { - Key keyFilter `xml:"S3Key,omitempty"` + Key keyFilter `xml:"S3Key,omitempty" json:"S3Key,omitempty"` }{ Key: keyFilter{ FilterRules: filterRules, @@ -288,29 +309,93 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit }, } - // Add topic config to bucket notification config. - if err = globalEventNotifier.AddTopicConfig(bucket, topicCfg); err != nil { + // Setup a listening channel that will receive notifications + // from the RPC handler. + nEventCh := make(chan []NotificationEvent) + defer close(nEventCh) + // Add channel for listener events + globalEventNotifier.AddListenerChan(accountARN, nEventCh) + // Remove listener channel after the writer has closed or the + // client disconnected. + defer globalEventNotifier.RemoveListenerChan(accountARN) + + // Update topic config to bucket config and persist - as soon + // as this call compelets, events may start appearing in + // nEventCh + lc := listenerConfig{ + TopicConfig: *topicCfg, + TargetServer: globalMinioAddr, + } + err = AddBucketListenerConfig(bucket, &lc, objAPI) + if err != nil { writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } + defer RemoveBucketListenerConfig(bucket, &lc, objAPI) // Add all common headers. setCommonHeaders(w) - // Create a new notification event channel. - nEventCh := make(chan []NotificationEvent) - // Close the listener channel. - defer close(nEventCh) - - // Set sns target. - globalEventNotifier.SetSNSTarget(accountARN, nEventCh) - // Remove sns listener after the writer has closed or the client disconnected. - defer globalEventNotifier.RemoveSNSTarget(accountARN, nEventCh) - // Start sending bucket notifications. sendBucketNotification(w, nEventCh) } +// AddBucketListenerConfig - Updates on disk state of listeners, and +// updates all peers with the change in listener config. +func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectLayer) error { + if lcfg == nil { + return errInvalidArgument + } + listenerCfgs := globalEventNotifier.GetBucketListenerConfig(bucket) + + // add new lid to listeners and persist to object layer. + listenerCfgs = append(listenerCfgs, *lcfg) + + // update persistent config + err := persistListenerConfig(bucket, listenerCfgs, objAPI) + if err != nil { + errorIf(err, "Error persisting listener config when adding a listener.") + return err + } + + // persistence success - now update in-memory globals on all + // peers (including local) + S3PeersUpdateBucketListener(bucket, listenerCfgs) + return nil +} + +// RemoveBucketListenerConfig - removes a given bucket notification config +func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectLayer) { + listenerCfgs := globalEventNotifier.GetBucketListenerConfig(bucket) + + // remove listener with matching ARN - if not found ignore and + // exit. + var updatedLcfgs []listenerConfig + found := false + for k, configuredLcfg := range listenerCfgs { + if configuredLcfg.TopicConfig.TopicARN == lcfg.TopicConfig.TopicARN { + updatedLcfgs = append(listenerCfgs[:k], + listenerCfgs[k+1:]...) + found = true + break + } + } + if !found { + return + } + + // update persistent config + err := persistListenerConfig(bucket, updatedLcfgs, objAPI) + if err != nil { + errorIf(err, "Error persisting listener config when removing a listener.") + return + } + + // persistence success - now update in-memory globals on all + // peers (including local) + S3PeersUpdateBucketListener(bucket, updatedLcfgs) +} + // Removes notification.xml for a given bucket, only used during DeleteBucket. func removeNotificationConfig(bucket string, objAPI ObjectLayer) error { // Verify bucket is valid. diff --git a/cmd/bucket-notification-handlers_test.go b/cmd/bucket-notification-handlers_test.go index 497488b2c..39257304e 100644 --- a/cmd/bucket-notification-handlers_test.go +++ b/cmd/bucket-notification-handlers_test.go @@ -5,11 +5,16 @@ import ( "bytes" "encoding/json" "encoding/xml" + "fmt" "io" "io/ioutil" + "net" "net/http" "net/http/httptest" + "strconv" "testing" + + "github.com/gorilla/mux" ) // Implement a dummy flush writer. @@ -156,183 +161,19 @@ func TestSendBucketNotification(t *testing.T) { } } -func testGetBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) { - // get random bucket name. - randBucket := getRandomBucketName() - noNotificationBucket := "nonotification" - invalidBucket := "Invalid\\Bucket" - - // Create buckets for the following test cases. - for _, bucket := range []string{randBucket, noNotificationBucket} { - err := obj.MakeBucket(bucket) - if err != nil { - // failed to create newbucket, abort. - t.Fatalf("Failed to create bucket %s %s : %s", bucket, - instanceType, err) - } - } - - // Initialize sample bucket notification config. - sampleNotificationBytes := []byte("" + - "s3:ObjectCreated:*s3:ObjectRemoved:*" + - "arn:minio:sns:us-east-1:1474332374:listen" + - "") - - emptyNotificationBytes := []byte("") - +func testListenBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) { // Register the API end points with XL/FS object layer. apiRouter := initTestAPIEndPoints(obj, []string{ - "GetBucketNotification", - "PutBucketNotification", - }) - - // initialize the server and obtain the credentials and root. - // credentials are necessary to sign the HTTP request. - rootPath, err := newTestConfig("us-east-1") - if err != nil { - t.Fatalf("Init Test config failed") - } - // remove the root folder after the test ends. - defer removeAll(rootPath) - - credentials := serverConfig.GetCredential() - - //Initialize global event notifier with mock queue targets. - err = initEventNotifier(obj) - if err != nil { - t.Fatalf("Test %s: Failed to initialize mock event notifier %v", - instanceType, err) - } - // Initialize httptest recorder. - rec := httptest.NewRecorder() - - // Prepare notification config for one of the test cases. - req, err := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", randBucket), - int64(len(sampleNotificationBytes)), bytes.NewReader(sampleNotificationBytes), - credentials.AccessKeyID, credentials.SecretAccessKey) - if err != nil { - t.Fatalf("Test %d %s: Failed to create HTTP request for PutBucketNotification: %v", - 1, instanceType, err) - } - - apiRouter.ServeHTTP(rec, req) - - type testKind int - const ( - CompareBytes testKind = iota - CheckStatus - InvalidAuth - ) - testCases := []struct { - bucketName string - kind testKind - expectedNotificationBytes []byte - expectedHTTPCode int - }{ - {randBucket, CompareBytes, sampleNotificationBytes, http.StatusOK}, - {randBucket, InvalidAuth, nil, http.StatusBadRequest}, - {noNotificationBucket, CompareBytes, emptyNotificationBytes, http.StatusOK}, - {invalidBucket, CheckStatus, nil, http.StatusBadRequest}, - } - signatureMismatchCode := getAPIError(ErrContentSHA256Mismatch).Code - for i, test := range testCases { - testRec := httptest.NewRecorder() - testReq, tErr := newTestSignedRequestV4("GET", getGetBucketNotificationURL("", test.bucketName), - int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for GetBucketNotification: %v", - i+1, instanceType, tErr) - } - - // Set X-Amz-Content-SHA256 in header different from what was used to calculate Signature. - if test.kind == InvalidAuth { - // Triggering a authentication type check failure. - testReq.Header.Set("x-amz-content-sha256", "somethingElse") - } - - apiRouter.ServeHTTP(testRec, testReq) - - switch test.kind { - case CompareBytes: - rspBytes, rErr := ioutil.ReadAll(testRec.Body) - if rErr != nil { - t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr) - } - if !bytes.Equal(rspBytes, test.expectedNotificationBytes) { - t.Errorf("Test %d: %s: Notification config doesn't match expected value %s: %v", - i+1, instanceType, string(test.expectedNotificationBytes), err) - } - case InvalidAuth: - rspBytes, rErr := ioutil.ReadAll(testRec.Body) - if rErr != nil { - t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr) - } - var errCode APIError - xErr := xml.Unmarshal(rspBytes, &errCode) - if xErr != nil { - t.Errorf("Test %d: %s: Failed to unmarshal error XML: %v", i+1, instanceType, xErr) - - } - - if errCode.Code != signatureMismatchCode { - t.Errorf("Test %d: %s: Expected error code %s but received %s: %v", i+1, - instanceType, signatureMismatchCode, errCode.Code, err) - - } - fallthrough - case CheckStatus: - if testRec.Code != test.expectedHTTPCode { - t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v", - i+1, instanceType, test.expectedHTTPCode, testRec.Code, err) - } - } - } - - // Nil Object layer - nilAPIRouter := initTestAPIEndPoints(nil, []string{ - "GetBucketNotification", - "PutBucketNotification", + "ListenBucketNotification", }) - testRec := httptest.NewRecorder() - testReq, tErr := newTestSignedRequestV4("GET", getGetBucketNotificationURL("", randBucket), - int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for GetBucketNotification: %v", - len(testCases)+1, instanceType, tErr) - } - nilAPIRouter.ServeHTTP(testRec, testReq) - if testRec.Code != http.StatusServiceUnavailable { - t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v", - len(testCases)+1, instanceType, http.StatusServiceUnavailable, testRec.Code, err) - } -} - -func TestGetBucketNotificationHandler(t *testing.T) { - ExecObjectLayerTest(t, testGetBucketNotificationHandler) -} - -func testPutBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) { - invalidBucket := "Invalid\\Bucket" - // get random bucket name. - randBucket := getRandomBucketName() - - err := obj.MakeBucket(randBucket) - if err != nil { - // failed to create randBucket, abort. - t.Fatalf("Failed to create bucket %s %s : %s", randBucket, - instanceType, err) + mux, ok := apiRouter.(*mux.Router) + if !ok { + t.Fatal("Unable to setup test") } + registerS3PeerRPCRouter(mux) - sampleNotificationBytes := []byte("" + - "s3:ObjectCreated:*s3:ObjectRemoved:*" + - "arn:minio:sns:us-east-1:1474332374:listen" + - "") - - // Register the API end points with XL/FS object layer. - apiRouter := initTestAPIEndPoints(obj, []string{ - "GetBucketNotification", - "PutBucketNotification", - }) + testServer := httptest.NewServer(apiRouter) + defer testServer.Close() // initialize the server and obtain the credentials and root. // credentials are necessary to sign the HTTP request. @@ -345,136 +186,25 @@ func testPutBucketNotificationHandler(obj ObjectLayer, instanceType string, t Te credentials := serverConfig.GetCredential() - //Initialize global event notifier with mock queue targets. - err = initEventNotifier(obj) + // setup port and minio addr + _, portStr, err := net.SplitHostPort(testServer.Listener.Addr().String()) if err != nil { - t.Fatalf("Test %s: Failed to initialize mock event notifier %v", - instanceType, err) - } - - signatureMismatchError := getAPIError(ErrContentSHA256Mismatch) - missingContentLengthError := getAPIError(ErrMissingContentLength) - type testKind int - const ( - CompareBytes testKind = iota - CheckStatus - InvalidAuth - MissingContentLength - ChunkedEncoding - ) - testCases := []struct { - bucketName string - kind testKind - expectedNotificationBytes []byte - expectedHTTPCode int - expectedAPIError string - }{ - {randBucket, CompareBytes, sampleNotificationBytes, http.StatusOK, ""}, - {randBucket, ChunkedEncoding, sampleNotificationBytes, http.StatusOK, ""}, - {randBucket, InvalidAuth, nil, signatureMismatchError.HTTPStatusCode, signatureMismatchError.Code}, - {randBucket, MissingContentLength, nil, missingContentLengthError.HTTPStatusCode, missingContentLengthError.Code}, - {invalidBucket, CheckStatus, nil, http.StatusBadRequest, ""}, - } - for i, test := range testCases { - testRec := httptest.NewRecorder() - testReq, tErr := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", test.bucketName), - int64(len(test.expectedNotificationBytes)), bytes.NewReader(test.expectedNotificationBytes), - credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for PutBucketNotification: %v", - i+1, instanceType, tErr) - } - - // Set X-Amz-Content-SHA256 in header different from what was used to calculate Signature. - switch test.kind { - case InvalidAuth: - // Triggering a authentication type check failure. - testReq.Header.Set("x-amz-content-sha256", "somethingElse") - case MissingContentLength: - testReq.ContentLength = -1 - case ChunkedEncoding: - testReq.ContentLength = -1 - testReq.TransferEncoding = append(testReq.TransferEncoding, "chunked") - } - - apiRouter.ServeHTTP(testRec, testReq) - - switch test.kind { - case CompareBytes: - - testReq, tErr = newTestSignedRequestV4("GET", getGetBucketNotificationURL("", test.bucketName), - int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for GetBucketNotification: %v", - i+1, instanceType, tErr) - } - apiRouter.ServeHTTP(testRec, testReq) - - rspBytes, rErr := ioutil.ReadAll(testRec.Body) - if rErr != nil { - t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr) - } - if !bytes.Equal(rspBytes, test.expectedNotificationBytes) { - t.Errorf("Test %d: %s: Notification config doesn't match expected value %s: %v", - i+1, instanceType, string(test.expectedNotificationBytes), err) - } - case MissingContentLength, InvalidAuth: - rspBytes, rErr := ioutil.ReadAll(testRec.Body) - if rErr != nil { - t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr) - } - var errCode APIError - xErr := xml.Unmarshal(rspBytes, &errCode) - if xErr != nil { - t.Errorf("Test %d: %s: Failed to unmarshal error XML: %v", i+1, instanceType, xErr) - - } - - if errCode.Code != test.expectedAPIError { - t.Errorf("Test %d: %s: Expected error code %s but received %s: %v", i+1, - instanceType, test.expectedAPIError, errCode.Code, err) - - } - fallthrough - case CheckStatus: - if testRec.Code != test.expectedHTTPCode { - t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v", - i+1, instanceType, test.expectedHTTPCode, testRec.Code, err) - } - } - } - - // Nil Object layer - nilAPIRouter := initTestAPIEndPoints(nil, []string{ - "GetBucketNotification", - "PutBucketNotification", - }) - testRec := httptest.NewRecorder() - testReq, tErr := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", randBucket), - int64(len(sampleNotificationBytes)), bytes.NewReader(sampleNotificationBytes), - credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for PutBucketNotification: %v", - len(testCases)+1, instanceType, tErr) + t.Fatalf("Initialisation error: %v", err) } - nilAPIRouter.ServeHTTP(testRec, testReq) - if testRec.Code != http.StatusServiceUnavailable { - t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v", - len(testCases)+1, instanceType, http.StatusServiceUnavailable, testRec.Code, err) + globalMinioPort, err = strconv.Atoi(portStr) + if err != nil { + t.Fatalf("Initialisation error: %v", err) } -} - -func TestPutBucketNotificationHandler(t *testing.T) { - ExecObjectLayerTest(t, testPutBucketNotificationHandler) -} + globalMinioAddr = fmt.Sprintf(":%d", globalMinioPort) + // initialize the peer client(s) + initGlobalS3Peers([]string{}) -func testListenBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) { invalidBucket := "Invalid\\Bucket" noNotificationBucket := "nonotificationbucket" // get random bucket name. randBucket := getRandomBucketName() for _, bucket := range []string{randBucket, noNotificationBucket} { - err := obj.MakeBucket(bucket) + err = obj.MakeBucket(bucket) if err != nil { // failed to create bucket, abort. t.Fatalf("Failed to create bucket %s %s : %s", bucket, @@ -482,43 +212,16 @@ func testListenBucketNotificationHandler(obj ObjectLayer, instanceType string, t } } - sampleNotificationBytes := []byte("" + - "s3:ObjectCreated:*s3:ObjectRemoved:*" + - "arn:minio:sns:us-east-1:1474332374:listen" + - "") - - // Register the API end points with XL/FS object layer. - apiRouter := initTestAPIEndPoints(obj, []string{ - "PutBucketNotification", - "ListenBucketNotification", - "PutObject", - }) - - // initialize the server and obtain the credentials and root. - // credentials are necessary to sign the HTTP request. - rootPath, err := newTestConfig("us-east-1") - if err != nil { - t.Fatalf("Init Test config failed") - } - // remove the root folder after the test ends. - defer removeAll(rootPath) - - credentials := serverConfig.GetCredential() - // Initialize global event notifier with mock queue targets. err = initEventNotifier(obj) if err != nil { t.Fatalf("Test %s: Failed to initialize mock event notifier %v", instanceType, err) } - testRec := httptest.NewRecorder() - testReq, tErr := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", randBucket), - int64(len(sampleNotificationBytes)), bytes.NewReader(sampleNotificationBytes), - credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("%s: Failed to create HTTP testRequest for PutBucketNotification: %v", instanceType, tErr) - } - apiRouter.ServeHTTP(testRec, testReq) + + var testRec *httptest.ResponseRecorder + var testReq *http.Request + var tErr error signatureMismatchError := getAPIError(ErrContentSHA256Mismatch) type testKind int diff --git a/cmd/bucket-notification-utils.go b/cmd/bucket-notification-utils.go index 46a26cbd8..fa2e6341c 100644 --- a/cmd/bucket-notification-utils.go +++ b/cmd/bucket-notification-utils.go @@ -265,25 +265,6 @@ func checkDuplicateQueueConfigs(configs []queueConfig) APIErrorCode { return ErrNone } -// Check all the topic configs for any duplicates. -func checkDuplicateTopicConfigs(configs []topicConfig) APIErrorCode { - var topicConfigARNS []string - - // Navigate through each configs and count the entries. - for _, config := range configs { - topicConfigARNS = append(topicConfigARNS, config.TopicARN) - } - - // Check if there are any duplicate counts. - if err := checkDuplicates(topicConfigARNS); err != nil { - errorIf(err, "Invalid topic configs found.") - return ErrOverlappingConfigs - } - - // Success. - return ErrNone -} - // Validates all the bucket notification configuration for their validity, // if one of the config is malformed or has invalid data it is rejected. // Configuration is never applied partially. @@ -292,10 +273,6 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { if s3Error := validateQueueConfigs(nConfig.QueueConfigs); s3Error != ErrNone { return s3Error } - // Validate all topic configs. - if s3Error := validateTopicConfigs(nConfig.TopicConfigs); s3Error != ErrNone { - return s3Error - } // Check for duplicate queue configs. if len(nConfig.QueueConfigs) > 1 { @@ -304,13 +281,6 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { } } - // Check for duplicate topic configs. - if len(nConfig.TopicConfigs) > 1 { - if s3Error := checkDuplicateTopicConfigs(nConfig.TopicConfigs); s3Error != ErrNone { - return s3Error - } - } - // Add validation for other configurations. return ErrNone } diff --git a/cmd/bucket-notification-utils_test.go b/cmd/bucket-notification-utils_test.go index 0d313e086..c32ee8377 100644 --- a/cmd/bucket-notification-utils_test.go +++ b/cmd/bucket-notification-utils_test.go @@ -57,42 +57,6 @@ func TestCheckDuplicateConfigs(t *testing.T) { t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.expectedErrCode, errCode) } } - - // Test cases for SNS topic config. - topicTestCases := []struct { - tConfigs []topicConfig - expectedErrCode APIErrorCode - }{ - // Error out for duplicate configs. - { - tConfigs: []topicConfig{ - { - TopicARN: "arn:minio:sns:us-east-1:1:listen", - }, - { - TopicARN: "arn:minio:sns:us-east-1:1:listen", - }, - }, - expectedErrCode: ErrOverlappingConfigs, - }, - // Valid config. - { - tConfigs: []topicConfig{ - { - TopicARN: "arn:minio:sns:us-east-1:1:listen", - }, - }, - expectedErrCode: ErrNone, - }, - } - - // ... validate for duplicate topic configs. - for i, testCase := range topicTestCases { - errCode := checkDuplicateTopicConfigs(testCase.tConfigs) - if errCode != testCase.expectedErrCode { - t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.expectedErrCode, errCode) - } - } } // Tests for validating filter rules. diff --git a/cmd/control-router.go b/cmd/control-router.go index 8514b1c7d..8c50b5a51 100644 --- a/cmd/control-router.go +++ b/cmd/control-router.go @@ -31,24 +31,6 @@ const ( controlPath = "/control" ) -// Find local node through the command line arguments. -func getLocalAddress(srvCmdConfig serverCmdConfig) string { - if !srvCmdConfig.isDistXL { - return fmt.Sprintf(":%d", globalMinioPort) - } - for _, export := range srvCmdConfig.disks { - // Validates if remote disk is local. - if isLocalStorage(export) { - var host string - if idx := strings.LastIndex(export, ":"); idx != -1 { - host = export[:idx] - } - return fmt.Sprintf("%s:%d", host, globalMinioPort) - } - } - return "" -} - // Initializes remote control clients for making remote requests. func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { if !srvCmdConfig.isDistXL { diff --git a/cmd/control-router_test.go b/cmd/control-router_test.go index 4e3d4c293..8520acdc4 100644 --- a/cmd/control-router_test.go +++ b/cmd/control-router_test.go @@ -16,70 +16,7 @@ package cmd -import ( - "runtime" - "testing" -) - -// Tests fetch local address. -func TestLocalAddress(t *testing.T) { - if runtime.GOOS == "windows" { - return - } - testCases := []struct { - srvCmdConfig serverCmdConfig - localAddr string - }{ - // Test 1 - local address is found. - { - srvCmdConfig: serverCmdConfig{ - isDistXL: true, - disks: []string{ - "localhost:/mnt/disk1", - "1.1.1.2:/mnt/disk2", - "1.1.2.1:/mnt/disk3", - "1.1.2.2:/mnt/disk4", - }, - }, - localAddr: "localhost:9000", - }, - // Test 2 - local address is everything. - { - srvCmdConfig: serverCmdConfig{ - isDistXL: false, - disks: []string{ - "/mnt/disk1", - "/mnt/disk2", - "/mnt/disk3", - "/mnt/disk4", - }, - }, - localAddr: ":9000", - }, - // Test 3 - local address is not found. - { - srvCmdConfig: serverCmdConfig{ - isDistXL: true, - disks: []string{ - "1.1.1.1:/mnt/disk1", - "1.1.1.2:/mnt/disk2", - "1.1.2.1:/mnt/disk3", - "1.1.2.2:/mnt/disk4", - }, - }, - localAddr: "", - }, - } - - // Validates fetching local address. - for i, testCase := range testCases { - localAddr := getLocalAddress(testCase.srvCmdConfig) - if localAddr != testCase.localAddr { - t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr) - } - } - -} +import "testing" // Tests initialization of remote controller clients. func TestInitRemoteControlClients(t *testing.T) { diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index c8b07889c..30249dbf6 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -18,6 +18,7 @@ package cmd import ( "bytes" + "encoding/json" "encoding/xml" "fmt" "net" @@ -29,14 +30,51 @@ import ( "github.com/Sirupsen/logrus" ) -// Global event notification queue. This is the queue that would be used to send all notifications. -type eventNotifier struct { +type externalNotifier struct { + // Per-bucket notification config. This is updated via + // PutBucketNotification API. + notificationConfigs map[string]*notificationConfig + + // An external target keeps a connection to an external + // service to which events are to be sent. It is a mapping + // from an ARN to a log object + targets map[string]*logrus.Logger + rwMutex *sync.RWMutex +} - // Collection of 'bucket' and notification config. - notificationConfigs map[string]*notificationConfig - snsTargets map[string][]chan []NotificationEvent - queueTargets map[string]*logrus.Logger +type internalNotifier struct { + // per-bucket listener configuration. This is updated + // when listeners connect or disconnect. + listenerConfigs map[string][]listenerConfig + + // An internal target is a peer Minio server, that is + // connected to a listening client. Here, targets is a map of + // listener ARN to log object. + targets map[string]*listenerLogger + + // Connected listeners is a map of listener ARNs to channels + // on which the ListenBucket API handler go routine is waiting + // for events to send to a client. + connectedListeners map[string]chan []NotificationEvent + + rwMutex *sync.RWMutex +} + +// Global event notification configuration. This structure has state +// about configured external notifications, and run-time configuration +// for listener notifications. +type eventNotifier struct { + + // `external` here refers to notification configuration to + // send events to supported external systems + external externalNotifier + + // `internal` refers to notification configuration for live + // listening clients. Events for a client are send from all + // servers, internally to a particular server that is + // connected to the client. + internal internalNotifier } // Represents data to be sent with notification event. @@ -54,7 +92,8 @@ func newNotificationEvent(event eventData) NotificationEvent { region := serverConfig.GetRegion() tnow := time.Now().UTC() sequencer := fmt.Sprintf("%X", tnow.UnixNano()) - // Following blocks fills in all the necessary details of s3 event message structure. + // Following blocks fills in all the necessary details of s3 + // event message structure. // http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html nEvent := NotificationEvent{ EventVersion: "2.0", @@ -96,149 +135,185 @@ func newNotificationEvent(event eventData) NotificationEvent { return nEvent } -// Fetch the saved queue target. -func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger { - return en.queueTargets[queueARN] +// Fetch the external target. No locking needed here since this map is +// never written after initial startup. +func (en eventNotifier) GetExternalTarget(queueARN string) *logrus.Logger { + return en.external.targets[queueARN] } -func (en eventNotifier) GetSNSTarget(snsARN string) []chan []NotificationEvent { - en.rwMutex.RLock() - defer en.rwMutex.RUnlock() - return en.snsTargets[snsARN] +func (en eventNotifier) GetInternalTarget(arn string) *listenerLogger { + en.internal.rwMutex.RLock() + defer en.internal.rwMutex.RUnlock() + return en.internal.targets[arn] } // Set a new sns target for an input sns ARN. -func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []NotificationEvent) error { - en.rwMutex.Lock() - defer en.rwMutex.Unlock() +func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh chan []NotificationEvent) error { if listenerCh == nil { return errInvalidArgument } - en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh) + en.internal.rwMutex.Lock() + defer en.internal.rwMutex.Unlock() + en.internal.connectedListeners[snsARN] = listenerCh return nil } // Remove sns target for an input sns ARN. -func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []NotificationEvent) { - en.rwMutex.Lock() - defer en.rwMutex.Unlock() - snsTarget, ok := en.snsTargets[snsARN] +func (en *eventNotifier) RemoveListenerChan(snsARN string) { + en.internal.rwMutex.Lock() + defer en.internal.rwMutex.Unlock() + if en.internal.connectedListeners != nil { + delete(en.internal.connectedListeners, snsARN) + } +} + +func (en *eventNotifier) SendListenerEvent(arn string, event []NotificationEvent) error { + en.internal.rwMutex.Lock() + defer en.internal.rwMutex.Unlock() + + ch, ok := en.internal.connectedListeners[arn] if ok { - for i, savedListenerCh := range snsTarget { - if listenerCh == savedListenerCh { - snsTarget = append(snsTarget[:i], snsTarget[i+1:]...) - if len(snsTarget) == 0 { - delete(en.snsTargets, snsARN) - break - } - en.snsTargets[snsARN] = snsTarget - } - } + ch <- event } + // If the channel is not present we ignore the event. + return nil } // Fetch bucket notification config for an input bucket. func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig { - en.rwMutex.RLock() - defer en.rwMutex.RUnlock() - return en.notificationConfigs[bucket] + en.external.rwMutex.RLock() + defer en.external.rwMutex.RUnlock() + return en.external.notificationConfigs[bucket] } -// Set a new notification config for a bucket, this operation will overwrite any previous -// notification configs for the bucket. -func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notificationCfg *notificationConfig) error { - en.rwMutex.Lock() - defer en.rwMutex.Unlock() - if notificationCfg == nil { - return errInvalidArgument +func (en *eventNotifier) SetBucketNotificationConfig(bucket string, ncfg *notificationConfig) { + en.external.rwMutex.Lock() + if ncfg == nil { + delete(en.external.notificationConfigs, bucket) + } else { + en.external.notificationConfigs[bucket] = ncfg } - en.notificationConfigs[bucket] = notificationCfg - return nil + en.external.rwMutex.Unlock() } -func (en *eventNotifier) AddTopicConfig(bucket string, topicCfg *topicConfig) error { - en.rwMutex.Lock() - defer en.rwMutex.Unlock() - if topicCfg == nil { - return errInvalidArgument +func (en *eventNotifier) GetBucketListenerConfig(bucket string) []listenerConfig { + en.internal.rwMutex.RLock() + defer en.internal.rwMutex.RUnlock() + return en.internal.listenerConfigs[bucket] +} + +func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerConfig) error { + en.internal.rwMutex.Lock() + defer en.internal.rwMutex.Unlock() + if lcfg == nil { + delete(en.internal.listenerConfigs, bucket) + } else { + en.internal.listenerConfigs[bucket] = lcfg + } + // close all existing loggers and initialize again. + for _, v := range en.internal.targets { + v.lconn.Close() } - notificationCfg := en.notificationConfigs[bucket] - if notificationCfg == nil { - en.notificationConfigs[bucket] = ¬ificationConfig{ - TopicConfigs: []topicConfig{*topicCfg}, + en.internal.targets = make(map[string]*listenerLogger) + for _, lc := range lcfg { + logger, err := newListenerLogger(lc.TopicConfig.TopicARN, + lc.TargetServer) + if err != nil { + return err } - return nil + en.internal.targets[lc.TopicConfig.TopicARN] = logger } - notificationCfg.TopicConfigs = append(notificationCfg.TopicConfigs, *topicCfg) return nil } -// eventNotify notifies an event to relevant targets based on their -// bucket notification configs. -func eventNotify(event eventData) { - // Notifies a new event. - // List of events reported through this function are - // - s3:ObjectCreated:Put - // - s3:ObjectCreated:Post - // - s3:ObjectCreated:Copy - // - s3:ObjectCreated:CompleteMultipartUpload - // - s3:ObjectRemoved:Delete - - nConfig := globalEventNotifier.GetBucketNotificationConfig(event.Bucket) - // No bucket notifications enabled, drop the event notification. +func eventNotifyForBucketNotifications(eventType, objectName, bucketName string, + nEvent []NotificationEvent) { + nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) if nConfig == nil { return } - if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 { - return - } - - // Event type. - eventType := event.Type.String() - - // Object name. - objectName := event.ObjInfo.Name - - // Save the notification event to be sent. - notificationEvent := []NotificationEvent{newNotificationEvent(event)} - // Validate if the event and object match the queue configs. for _, qConfig := range nConfig.QueueConfigs { eventMatch := eventMatch(eventType, qConfig.Events) ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules) if eventMatch && ruleMatch { - targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN) + targetLog := globalEventNotifier.GetExternalTarget(qConfig.QueueARN) if targetLog != nil { targetLog.WithFields(logrus.Fields{ - "Key": path.Join(event.Bucket, objectName), + "Key": path.Join(bucketName, objectName), "EventType": eventType, - "Records": notificationEvent, + "Records": nEvent, }).Info() } } } - // Validate if the event and object match the sns configs. - for _, topicConfig := range nConfig.TopicConfigs { - ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules) - eventMatch := eventMatch(eventType, topicConfig.Events) +} + +func eventNotifyForBucketListeners(eventType, objectName, bucketName string, + nEvent []NotificationEvent) { + lCfgs := globalEventNotifier.GetBucketListenerConfig(bucketName) + if lCfgs == nil { + return + } + // Validate if the event and object match listener configs + for _, lcfg := range lCfgs { + ruleMatch := filterRuleMatch(objectName, lcfg.TopicConfig.Filter.Key.FilterRules) + eventMatch := eventMatch(eventType, lcfg.TopicConfig.Events) if eventMatch && ruleMatch { - targetListeners := globalEventNotifier.GetSNSTarget(topicConfig.TopicARN) - for _, listener := range targetListeners { - listener <- notificationEvent + targetLog := globalEventNotifier.GetInternalTarget( + lcfg.TopicConfig.TopicARN) + if targetLog != nil && targetLog.log != nil { + targetLog.log.WithFields(logrus.Fields{ + "Key": path.Join(bucketName, objectName), + "EventType": eventType, + "Records": nEvent, + }).Info() } } } + +} + +// eventNotify notifies an event to relevant targets based on their +// bucket configuration (notifications and listeners). +func eventNotify(event eventData) { + // Notifies a new event. + // List of events reported through this function are + // - s3:ObjectCreated:Put + // - s3:ObjectCreated:Post + // - s3:ObjectCreated:Copy + // - s3:ObjectCreated:CompleteMultipartUpload + // - s3:ObjectRemoved:Delete + + // Event type. + eventType := event.Type.String() + + // Object name. + objectName := event.ObjInfo.Name + + // Save the notification event to be sent. + notificationEvent := []NotificationEvent{newNotificationEvent(event)} + + // Notify external targets. + eventNotifyForBucketNotifications(eventType, objectName, event.Bucket, + notificationEvent) + + // Notify internal targets. + eventNotifyForBucketListeners(eventType, objectName, event.Bucket, + notificationEvent) } -// loads notifcation config if any for a given bucket, returns back structured notification config. +// loads notification config if any for a given bucket, returns +// structured notification config. func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) { // Construct the notification config path. notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath) err = errorCause(err) if err != nil { - // 'notification.xml' not found return 'errNoSuchNotifications'. - // This is default when no bucket notifications are found on the bucket. + // 'notification.xml' not found return + // 'errNoSuchNotifications'. This is default when no + // bucket notifications are found on the bucket. switch err.(type) { case ObjectNotFound: return nil, errNoSuchNotifications @@ -251,8 +326,9 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer) err = errorCause(err) if err != nil { - // 'notification.xml' not found return 'errNoSuchNotifications'. - // This is default when no bucket notifications are found on the bucket. + // 'notification.xml' not found return + // 'errNoSuchNotifications'. This is default when no + // bucket notifications are found on the bucket. switch err.(type) { case ObjectNotFound: return nil, errNoSuchNotifications @@ -267,36 +343,144 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon notificationCfg := ¬ificationConfig{} if err = xml.Unmarshal(notificationConfigBytes, ¬ificationCfg); err != nil { return nil, err - } // Successfully marshalled notification configuration. + } // Return success. return notificationCfg, nil } +// loads notification config if any for a given bucket, returns +// structured notification config. +func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, error) { + // Construct the notification config path. + listenerConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, listenerConfigPath) + err = errorCause(err) + if err != nil { + // 'listener.json' not found return + // 'errNoSuchNotifications'. This is default when no + // bucket notifications are found on the bucket. + switch err.(type) { + case ObjectNotFound: + return nil, errNoSuchNotifications + } + errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket) + // Returns error for other errors. + return nil, err + } + var buffer bytes.Buffer + err = objAPI.GetObject(minioMetaBucket, listenerConfigPath, 0, objInfo.Size, &buffer) + err = errorCause(err) + if err != nil { + // 'notification.xml' not found return + // 'errNoSuchNotifications'. This is default when no + // bucket listners are found on the bucket. + switch err.(type) { + case ObjectNotFound: + return nil, errNoSuchNotifications + } + errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket) + // Returns error for other errors. + return nil, err + } + + // Unmarshal notification bytes. + var lCfg []listenerConfig + lConfigBytes := buffer.Bytes() + if err = json.Unmarshal(lConfigBytes, &lCfg); err != nil { + errorIf(err, "Unable to unmarshal listener config from JSON.") + return nil, err + } + + // Return success. + return lCfg, nil +} + +func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj ObjectLayer) error { + // marshal to xml + buf, err := xml.Marshal(ncfg) + if err != nil { + errorIf(err, "Unable to marshal notification configuration into XML") + return err + } + + // verify bucket exists + // FIXME: There is a race between this check and PutObject + if err = isBucketExist(bucket, obj); err != nil { + return err + } + + // build path + ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + // write object to path + _, err = obj.PutObject(minioMetaBucket, ncPath, int64(len(buf)), + bytes.NewReader(buf), nil, "") + if err != nil { + errorIf(err, "Unable to write bucket notification configuration.") + return err + } + return nil +} + +// Persists validated listener config to object layer. +func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer) error { + buf, err := json.Marshal(lcfg) + if err != nil { + errorIf(err, "Unable to marshal listener config to JSON.") + return err + } + + // verify bucket exists + // FIXME: There is a race between this check and PutObject + if err = isBucketExist(bucket, obj); err != nil { + return err + } + + // build path + lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) + // write object to path + _, err = obj.PutObject(minioMetaBucket, lcPath, int64(len(buf)), + bytes.NewReader(buf), nil, "") + if err != nil { + errorIf(err, "Unable to write bucket listener configuration to object layer.") + } + return err +} + // loads all bucket notifications if present. -func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, error) { +func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, map[string][]listenerConfig, error) { // List buckets to proceed loading all notification configuration. buckets, err := objAPI.ListBuckets() if err != nil { - return nil, err + return nil, nil, err } - configs := make(map[string]*notificationConfig) + nConfigs := make(map[string]*notificationConfig) + lConfigs := make(map[string][]listenerConfig) // Loads all bucket notifications. for _, bucket := range buckets { nCfg, nErr := loadNotificationConfig(bucket.Name, objAPI) if nErr != nil { - if nErr == errNoSuchNotifications { - continue + if nErr != errNoSuchNotifications { + return nil, nil, nErr } - return nil, nErr + } else { + nConfigs[bucket.Name] = nCfg + } + + lCfg, lErr := loadListenerConfig(bucket.Name, objAPI) + if lErr != nil { + if lErr != errNoSuchNotifications { + return nil, nil, lErr + } + } else { + lConfigs[bucket.Name] = lCfg } - configs[bucket.Name] = nCfg } // Success. - return configs, nil + return nConfigs, lConfigs, nil } // Loads all queue targets, initializes each queueARNs depending on their config. @@ -452,8 +636,9 @@ func initEventNotifier(objAPI ObjectLayer) error { } // Read all saved bucket notifications. - configs, err := loadAllBucketNotifications(objAPI) + nConfigs, lConfigs, err := loadAllBucketNotifications(objAPI) if err != nil { + errorIf(err, "Error loading bucket notifications - %v", err) return err } @@ -463,12 +648,36 @@ func initEventNotifier(objAPI ObjectLayer) error { return err } - // Inititalize event notifier queue. + // Initialize internal listener targets + listenTargets := make(map[string]*listenerLogger) + for _, listeners := range lConfigs { + for _, listener := range listeners { + ln, err := newListenerLogger( + listener.TopicConfig.TopicARN, + listener.TargetServer, + ) + if err != nil { + errorIf(err, "Unable to initialize listener target logger.") + //TODO: improve error + return fmt.Errorf("Error initializing listner target logger - %v", err) + } + listenTargets[listener.TopicConfig.TopicARN] = ln + } + } + + // Initialize event notifier queue. globalEventNotifier = &eventNotifier{ - rwMutex: &sync.RWMutex{}, - notificationConfigs: configs, - queueTargets: queueTargets, - snsTargets: make(map[string][]chan []NotificationEvent), + external: externalNotifier{ + notificationConfigs: nConfigs, + targets: queueTargets, + rwMutex: &sync.RWMutex{}, + }, + internal: internalNotifier{ + rwMutex: &sync.RWMutex{}, + targets: listenTargets, + listenerConfigs: lConfigs, + connectedListeners: make(map[string]chan []NotificationEvent), + }, } return nil diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index 31322db9c..7f17d7d6b 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -18,130 +18,13 @@ package cmd import ( "fmt" + "net" "reflect" + "strconv" "testing" "time" ) -// Tests event notify. -func TestEventNotify(t *testing.T) { - ExecObjectLayerTest(t, testEventNotify) -} - -func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { - bucketName := getRandomBucketName() - - // initialize the server and obtain the credentials and root. - // credentials are necessary to sign the HTTP request. - rootPath, err := newTestConfig("us-east-1") - if err != nil { - t.Fatalf("Init Test config failed") - } - // remove the root folder after the test ends. - defer removeAll(rootPath) - - if err := initEventNotifier(obj); err != nil { - t.Fatal("Unexpected error:", err) - } - - // Notify object created event. - eventNotify(eventData{ - Type: ObjectCreatedPost, - Bucket: bucketName, - ObjInfo: ObjectInfo{ - Bucket: bucketName, - Name: "object1", - }, - ReqParams: map[string]string{ - "sourceIPAddress": "localhost:1337", - }, - }) - - if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, nil); err != errInvalidArgument { - t.Errorf("Expected error %s, got %s", errInvalidArgument, err) - } - - if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, ¬ificationConfig{}); err != nil { - t.Errorf("Expected error to be nil, got %s", err) - } - - nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) - if nConfig == nil { - t.Errorf("Notification expected to be set, but notification not set.") - } - - if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) { - t.Errorf("Mismatching notification configs.") - } - - // Notify object created event. - eventNotify(eventData{ - Type: ObjectRemovedDelete, - Bucket: bucketName, - ObjInfo: ObjectInfo{ - Bucket: bucketName, - Name: "object1", - }, - ReqParams: map[string]string{ - "sourceIPAddress": "localhost:1337", - }, - }) -} - -// Tests various forms of inititalization of event notifier. -func TestInitEventNotifier(t *testing.T) { - disks, err := getRandomDisks(1) - if err != nil { - t.Fatal("Unable to create directories for FS backend. ", err) - } - defer removeRoots(disks) - fs, _, err := initObjectLayer(disks, nil) - if err != nil { - t.Fatal("Unable to initialize FS backend.", err) - } - nDisks := 16 - disks, err = getRandomDisks(nDisks) - if err != nil { - t.Fatal("Unable to create directories for XL backend. ", err) - } - defer removeRoots(disks) - xl, _, err := initObjectLayer(disks, nil) - if err != nil { - t.Fatal("Unable to initialize XL backend.", err) - } - - // Collection of test cases for inititalizing event notifier. - testCases := []struct { - objAPI ObjectLayer - configs map[string]*notificationConfig - err error - }{ - // Test 1 - invalid arguments. - { - objAPI: nil, - err: errInvalidArgument, - }, - // Test 2 - valid FS object layer but no bucket notifications. - { - objAPI: fs, - err: nil, - }, - // Test 3 - valid XL object layer but no bucket notifications. - { - objAPI: xl, - err: nil, - }, - } - - // Validate if event notifier is properly initialized. - for i, testCase := range testCases { - err = initEventNotifier(testCase.objAPI) - if err != testCase.err { - t.Errorf("Test %d: Expected %s, but got: %s", i+1, testCase.err, err) - } - } -} - // Test InitEventNotifier with faulty disks func TestInitEventNotifierFaultyDisks(t *testing.T) { // Prepare for tests @@ -272,78 +155,231 @@ func TestInitEventNotifierWithRedis(t *testing.T) { } } -// TestListenBucketNotification - test Listen Bucket Notification process -func TestListenBucketNotification(t *testing.T) { +type TestPeerRPCServerData struct { + serverType string + testServer TestServer +} - bucketName := "bucket" - objectName := "object" +func (s *TestPeerRPCServerData) Setup(t *testing.T) { + s.testServer = StartTestPeersRPCServer(t, s.serverType) - // Prepare for tests - // Create fs backend - rootPath, err := newTestConfig("us-east-1") + // setup port and minio addr + _, portStr, err := net.SplitHostPort(s.testServer.Server.Listener.Addr().String()) if err != nil { - t.Fatalf("Init Test config failed") + t.Fatalf("Initialisation error: %v", err) } - // remove the root folder after the test ends. - defer removeAll(rootPath) - - disk, err := getRandomDisks(1) - defer removeAll(disk[0]) + globalMinioPort, err = strconv.Atoi(portStr) if err != nil { - t.Fatal("Unable to create directories for FS backend. ", err) + t.Fatalf("Initialisation error: %v", err) } - obj, _, err := initObjectLayer(disk, nil) - if err != nil { - t.Fatal("Unable to initialize FS backend.", err) + globalMinioAddr = getLocalAddress( + s.testServer.SrvCmdCfg, + ) + + // initialize the peer client(s) + initGlobalS3Peers(s.testServer.Disks) +} + +func (s *TestPeerRPCServerData) TearDown() { + s.testServer.Stop() + _ = removeAll(s.testServer.Root) + for _, d := range s.testServer.Disks { + _ = removeAll(d) } +} - // Create the bucket to listen on +func TestSetNGetBucketNotification(t *testing.T) { + s := TestPeerRPCServerData{serverType: "XL"} + + // setup and teardown + s.Setup(t) + defer s.TearDown() + + bucketName := getRandomBucketName() + + obj := s.testServer.Obj + if err := initEventNotifier(obj); err != nil { + t.Fatal("Unexpected error:", err) + } + + globalEventNotifier.SetBucketNotificationConfig(bucketName, ¬ificationConfig{}) + nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) + if nConfig == nil { + t.Errorf("Notification expected to be set, but notification not set.") + } + + if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) { + t.Errorf("Mismatching notification configs.") + } +} + +func TestInitEventNotifier(t *testing.T) { + s := TestPeerRPCServerData{serverType: "XL"} + + // setup and teardown + s.Setup(t) + defer s.TearDown() + + // test if empty object layer arg. returns expected error. + if err := initEventNotifier(nil); err == nil || err != errInvalidArgument { + t.Fatalf("initEventNotifier returned unexpected error value - %v", err) + } + + obj := s.testServer.Obj + bucketName := getRandomBucketName() + // declare sample configs + filterRules := []filterRule{ + { + Name: "prefix", + Value: "minio", + }, + { + Name: "suffix", + Value: "*.jpg", + }, + } + sampleSvcCfg := ServiceConfig{ + []string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"}, + filterStruct{ + keyFilter{filterRules}, + }, + "1", + } + sampleNotifCfg := notificationConfig{ + QueueConfigs: []queueConfig{ + { + ServiceConfig: sampleSvcCfg, + QueueARN: "testqARN", + }, + }, + } + sampleListenCfg := []listenerConfig{ + { + TopicConfig: topicConfig{ServiceConfig: sampleSvcCfg, + TopicARN: "testlARN"}, + TargetServer: globalMinioAddr, + }, + } + + // write without an existing bucket and check + if err := persistNotificationConfig(bucketName, ¬ificationConfig{}, obj); err == nil { + t.Fatalf("Did not get an error though bucket does not exist!") + } + // no bucket write check for listener + if err := persistListenerConfig(bucketName, []listenerConfig{}, obj); err == nil { + t.Fatalf("Did not get an error though bucket does not exist!") + } + + // create bucket if err := obj.MakeBucket(bucketName); err != nil { t.Fatal("Unexpected error:", err) } - listenARN := "arn:minio:sns:us-east-1:1:listen" - queueARN := "arn:minio:sqs:us-east-1:1:redis" + // bucket is created, now writing should not give errors. + if err := persistNotificationConfig(bucketName, &sampleNotifCfg, obj); err != nil { + t.Fatal("Unexpected error:", err) + } - fs := obj.(fsObjects) - storage := fs.storage.(*posix) + if err := persistListenerConfig(bucketName, sampleListenCfg, obj); err != nil { + t.Fatal("Unexpected error:", err) + } - // Create and store notification.xml with listen and queue notification configured - notificationXML := "" - notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + listenARN + "" - notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + queueARN + "" - notificationXML += "" - if err := storage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil { + // test event notifier init + if err := initEventNotifier(obj); err != nil { + t.Fatal("Unexpected error:", err) + } + + // fetch bucket configs and verify + ncfg := globalEventNotifier.GetBucketNotificationConfig(bucketName) + if ncfg == nil { + t.Error("Bucket notification was not present for ", bucketName) + } + if len(ncfg.QueueConfigs) != 1 || ncfg.QueueConfigs[0].QueueARN != "testqARN" { + t.Error("Unexpected bucket notification found - ", *ncfg) + } + if globalEventNotifier.GetExternalTarget("testqARN") != nil { + t.Error("A logger was not expected to be found as it was not enabled in the config.") + } + + lcfg := globalEventNotifier.GetBucketListenerConfig(bucketName) + if lcfg == nil { + t.Error("Bucket listener was not present for ", bucketName) + } + if len(lcfg) != 1 || lcfg[0].TargetServer != globalMinioAddr || lcfg[0].TopicConfig.TopicARN != "testlARN" { + t.Error("Unexpected listener config found - ", lcfg[0]) + } + if globalEventNotifier.GetInternalTarget("testlARN") == nil { + t.Error("A listen logger was not found.") + } +} + +func TestListenBucketNotification(t *testing.T) { + s := TestPeerRPCServerData{serverType: "XL"} + + // setup and teardown + s.Setup(t) + defer s.TearDown() + + // test initialisation + obj := s.testServer.Obj + + bucketName := "bucket" + objectName := "object" + + // Create the bucket to listen on + if err := obj.MakeBucket(bucketName); err != nil { t.Fatal("Unexpected error:", err) } + listenARN := "arn:minio:sns:us-east-1:1:listen-" + globalMinioAddr + lcfg := listenerConfig{ + topicConfig{ + ServiceConfig{ + []string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"}, + filterStruct{}, + "0", + }, + listenARN, + }, + globalMinioAddr, + } + + // write listener config to storage layer + lcfgs := []listenerConfig{lcfg} + if err := persistListenerConfig(bucketName, lcfgs, obj); err != nil { + t.Fatalf("Test Setup error: %v", err) + } + // Init event notifier - if err := initEventNotifier(fs); err != nil { + if err := initEventNotifier(obj); err != nil { t.Fatal("Unexpected error:", err) } // Check if the config is loaded - notificationCfg := globalEventNotifier.GetBucketNotificationConfig(bucketName) - if notificationCfg == nil { - t.Fatal("Cannot load bucket notification config") + listenerCfg := globalEventNotifier.GetBucketListenerConfig(bucketName) + if listenerCfg == nil { + t.Fatal("Cannot load bucket listener config") } - if len(notificationCfg.TopicConfigs) != 1 || len(notificationCfg.QueueConfigs) != 1 { - t.Fatal("Notification config is not correctly loaded. Exactly one topic and one queue config are expected") + if len(listenerCfg) != 1 { + t.Fatal("Listener config is not correctly loaded. Exactly one listener config is expected") } - // Check if topic ARN is enabled - if notificationCfg.TopicConfigs[0].TopicARN != listenARN { - t.Fatal("SNS listen is not configured.") + // Check if topic ARN is correct + if listenerCfg[0].TopicConfig.TopicARN != listenARN { + t.Fatal("Configured topic ARN is incorrect.") } // Create a new notification event channel. nEventCh := make(chan []NotificationEvent) // Close the listener channel. defer close(nEventCh) - // Set sns target. - globalEventNotifier.SetSNSTarget(listenARN, nEventCh) - // Remove sns listener after the writer has closed or the client disconnected. - defer globalEventNotifier.RemoveSNSTarget(listenARN, nEventCh) + // Add events channel for listener. + if err := globalEventNotifier.AddListenerChan(listenARN, nEventCh); err != nil { + t.Fatalf("Test Setup error: %v", err) + } + // Remove listen channel after the writer has closed or the + // client disconnected. + defer globalEventNotifier.RemoveListenerChan(listenARN) // Fire an event notification go eventNotify(eventData{ @@ -370,73 +406,90 @@ func TestListenBucketNotification(t *testing.T) { t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName) } break - case <-time.After(30 * time.Second): + case <-time.After(3 * time.Second): break } + } -func testAddTopicConfig(obj ObjectLayer, instanceType string, t TestErrHandler) { - root, cErr := newTestConfig("us-east-1") - if cErr != nil { - t.Fatalf("[%s] Failed to initialize test config: %v", instanceType, cErr) - } - defer removeAll(root) +func TestAddRemoveBucketListenerConfig(t *testing.T) { + s := TestPeerRPCServerData{serverType: "XL"} + // setup and teardown + s.Setup(t) + defer s.TearDown() + + // test code + obj := s.testServer.Obj if err := initEventNotifier(obj); err != nil { - t.Fatalf("[%s] : Failed to initialize event notifier: %v", instanceType, err) + t.Fatalf("Failed to initialize event notifier: %v", err) } // Make a bucket to store topicConfigs. randBucket := getRandomBucketName() if err := obj.MakeBucket(randBucket); err != nil { - t.Fatalf("[%s] : Failed to make bucket %s", instanceType, randBucket) + t.Fatalf("Failed to make bucket %s", randBucket) } // Add a topicConfig to an empty notificationConfig. accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano()) - accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen" - var filterRules []filterRule - filterRules = append(filterRules, filterRule{ - Name: "prefix", - Value: "minio", - }) - filterRules = append(filterRules, filterRule{ - Name: "suffix", - Value: "*.jpg", - }) - - // Make topic configuration corresponding to this ListenBucketNotification request. - sampleTopicCfg := &topicConfig{ + accountARN := fmt.Sprintf( + "arn:minio:sqs:%s:%s:listen-%s", + serverConfig.GetRegion(), + accountID, + globalMinioAddr, + ) + + // Make topic configuration + filterRules := []filterRule{ + { + Name: "prefix", + Value: "minio", + }, + { + Name: "suffix", + Value: "*.jpg", + }, + } + sampleTopicCfg := topicConfig{ TopicARN: accountARN, - serviceConfig: serviceConfig{ - Filter: struct { - Key keyFilter `xml:"S3Key,omitempty"` - }{ - Key: keyFilter{ - FilterRules: filterRules, - }, + ServiceConfig: ServiceConfig{ + []string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"}, + filterStruct{ + keyFilter{filterRules}, }, - ID: "sns-" + accountID, + "sns-" + accountID, }, } + sampleListenerCfg := &listenerConfig{ + TopicConfig: sampleTopicCfg, + TargetServer: globalMinioAddr, + } testCases := []struct { - topicCfg *topicConfig + lCfg *listenerConfig expectedErr error }{ - {sampleTopicCfg, nil}, + {sampleListenerCfg, nil}, {nil, errInvalidArgument}, - {sampleTopicCfg, nil}, } for i, test := range testCases { - err := globalEventNotifier.AddTopicConfig(randBucket, test.topicCfg) + err := AddBucketListenerConfig(randBucket, test.lCfg, obj) if err != test.expectedErr { - t.Errorf("Test %d: %s failed with error %v, expected to fail with %v", - i+1, instanceType, err, test.expectedErr) + t.Errorf( + "Test %d: Failed with error %v, expected to fail with %v", + i+1, err, test.expectedErr, + ) } } -} -func TestAddTopicConfig(t *testing.T) { - ExecObjectLayerTest(t, testAddTopicConfig) + // test remove listener actually removes a listener + RemoveBucketListenerConfig(randBucket, sampleListenerCfg, obj) + // since it does not return errors we fetch the config and + // check + lcSlice := globalEventNotifier.GetBucketListenerConfig(randBucket) + if len(lcSlice) != 0 { + t.Errorf("Remove Listener Config Test: did not remove listener config - %v", + lcSlice) + } } diff --git a/cmd/globals.go b/cmd/globals.go index 0ac2a299e..5668e4ba5 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -52,8 +52,13 @@ var ( globalMaxCacheSize = uint64(maxCacheSize) // Cache expiry. globalCacheExpiry = objcache.DefaultExpiry + // Minio local server address (in `host:port` format) + globalMinioAddr = "" // Minio default port, can be changed through command line. globalMinioPort = 9000 + // Peer communication struct + globalS3Peers = s3Peers{} + // Add new variable global values here. ) diff --git a/cmd/logger.go b/cmd/logger.go index f1418b3d5..ddc91d08b 100644 --- a/cmd/logger.go +++ b/cmd/logger.go @@ -20,6 +20,7 @@ import ( "bufio" "bytes" "fmt" + "path" "path/filepath" "runtime" "runtime/debug" @@ -50,10 +51,10 @@ type logger struct { func funcFromPC(pc uintptr, file string, line int, shortFile bool) string { var fn, name string if shortFile { - fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/github.com/minio/minio/cmd/", "", -1) + fn = strings.Replace(file, path.Join(filepath.ToSlash(GOPATH)+"/src/github.com/minio/minio/cmd/")+"/", "", -1) name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1) } else { - fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/", "", -1) + fn = strings.Replace(file, path.Join(filepath.ToSlash(GOPATH)+"/src/")+"/", "", -1) name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1) } return fmt.Sprintf("%s [%s:%d]", name, fn, line) diff --git a/cmd/notify-listener.go b/cmd/notify-listener.go new file mode 100644 index 000000000..e94d16aa1 --- /dev/null +++ b/cmd/notify-listener.go @@ -0,0 +1,82 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "io/ioutil" + + "github.com/Sirupsen/logrus" +) + +type listenerConn struct { + Client *AuthRPCClient + ListenerARN string +} + +type listenerLogger struct { + log *logrus.Logger + lconn listenerConn +} + +func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) { + client := globalS3Peers.GetPeerClient(targetAddr) + if client == nil { + return nil, fmt.Errorf("Peer %s was not initialized - bug!", + targetAddr) + } + lc := listenerConn{ + Client: client, + ListenerARN: listenerArn, + } + + lcLog := logrus.New() + + lcLog.Out = ioutil.Discard + + lcLog.Formatter = new(logrus.JSONFormatter) + + lcLog.Hooks.Add(lc) + + return &listenerLogger{lcLog, lc}, nil +} + +func (lc listenerConn) Close() { + // ignore closing errors + _ = lc.Client.Close() +} + +// send event to target server via rpc client calls. +func (lc listenerConn) Fire(entry *logrus.Entry) error { + notificationEvent, ok := entry.Data["Records"].([]NotificationEvent) + if !ok { + // If the record is not of the expected type, silently + // discard. + return nil + } + + evArgs := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN} + reply := GenericReply{} + err := lc.Client.Call("S3.Event", &evArgs, &reply) + return err +} + +func (lc listenerConn) Levels() []logrus.Level { + return []logrus.Level{ + logrus.InfoLevel, + } +} diff --git a/cmd/routers.go b/cmd/routers.go index 33c0be5ce..bf4ea1fad 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -80,6 +80,9 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { registerDistNSLockRouter(mux, srvCmdConfig) } + // Register S3 peer communication router. + registerS3PeerRPCRouter(mux) + // Register controller rpc router. registerControlRPCRouter(mux, srvCmdConfig) diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go new file mode 100644 index 000000000..458ced6f1 --- /dev/null +++ b/cmd/s3-peer-client.go @@ -0,0 +1,176 @@ +/* + * Minio Cloud Storage, (C) 2014-2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "path" + "time" + + "github.com/minio/minio-go/pkg/set" +) + +type s3Peers struct { + // A map of peer server address (in `host:port` format) to RPC + // client connections + rpcClient map[string]*AuthRPCClient + + // slice of all peer addresses (in `host:port` format) + peers []string +} + +func initGlobalS3Peers(disks []string) { + // get list of de-duplicated peers + peers := getAllPeers(disks) + globalS3Peers = s3Peers{make(map[string]*AuthRPCClient), nil} + for _, peer := range peers { + globalS3Peers.InitS3PeerClient(peer) + } + + // Additionally setup a local peer if one does not exist + if globalS3Peers.GetPeerClient(globalMinioAddr) == nil { + globalS3Peers.InitS3PeerClient(globalMinioAddr) + peers = append(peers, globalMinioAddr) + } + + globalS3Peers.peers = peers +} + +func (s3p *s3Peers) GetPeers() []string { + return s3p.peers +} + +func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient { + return s3p.rpcClient[peer] +} + +// Initializes a new RPC connection (or closes and re-opens if it +// already exists) to a peer. Note that peer address is in `host:port` +// format. +func (s3p *s3Peers) InitS3PeerClient(peer string) { + if s3p.rpcClient[peer] != nil { + s3p.rpcClient[peer].Close() + delete(s3p.rpcClient, peer) + } + authCfg := &authConfig{ + accessKey: serverConfig.GetCredential().AccessKeyID, + secretKey: serverConfig.GetCredential().SecretAccessKey, + address: peer, + path: path.Join(reservedBucket, s3Path), + loginMethod: "S3.LoginHandler", + } + s3p.rpcClient[peer] = newAuthClient(authCfg) +} + +func (s3p *s3Peers) Close() error { + for _, v := range s3p.rpcClient { + if err := v.Close(); err != nil { + return err + } + } + s3p.rpcClient = nil + s3p.peers = nil + return nil +} + +// returns the network addresses of all Minio servers in the cluster +// in `host:port` format. +func getAllPeers(disks []string) []string { + res := []string{} + // use set to de-duplicate + sset := set.NewStringSet() + for _, disk := range disks { + netAddr, _, err := splitNetPath(disk) + if err != nil || netAddr == "" { + errorIf(err, "Unexpected error - most likely a bug.") + continue + } + if !sset.Contains(netAddr) { + res = append( + res, + fmt.Sprintf("%s:%d", netAddr, globalMinioPort), + ) + sset.Add(netAddr) + } + } + return res +} + +// Make RPC calls with the given method and arguments to all the given +// peers (in parallel), and collects the results. Since the methods +// intended for use here, have only a success or failure response, we +// do not return/inspect the `reply` parameter in the RPC call. The +// function attempts to connect to a peer only once, and returns a map +// of peer address to error response. If the error is nil, it means +// the RPC succeeded. +func (s3p *s3Peers) SendRPC(peers []string, method string, args interface { + SetToken(token string) + SetTimestamp(tstamp time.Time) +}) map[string]error { + // result type + type callResult struct { + target string + err error + } + // channel to collect results from goroutines + resChan := make(chan callResult) + // closure to make a single request. + callTarget := func(target string) { + reply := &GenericReply{} + err := s3p.rpcClient[target].Call(method, args, reply) + resChan <- callResult{target, err} + } + // map of errors + errsMap := make(map[string]error) + // make network calls in parallel + for _, target := range peers { + go callTarget(target) + } + // wait on channel and collect all results + for range peers { + res := <-resChan + if res.err != nil { + errsMap[res.target] = res.err + } + } + // return errors map + return errsMap +} + +// S3PeersUpdateBucketNotification - Sends Update Bucket notification +// request to all peers. Currently we log an error and continue. +func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) { + setBNPArgs := &SetBNPArgs{Bucket: bucket, NCfg: ncfg} + peers := globalS3Peers.GetPeers() + errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketNotificationPeer", + setBNPArgs) + for peer, err := range errsMap { + errorIf(err, "Error sending peer update bucket notification to %s - %v", peer, err) + } +} + +// S3PeersUpdateBucketListener - Sends Update Bucket listeners request +// to all peers. Currently we log an error and continue. +func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) { + setBLPArgs := &SetBLPArgs{Bucket: bucket, LCfg: lcfg} + peers := globalS3Peers.GetPeers() + errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketListenerPeer", + setBLPArgs) + for peer, err := range errsMap { + errorIf(err, "Error sending peer update bucket listener to %s - %v", peer, err) + } +} diff --git a/cmd/s3-peer-router.go b/cmd/s3-peer-router.go new file mode 100644 index 000000000..7e72ce306 --- /dev/null +++ b/cmd/s3-peer-router.go @@ -0,0 +1,43 @@ +/* + * Minio Cloud Storage, (C) 2014-2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "net/rpc" + + router "github.com/gorilla/mux" +) + +const ( + s3Path = "/s3/remote" +) + +type s3PeerAPIHandlers struct { + ObjectAPI func() ObjectLayer +} + +func registerS3PeerRPCRouter(mux *router.Router) { + s3PeerHandlers := &s3PeerAPIHandlers{ + ObjectAPI: newObjectLayerFn, + } + + s3PeerRPCServer := rpc.NewServer() + s3PeerRPCServer.RegisterName("S3", s3PeerHandlers) + + s3PeerRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() + s3PeerRouter.Path(s3Path).Handler(s3PeerRPCServer) +} diff --git a/cmd/s3-peer-rpc-handlers.go b/cmd/s3-peer-rpc-handlers.go new file mode 100644 index 000000000..8e6c06366 --- /dev/null +++ b/cmd/s3-peer-rpc-handlers.go @@ -0,0 +1,123 @@ +/* + * Minio Cloud Storage, (C) 2014-2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "time" + +func (s3 *s3PeerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { + jwt, err := newJWT(defaultInterNodeJWTExpiry) + if err != nil { + return err + } + if err = jwt.Authenticate(args.Username, args.Password); err != nil { + return err + } + token, err := jwt.GenerateToken(args.Username) + if err != nil { + return err + } + reply.Token = token + reply.ServerVersion = Version + reply.Timestamp = time.Now().UTC() + return nil +} + +// SetBNPArgs - Arguments collection to SetBucketNotificationPeer RPC +// call +type SetBNPArgs struct { + // For Auth + GenericArgs + + Bucket string + + // Notification config for the given bucket. + NCfg *notificationConfig +} + +func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBNPArgs, reply *GenericReply) error { + // check auth + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + + // check if object layer is available. + objAPI := s3.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + // Update in-memory notification config. + globalEventNotifier.SetBucketNotificationConfig(args.Bucket, args.NCfg) + + return nil +} + +// SetBLPArgs - Arguments collection to SetBucketListenerPeer RPC call +type SetBLPArgs struct { + // For Auth + GenericArgs + + Bucket string + + // Listener config for a given bucket. + LCfg []listenerConfig +} + +func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args SetBLPArgs, reply *GenericReply) error { + // check auth + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + + // check if object layer is available. + objAPI := s3.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + // Update in-memory notification config. + return globalEventNotifier.SetBucketListenerConfig(args.Bucket, args.LCfg) +} + +// EventArgs - Arguments collection for Event RPC call +type EventArgs struct { + // For Auth + GenericArgs + + // event being sent + Event []NotificationEvent + + // client that it is meant for + Arn string +} + +// submit an event to the receiving server. +func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error { + // check auth + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + + // check if object layer is available. + objAPI := s3.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + err := globalEventNotifier.SendListenerEvent(args.Arn, args.Event) + return err +} diff --git a/cmd/server-main.go b/cmd/server-main.go index e7de50e3b..4a4656214 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -386,6 +386,12 @@ func serverMain(c *cli.Context) { globalObjectAPI = newObject globalObjLayerMutex.Unlock() + // Initialize local server address + globalMinioAddr = getLocalAddress(srvConfig) + + // Initialize S3 Peers inter-node communication + initGlobalS3Peers(disks) + // Initialize a new event notifier. err = initEventNotifier(newObjectLayerFn()) fatalIf(err, "Unable to initialize event notification.") diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index da9250891..f4adb8ce2 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -83,10 +83,10 @@ func printEventNotifiers() { return } arnMsg := colorBlue("SQS ARNs: ") - if len(globalEventNotifier.queueTargets) == 0 { + if len(globalEventNotifier.external.targets) == 0 { arnMsg += colorBold(fmt.Sprintf(getFormatStr(len(""), 1), "")) } - for queueArn := range globalEventNotifier.queueTargets { + for queueArn := range globalEventNotifier.external.targets { arnMsg += colorBold(fmt.Sprintf(getFormatStr(len(queueArn), 1), queueArn)) } console.Println(arnMsg) diff --git a/cmd/server_test.go b/cmd/server_test.go index 67c59317e..d4b149fe7 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -105,125 +105,6 @@ func (s *TestSuiteCommon) TestBucketSQSNotification(c *C) { verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest) } -// TestBucketNotification - Inserts the bucket notification and verifies it by fetching the notification back. -func (s *TestSuiteCommon) TestBucketSNSNotification(c *C) { - // Sample bucket notification. - bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` - - // generate a random bucket Name. - bucketName := getRandomBucketName() - // HTTP request to create the bucket. - request, err := newTestSignedRequestV4("PUT", getMakeBucketURL(s.endPoint, bucketName), - 0, nil, s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client := http.Client{} - // execute the request. - response, err := client.Do(request) - c.Assert(err, IsNil) - // assert the http response status code. - c.Assert(response.StatusCode, Equals, http.StatusOK) - - request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - - c.Assert(err, IsNil) - c.Assert(response.StatusCode, Equals, http.StatusOK) - - // Fetch the uploaded policy. - request, err = newTestSignedRequestV4("GET", getGetNotificationURL(s.endPoint, bucketName), 0, nil, - s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - response, err = client.Do(request) - c.Assert(err, IsNil) - c.Assert(response.StatusCode, Equals, http.StatusOK) - - bucketNotificationReadBuf, err := ioutil.ReadAll(response.Body) - c.Assert(err, IsNil) - // Verify if downloaded policy matches with previousy uploaded. - c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true) - - invalidBucketNotificationBuf := `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:444455556666:minio` - - request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:1:listen` - - request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "filter rule name must be either prefix or suffix", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefixhello\1arn:minio:sns:us-east-1:1:listen` - - request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "Size of filter rule value cannot exceed 1024 bytes in UTF-8 representation", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-west-1:444455556666:listen` - request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` - request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest) - - bucketNotificationDuplicates := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listens3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` - request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(bucketNotificationDuplicates)), bytes.NewReader([]byte(bucketNotificationDuplicates)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - verifyError(c, response, "InvalidArgument", "Configurations overlap. Configurations on the same bucket cannot share a common event type.", http.StatusBadRequest) -} - // TestBucketPolicy - Inserts the bucket policy and verifies it by fetching the policy back. // Deletes the policy and verifies the deletion by fetching it back. func (s *TestSuiteCommon) TestBucketPolicy(c *C) { diff --git a/cmd/server_v2_test.go b/cmd/server_v2_test.go index 5996c4624..2f7858aa0 100644 --- a/cmd/server_v2_test.go +++ b/cmd/server_v2_test.go @@ -102,125 +102,6 @@ func (s *TestSuiteCommonV2) TestBucketSQSNotification(c *C) { verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest) } -// TestBucketNotification - Inserts the bucket notification and verifies it by fetching the notification back. -func (s *TestSuiteCommonV2) TestBucketSNSNotification(c *C) { - // Sample bucket notification. - bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` - - // generate a random bucket Name. - bucketName := getRandomBucketName() - // HTTP request to create the bucket. - request, err := newTestSignedRequestV2("PUT", getMakeBucketURL(s.endPoint, bucketName), - 0, nil, s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client := http.Client{} - // execute the request. - response, err := client.Do(request) - c.Assert(err, IsNil) - // assert the http response status code. - c.Assert(response.StatusCode, Equals, http.StatusOK) - - request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - - c.Assert(err, IsNil) - c.Assert(response.StatusCode, Equals, http.StatusOK) - - // Fetch the uploaded policy. - request, err = newTestSignedRequestV2("GET", getGetNotificationURL(s.endPoint, bucketName), 0, nil, - s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - response, err = client.Do(request) - c.Assert(err, IsNil) - c.Assert(response.StatusCode, Equals, http.StatusOK) - - bucketNotificationReadBuf, err := ioutil.ReadAll(response.Body) - c.Assert(err, IsNil) - // Verify if downloaded policy matches with previousy uploaded. - c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true) - - invalidBucketNotificationBuf := `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:444455556666:minio` - - request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:1:listen` - - request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "filter rule name must be either prefix or suffix", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefixhello\1arn:minio:sns:us-east-1:1:listen` - - request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "Size of filter rule value cannot exceed 1024 bytes in UTF-8 representation", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-west-1:444455556666:listen` - request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - - verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest) - - invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` - request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest) - - bucketNotificationDuplicates := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listens3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` - request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName), - int64(len(bucketNotificationDuplicates)), bytes.NewReader([]byte(bucketNotificationDuplicates)), s.accessKey, s.secretKey) - c.Assert(err, IsNil) - - client = http.Client{} - // execute the HTTP request. - response, err = client.Do(request) - c.Assert(err, IsNil) - verifyError(c, response, "InvalidArgument", "Configurations overlap. Configurations on the same bucket cannot share a common event type.", http.StatusBadRequest) -} - // TestBucketPolicy - Inserts the bucket policy and verifies it by fetching the policy back. // Deletes the policy and verifies the deletion by fetching it back. func (s *TestSuiteCommonV2) TestBucketPolicy(c *C) { diff --git a/cmd/signature-v4.go b/cmd/signature-v4.go index 6b29579a3..1c44c53ae 100644 --- a/cmd/signature-v4.go +++ b/cmd/signature-v4.go @@ -147,7 +147,7 @@ func getSignature(signingKey []byte, stringToSign string) string { // doesPolicySignatureMatch - Verify query headers with post policy // - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html -// returns true if matches, false otherwise. if error is not nil then it is always false +// returns ErrNone if the signature matches. func doesPolicySignatureMatch(formValues map[string]string) APIErrorCode { // Access credentials. cred := serverConfig.GetCredential() @@ -193,7 +193,7 @@ func doesPolicySignatureMatch(formValues map[string]string) APIErrorCode { // doesPresignedSignatureMatch - Verify query headers with presigned signature // - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html -// returns true if matches, false otherwise. if error is not nil then it is always false +// returns ErrNone if the signature matches. func doesPresignedSignatureMatch(hashedPayload string, r *http.Request, region string) APIErrorCode { // Access credentials. cred := serverConfig.GetCredential() @@ -316,7 +316,7 @@ func doesPresignedSignatureMatch(hashedPayload string, r *http.Request, region s // doesSignatureMatch - Verify authorization header with calculated header in accordance with // - http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html -// returns true if matches, false otherwise. if error is not nil then it is always false +// returns ErrNone if signature matches. func doesSignatureMatch(hashedPayload string, r *http.Request, region string) APIErrorCode { // Access credentials. cred := serverConfig.GetCredential() diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index f659e254a..f6d729ce9 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -149,6 +149,7 @@ type TestServer struct { SecretKey string Server *httptest.Server Obj ObjectLayer + SrvCmdCfg serverCmdConfig } // Starts the test server and returns the TestServer instance. @@ -236,6 +237,64 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int) return testRPCServer } +// Sets up a Peers RPC test server. +func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer { + // create temporary backend for the test server. + nDisks := 16 + disks, err := getRandomDisks(nDisks) + if err != nil { + t.Fatal("Failed to create disks for the backend") + } + + root, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("%s", err) + } + + // create an instance of TestServer. + testRPCServer := TestServer{} + // Get credential. + credentials := serverConfig.GetCredential() + + testRPCServer.Root = root + testRPCServer.Disks = disks + testRPCServer.AccessKey = credentials.AccessKeyID + testRPCServer.SecretKey = credentials.SecretAccessKey + + // create temporary backend for the test server. + objLayer, storageDisks, err := initObjectLayer(disks, nil) + if err != nil { + t.Fatalf("Failed obtaining Temp Backend: %s", err) + } + + globalObjLayerMutex.Lock() + globalObjectAPI = objLayer + testRPCServer.Obj = objLayer + globalObjLayerMutex.Unlock() + + srvCfg := serverCmdConfig{ + disks: disks, + storageDisks: storageDisks, + } + + mux := router.NewRouter() + // need storage layer for bucket config storage. + registerStorageRPCRouters(mux, srvCfg) + // need API layer to send requests, etc. + registerAPIRouter(mux) + // module being tested is Peer RPCs router. + registerS3PeerRPCRouter(mux) + + // Run TestServer. + testRPCServer.Server = httptest.NewServer(mux) + + // initialize remainder of serverCmdConfig + srvCfg.isDistXL = false + testRPCServer.SrvCmdCfg = srvCfg + + return testRPCServer +} + // Initializes control RPC endpoints. // The object Layer will be a temp back used for testing purpose. func initTestControlRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler { @@ -595,7 +654,6 @@ func newTestStreamingSignedBadChunkDateRequest(method, urlStr string, contentLen } currTime := time.Now().UTC() - fmt.Println("now: ", currTime) signature, err := signStreamingRequest(req, accessKey, secretKey, currTime) if err != nil { return nil, err @@ -603,7 +661,6 @@ func newTestStreamingSignedBadChunkDateRequest(method, urlStr string, contentLen // skew the time between the chunk signature calculation and seed signature. currTime = currTime.Add(1 * time.Second) - fmt.Println("later: ", currTime) req, err = assembleStreamingChunks(req, body, chunkSize, secretKey, signature, currTime) return req, nil } @@ -625,14 +682,15 @@ func newTestStreamingSignedRequest(method, urlStr string, contentLength, chunkSi return req, nil } -// Replaces any occurring '/' in string, into its encoded representation. +// Replaces any occurring '/' in string, into its encoded +// representation. func percentEncodeSlash(s string) string { return strings.Replace(s, "/", "%2F", -1) } // queryEncode - encodes query values in their URL encoded form. In -// addition to the percent encoding performed by getURLEncodedName() used -// here, it also percent encodes '/' (forward slash) +// addition to the percent encoding performed by getURLEncodedName() +// used here, it also percent encodes '/' (forward slash) func queryEncode(v url.Values) string { if v == nil { return "" diff --git a/cmd/utils.go b/cmd/utils.go index cae58fbaf..e5ca67cc2 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -80,19 +80,38 @@ func splitNetPath(networkPath string) (netAddr, netPath string, err error) { } } networkParts := strings.SplitN(networkPath, ":", 2) - if len(networkParts) == 1 { + switch { + case len(networkParts) == 1: return "", networkPath, nil - } - if networkParts[1] == "" { + case networkParts[1] == "": return "", "", &net.AddrError{Err: "Missing path in network path", Addr: networkPath} - } else if networkParts[0] == "" { + case networkParts[0] == "": return "", "", &net.AddrError{Err: "Missing address in network path", Addr: networkPath} - } else if !filepath.IsAbs(networkParts[1]) { + case !filepath.IsAbs(networkParts[1]): return "", "", &net.AddrError{Err: "Network path should be absolute", Addr: networkPath} } return networkParts[0], networkParts[1], nil } +// Find local node through the command line arguments. Returns in +// `host:port` format. +func getLocalAddress(srvCmdConfig serverCmdConfig) string { + if !srvCmdConfig.isDistXL { + return fmt.Sprintf(":%d", globalMinioPort) + } + for _, export := range srvCmdConfig.disks { + // Validates if remote disk is local. + if isLocalStorage(export) { + var host string + if idx := strings.LastIndex(export, ":"); idx != -1 { + host = export[:idx] + } + return fmt.Sprintf("%s:%d", host, globalMinioPort) + } + } + return "" +} + // xmlDecoder provide decoded value in xml. func xmlDecoder(body io.Reader, v interface{}, size int64) error { var lbody io.Reader diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 800bb73af..3a9ecd45c 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "reflect" + "runtime" "testing" ) @@ -174,3 +175,65 @@ func TestMaxPartID(t *testing.T) { } } } + +// Tests fetch local address. +func TestLocalAddress(t *testing.T) { + if runtime.GOOS == "windows" { + return + } + // need to set this to avoid stale values from other tests. + globalMinioPort = 9000 + testCases := []struct { + srvCmdConfig serverCmdConfig + localAddr string + }{ + // Test 1 - local address is found. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "localhost:/mnt/disk1", + "1.1.1.2:/mnt/disk2", + "1.1.2.1:/mnt/disk3", + "1.1.2.2:/mnt/disk4", + }, + }, + localAddr: fmt.Sprintf("localhost:%d", globalMinioPort), + }, + // Test 2 - local address is everything. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: false, + disks: []string{ + "/mnt/disk1", + "/mnt/disk2", + "/mnt/disk3", + "/mnt/disk4", + }, + }, + localAddr: fmt.Sprintf(":%d", globalMinioPort), + }, + // Test 3 - local address is not found. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "1.1.1.1:/mnt/disk1", + "1.1.1.2:/mnt/disk2", + "1.1.2.1:/mnt/disk3", + "1.1.2.2:/mnt/disk4", + }, + }, + localAddr: "", + }, + } + + // Validates fetching local address. + for i, testCase := range testCases { + localAddr := getLocalAddress(testCase.srvCmdConfig) + if localAddr != testCase.localAddr { + t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr) + } + } + +}