diff --git a/cmd/bucket-notification-handlers_test.go b/cmd/bucket-notification-handlers_test.go
index 246c83a59..61e53e4c2 100644
--- a/cmd/bucket-notification-handlers_test.go
+++ b/cmd/bucket-notification-handlers_test.go
@@ -1,10 +1,14 @@
package cmd
import (
+ "bufio"
"bytes"
+ "encoding/json"
"io"
"io/ioutil"
"net/http"
+ "net/http/httptest"
+ "sync"
"testing"
)
@@ -19,7 +23,7 @@ func (f *flushWriter) Write(b []byte) (n int, err error) { return f.Writer.Write
func (f *flushWriter) Header() http.Header { return http.Header{} }
func (f *flushWriter) WriteHeader(code int) {}
-func newFlushWriter(writer io.Writer) *flushWriter {
+func newFlushWriter(writer io.Writer) http.ResponseWriter {
return &flushWriter{writer}
}
@@ -35,7 +39,7 @@ func TestWriteNotification(t *testing.T) {
var buffer bytes.Buffer
// Collection of test cases for each event writer.
testCases := []struct {
- writer *flushWriter
+ writer http.ResponseWriter
event map[string][]NotificationEvent
err error
}{
@@ -88,3 +92,198 @@ func TestWriteNotification(t *testing.T) {
}
}
}
+
+func TestSendBucketNotification(t *testing.T) {
+ // Initialize a new test config.
+ root, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatalf("Unable to initialize test config %s", err)
+ }
+ defer removeAll(root)
+
+ eventCh := make(chan []NotificationEvent)
+
+ // Create a Pipe with FlushWriter on the write-side and bufio.Scanner
+ // on the reader-side to receive notification over the listen channel in a
+ // synchronized manner.
+ pr, pw := io.Pipe()
+ fw := newFlushWriter(pw)
+ scanner := bufio.NewScanner(pr)
+ // Start a go-routine to wait for notification events.
+ go func(listenerCh <-chan []NotificationEvent) {
+ sendBucketNotification(fw, listenerCh)
+ }(eventCh)
+
+ // Construct notification events to be passed on the events channel.
+ var events []NotificationEvent
+ evTypes := []EventName{
+ ObjectCreatedPut,
+ ObjectCreatedPost,
+ ObjectCreatedCopy,
+ ObjectCreatedCompleteMultipartUpload,
+ }
+ for _, evType := range evTypes {
+ events = append(events, newNotificationEvent(eventData{
+ Type: evType,
+ }))
+ }
+ // Send notification events to the channel on which sendBucketNotification
+ // is waiting on.
+ eventCh <- events
+
+ // Read from the pipe connected to the ResponseWriter.
+ scanner.Scan()
+ notificationBytes := scanner.Bytes()
+
+ // Close the read-end and send an empty notification event on the channel
+ // to signal sendBucketNotification to terminate.
+ pr.Close()
+ eventCh <- []NotificationEvent{}
+ close(eventCh)
+
+ // Checking if the notification are the same as those sent over the channel.
+ var notifications map[string][]NotificationEvent
+ err = json.Unmarshal(notificationBytes, ¬ifications)
+ if err != nil {
+ t.Fatal("Failed to Unmarshal notification")
+ }
+ records := notifications["Records"]
+ for i, rec := range records {
+ if rec.EventName == evTypes[i].String() {
+ continue
+ }
+ t.Errorf("Failed to receive %d event %s", i, evTypes[i].String())
+ }
+}
+
+func initMockEventNotifier(objAPI ObjectLayer) error {
+ if objAPI == nil {
+ return errInvalidArgument
+ }
+ globalEventNotifier = &eventNotifier{
+ rwMutex: &sync.RWMutex{},
+ queueTargets: nil,
+ notificationConfigs: make(map[string]*notificationConfig),
+ snsTargets: make(map[string][]chan []NotificationEvent),
+ }
+ return nil
+}
+
+func testGetBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) {
+ // get random bucket name.
+ bucketName := getRandomBucketName()
+
+ // Create bucket to add notification config for.
+ err := obj.MakeBucket(bucketName)
+ if err != nil {
+ // failed to create newbucket, abort.
+ t.Fatalf("%s : %s", instanceType, err)
+ }
+
+ noNotificationBucket := "nonotification"
+ err = obj.MakeBucket(noNotificationBucket)
+ if err != nil {
+ // failed to create newbucket, abort.
+ t.Fatalf("%s : %s", instanceType, err)
+ }
+
+ // Register the API end points with XL/FS object layer.
+ apiRouter := initTestAPIEndPoints(obj, []string{
+ "GetBucketNotificationHandler",
+ "PutBucketNotificationHandler",
+ })
+
+ // initialize the server and obtain the credentials and root.
+ // credentials are necessary to sign the HTTP request.
+ rootPath, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatalf("Init Test config failed")
+ }
+ // remove the root folder after the test ends.
+ defer removeAll(rootPath)
+
+ credentials := serverConfig.GetCredential()
+
+ //Initialize global event notifier with mock queue targets.
+ err = initMockEventNotifier(obj)
+ if err != nil {
+ t.Fatalf("Test %s: Failed to initialize mock event notifier %v",
+ instanceType, err)
+ }
+ // Initialize httptest recorder.
+ rec := httptest.NewRecorder()
+
+ // Initialize sample bucket notification config.
+ sampleNotificationConfig := []byte("" +
+ "s3:ObjectCreated:*s3:ObjectRemoved:*" +
+ "arn:minio:sns:us-east-1:1474332374:listen" +
+ "")
+
+ // Prepare notification config for one of the test cases.
+ req, err := newTestSignedRequest("PUT", getPutBucketNotificationURL("", bucketName),
+ int64(len(sampleNotificationConfig)), bytes.NewReader(sampleNotificationConfig),
+ credentials.AccessKeyID, credentials.SecretAccessKey)
+ if err != nil {
+ t.Fatalf("Test %d %s: Failed to create HTTP request for PutBucketNotification: %v",
+ 1, instanceType, err)
+ }
+
+ apiRouter.ServeHTTP(rec, req)
+ // Test 1: Check if we get back sample notification.
+ req, err = newTestSignedRequest("GET", getGetBucketNotificationURL("", bucketName),
+ int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
+ if err != nil {
+ t.Fatalf("Test %d: %s: Failed to create HTTP request for GetBucketNotification: %v",
+ 1, instanceType, err)
+ }
+
+ apiRouter.ServeHTTP(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Errorf("Test %d: %s: GetBucketNotification request failed with %d: %v",
+ 2, instanceType, rec.Code, err)
+ }
+ rspBytes, err := ioutil.ReadAll(rec.Body)
+ if err != nil {
+ t.Errorf("Test %d: %s: Failed to read response body: %v", 1, instanceType, err)
+ }
+ if !bytes.Equal(rspBytes, sampleNotificationConfig) {
+ t.Errorf("Test %d: %s: Notification config doesn't match expected value: %v", 2, instanceType, err)
+ }
+
+ // Test 2: Try getting bucket notification on a non-existent bucket.
+ invalidBucketName := "Invalid_BucketName"
+ req, err = newTestSignedRequest("GET", getGetBucketNotificationURL("", invalidBucketName),
+ int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
+ if err != nil {
+ t.Fatalf("Test %d: %s: Failed to create HTTP request for GetBucketNotification: %v",
+ 2, instanceType, err)
+ }
+ apiRouter.ServeHTTP(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Errorf("Test %d: %s: GetBucketNotification request failed with %d: %v",
+ 2, instanceType, rec.Code, err)
+ }
+
+ // Test 3: Try getting bucket notification for a bucket with notification set.
+ emptyNotificationXML := []byte("" +
+ "")
+ req, err = newTestSignedRequest("GET", getGetBucketNotificationURL("", noNotificationBucket),
+ int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
+ if err != nil {
+ t.Fatalf("Test %d: %s: Failed to create HTTP request for GetBucketNotification: %v",
+ 3, instanceType, err)
+ }
+ apiRouter.ServeHTTP(rec, req)
+ if rec.Code != http.StatusOK {
+ t.Errorf("Test %d: %s: GetBucketNotification request failed with %d: %v",
+ 3, instanceType, rec.Code, err)
+ }
+ if !bytes.Equal(rec.Body.Bytes(), emptyNotificationXML) {
+ t.Errorf("Test %d: %s: GetBucketNotification request received notification "+
+ "config different from empty config: %v", 3, instanceType, err)
+ }
+}
+
+func TestGetBucketNotificationHandler(t *testing.T) {
+ ExecObjectLayerTest(t, testGetBucketNotificationHandler)
+}
diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go
index 4afd9b2f0..b6af1d35b 100644
--- a/cmd/test-utils_test.go
+++ b/cmd/test-utils_test.go
@@ -942,6 +942,7 @@ func getMultiDeleteObjectURL(endPoint, bucketName string) string {
queryValue := url.Values{}
queryValue.Set("delete", "")
return makeTestTargetURL(endPoint, bucketName, "", queryValue)
+
}
// return URL for HEAD on the object.
@@ -1095,6 +1096,18 @@ func getCompleteMultipartUploadURL(endPoint, bucketName, objectName, uploadID st
return makeTestTargetURL(endPoint, bucketName, objectName, queryValue)
}
+// return URL for put bucket notification.
+func getPutBucketNotificationURL(endPoint, bucketName string) string {
+ return getGetBucketNotificationURL(endPoint, bucketName)
+}
+
+// return URL for get bucket notification.
+func getGetBucketNotificationURL(endPoint, bucketName string) string {
+ queryValue := url.Values{}
+ queryValue.Set("notification", "")
+ return makeTestTargetURL(endPoint, bucketName, "", queryValue)
+}
+
// returns temp root directory. `
func getTestRoot() (string, error) {
return ioutil.TempDir(os.TempDir(), "api-")
@@ -1324,6 +1337,11 @@ func initTestAPIEndPoints(objLayer ObjectLayer, apiFunctions []string) http.Hand
// Register ListMultipartUploads handler.
case "ListMultipartUploads":
bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "")
+ // Register GetBucketNotification Handler.
+ case "GetBucketNotification":
+ bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "")
+ case "PutBucketNotification":
+ bucket.Methods("PUT").HandlerFunc(api.PutBucketNotificationHandler).Queries("notification", "")
// Register all api endpoints by default.
default:
registerAPIRouter(muxRouter, api)