diff --git a/.travis.yml b/.travis.yml index b1fc8c314..e3ad81667 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,4 +23,4 @@ after_success: - bash <(curl -s https://codecov.io/bash) go: -- 1.7.1 +- 1.7.3 diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 898f042b7..4fdb29bbb 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -210,8 +210,6 @@ var crlf = []byte("\r\n") // for each notification input, otherwise writes whitespace characters periodically // to keep the connection active. Each notification messages are terminated by CRLF // character. Upon any error received on response writer the for loop exits. -// -// TODO - do not log for all errors. func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []NotificationEvent) { var dummyEvents = map[string][]NotificationEvent{"Records": nil} // Continuously write to client either timely empty structures @@ -223,8 +221,9 @@ func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []Notifi errorIf(err, "Unable to write notification to client.") return } - case <-time.After(5 * time.Second): + case <-time.After(globalSNSConnAlive): // Wait for global conn active seconds. if err := writeNotification(w, dummyEvents); err != nil { + // FIXME - do not log for all errors. errorIf(err, "Unable to write notification to client.") return } diff --git a/cmd/bucket-notification-handlers_test.go b/cmd/bucket-notification-handlers_test.go index 2d251d6f7..639a0c156 100644 --- a/cmd/bucket-notification-handlers_test.go +++ b/cmd/bucket-notification-handlers_test.go @@ -7,15 +7,10 @@ import ( "encoding/xml" "io" "io/ioutil" - "net" "net/http" "net/http/httptest" "reflect" - "strconv" - "sync" "testing" - - "github.com/gorilla/mux" ) // Implement a dummy flush writer. @@ -235,112 +230,24 @@ func testGetBucketNotificationHandler(obj ObjectLayer, instanceType, bucketName } } -func TestListenBucketNotificationHandler(t *testing.T) { - ExecObjectLayerAPITest(t, testListenBucketNotificationHandler, []string{ +func TestListenBucketNotificationNilHandler(t *testing.T) { + ExecObjectLayerAPITest(t, testListenBucketNotificationNilHandler, []string{ "ListenBucketNotification", "PutObject", }) } -func testListenBucketNotificationHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler, +func testListenBucketNotificationNilHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler, credentials credential, t *testing.T) { - mux, ok := apiRouter.(*mux.Router) - if !ok { - t.Fatal("Invalid mux router found") - } - registerS3PeerRPCRouter(mux) - - testServer := httptest.NewServer(apiRouter) - defer testServer.Close() - - // setup port and minio addr - _, portStr, err := net.SplitHostPort(testServer.Listener.Addr().String()) - if err != nil { - t.Fatalf("Initialization error: %v", err) - } - globalMinioPort, err = strconv.Atoi(portStr) - if err != nil { - t.Fatalf("Initialization error: %v", err) - } - globalMinioAddr = testServer.Listener.Addr().String() - // initialize the peer client(s) - initGlobalS3Peers([]storageEndPoint{}) - - invalidBucket := "Invalid\\Bucket" - noNotificationBucket := "nonotificationbucket" // get random bucket name. randBucket := getRandomBucketName() - for _, bucket := range []string{randBucket, noNotificationBucket} { - err = obj.MakeBucket(bucket) - if err != nil { - // failed to create bucket, abort. - t.Fatalf("Failed to create bucket %s %s : %s", bucket, - instanceType, err) - } - } - - var testRec *httptest.ResponseRecorder - var testReq *http.Request - var tErr error - - signatureMismatchError := getAPIError(ErrContentSHA256Mismatch) - tooBigPrefix := string(bytes.Repeat([]byte("a"), 1025)) - validEvents := []string{"s3:ObjectCreated:*", "s3:ObjectRemoved:*"} - invalidEvents := []string{"invalidEvent"} - testCases := []struct { - bucketName string - prefixes []string - suffixes []string - events []string - expectedHTTPCode int - expectedAPIError string - }{ - {randBucket, []string{}, []string{}, invalidEvents, signatureMismatchError.HTTPStatusCode, "InvalidArgument"}, - {randBucket, []string{tooBigPrefix}, []string{}, validEvents, http.StatusBadRequest, "InvalidArgument"}, - {invalidBucket, []string{}, []string{}, validEvents, http.StatusBadRequest, "InvalidBucketName"}, - {randBucket, []string{}, []string{}, validEvents, signatureMismatchError.HTTPStatusCode, signatureMismatchError.Code}, - } - - for i, test := range testCases { - testRec = httptest.NewRecorder() - testReq, tErr = newTestSignedRequestV4("GET", - getListenBucketNotificationURL("", test.bucketName, test.prefixes, test.suffixes, test.events), - 0, nil, credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("%s: Failed to create HTTP testRequest for ListenBucketNotification: %v", instanceType, tErr) - } - // Set X-Amz-Content-SHA256 in header different from what was used to calculate Signature. - if test.expectedAPIError == "XAmzContentSHA256Mismatch" { - // Triggering a authentication failure. - testReq.Header.Set("x-amz-content-sha256", "somethingElse") - } - apiRouter.ServeHTTP(testRec, testReq) - rspBytes, rErr := ioutil.ReadAll(testRec.Body) - if rErr != nil { - t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr) - } - var errXML APIErrorResponse - xErr := xml.Unmarshal(rspBytes, &errXML) - if xErr != nil { - t.Errorf("Test %d: %s: Failed to unmarshal error XML: %v", i+1, instanceType, xErr) - } - if errXML.Code != test.expectedAPIError { - t.Errorf("Test %d: %s: Expected error code %s but received %s: %v", i+1, - instanceType, test.expectedAPIError, errXML.Code, err) - - } - if testRec.Code != test.expectedHTTPCode { - t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v", - i+1, instanceType, test.expectedHTTPCode, testRec.Code, err) - } - } // Nil Object layer nilAPIRouter := initTestAPIEndPoints(nil, []string{ "ListenBucketNotification", }) - testRec = httptest.NewRecorder() - testReq, tErr = newTestSignedRequestV4("GET", + testRec := httptest.NewRecorder() + testReq, tErr := newTestSignedRequestV4("GET", getListenBucketNotificationURL("", randBucket, []string{}, []string{"*.jpg"}, []string{ "s3:ObjectCreated:*", @@ -351,58 +258,8 @@ func testListenBucketNotificationHandler(obj ObjectLayer, instanceType, bucketNa } nilAPIRouter.ServeHTTP(testRec, testReq) if testRec.Code != http.StatusServiceUnavailable { - t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v", - 1, instanceType, http.StatusServiceUnavailable, testRec.Code, err) - } - - testRec = httptest.NewRecorder() - testReq, tErr = newTestSignedRequestV4("GET", - getListenBucketNotificationURL("", randBucket, []string{}, []string{}, validEvents), - 0, nil, credentials.AccessKeyID, credentials.SecretAccessKey) - if tErr != nil { - t.Fatalf("%s: Failed to create HTTP testRequest for ListenBucketNotification: %v", instanceType, tErr) - } - - globalObjLayerMutex.Lock() - globalObjectAPI = obj - globalObjLayerMutex.Unlock() - - go apiRouter.ServeHTTP(testRec, testReq) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - rec := httptest.NewRecorder() - buf := bytes.NewReader([]byte("hello, world")) - req, rerr := newTestSignedRequestV4("PUT", getPutObjectURL("", randBucket, "jeezus"), - int64(buf.Len()), buf, credentials.AccessKeyID, credentials.SecretAccessKey) - if rerr != nil { - t.Fatalf("%s: Failed to create HTTP testRequest for ListenBucketNotification: %v", instanceType, rerr) - } - apiRouter.ServeHTTP(rec, req) - if rec.Code != http.StatusOK { - t.Fatalf("Unexpected http reply %d should be %d", rec.Code, http.StatusOK) - } - }() - wg.Wait() - - bio := bufio.NewScanner(testRec.Body) - // Unmarshal each line, returns marshalled values. - for bio.Scan() { - var notificationInfo struct { - Records []NotificationEvent - } - if err = json.Unmarshal(bio.Bytes(), ¬ificationInfo); err != nil { - t.Fatalf("%s: Unable to marshal: %v", instanceType, err) - } - // Send notifications on channel only if there are events received. - if len(notificationInfo.Records) == 0 { - t.Fatalf("%s: Expected notification events, received none", instanceType) - } - } - // Look for any underlying errors. - if err = bio.Err(); err != nil { - t.Fatalf("%s: Server connection closed prematurely %s", instanceType, err) + t.Fatalf("Test 1: %s: expected HTTP code %d, but received %d: %v", + instanceType, http.StatusServiceUnavailable, testRec.Code, tErr) } } diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 41658f6bc..d6636eb6f 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -225,8 +225,7 @@ func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerC return nil } -func eventNotifyForBucketNotifications(eventType, objectName, bucketName string, - nEvent []NotificationEvent) { +func eventNotifyForBucketNotifications(eventType, objectName, bucketName string, nEvent []NotificationEvent) { nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) if nConfig == nil { return @@ -294,12 +293,10 @@ func eventNotify(event eventData) { notificationEvent := []NotificationEvent{newNotificationEvent(event)} // Notify external targets. - eventNotifyForBucketNotifications(eventType, objectName, event.Bucket, - notificationEvent) + eventNotifyForBucketNotifications(eventType, objectName, event.Bucket, notificationEvent) // Notify internal targets. - eventNotifyForBucketListeners(eventType, objectName, event.Bucket, - notificationEvent) + eventNotifyForBucketListeners(eventType, objectName, event.Bucket, notificationEvent) } // loads notification config if any for a given bucket, returns diff --git a/cmd/globals.go b/cmd/globals.go index 46f0929dc..4fff63c48 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -74,6 +74,10 @@ var ( var ( // The maximum allowed difference between the request generation time and the server processing time globalMaxSkewTime = 15 * time.Minute + + // Keeps the connection active by waiting for following amount of time. + // Primarily used in ListenBucketNotification. + globalSNSConnAlive = 5 * time.Second ) // global colors. diff --git a/cmd/server_test.go b/cmd/server_test.go index d4b149fe7..67bb54aa2 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -246,6 +246,99 @@ func (s *TestSuiteCommon) TestDeleteBucketNotEmpty(c *C) { } +func (s *TestSuiteCommon) TestListenBucketNotificationHandler(c *C) { + // generate a random bucket name. + bucketName := getRandomBucketName() + // HTTP request to create the bucket. + req, err := newTestSignedRequestV4("PUT", getMakeBucketURL(s.endPoint, bucketName), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client := http.Client{} + // execute the request. + response, err := client.Do(req) + c.Assert(err, IsNil) + // assert the http response status code. + c.Assert(response.StatusCode, Equals, http.StatusOK) + + invalidBucket := "Invalid\\Bucket" + tooByte := bytes.Repeat([]byte("a"), 1025) + tooBigPrefix := string(tooByte) + validEvents := []string{"s3:ObjectCreated:*", "s3:ObjectRemoved:*"} + invalidEvents := []string{"invalidEvent"} + + req, err = newTestSignedRequestV4("GET", + getListenBucketNotificationURL(s.endPoint, invalidBucket, []string{}, []string{}, []string{}), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the request. + response, err = client.Do(req) + c.Assert(err, IsNil) + verifyError(c, response, "InvalidBucketName", "The specified bucket is not valid.", http.StatusBadRequest) + + req, err = newTestSignedRequestV4("GET", + getListenBucketNotificationURL(s.endPoint, bucketName, []string{}, []string{}, invalidEvents), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the request. + response, err = client.Do(req) + c.Assert(err, IsNil) + verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest) + + req, err = newTestSignedRequestV4("GET", + getListenBucketNotificationURL(s.endPoint, bucketName, []string{tooBigPrefix}, []string{}, validEvents), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the request. + response, err = client.Do(req) + c.Assert(err, IsNil) + verifyError(c, response, "InvalidArgument", "Size of filter rule value cannot exceed 1024 bytes in UTF-8 representation", http.StatusBadRequest) + + req, err = newTestSignedRequestV4("GET", + getListenBucketNotificationURL(s.endPoint, bucketName, []string{}, []string{}, validEvents), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + req.Header.Set("x-amz-content-sha256", "somethingElse") + client = http.Client{} + // execute the request. + response, err = client.Do(req) + c.Assert(err, IsNil) + verifyError(c, response, "XAmzContentSHA256Mismatch", "The provided 'x-amz-content-sha256' header does not match what was computed.", http.StatusBadRequest) + + // Change global value from 5 second to 100millisecond. + globalSNSConnAlive = 100 * time.Millisecond + req, err = newTestSignedRequestV4("GET", + getListenBucketNotificationURL(s.endPoint, bucketName, + []string{}, []string{}, validEvents), 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + client = http.Client{} + // execute the request. + response, err = client.Do(req) + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + // FIXME: uncomment this in future when we have a code to read notifications from. + // go func() { + // buf := bytes.NewReader(tooByte) + // rreq, rerr := newTestSignedRequestV4("GET", + // getPutObjectURL(s.endPoint, bucketName, "myobject/1"), + // int64(buf.Len()), buf, s.accessKey, s.secretKey) + // c.Assert(rerr, IsNil) + // client = http.Client{} + // // execute the request. + // resp, rerr := client.Do(rreq) + // c.Assert(rerr, IsNil) + // c.Assert(resp.StatusCode, Equals, http.StatusOK) + // }() + response.Body.Close() // FIXME. Find a way to read from the returned body. +} + // Test deletes multple objects and verifies server resonse. func (s *TestSuiteCommon) TestDeleteMultipleObjects(c *C) { // generate a random bucket name.