diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 22cb6f8c7..d5cb435aa 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -22,7 +22,9 @@ import ( "encoding/xml" "fmt" "io" + "net" "net/http" + "syscall" "time" "github.com/gorilla/mux" @@ -213,14 +215,6 @@ func writeNotification(w http.ResponseWriter, notification map[string][]Notifica return err } - // https://github.com/containous/traefik/issues/560 - // https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events - // - // Proxies might buffer the connection to avoid this we - // need the proper MIME type before writing to client. - // This MIME header tells the proxies to avoid buffering - w.Header().Set("Content-Type", "text/event-stream") - // Add additional CRLF characters for client to // differentiate the individual events properly. _, err = w.Write(append(notificationBytes, crlf...)) @@ -232,25 +226,61 @@ func writeNotification(w http.ResponseWriter, notification map[string][]Notifica // CRLF character used for chunked transfer in accordance with HTTP standards. var crlf = []byte("\r\n") -// sendBucketNotification - writes notification back to client on the response writer -// for each notification input, otherwise writes whitespace characters periodically -// to keep the connection active. Each notification messages are terminated by CRLF -// character. Upon any error received on response writer the for loop exits. -func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []NotificationEvent) { - var dummyEvents = map[string][]NotificationEvent{"Records": nil} - // Continuously write to client either timely empty structures - // every 5 seconds, or return back the notifications. +// listenChan A `listenChan` provides a data channel to send event +// notifications on and `doneCh` to signal that events are no longer +// being received. It also sends empty events (whitespace) to keep the +// underlying connection alive. +type listenChan struct { + doneCh chan struct{} + dataCh chan []NotificationEvent +} + +// newListenChan returns a listenChan with properly initialized +// unbuffered channels. +func newListenChan() *listenChan { + return &listenChan{ + doneCh: make(chan struct{}), + dataCh: make(chan []NotificationEvent), + } +} + +// sendNotificationEvent sends notification events on the data channel +// unless doneCh is not closed +func (l *listenChan) sendNotificationEvent(events []NotificationEvent) { + select { + // Returns immediately if receiver has quit. + case <-l.doneCh: + // Blocks until receiver is available. + case l.dataCh <- events: + } +} + +// waitForListener writes event notification OR whitespaces on +// ResponseWriter until client closes connection +func (l *listenChan) waitForListener(w http.ResponseWriter) { + + // Logs errors other than EPIPE and ECONNRESET. + // EPIPE and ECONNRESET indicate that the client stopped + // listening to notification events. + logClientError := func(err error, msg string) { + if oe, ok := err.(*net.OpError); ok && (oe.Err == syscall.EPIPE || oe.Err == + syscall.ECONNRESET) { + errorIf(err, msg) + } + } + + emptyEvent := map[string][]NotificationEvent{"Records": nil} + defer close(l.doneCh) for { select { - case events := <-arnListenerCh: + case events := <-l.dataCh: if err := writeNotification(w, map[string][]NotificationEvent{"Records": events}); err != nil { - errorIf(err, "Unable to write notification to client.") + logClientError(err, "Unable to write notification") return } - case <-time.After(globalSNSConnAlive): // Wait for global conn active seconds. - if err := writeNotification(w, dummyEvents); err != nil { - // FIXME - do not log for all errors. - errorIf(err, "Unable to write notification to client.") + case <-time.After(globalSNSConnAlive): + if err := writeNotification(w, emptyEvent); err != nil { + logClientError(err, "Unable to write empty notification") return } } @@ -346,12 +376,11 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit }, } - // Setup a listening channel that will receive notifications - // from the RPC handler. - nEventCh := make(chan []NotificationEvent) - defer close(nEventCh) + // Setup a listen channel to receive notifications like + // s3:ObjectCreated, s3:ObjectDeleted etc. + nListenCh := newListenChan() // Add channel for listener events - if err = globalEventNotifier.AddListenerChan(accountARN, nEventCh); err != nil { + if err = globalEventNotifier.AddListenerChan(accountARN, nListenCh); err != nil { errorIf(err, "Error adding a listener!") writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -361,8 +390,8 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit defer globalEventNotifier.RemoveListenerChan(accountARN) // Update topic config to bucket config and persist - as soon - // as this call compelets, events may start appearing in - // nEventCh + // as this call completes, events may start appearing in + // nListenCh lc := listenerConfig{ TopicConfig: *topicCfg, TargetServer: targetServer, @@ -378,8 +407,16 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit // Add all common headers. setCommonHeaders(w) - // Start sending bucket notifications. - sendBucketNotification(w, nEventCh) + // https://github.com/containous/traefik/issues/560 + // https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events + // + // Proxies might buffer the connection to avoid this we + // need the proper MIME type before writing to client. + // This MIME header tells the proxies to avoid buffering + w.Header().Set("Content-Type", "text/event-stream") + + // Start writing bucket notifications to ResponseWriter. + nListenCh.waitForListener(w) } // AddBucketListenerConfig - Updates on disk state of listeners, and diff --git a/cmd/bucket-notification-handlers_test.go b/cmd/bucket-notification-handlers_test.go index 87cfd2205..be6381f48 100644 --- a/cmd/bucket-notification-handlers_test.go +++ b/cmd/bucket-notification-handlers_test.go @@ -21,14 +21,18 @@ import ( "bytes" "encoding/json" "encoding/xml" + "errors" "io" "io/ioutil" "net/http" "net/http/httptest" "os" "reflect" + "sync" "testing" + "time" + "github.com/minio/minio/pkg/auth" ) @@ -113,7 +117,47 @@ func TestWriteNotification(t *testing.T) { } } -func TestSendBucketNotification(t *testing.T) { +// testResponseWriter implements `http.ResponseWriter` that buffers +// response body in a `bytes.Buffer` and returns error after `failCount` +// calls to `Write` method +type testResponseWriter struct { + mu sync.Mutex + failCount int + buf *bytes.Buffer + m http.Header +} + +func newTestResponseWriter(failAt int) *testResponseWriter { + return &testResponseWriter{ + buf: new(bytes.Buffer), + m: make(http.Header), + failCount: failAt, + } +} + +func (trw *testResponseWriter) Flush() { +} + +func (trw *testResponseWriter) Write(p []byte) (int, error) { + trw.mu.Lock() + defer trw.mu.Unlock() + + if trw.failCount == 0 { + return 0, errors.New("Custom error") + } + trw.failCount-- + + return trw.buf.Write(p) +} + +func (trw *testResponseWriter) Header() http.Header { + return trw.m +} + +func (trw *testResponseWriter) WriteHeader(i int) { +} + +func TestListenChan(t *testing.T) { // Initialize a new test config. root, err := newTestConfig(globalMinioDefaultRegion) if err != nil { @@ -121,18 +165,8 @@ func TestSendBucketNotification(t *testing.T) { } defer os.RemoveAll(root) - eventCh := make(chan []NotificationEvent) - - // Create a Pipe with FlushWriter on the write-side and bufio.Scanner - // on the reader-side to receive notification over the listen channel in a - // synchronized manner. - pr, pw := io.Pipe() - fw := newFlushWriter(pw) - scanner := bufio.NewScanner(pr) - // Start a go-routine to wait for notification events. - go func(listenerCh <-chan []NotificationEvent) { - sendBucketNotification(fw, listenerCh) - }(eventCh) + // Create a listen channel to manage notifications + nListenCh := newListenChan() // Construct notification events to be passed on the events channel. var events []NotificationEvent @@ -142,37 +176,68 @@ func TestSendBucketNotification(t *testing.T) { ObjectCreatedCopy, ObjectCreatedCompleteMultipartUpload, } + for _, evType := range evTypes { events = append(events, newNotificationEvent(eventData{ Type: evType, })) } - // Send notification events to the channel on which sendBucketNotification - // is waiting on. - eventCh <- events - // Read from the pipe connected to the ResponseWriter. - scanner.Scan() - notificationBytes := scanner.Bytes() + // Send notification events one-by-one + go func() { + for _, event := range events { + nListenCh.sendNotificationEvent([]NotificationEvent{event}) + } + }() - // Close the read-end and send an empty notification event on the channel - // to signal sendBucketNotification to terminate. - pr.Close() - eventCh <- []NotificationEvent{} - close(eventCh) + // Create a http.ResponseWriter that fails after len(events) + // number of times + trw := newTestResponseWriter(len(events)) - // Checking if the notification are the same as those sent over the channel. - var notifications map[string][]NotificationEvent - err = json.Unmarshal(notificationBytes, ¬ifications) - if err != nil { - t.Fatal("Failed to Unmarshal notification") - } - records := notifications["Records"] - for i, rec := range records { - if rec.EventName == evTypes[i].String() { - continue + // Wait for all (4) notification events to be received + nListenCh.waitForListener(trw) + + // Used to read JSON-formatted event stream line-by-line + scanner := bufio.NewScanner(trw.buf) + var records map[string][]NotificationEvent + for i := 0; scanner.Scan(); i++ { + err = json.Unmarshal(scanner.Bytes(), &records) + if err != nil { + t.Fatalf("Failed to unmarshal json %v", err) + } + + nEvent := records["Records"][0] + if nEvent.EventName != evTypes[i].String() { + t.Errorf("notification event name mismatch, expected %s but got %s", evTypes[i], nEvent.EventName) } - t.Errorf("Failed to receive %d event %s", i, evTypes[i].String()) + } +} + +func TestSendNotificationEvent(t *testing.T) { + // This test verifies that sendNotificationEvent function + // returns once listenChan.doneCh is closed + + l := newListenChan() + testCh := make(chan struct{}) + timeout := 5 * time.Second + + go func() { + // Send one empty notification event on listenChan + events := []NotificationEvent{NotificationEvent{}} + l.sendNotificationEvent(events) + testCh <- struct{}{} + }() + + // close l.doneCh to signal client exiting from + // ListenBucketNotification API call + close(l.doneCh) + + select { + case <-time.After(timeout): + t.Fatalf("sendNotificationEvent didn't return after %v seconds", timeout) + case <-testCh: + // If we reach this case, sendNotificationEvent + // returned on closing l.doneCh } } diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 8ef43fe87..0bf9d57ce 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -60,7 +60,7 @@ type internalNotifier struct { // Connected listeners is a map of listener ARNs to channels // on which the ListenBucket API handler go routine is waiting // for events to send to a client. - connectedListeners map[string]chan []NotificationEvent + connectedListeners map[string]*listenChan rwMutex *sync.RWMutex } @@ -206,7 +206,7 @@ func (en eventNotifier) GetInternalTarget(arn string) *listenerLogger { } // Set a new sns target for an input sns ARN. -func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh chan []NotificationEvent) error { +func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh *listenChan) error { if listenerCh == nil { return errInvalidArgument } @@ -229,9 +229,9 @@ func (en *eventNotifier) SendListenerEvent(arn string, event []NotificationEvent en.internal.rwMutex.Lock() defer en.internal.rwMutex.Unlock() - ch, ok := en.internal.connectedListeners[arn] + listenChan, ok := en.internal.connectedListeners[arn] if ok { - ch <- event + listenChan.sendNotificationEvent(event) } // If the channel is not present we ignore the event. return nil @@ -833,7 +833,7 @@ func initEventNotifier(objAPI ObjectLayer) error { rwMutex: &sync.RWMutex{}, targets: listenTargets, listenerConfigs: lConfigs, - connectedListeners: make(map[string]chan []NotificationEvent), + connectedListeners: make(map[string]*listenChan), }, } diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index 49d075410..205e3629a 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -462,11 +462,9 @@ func TestListenBucketNotification(t *testing.T) { } // Create a new notification event channel. - nEventCh := make(chan []NotificationEvent) - // Close the listener channel. - defer close(nEventCh) + nListenCh := newListenChan() // Add events channel for listener. - if err := globalEventNotifier.AddListenerChan(listenARN, nEventCh); err != nil { + if err := globalEventNotifier.AddListenerChan(listenARN, nListenCh); err != nil { t.Fatalf("Test Setup error: %v", err) } // Remove listen channel after the writer has closed or the @@ -489,7 +487,7 @@ func TestListenBucketNotification(t *testing.T) { // Wait for the event notification here, if nothing is received within 30 seconds, // test error will be fired select { - case n := <-nEventCh: + case n := <-nListenCh.dataCh: // Check that received event if len(n) == 0 { t.Fatal("Unexpected error occurred") @@ -497,9 +495,7 @@ func TestListenBucketNotification(t *testing.T) { if n[0].S3.Object.Key != objectName { t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName) } - break case <-time.After(3 * time.Second): - break } } diff --git a/cmd/server_test.go b/cmd/server_test.go index e679f2b0a..533429600 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -522,32 +522,6 @@ func (s *TestSuiteCommon) TestListenBucketNotificationHandler(c *check) { if s.signer == signerV4 { verifyError(c, response, "XAmzContentSHA256Mismatch", "The provided 'x-amz-content-sha256' header does not match what was computed.", http.StatusBadRequest) } - - // Change global value from 5 second to 100millisecond. - globalSNSConnAlive = 100 * time.Millisecond - req, err = newTestSignedRequest("GET", - getListenBucketNotificationURL(s.endPoint, bucketName, - []string{}, []string{}, validEvents), 0, nil, s.accessKey, s.secretKey, s.signer) - c.Assert(err, nil) - client = http.Client{Transport: s.transport} - // execute the request. - response, err = client.Do(req) - c.Assert(err, nil) - c.Assert(response.StatusCode, http.StatusOK) - // FIXME: uncomment this in future when we have a code to read notifications from. - // go func() { - // buf := bytes.NewReader(tooByte) - // rreq, rerr := newTestSignedRequest("GET", - // getPutObjectURL(s.endPoint, bucketName, "myobject/1"), - // int64(buf.Len()), buf, s.accessKey, s.secretKey, s.signer) - // c.Assert(rerr, IsNil) - // client = http.Client{Transport: s.transport} - // // execute the request. - // resp, rerr := client.Do(rreq) - // c.Assert(rerr, IsNil) - // c.Assert(resp.StatusCode, http.StatusOK) - // }() - response.Body.Close() // FIXME. Find a way to read from the returned body. } // Test deletes multple objects and verifies server resonse.