From 72490bf8dbe363f076ac97365d1e02d60d6ef5bd Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 4 Sep 2017 17:45:30 -0700 Subject: [PATCH] Implement proper reConnect logic for amqp notification target. (#4867) Fixes #4597 --- cmd/notifiers.go | 2 +- cmd/notify-amqp.go | 71 ++++++++++++++++++++++++++++------------- cmd/notify-amqp_test.go | 57 +++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 23 deletions(-) create mode 100644 cmd/notify-amqp_test.go diff --git a/cmd/notifiers.go b/cmd/notifiers.go index db140124e..f0086dd94 100644 --- a/cmd/notifiers.go +++ b/cmd/notifiers.go @@ -78,7 +78,7 @@ func isAMQPQueue(sqsArn arnSQS) bool { errorIf(err, "Unable to connect to amqp service. %#v", amqpL) return false } - defer amqpC.Close() + defer amqpC.conn.Close() return true } diff --git a/cmd/notify-amqp.go b/cmd/notify-amqp.go index d946e4bf8..3334d51f1 100644 --- a/cmd/notify-amqp.go +++ b/cmd/notify-amqp.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2016 Minio, Inc. + * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package cmd import ( "io/ioutil" "net" + "sync" "github.com/Sirupsen/logrus" "github.com/streadway/amqp" @@ -51,23 +52,29 @@ func (a *amqpNotify) Validate() error { return nil } +// amqpConn implements a reconnecting amqp conn extending *amqp.Connection, +// also provides additional protection for such a mutation. type amqpConn struct { + sync.Mutex + conn *amqp.Connection params amqpNotify - *amqp.Connection } // dialAMQP - dials and returns an amqpConnection instance, // for sending notifications. Returns error if amqp logger // is not enabled. -func dialAMQP(amqpL amqpNotify) (ac amqpConn, e error) { +func dialAMQP(amqpL amqpNotify) (*amqpConn, error) { if !amqpL.Enable { - return ac, errNotifyNotEnabled + return nil, errNotifyNotEnabled } conn, err := amqp.Dial(amqpL.URL) if err != nil { - return ac, err + return nil, err } - return amqpConn{Connection: conn, params: amqpL}, nil + return &amqpConn{ + conn: conn, + params: amqpL, + }, nil } func newAMQPNotify(accountID string) (*logrus.Logger, error) { @@ -94,31 +101,51 @@ func newAMQPNotify(accountID string) (*logrus.Logger, error) { return amqpLog, nil } -// Fire is called when an event should be sent to the message broker. -func (q amqpConn) Fire(entry *logrus.Entry) error { - ch, err := q.Connection.Channel() +// Returns true if the error represents a closed +// network error. +func isAMQPClosedNetworkErr(err error) bool { + // Any other error other than connection closed, return. + if neterr, ok := err.(*net.OpError); ok && + neterr.Err.Error() == "use of closed network connection" { + return true + } else if err == amqp.ErrClosed { + return true + } + return false +} + +// Channel is a wrapper implementation of amqp.Connection.Channel() +// which implements transparent reconnection. +func (q *amqpConn) Channel() (*amqp.Channel, error) { + q.Lock() + ch, err := q.conn.Channel() + q.Unlock() if err != nil { - // Any other error other than connection closed, return. - isClosedErr := false - if neterr, ok := err.(*net.OpError); ok && - neterr.Err.Error() == "use of closed network connection" { - isClosedErr = true - } else if err == amqp.ErrClosed { - isClosedErr = true - } - if !isClosedErr { - return err + if !isAMQPClosedNetworkErr(err) { + return nil, err } // Attempt to connect again. var conn *amqp.Connection conn, err = amqp.Dial(q.params.URL) if err != nil { - return err + return nil, err } ch, err = conn.Channel() if err != nil { - return err + return nil, err } + q.Lock() + q.conn = conn + q.Unlock() + } + return ch, nil +} + +// Fire is called when an event should be sent to the message broker. +func (q *amqpConn) Fire(entry *logrus.Entry) error { + ch, err := q.Channel() + if err != nil { + return err } defer ch.Close() @@ -158,7 +185,7 @@ func (q amqpConn) Fire(entry *logrus.Entry) error { } // Levels is available logging levels. -func (q amqpConn) Levels() []logrus.Level { +func (q *amqpConn) Levels() []logrus.Level { return []logrus.Level{ logrus.InfoLevel, } diff --git a/cmd/notify-amqp_test.go b/cmd/notify-amqp_test.go new file mode 100644 index 000000000..d056220a3 --- /dev/null +++ b/cmd/notify-amqp_test.go @@ -0,0 +1,57 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "errors" + "net" + "testing" + + "github.com/streadway/amqp" +) + +// Tests for is closed network error. +func TestIsClosedNetworkErr(t *testing.T) { + testCases := []struct { + err error + success bool + }{ + { + err: amqp.ErrClosed, + success: true, + }, + { + err: &net.OpError{Err: errors.New("use of closed network connection")}, + success: true, + }, + { + err: nil, + success: false, + }, + { + err: errors.New("testing error"), + success: false, + }, + } + + for i, testCase := range testCases { + ok := isAMQPClosedNetworkErr(testCase.err) + if ok != testCase.success { + t.Errorf("Test %d: Expected %t, got %t", i+1, testCase.success, ok) + } + } +}