You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
196 lines
5.3 KiB
196 lines
5.3 KiB
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"io/ioutil"
|
|
"net"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/nats-io/go-nats-streaming"
|
|
"github.com/nats-io/nats"
|
|
)
|
|
|
|
// natsNotifyStreaming contains specific options related to connection
|
|
// to a NATS streaming server
|
|
type natsNotifyStreaming struct {
|
|
Enable bool `json:"enable"`
|
|
ClusterID string `json:"clusterID"`
|
|
ClientID string `json:"clientID"`
|
|
Async bool `json:"async"`
|
|
MaxPubAcksInflight int `json:"maxPubAcksInflight"`
|
|
}
|
|
|
|
// natsNotify - represents logrus compatible NATS hook.
|
|
// All fields represent NATS configuration details.
|
|
type natsNotify struct {
|
|
Enable bool `json:"enable"`
|
|
Address string `json:"address"`
|
|
Subject string `json:"subject"`
|
|
Username string `json:"username"`
|
|
Password string `json:"password"`
|
|
Token string `json:"token"`
|
|
Secure bool `json:"secure"`
|
|
PingInterval int64 `json:"pingInterval"`
|
|
Streaming natsNotifyStreaming `json:"streaming"`
|
|
}
|
|
|
|
func (n *natsNotify) Validate() error {
|
|
if !n.Enable {
|
|
return nil
|
|
}
|
|
if _, _, err := net.SplitHostPort(n.Address); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// natsIOConn abstracts connection to any type of NATS server
|
|
type natsIOConn struct {
|
|
params natsNotify
|
|
natsConn *nats.Conn
|
|
stanConn stan.Conn
|
|
}
|
|
|
|
// dialNATS - dials and returns an natsIOConn instance,
|
|
// for sending notifications. Returns error if nats logger
|
|
// is not enabled.
|
|
func dialNATS(natsL natsNotify, testDial bool) (nioc natsIOConn, e error) {
|
|
if !natsL.Enable {
|
|
return nioc, errNotifyNotEnabled
|
|
}
|
|
|
|
// Construct natsIOConn which holds all NATS connection information
|
|
conn := natsIOConn{params: natsL}
|
|
|
|
if natsL.Streaming.Enable {
|
|
// Construct scheme to differentiate between clear and TLS connections
|
|
scheme := "nats"
|
|
if natsL.Secure {
|
|
scheme = "tls"
|
|
}
|
|
// Construct address URL
|
|
addressURL := scheme + "://" + natsL.Username + ":" + natsL.Password + "@" + natsL.Address
|
|
// Fetch the user-supplied client ID and provide a random one if not provided
|
|
clientID := natsL.Streaming.ClientID
|
|
if clientID == "" {
|
|
clientID = mustGetUUID()
|
|
}
|
|
// Add test suffix to clientID to avoid clientID already registered error
|
|
if testDial {
|
|
clientID += "-test"
|
|
}
|
|
connOpts := []stan.Option{
|
|
stan.NatsURL(addressURL),
|
|
}
|
|
// Setup MaxPubAcksInflight parameter
|
|
if natsL.Streaming.MaxPubAcksInflight > 0 {
|
|
connOpts = append(connOpts,
|
|
stan.MaxPubAcksInflight(natsL.Streaming.MaxPubAcksInflight))
|
|
}
|
|
// Do the real connection to the NATS server
|
|
sc, err := stan.Connect(natsL.Streaming.ClusterID, clientID, connOpts...)
|
|
if err != nil {
|
|
return nioc, err
|
|
}
|
|
// Save the created connection
|
|
conn.stanConn = sc
|
|
} else {
|
|
// Configure and connect to NATS server
|
|
natsC := nats.DefaultOptions
|
|
natsC.Url = "nats://" + natsL.Address
|
|
natsC.User = natsL.Username
|
|
natsC.Password = natsL.Password
|
|
natsC.Token = natsL.Token
|
|
natsC.Secure = natsL.Secure
|
|
// Do the real connection
|
|
nc, err := natsC.Connect()
|
|
if err != nil {
|
|
return nioc, err
|
|
}
|
|
// Save the created connection
|
|
conn.natsConn = nc
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
// closeNATS - close the underlying NATS connection
|
|
func closeNATS(conn natsIOConn) {
|
|
if conn.params.Streaming.Enable {
|
|
conn.stanConn.Close()
|
|
} else {
|
|
conn.natsConn.Close()
|
|
}
|
|
}
|
|
|
|
func newNATSNotify(accountID string) (*logrus.Logger, error) {
|
|
natsL := serverConfig.Notify.GetNATSByID(accountID)
|
|
|
|
// Connect to nats server.
|
|
natsC, err := dialNATS(natsL, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
natsLog := logrus.New()
|
|
|
|
// Disable writing to console.
|
|
natsLog.Out = ioutil.Discard
|
|
|
|
// Add a nats hook.
|
|
natsLog.Hooks.Add(natsC)
|
|
|
|
// Set default JSON formatter.
|
|
natsLog.Formatter = new(logrus.JSONFormatter)
|
|
|
|
// Successfully enabled all NATSs.
|
|
return natsLog, nil
|
|
}
|
|
|
|
// Fire is called when an event should be sent to the message broker
|
|
func (n natsIOConn) Fire(entry *logrus.Entry) error {
|
|
body, err := entry.Reader()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n.params.Streaming.Enable {
|
|
// Streaming flag is enabled, publish the log synchronously or asynchronously
|
|
// depending on the user supplied parameter
|
|
if n.params.Streaming.Async {
|
|
_, err = n.stanConn.PublishAsync(n.params.Subject, body.Bytes(), nil)
|
|
} else {
|
|
err = n.stanConn.Publish(n.params.Subject, body.Bytes())
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// Publish the log
|
|
err = n.natsConn.Publish(n.params.Subject, body.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Levels is available logging levels.
|
|
func (n natsIOConn) Levels() []logrus.Level {
|
|
return []logrus.Level{
|
|
logrus.InfoLevel,
|
|
}
|
|
}
|
|
|