/* * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "io/ioutil" "github.com/Sirupsen/logrus" "github.com/nats-io/go-nats-streaming" "github.com/nats-io/nats" ) // natsNotifyStreaming contains specific options related to connection // to a NATS streaming server type natsNotifyStreaming struct { Enable bool `json:"enable"` ClusterID string `json:"clusterID"` ClientID string `json:"clientID"` Async bool `json:"async"` MaxPubAcksInflight int `json:"maxPubAcksInflight"` } // natsNotify - represents logrus compatible NATS hook. // All fields represent NATS configuration details. type natsNotify struct { Enable bool `json:"enable"` Address string `json:"address"` Subject string `json:"subject"` Username string `json:"username"` Password string `json:"password"` Token string `json:"token"` Secure bool `json:"secure"` PingInterval int64 `json:"pingInterval"` Streaming natsNotifyStreaming `json:"streaming"` } func (n *natsNotify) Validate() error { if !n.Enable { return nil } if _, err := checkURL(n.Address); err != nil { return err } return nil } // natsIOConn abstracts connection to any type of NATS server type natsIOConn struct { params natsNotify natsConn *nats.Conn stanConn stan.Conn } // dialNATS - dials and returns an natsIOConn instance, // for sending notifications. Returns error if nats logger // is not enabled. func dialNATS(natsL natsNotify, testDial bool) (natsIOConn, error) { if !natsL.Enable { return natsIOConn{}, errNotifyNotEnabled } // Construct natsIOConn which holds all NATS connection information conn := natsIOConn{params: natsL} if natsL.Streaming.Enable { // Construct scheme to differentiate between clear and TLS connections scheme := "nats" if natsL.Secure { scheme = "tls" } // Construct address URL addressURL := scheme + "://" + natsL.Username + ":" + natsL.Password + "@" + natsL.Address // Fetch the user-supplied client ID and provide a random one if not provided clientID := natsL.Streaming.ClientID if clientID == "" { clientID = mustGetUUID() } // Add test suffix to clientID to avoid clientID already registered error if testDial { clientID += "-test" } connOpts := []stan.Option{ stan.NatsURL(addressURL), } // Setup MaxPubAcksInflight parameter if natsL.Streaming.MaxPubAcksInflight > 0 { connOpts = append(connOpts, stan.MaxPubAcksInflight(natsL.Streaming.MaxPubAcksInflight)) } // Do the real connection to the NATS server sc, err := stan.Connect(natsL.Streaming.ClusterID, clientID, connOpts...) if err != nil { return natsIOConn{}, err } // Save the created connection conn.stanConn = sc } else { // Configure and connect to NATS server natsC := nats.DefaultOptions natsC.Url = "nats://" + natsL.Address natsC.User = natsL.Username natsC.Password = natsL.Password natsC.Token = natsL.Token natsC.Secure = natsL.Secure // Do the real connection nc, err := natsC.Connect() if err != nil { return natsIOConn{}, err } // Save the created connection conn.natsConn = nc } return conn, nil } // closeNATS - close the underlying NATS connection func closeNATS(conn natsIOConn) { if conn.params.Streaming.Enable { conn.stanConn.Close() } else { conn.natsConn.Close() } } func newNATSNotify(accountID string) (*logrus.Logger, error) { natsL := serverConfig.Notify.GetNATSByID(accountID) // Connect to nats server. natsC, err := dialNATS(natsL, false) if err != nil { return nil, err } natsLog := logrus.New() // Disable writing to console. natsLog.Out = ioutil.Discard // Add a nats hook. natsLog.Hooks.Add(natsC) // Set default JSON formatter. natsLog.Formatter = new(logrus.JSONFormatter) // Successfully enabled all NATSs. return natsLog, nil } // Fire is called when an event should be sent to the message broker func (n natsIOConn) Fire(entry *logrus.Entry) error { body, err := entry.Reader() if err != nil { return err } if n.params.Streaming.Enable { // Streaming flag is enabled, publish the log synchronously or asynchronously // depending on the user supplied parameter if n.params.Streaming.Async { _, err = n.stanConn.PublishAsync(n.params.Subject, body.Bytes(), nil) } else { err = n.stanConn.Publish(n.params.Subject, body.Bytes()) } if err != nil { return err } } else { // Publish the log err = n.natsConn.Publish(n.params.Subject, body.Bytes()) if err != nil { return err } } return nil } // Levels is available logging levels. func (n natsIOConn) Levels() []logrus.Level { return []logrus.Level{ logrus.InfoLevel, } }