Fix ListenBucketNotification deadlock (#5028)

Previously ListenBucketNotificationHandler could deadlock with
PutObjectHandler's eventNotify call when a client closes its
connection. This change removes the cyclic dependency between the
channel and map of ARN to channels by using a separate done channel to
signal that the client has quit.
master
Krishnan Parthasarathi 7 years ago committed by Dee Koder
parent 5a2bdf6959
commit 67f66c40c1
  1. 99
      cmd/bucket-notification-handlers.go
  2. 135
      cmd/bucket-notification-handlers_test.go
  3. 10
      cmd/event-notifier.go
  4. 10
      cmd/event-notifier_test.go
  5. 26
      cmd/server_test.go

@ -22,7 +22,9 @@ import (
"encoding/xml"
"fmt"
"io"
"net"
"net/http"
"syscall"
"time"
"github.com/gorilla/mux"
@ -213,14 +215,6 @@ func writeNotification(w http.ResponseWriter, notification map[string][]Notifica
return err
}
// https://github.com/containous/traefik/issues/560
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
//
// Proxies might buffer the connection to avoid this we
// need the proper MIME type before writing to client.
// This MIME header tells the proxies to avoid buffering
w.Header().Set("Content-Type", "text/event-stream")
// Add additional CRLF characters for client to
// differentiate the individual events properly.
_, err = w.Write(append(notificationBytes, crlf...))
@ -232,25 +226,61 @@ func writeNotification(w http.ResponseWriter, notification map[string][]Notifica
// CRLF character used for chunked transfer in accordance with HTTP standards.
var crlf = []byte("\r\n")
// sendBucketNotification - writes notification back to client on the response writer
// for each notification input, otherwise writes whitespace characters periodically
// to keep the connection active. Each notification messages are terminated by CRLF
// character. Upon any error received on response writer the for loop exits.
func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []NotificationEvent) {
var dummyEvents = map[string][]NotificationEvent{"Records": nil}
// Continuously write to client either timely empty structures
// every 5 seconds, or return back the notifications.
// listenChan A `listenChan` provides a data channel to send event
// notifications on and `doneCh` to signal that events are no longer
// being received. It also sends empty events (whitespace) to keep the
// underlying connection alive.
type listenChan struct {
doneCh chan struct{}
dataCh chan []NotificationEvent
}
// newListenChan returns a listenChan with properly initialized
// unbuffered channels.
func newListenChan() *listenChan {
return &listenChan{
doneCh: make(chan struct{}),
dataCh: make(chan []NotificationEvent),
}
}
// sendNotificationEvent sends notification events on the data channel
// unless doneCh is not closed
func (l *listenChan) sendNotificationEvent(events []NotificationEvent) {
select {
// Returns immediately if receiver has quit.
case <-l.doneCh:
// Blocks until receiver is available.
case l.dataCh <- events:
}
}
// waitForListener writes event notification OR whitespaces on
// ResponseWriter until client closes connection
func (l *listenChan) waitForListener(w http.ResponseWriter) {
// Logs errors other than EPIPE and ECONNRESET.
// EPIPE and ECONNRESET indicate that the client stopped
// listening to notification events.
logClientError := func(err error, msg string) {
if oe, ok := err.(*net.OpError); ok && (oe.Err == syscall.EPIPE || oe.Err ==
syscall.ECONNRESET) {
errorIf(err, msg)
}
}
emptyEvent := map[string][]NotificationEvent{"Records": nil}
defer close(l.doneCh)
for {
select {
case events := <-arnListenerCh:
case events := <-l.dataCh:
if err := writeNotification(w, map[string][]NotificationEvent{"Records": events}); err != nil {
errorIf(err, "Unable to write notification to client.")
logClientError(err, "Unable to write notification")
return
}
case <-time.After(globalSNSConnAlive): // Wait for global conn active seconds.
if err := writeNotification(w, dummyEvents); err != nil {
// FIXME - do not log for all errors.
errorIf(err, "Unable to write notification to client.")
case <-time.After(globalSNSConnAlive):
if err := writeNotification(w, emptyEvent); err != nil {
logClientError(err, "Unable to write empty notification")
return
}
}
@ -346,12 +376,11 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
},
}
// Setup a listening channel that will receive notifications
// from the RPC handler.
nEventCh := make(chan []NotificationEvent)
defer close(nEventCh)
// Setup a listen channel to receive notifications like
// s3:ObjectCreated, s3:ObjectDeleted etc.
nListenCh := newListenChan()
// Add channel for listener events
if err = globalEventNotifier.AddListenerChan(accountARN, nEventCh); err != nil {
if err = globalEventNotifier.AddListenerChan(accountARN, nListenCh); err != nil {
errorIf(err, "Error adding a listener!")
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@ -361,8 +390,8 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
defer globalEventNotifier.RemoveListenerChan(accountARN)
// Update topic config to bucket config and persist - as soon
// as this call compelets, events may start appearing in
// nEventCh
// as this call completes, events may start appearing in
// nListenCh
lc := listenerConfig{
TopicConfig: *topicCfg,
TargetServer: targetServer,
@ -378,8 +407,16 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
// Add all common headers.
setCommonHeaders(w)
// Start sending bucket notifications.
sendBucketNotification(w, nEventCh)
// https://github.com/containous/traefik/issues/560
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
//
// Proxies might buffer the connection to avoid this we
// need the proper MIME type before writing to client.
// This MIME header tells the proxies to avoid buffering
w.Header().Set("Content-Type", "text/event-stream")
// Start writing bucket notifications to ResponseWriter.
nListenCh.waitForListener(w)
}
// AddBucketListenerConfig - Updates on disk state of listeners, and

@ -21,14 +21,18 @@ import (
"bytes"
"encoding/json"
"encoding/xml"
"errors"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/minio/minio/pkg/auth"
)
@ -113,7 +117,47 @@ func TestWriteNotification(t *testing.T) {
}
}
func TestSendBucketNotification(t *testing.T) {
// testResponseWriter implements `http.ResponseWriter` that buffers
// response body in a `bytes.Buffer` and returns error after `failCount`
// calls to `Write` method
type testResponseWriter struct {
mu sync.Mutex
failCount int
buf *bytes.Buffer
m http.Header
}
func newTestResponseWriter(failAt int) *testResponseWriter {
return &testResponseWriter{
buf: new(bytes.Buffer),
m: make(http.Header),
failCount: failAt,
}
}
func (trw *testResponseWriter) Flush() {
}
func (trw *testResponseWriter) Write(p []byte) (int, error) {
trw.mu.Lock()
defer trw.mu.Unlock()
if trw.failCount == 0 {
return 0, errors.New("Custom error")
}
trw.failCount--
return trw.buf.Write(p)
}
func (trw *testResponseWriter) Header() http.Header {
return trw.m
}
func (trw *testResponseWriter) WriteHeader(i int) {
}
func TestListenChan(t *testing.T) {
// Initialize a new test config.
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
@ -121,18 +165,8 @@ func TestSendBucketNotification(t *testing.T) {
}
defer os.RemoveAll(root)
eventCh := make(chan []NotificationEvent)
// Create a Pipe with FlushWriter on the write-side and bufio.Scanner
// on the reader-side to receive notification over the listen channel in a
// synchronized manner.
pr, pw := io.Pipe()
fw := newFlushWriter(pw)
scanner := bufio.NewScanner(pr)
// Start a go-routine to wait for notification events.
go func(listenerCh <-chan []NotificationEvent) {
sendBucketNotification(fw, listenerCh)
}(eventCh)
// Create a listen channel to manage notifications
nListenCh := newListenChan()
// Construct notification events to be passed on the events channel.
var events []NotificationEvent
@ -142,37 +176,68 @@ func TestSendBucketNotification(t *testing.T) {
ObjectCreatedCopy,
ObjectCreatedCompleteMultipartUpload,
}
for _, evType := range evTypes {
events = append(events, newNotificationEvent(eventData{
Type: evType,
}))
}
// Send notification events to the channel on which sendBucketNotification
// is waiting on.
eventCh <- events
// Read from the pipe connected to the ResponseWriter.
scanner.Scan()
notificationBytes := scanner.Bytes()
// Send notification events one-by-one
go func() {
for _, event := range events {
nListenCh.sendNotificationEvent([]NotificationEvent{event})
}
}()
// Close the read-end and send an empty notification event on the channel
// to signal sendBucketNotification to terminate.
pr.Close()
eventCh <- []NotificationEvent{}
close(eventCh)
// Create a http.ResponseWriter that fails after len(events)
// number of times
trw := newTestResponseWriter(len(events))
// Checking if the notification are the same as those sent over the channel.
var notifications map[string][]NotificationEvent
err = json.Unmarshal(notificationBytes, &notifications)
if err != nil {
t.Fatal("Failed to Unmarshal notification")
}
records := notifications["Records"]
for i, rec := range records {
if rec.EventName == evTypes[i].String() {
continue
// Wait for all (4) notification events to be received
nListenCh.waitForListener(trw)
// Used to read JSON-formatted event stream line-by-line
scanner := bufio.NewScanner(trw.buf)
var records map[string][]NotificationEvent
for i := 0; scanner.Scan(); i++ {
err = json.Unmarshal(scanner.Bytes(), &records)
if err != nil {
t.Fatalf("Failed to unmarshal json %v", err)
}
nEvent := records["Records"][0]
if nEvent.EventName != evTypes[i].String() {
t.Errorf("notification event name mismatch, expected %s but got %s", evTypes[i], nEvent.EventName)
}
t.Errorf("Failed to receive %d event %s", i, evTypes[i].String())
}
}
func TestSendNotificationEvent(t *testing.T) {
// This test verifies that sendNotificationEvent function
// returns once listenChan.doneCh is closed
l := newListenChan()
testCh := make(chan struct{})
timeout := 5 * time.Second
go func() {
// Send one empty notification event on listenChan
events := []NotificationEvent{NotificationEvent{}}
l.sendNotificationEvent(events)
testCh <- struct{}{}
}()
// close l.doneCh to signal client exiting from
// ListenBucketNotification API call
close(l.doneCh)
select {
case <-time.After(timeout):
t.Fatalf("sendNotificationEvent didn't return after %v seconds", timeout)
case <-testCh:
// If we reach this case, sendNotificationEvent
// returned on closing l.doneCh
}
}

@ -60,7 +60,7 @@ type internalNotifier struct {
// Connected listeners is a map of listener ARNs to channels
// on which the ListenBucket API handler go routine is waiting
// for events to send to a client.
connectedListeners map[string]chan []NotificationEvent
connectedListeners map[string]*listenChan
rwMutex *sync.RWMutex
}
@ -206,7 +206,7 @@ func (en eventNotifier) GetInternalTarget(arn string) *listenerLogger {
}
// Set a new sns target for an input sns ARN.
func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh chan []NotificationEvent) error {
func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh *listenChan) error {
if listenerCh == nil {
return errInvalidArgument
}
@ -229,9 +229,9 @@ func (en *eventNotifier) SendListenerEvent(arn string, event []NotificationEvent
en.internal.rwMutex.Lock()
defer en.internal.rwMutex.Unlock()
ch, ok := en.internal.connectedListeners[arn]
listenChan, ok := en.internal.connectedListeners[arn]
if ok {
ch <- event
listenChan.sendNotificationEvent(event)
}
// If the channel is not present we ignore the event.
return nil
@ -833,7 +833,7 @@ func initEventNotifier(objAPI ObjectLayer) error {
rwMutex: &sync.RWMutex{},
targets: listenTargets,
listenerConfigs: lConfigs,
connectedListeners: make(map[string]chan []NotificationEvent),
connectedListeners: make(map[string]*listenChan),
},
}

@ -462,11 +462,9 @@ func TestListenBucketNotification(t *testing.T) {
}
// Create a new notification event channel.
nEventCh := make(chan []NotificationEvent)
// Close the listener channel.
defer close(nEventCh)
nListenCh := newListenChan()
// Add events channel for listener.
if err := globalEventNotifier.AddListenerChan(listenARN, nEventCh); err != nil {
if err := globalEventNotifier.AddListenerChan(listenARN, nListenCh); err != nil {
t.Fatalf("Test Setup error: %v", err)
}
// Remove listen channel after the writer has closed or the
@ -489,7 +487,7 @@ func TestListenBucketNotification(t *testing.T) {
// Wait for the event notification here, if nothing is received within 30 seconds,
// test error will be fired
select {
case n := <-nEventCh:
case n := <-nListenCh.dataCh:
// Check that received event
if len(n) == 0 {
t.Fatal("Unexpected error occurred")
@ -497,9 +495,7 @@ func TestListenBucketNotification(t *testing.T) {
if n[0].S3.Object.Key != objectName {
t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName)
}
break
case <-time.After(3 * time.Second):
break
}
}

@ -522,32 +522,6 @@ func (s *TestSuiteCommon) TestListenBucketNotificationHandler(c *check) {
if s.signer == signerV4 {
verifyError(c, response, "XAmzContentSHA256Mismatch", "The provided 'x-amz-content-sha256' header does not match what was computed.", http.StatusBadRequest)
}
// Change global value from 5 second to 100millisecond.
globalSNSConnAlive = 100 * time.Millisecond
req, err = newTestSignedRequest("GET",
getListenBucketNotificationURL(s.endPoint, bucketName,
[]string{}, []string{}, validEvents), 0, nil, s.accessKey, s.secretKey, s.signer)
c.Assert(err, nil)
client = http.Client{Transport: s.transport}
// execute the request.
response, err = client.Do(req)
c.Assert(err, nil)
c.Assert(response.StatusCode, http.StatusOK)
// FIXME: uncomment this in future when we have a code to read notifications from.
// go func() {
// buf := bytes.NewReader(tooByte)
// rreq, rerr := newTestSignedRequest("GET",
// getPutObjectURL(s.endPoint, bucketName, "myobject/1"),
// int64(buf.Len()), buf, s.accessKey, s.secretKey, s.signer)
// c.Assert(rerr, IsNil)
// client = http.Client{Transport: s.transport}
// // execute the request.
// resp, rerr := client.Do(rreq)
// c.Assert(rerr, IsNil)
// c.Assert(resp.StatusCode, http.StatusOK)
// }()
response.Body.Close() // FIXME. Find a way to read from the returned body.
}
// Test deletes multple objects and verifies server resonse.

Loading…
Cancel
Save