You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
minio/cmd/notify-nats.go

196 lines
5.3 KiB

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"io/ioutil"
"github.com/Sirupsen/logrus"
"github.com/nats-io/go-nats-streaming"
"github.com/nats-io/nats"
)
// natsNotifyStreaming contains specific options related to connection
// to a NATS streaming server
type natsNotifyStreaming struct {
Enable bool `json:"enable"`
ClusterID string `json:"clusterID"`
ClientID string `json:"clientID"`
Async bool `json:"async"`
MaxPubAcksInflight int `json:"maxPubAcksInflight"`
}
// natsNotify - represents logrus compatible NATS hook.
// All fields represent NATS configuration details.
type natsNotify struct {
Enable bool `json:"enable"`
Address string `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"`
Streaming natsNotifyStreaming `json:"streaming"`
}
func (n *natsNotify) Validate() error {
if !n.Enable {
return nil
}
if _, err := checkURL(n.Address); err != nil {
return err
}
return nil
}
// natsIOConn abstracts connection to any type of NATS server
type natsIOConn struct {
params natsNotify
natsConn *nats.Conn
stanConn stan.Conn
}
// dialNATS - dials and returns an natsIOConn instance,
// for sending notifications. Returns error if nats logger
// is not enabled.
func dialNATS(natsL natsNotify, testDial bool) (natsIOConn, error) {
if !natsL.Enable {
return natsIOConn{}, errNotifyNotEnabled
}
// Construct natsIOConn which holds all NATS connection information
conn := natsIOConn{params: natsL}
if natsL.Streaming.Enable {
// Construct scheme to differentiate between clear and TLS connections
scheme := "nats"
if natsL.Secure {
scheme = "tls"
}
// Construct address URL
addressURL := scheme + "://" + natsL.Username + ":" + natsL.Password + "@" + natsL.Address
// Fetch the user-supplied client ID and provide a random one if not provided
clientID := natsL.Streaming.ClientID
if clientID == "" {
clientID = mustGetUUID()
}
// Add test suffix to clientID to avoid clientID already registered error
if testDial {
clientID += "-test"
}
connOpts := []stan.Option{
stan.NatsURL(addressURL),
}
// Setup MaxPubAcksInflight parameter
if natsL.Streaming.MaxPubAcksInflight > 0 {
connOpts = append(connOpts,
stan.MaxPubAcksInflight(natsL.Streaming.MaxPubAcksInflight))
}
// Do the real connection to the NATS server
sc, err := stan.Connect(natsL.Streaming.ClusterID, clientID, connOpts...)
if err != nil {
return natsIOConn{}, err
}
// Save the created connection
conn.stanConn = sc
} else {
// Configure and connect to NATS server
natsC := nats.DefaultOptions
natsC.Url = "nats://" + natsL.Address
natsC.User = natsL.Username
natsC.Password = natsL.Password
natsC.Token = natsL.Token
natsC.Secure = natsL.Secure
// Do the real connection
nc, err := natsC.Connect()
if err != nil {
return natsIOConn{}, err
}
// Save the created connection
conn.natsConn = nc
}
return conn, nil
}
// closeNATS - close the underlying NATS connection
func closeNATS(conn natsIOConn) {
if conn.params.Streaming.Enable {
conn.stanConn.Close()
} else {
conn.natsConn.Close()
}
}
func newNATSNotify(accountID string) (*logrus.Logger, error) {
natsL := serverConfig.Notify.GetNATSByID(accountID)
// Connect to nats server.
natsC, err := dialNATS(natsL, false)
if err != nil {
return nil, err
}
natsLog := logrus.New()
// Disable writing to console.
natsLog.Out = ioutil.Discard
// Add a nats hook.
natsLog.Hooks.Add(natsC)
// Set default JSON formatter.
natsLog.Formatter = new(logrus.JSONFormatter)
// Successfully enabled all NATSs.
return natsLog, nil
}
// Fire is called when an event should be sent to the message broker
func (n natsIOConn) Fire(entry *logrus.Entry) error {
body, err := entry.Reader()
if err != nil {
return err
}
if n.params.Streaming.Enable {
// Streaming flag is enabled, publish the log synchronously or asynchronously
// depending on the user supplied parameter
if n.params.Streaming.Async {
_, err = n.stanConn.PublishAsync(n.params.Subject, body.Bytes(), nil)
} else {
err = n.stanConn.Publish(n.params.Subject, body.Bytes())
}
if err != nil {
return err
}
} else {
// Publish the log
err = n.natsConn.Publish(n.params.Subject, body.Bytes())
if err != nil {
return err
}
}
return nil
}
// Levels is available logging levels.
func (n natsIOConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}
}