New Admin Info (#8497)

master
Ashish Kumar Sinha 5 years ago committed by kannappanr
parent 8b803491af
commit 24fb1bf258
  1. 334
      cmd/admin-handlers.go
  2. 75
      cmd/admin-server-info.go
  3. 19
      cmd/crypto/kms.go
  4. 9
      cmd/crypto/vault.go
  5. 70
      cmd/notification.go
  6. 5
      cmd/peer-rest-client-target.go
  7. 2
      cmd/peer-rest-client.go
  8. 32
      cmd/peer-rest-server.go
  9. 12
      pkg/event/target/amqp.go
  10. 28
      pkg/event/target/elasticsearch.go
  11. 5
      pkg/event/target/httpclient.go
  12. 19
      pkg/event/target/kafka.go
  13. 18
      pkg/event/target/mqtt.go
  14. 23
      pkg/event/target/mysql.go
  15. 23
      pkg/event/target/nats.go
  16. 27
      pkg/event/target/nsq.go
  17. 25
      pkg/event/target/postgresql.go
  18. 23
      pkg/event/target/redis.go
  19. 38
      pkg/event/target/webhook.go
  20. 8
      pkg/event/targetlist.go
  21. 4
      pkg/event/targetlist_test.go
  22. 238
      pkg/madmin/info-commands.go

@ -20,6 +20,7 @@ import (
"context"
"crypto/subtle"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
@ -36,11 +37,13 @@ import (
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/config/notify"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/cpu"
"github.com/minio/minio/pkg/event/target"
"github.com/minio/minio/pkg/handlers"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
@ -262,48 +265,7 @@ type ServerInfo struct {
Data *ServerInfoData `json:"data"`
}
// ServerInfoHandler - GET /minio/admin/v2/info
// ----------
// Get server information
func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ServerInfo")
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ListServerInfoAdminAction)
if objectAPI == nil {
return
}
serverInfo := globalNotificationSys.ServerInfo(ctx)
// Once we have received all the ServerInfo from peers
// add the local peer server info as well.
serverInfo = append(serverInfo, ServerInfo{
Addr: getHostName(r),
Data: &ServerInfoData{
ConnStats: globalConnStats.toServerConnStats(),
HTTPStats: globalHTTPStats.toServerHTTPStats(),
Properties: ServerProperties{
Uptime: UTCNow().Sub(globalBootTime),
Version: Version,
CommitID: CommitID,
DeploymentID: globalDeploymentID,
SQSARN: globalNotificationSys.GetARNList(),
Region: globalServerRegion,
},
},
})
// Marshal API response
jsonBytes, err := json.Marshal(serverInfo)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// Reply with storage information (across nodes in a
// distributed setup) as json.
writeSuccessResponseJSON(w, jsonBytes)
}
// ServerInfoHandler - GET /minio/admin/v2/storageinfo
// StorageInfoHandler - GET /minio/admin/v2/storageinfo
// ----------
// Get server information
func (a adminAPIHandlers) StorageInfoHandler(w http.ResponseWriter, r *http.Request) {
@ -1303,3 +1265,291 @@ func (a adminAPIHandlers) ServerHardwareInfoHandler(w http.ResponseWriter, r *ht
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
}
}
// ServerInfoHandler - GET /minio/admin/v2/info
// ----------
// Get server information
func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ServerInfo")
objectAPI, _ := validateAdminReq(ctx, w, r, "")
if objectAPI == nil {
return
}
cfg, err := readServerConfig(ctx, objectAPI)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
infoMsg := madmin.InfoMessage{}
vault := madmin.Vault{}
if GlobalKMS != nil {
vault = fetchVaultStatus(cfg)
}
ldap := madmin.LDAP{}
if globalLDAPConfig.Enabled {
ldapConn, err := globalLDAPConfig.Connect()
if err != nil {
ldap.Status = "offline"
} else if ldapConn == nil {
ldap.Status = "Not Configured"
} else {
ldap.Status = "online"
}
// Close ldap connection to avoid leaks.
defer ldapConn.Close()
}
log, audit := fetchLoggerInfo(cfg)
// Get the notification target info
notifyTarget := fetchLambdaInfo(cfg)
// Fetching the Storage information
storageInfo := objectAPI.StorageInfo(ctx)
var OnDisks int
var OffDisks int
var backend interface{}
if storageInfo.Backend.Type == BackendType(madmin.Erasure) {
for _, v := range storageInfo.Backend.OnlineDisks {
OnDisks += v
}
for _, v := range storageInfo.Backend.OfflineDisks {
OffDisks += v
}
backend = madmin.XlBackend{
Type: madmin.ErasureType,
OnlineDisks: OnDisks,
OfflineDisks: OffDisks,
StandardSCData: storageInfo.Backend.StandardSCData,
StandardSCParity: storageInfo.Backend.StandardSCParity,
RRSCData: storageInfo.Backend.RRSCData,
RRSCParity: storageInfo.Backend.RRSCParity,
}
} else {
backend = madmin.FsBackend{
Type: madmin.FsType,
}
}
mode := ""
if globalSafeMode {
mode = "safe"
} else {
mode = "online"
}
server := getLocalServerProperty(globalEndpoints, r)
servers := globalNotificationSys.ServerInfo()
servers = append(servers, server)
for _, sp := range servers {
for i, di := range sp.Disks {
path := ""
if globalIsXL {
path = di.DrivePath
}
if globalIsDistXL {
path = sp.Endpoint + di.DrivePath
}
// For distributed
for a := range storageInfo.Backend.Sets {
for b := range storageInfo.Backend.Sets[a] {
ep := storageInfo.Backend.Sets[a][b].Endpoint
if globalIsDistXL {
if strings.Replace(ep, "http://", "", -1) == path || strings.Replace(ep, "https://", "", -1) == path {
sp.Disks[i].State = storageInfo.Backend.Sets[a][b].State
sp.Disks[i].UUID = storageInfo.Backend.Sets[a][b].UUID
}
}
if globalIsXL {
if ep == path {
sp.Disks[i].State = storageInfo.Backend.Sets[a][b].State
sp.Disks[i].UUID = storageInfo.Backend.Sets[a][b].UUID
}
}
}
}
}
}
domain := globalDomainNames
buckets := madmin.Buckets{Count: 20}
objects := madmin.Objects{Count: 200}
usage := madmin.Usage{Size: 1024}
services := madmin.Services{
Vault: vault,
LDAP: ldap,
Logger: log,
Audit: audit,
Notifications: notifyTarget,
}
infoMsg = madmin.InfoMessage{
Mode: mode,
Domain: domain,
Region: globalServerRegion,
SQSARN: globalNotificationSys.GetARNList(),
DeploymentID: globalDeploymentID,
Buckets: buckets,
Objects: objects,
Usage: usage,
Services: services,
Backend: backend,
Servers: servers,
}
// Marshal API response
jsonBytes, err := json.Marshal(infoMsg)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
//Reply with storage information (across nodes in a
// distributed setup) as json.
writeSuccessResponseJSON(w, jsonBytes)
}
func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus {
lambdaMap := make(map[string][]madmin.TargetIDStatus)
targetList, _ := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, globalRootCAs)
for targetID, target := range targetList.TargetMap() {
targetIDStatus := make(map[string]madmin.Status)
active, _ := target.IsActive()
if active {
targetIDStatus[targetID.ID] = madmin.Status{Status: "Online"}
} else {
targetIDStatus[targetID.ID] = madmin.Status{Status: "Offline"}
}
list := lambdaMap[targetID.Name]
list = append(list, targetIDStatus)
lambdaMap[targetID.Name] = list
}
notify := make([]map[string][]madmin.TargetIDStatus, len(lambdaMap))
counter := 0
for key, value := range lambdaMap {
v := make(map[string][]madmin.TargetIDStatus)
v[key] = value
notify[counter] = v
counter++
}
return notify
}
// fetchVaultStatus fetches Vault Info
func fetchVaultStatus(cfg config.Config) madmin.Vault {
vault := madmin.Vault{}
if GlobalKMS == nil {
vault.Status = "disabled"
return vault
}
keyID := GlobalKMS.KeyID()
kmsInfo := GlobalKMS.Info()
if kmsInfo.Endpoint == "" {
vault.Status = "KMS configured using master key"
return vault
}
if err := checkConnection(kmsInfo.Endpoint); err != nil {
vault.Status = "offline"
} else {
vault.Status = "online"
kmsContext := crypto.Context{"MinIO admin API": "KMSKeyStatusHandler"} // Context for a test key operation
// 1. Generate a new key using the KMS.
key, sealedKey, err := GlobalKMS.GenerateKey(keyID, kmsContext)
if err != nil {
vault.Encrypt = "Encryption failed"
} else {
vault.Encrypt = "Ok"
}
// 2. Check whether we can update / re-wrap the sealed key.
sealedKey, err = GlobalKMS.UpdateKey(keyID, sealedKey, kmsContext)
if err != nil {
vault.Update = "Re-wrap failed:"
} else {
vault.Update = "Ok"
}
// 3. Verify that we can indeed decrypt the (encrypted) key
decryptedKey, decryptErr := GlobalKMS.UnsealKey(keyID, sealedKey, kmsContext)
// 4. Compare generated key with decrypted key
if subtle.ConstantTimeCompare(key[:], decryptedKey[:]) != 1 || decryptErr != nil {
vault.Decrypt = "Re-wrap failed:"
} else {
vault.Decrypt = "Ok"
}
}
return vault
}
// fetchLoggerDetails return log info
func fetchLoggerInfo(cfg config.Config) ([]madmin.Logger, []madmin.Audit) {
loggerCfg, _ := logger.LookupConfig(cfg)
var logger []madmin.Logger
var auditlogger []madmin.Audit
for log, l := range loggerCfg.HTTP {
if l.Enabled {
err := checkConnection(l.Endpoint)
if err == nil {
mapLog := make(map[string]madmin.Status)
mapLog[log] = madmin.Status{Status: "Online"}
logger = append(logger, mapLog)
} else {
mapLog := make(map[string]madmin.Status)
mapLog[log] = madmin.Status{Status: "offline"}
logger = append(logger, mapLog)
}
}
}
for audit, l := range loggerCfg.Audit {
if l.Enabled {
err := checkConnection(l.Endpoint)
if err == nil {
mapAudit := make(map[string]madmin.Status)
mapAudit[audit] = madmin.Status{Status: "Online"}
auditlogger = append(auditlogger, mapAudit)
} else {
mapAudit := make(map[string]madmin.Status)
mapAudit[audit] = madmin.Status{Status: "Offline"}
auditlogger = append(auditlogger, mapAudit)
}
}
}
return logger, auditlogger
}
// checkConnection - ping an endpoint , return err in case of no connection
func checkConnection(endpointStr string) error {
u, pErr := xnet.ParseURL(endpointStr)
if pErr != nil {
return pErr
}
if dErr := u.DialHTTP(); dErr != nil {
if urlErr, ok := dErr.(*url.Error); ok {
// To treat "connection refused" errors as un reachable endpoint.
if target.IsConnRefusedErr(urlErr.Err) {
return errors.New("endpoint unreachable, please check your endpoint")
}
}
return dErr
}
return nil
}

@ -20,6 +20,7 @@ import (
"net"
"net/http"
"os"
"strings"
"github.com/minio/minio-go/pkg/set"
"github.com/minio/minio/pkg/cpu"
@ -115,7 +116,6 @@ func getLocalDrivesPerf(endpointZones EndpointZones, size int64, r *http.Request
return madmin.ServerDrivesPerfInfo{
Addr: addr,
Perf: dps,
Size: size,
}
}
@ -186,3 +186,76 @@ func getLocalNetworkInfo(endpointZones EndpointZones, r *http.Request) madmin.Se
NetworkInfo: networkHardwares,
}
}
// getLocalServerProperty - returns ServerDrivesPerfInfo for only the
// local endpoints from given list of endpoints
func getLocalServerProperty(endpointZones EndpointZones, r *http.Request) madmin.ServerProperties {
var di madmin.Disk
var disks []madmin.Disk
addr := r.Host
if globalIsDistXL {
addr = GetLocalPeer(endpointZones)
}
network := make(map[string]string)
hosts := set.NewStringSet()
for _, ep := range endpointZones {
for _, endpoint := range ep.Endpoints {
url := strings.Replace(endpoint.URL.String(), endpoint.Path, "", -1)
if url == "" {
url = r.Host
}
hosts.Add(url)
// Only proceed for local endpoints
if endpoint.IsLocal {
url = fetchAddress(url)
network[url] = "online"
if _, err := os.Stat(endpoint.Path); err != nil {
continue
}
diInfo, _ := disk.GetInfo(endpoint.Path)
di.State = "ok"
di.DrivePath = endpoint.Path
di.TotalSpace = diInfo.Total
di.UsedSpace = diInfo.Total - diInfo.Free
di.Utilization = float64((diInfo.Total - diInfo.Free) / diInfo.Total * 100)
disks = append(disks, di)
}
}
}
for host := range hosts {
_, present := network[host]
if !present {
err := checkConnection(host)
host = fetchAddress(host)
if err != nil {
network[host] = "offline"
} else {
network[host] = "online"
}
}
}
return madmin.ServerProperties{
State: "ok",
Endpoint: addr,
Uptime: UTCNow().Sub(globalBootTime),
Version: Version,
CommitID: CommitID,
Network: network,
Disks: disks,
}
}
// Replaces http and https from address
func fetchAddress(address string) string {
if strings.Contains(address, "http://") {
address = strings.Replace(address, "http://", "", -1)
} else if strings.Contains(address, "https://") {
address = strings.Replace(address, "https://", "", -1)
}
return address
}

@ -102,6 +102,9 @@ type KMS interface {
// keys this method may behave like a NOP and just return the sealedKey
// itself.
UpdateKey(keyID string, sealedKey []byte, context Context) (rotatedKey []byte, err error)
// Returns KMSInfo
Info() (kmsInfo KMSInfo)
}
type masterKeyKMS struct {
@ -109,6 +112,13 @@ type masterKeyKMS struct {
masterKey [32]byte
}
// KMSInfo stores the details of KMS
type KMSInfo struct {
Endpoint string
Name string
AuthType string
}
// NewMasterKey returns a basic KMS implementation from a single 256 bit master key.
//
// The KMS accepts any keyID but binds the keyID and context cryptographically
@ -135,6 +145,15 @@ func (kms *masterKeyKMS) GenerateKey(keyID string, ctx Context) (key [32]byte, s
return key, sealedKey, nil
}
// KMS is configured directly using master key
func (kms *masterKeyKMS) Info() (info KMSInfo) {
return KMSInfo{
Endpoint: "",
Name: "",
AuthType: "master-key",
}
}
func (kms *masterKeyKMS) UnsealKey(keyID string, sealedKey []byte, ctx Context) (key [32]byte, err error) {
var (
buffer bytes.Buffer

@ -195,6 +195,15 @@ func (v *vaultService) KeyID() string {
return v.config.Key.Name
}
// Returns - vault info
func (v *vaultService) Info() (kmsInfo KMSInfo) {
return KMSInfo{
Endpoint: v.config.Endpoint,
Name: v.config.Key.Name,
AuthType: v.config.Auth.Type,
}
}
// GenerateKey returns a new plaintext key, generated by the KMS,
// and a sealed version of this plaintext key encrypted using the
// named key referenced by keyID. It also binds the generated key

@ -423,51 +423,6 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE
return ng.Wait()
}
// ServerInfo - calls ServerInfo RPC call on all peers.
func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo {
serverInfo := make([]ServerInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
// Try to fetch serverInfo remotely in three attempts.
for i := 0; i < 3; i++ {
serverInfo[index] = ServerInfo{
Addr: sys.peerClients[index].host.String(),
}
info, err := sys.peerClients[index].ServerInfo()
if err != nil {
serverInfo[index].Error = err.Error()
} else {
serverInfo[index].Data = &info
}
// Last iteration log the error.
if i == 2 {
return err
}
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
return nil
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr)
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
}
}
return serverInfo
}
// GetLocks - makes GetLocks RPC call on all peers.
func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks {
locksResp := make([]*PeerLocks, len(sys.peerClients))
@ -1213,6 +1168,31 @@ func (sys *NotificationSys) NetworkInfo() []madmin.ServerNetworkHardwareInfo {
return reply
}
// ServerInfo - calls ServerInfo RPC call on all peers.
func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties {
reply := make([]madmin.ServerProperties, len(sys.peerClients))
var wg sync.WaitGroup
for i, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient, idx int) {
defer wg.Done()
info, err := client.ServerInfo()
if err != nil {
info.Endpoint = client.host.String()
info.State = "offline"
} else {
info.State = "ok"
}
reply[idx] = info
}(client, i)
}
wg.Wait()
return reply
}
// NewNotificationSys - creates new notification system object.
func NewNotificationSys(endpoints EndpointZones) *NotificationSys {
// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()

@ -31,6 +31,11 @@ func (target *PeerRESTClientTarget) ID() event.TargetID {
return target.id
}
// IsActive - does nothing and available for interface compatibility.
func (target *PeerRESTClientTarget) IsActive() (bool, error) {
return true, nil
}
// Save - Sends event directly without persisting.
func (target *PeerRESTClientTarget) Save(eventData event.Event) error {
return target.send(eventData)

@ -145,7 +145,7 @@ func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) {
}
// ServerInfo - fetch server information for a remote node.
func (client *peerRESTClient) ServerInfo() (info ServerInfoData, err error) {
func (client *peerRESTClient) ServerInfo() (info madmin.ServerProperties, err error) {
respBody, err := client.call(peerRESTMethodServerInfo, nil, nil, -1)
if err != nil {
return

@ -374,24 +374,6 @@ func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Re
w.(http.Flusher).Flush()
}
// ServerInfoHandler - returns server info.
func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "ServerInfo")
info, err := getServerInfo()
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DownloadProflingDataHandler - returns proflied data.
func (s *peerRESTServer) DownloadProflingDataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -452,6 +434,20 @@ func (s *peerRESTServer) NetworkInfoHandler(w http.ResponseWriter, r *http.Reque
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// ServerInfoHandler - returns Server Info
func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "ServerInfo")
info := getLocalServerProperty(globalEndpoints, r)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DrivePerfInfoHandler - returns Drive Performance info.
func (s *peerRESTServer) DrivePerfInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {

@ -121,6 +121,18 @@ func (target *AMQPTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *AMQPTarget) IsActive() (bool, error) {
ch, err := target.channel()
if err != nil {
return false, err
}
defer func() {
ch.Close()
}()
return true, nil
}
func (target *AMQPTarget) channel() (*amqp.Channel, error) {
var err error
var conn *amqp.Connection

@ -93,16 +93,25 @@ func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *ElasticsearchTarget) IsActive() (bool, error) {
if dErr := target.args.URL.DialHTTP(); dErr != nil {
if xnet.IsNetworkOrHostDown(dErr) {
return false, errNotConnected
}
return false, dErr
}
return true, nil
}
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if dErr := target.args.URL.DialHTTP(); dErr != nil {
if xnet.IsNetworkOrHostDown(dErr) {
return errNotConnected
}
return dErr
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
@ -167,12 +176,9 @@ func (target *ElasticsearchTarget) Send(eventKey string) error {
return err
}
}
if dErr := target.args.URL.DialHTTP(); dErr != nil {
if xnet.IsNetworkOrHostDown(dErr) {
return errNotConnected
}
return dErr
_, err = target.IsActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey)

@ -44,6 +44,11 @@ func (target HTTPClientTarget) ID() event.TargetID {
return target.id
}
// IsActive - does nothing and available for interface compatibility.
func (target *HTTPClientTarget) IsActive() (bool, error) {
return true, nil
}
func (target *HTTPClientTarget) start() {
go func() {
defer func() {

@ -124,13 +124,22 @@ func (target *KafkaTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *KafkaTarget) IsActive() (bool, error) {
if !target.args.pingBrokers() {
return false, errNotConnected
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if !target.args.pingBrokers() {
return errNotConnected
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
@ -162,9 +171,9 @@ func (target *KafkaTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to Kafka.
func (target *KafkaTarget) Send(eventKey string) error {
var err error
if !target.args.pingBrokers() {
return errNotConnected
_, err = target.IsActive()
if err != nil {
return err
}
if target.producer == nil {

@ -121,6 +121,14 @@ func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *MQTTTarget) IsActive() (bool, error) {
if !target.client.IsConnectionOpen() {
return false, errNotConnected
}
return true, nil
}
// send - sends an event to the mqtt.
func (target *MQTTTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
@ -144,8 +152,9 @@ func (target *MQTTTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to MQTT.
func (target *MQTTTarget) Send(eventKey string) error {
// Do not send if the connection is not active.
if !target.client.IsConnectionOpen() {
return errNotConnected
_, err := target.IsActive()
if err != nil {
return err
}
eventData, err := target.store.Get(eventKey)
@ -174,8 +183,9 @@ func (target *MQTTTarget) Save(eventData event.Event) error {
}
// Do not send if the connection is not active.
if !target.client.IsConnectionOpen() {
return errNotConnected
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)

@ -185,15 +185,24 @@ func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *MySQLTarget) IsActive() (bool, error) {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
}
return false, err
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
@ -244,10 +253,8 @@ func (target *MySQLTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to MySQL.
func (target *MySQLTarget) Send(eventKey string) error {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
_, err := target.IsActive()
if err != nil {
return err
}

@ -197,20 +197,29 @@ func (target *NATSTarget) ID() event.TargetID {
return target.id
}
// Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
// IsActive - Return true if target is up and active
func (target *NATSTarget) IsActive() (bool, error) {
if target.args.Streaming.Enable {
if !target.stanConn.NatsConn().IsConnected() {
return errNotConnected
return false, errNotConnected
}
} else {
if !target.natsConn.IsConnected() {
return errNotConnected
return false, errNotConnected
}
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}

@ -100,16 +100,25 @@ func (target *NSQTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *NSQTarget) IsActive() (bool, error) {
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
if IsConnRefusedErr(err) {
return false, errNotConnected
}
return false, err
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the nsq connection is active.
func (target *NSQTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
if IsConnRefusedErr(err) {
return errNotConnected
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
@ -133,12 +142,8 @@ func (target *NSQTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to NSQ.
func (target *NSQTarget) Send(eventKey string) error {
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
if IsConnRefusedErr(err) {
return errNotConnected
}
_, err := target.IsActive()
if err != nil {
return err
}

@ -183,15 +183,24 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *PostgreSQLTarget) IsActive() (bool, error) {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
}
return false, err
}
return true, nil
}
// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (target *PostgreSQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
@ -245,14 +254,10 @@ func (target *PostgreSQLTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to PostgreSQL.
func (target *PostgreSQLTarget) Send(eventKey string) error {
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
_, err := target.IsActive()
if err != nil {
return err
}
if !target.firstPing {
if err := target.executeStmts(); err != nil {
if IsConnErr(err) {

@ -125,11 +125,8 @@ func (target *RedisTarget) ID() event.TargetID {
return target.id
}
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
// IsActive - Return true if target is up and active
func (target *RedisTarget) IsActive() (bool, error) {
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
@ -138,9 +135,21 @@ func (target *RedisTarget) Save(eventData event.Event) error {
_, pingErr := conn.Do("PING")
if pingErr != nil {
if IsConnRefusedErr(pingErr) {
return errNotConnected
return false, errNotConnected
}
return pingErr
return false, pingErr
}
return true, nil
}
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}

@ -93,20 +93,29 @@ func (target WebhookTarget) ID() event.TargetID {
return target.id
}
// Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
// IsActive - Return true if target is up and active
func (target *WebhookTarget) IsActive() (bool, error) {
u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String())
if pErr != nil {
return pErr
return false, pErr
}
if dErr := u.DialHTTP(); dErr != nil {
if xnet.IsNetworkOrHostDown(dErr) {
return errNotConnected
return false, errNotConnected
}
return dErr
return false, dErr
}
return true, nil
}
// Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
@ -153,17 +162,10 @@ func (target *WebhookTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to webhook.
func (target *WebhookTarget) Send(eventKey string) error {
u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String())
if pErr != nil {
return pErr
}
if dErr := u.DialHTTP(); dErr != nil {
if xnet.IsNetworkOrHostDown(dErr) {
return errNotConnected
}
return dErr
_, err := target.IsActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()

@ -24,6 +24,7 @@ import (
// Target - event target interface
type Target interface {
ID() TargetID
IsActive() (bool, error)
Save(Event) error
Send(string) error
Close() error
@ -130,6 +131,13 @@ func (list *TargetList) List() []TargetID {
return keys
}
// TargetMap - returns available targets.
func (list *TargetList) TargetMap() map[TargetID]Target {
list.RLock()
defer list.RUnlock()
return list.targets
}
// Send - sends events to targets identified by target IDs.
func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetIDErr {
errCh := make(chan TargetIDErr)

@ -67,6 +67,10 @@ func (target ExampleTarget) Close() error {
return nil
}
func (target ExampleTarget) IsActive() (bool, error) {
return false, errors.New("not connected to target server/service")
}
func TestTargetListAdd(t *testing.T) {
targetListCase1 := NewTargetList()

@ -111,91 +111,6 @@ func (d1 BackendDisks) Merge(d2 BackendDisks) BackendDisks {
return d2
}
// ServerProperties holds some of the server's information such as uptime,
// version, region, ..
type ServerProperties struct {
Uptime time.Duration `json:"uptime"`
Version string `json:"version"`
CommitID string `json:"commitID"`
DeploymentID string `json:"deploymentID"`
Region string `json:"region"`
SQSARN []string `json:"sqsARN"`
}
// ServerConnStats holds network information
type ServerConnStats struct {
TotalInputBytes uint64 `json:"transferred"`
TotalOutputBytes uint64 `json:"received"`
}
// ServerHTTPMethodStats holds total number of HTTP operations from/to the server,
// including the average duration the call was spent.
type ServerHTTPMethodStats struct {
Count uint64 `json:"count"`
AvgDuration string `json:"avgDuration"`
}
// ServerHTTPStats holds all type of http operations performed to/from the server
// including their average execution time.
type ServerHTTPStats struct {
TotalHEADStats ServerHTTPMethodStats `json:"totalHEADs"`
SuccessHEADStats ServerHTTPMethodStats `json:"successHEADs"`
TotalGETStats ServerHTTPMethodStats `json:"totalGETs"`
SuccessGETStats ServerHTTPMethodStats `json:"successGETs"`
TotalPUTStats ServerHTTPMethodStats `json:"totalPUTs"`
SuccessPUTStats ServerHTTPMethodStats `json:"successPUTs"`
TotalPOSTStats ServerHTTPMethodStats `json:"totalPOSTs"`
SuccessPOSTStats ServerHTTPMethodStats `json:"successPOSTs"`
TotalDELETEStats ServerHTTPMethodStats `json:"totalDELETEs"`
SuccessDELETEStats ServerHTTPMethodStats `json:"successDELETEs"`
}
// ServerInfoData holds storage, connections and other
// information of a given server
type ServerInfoData struct {
StorageInfo StorageInfo `json:"storage"`
ConnStats ServerConnStats `json:"network"`
HTTPStats ServerHTTPStats `json:"http"`
Properties ServerProperties `json:"server"`
}
// ServerInfo holds server information result of one node
type ServerInfo struct {
Error string `json:"error"`
Addr string `json:"addr"`
Data *ServerInfoData `json:"data"`
}
// ServerInfo - Connect to a minio server and call Server Info Management API
// to fetch server's information represented by ServerInfo structure
func (adm *AdminClient) ServerInfo() ([]ServerInfo, error) {
resp, err := adm.executeMethod("GET", requestData{relPath: adminAPIPrefix + "/info"})
defer closeResponse(resp)
if err != nil {
return nil, err
}
// Check response http status code
if resp.StatusCode != http.StatusOK {
return nil, httpRespToErrorResponse(resp)
}
// Unmarshal the server's json response
var serversInfo []ServerInfo
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
err = json.Unmarshal(respBytes, &serversInfo)
if err != nil {
return nil, err
}
return serversInfo, nil
}
// StorageInfo - Connect to a minio server and call Storage Info Management API
// to fetch server's information represented by StorageInfo structure
func (adm *AdminClient) StorageInfo() (StorageInfo, error) {
@ -409,3 +324,156 @@ func (adm *AdminClient) NetPerfInfo(size int) (map[string][]NetPerfInfo, error)
return info, nil
}
// InfoMessage container to hold server admin related information.
type InfoMessage struct {
Mode string `json:"mode"`
Domain []string `json:"domain,omitempty"`
Region string `json:"region,omitempty"`
SQSARN []string `json:"sqsARN,omitempty"`
DeploymentID string `json:"deploymentID"`
Buckets Buckets `json:"buckets"`
Objects Objects `json:"objects"`
Usage Usage `json:"usage"`
Services Services `json:"services"`
Backend interface{} `json:"backend"`
Servers []ServerProperties `json:"servers"`
}
// Services contains different services information
type Services struct {
Vault Vault `json:"vault"`
LDAP LDAP `json:"ldap"`
Logger []Logger `json:"logger,omitempty"`
Audit []Audit `json:"audit,omitempty"`
Notifications []map[string][]TargetIDStatus `json:"notifications"`
}
// Buckets contains the number of buckets
type Buckets struct {
Count int `json:"count"`
}
// Objects contains the number of objects
type Objects struct {
Count int `json:"count"`
}
// Usage contains the tottal size used
type Usage struct {
Size uint64 `json:"size"`
}
// Vault - Fetches the Vault status
type Vault struct {
Status string `json:"status,omitempty"`
Encrypt string `json:"encryp,omitempty"`
Decrypt string `json:"decrypt,omitempty"`
Update string `json:"update,omitempty"`
}
// LDAP contains ldap status
type LDAP struct {
Status string `json:"status,omitempty"`
}
// Status of endpoint
type Status struct {
Status string `json:"status"`
}
// Audit contains audit logger status
type Audit map[string]Status
// Logger contains logger status
type Logger map[string]Status
// TargetIDStatus containsid and status
type TargetIDStatus map[string]Status
// backendType - indicates the type of backend storage
type backendType string
const (
// FsType - Backend is FS Type
FsType = backendType("FS")
// ErasureType - Backend is Erasure type
ErasureType = backendType("Erasure")
)
// FsBackend contains specific FS storage information
type FsBackend struct {
Type backendType `json:"backendType"`
}
// XlBackend contains specific erasure storage information
type XlBackend struct {
Type backendType `json:"backendType"`
OnlineDisks int `json:"onlineDisks"`
OfflineDisks int `json:"offlineDisks"`
// Data disks for currently configured Standard storage class.
StandardSCData int `json:"standardSCData"`
// Parity disks for currently configured Standard storage class.
StandardSCParity int `json:"standardSCParity"`
// Data disks for currently configured Reduced Redundancy storage class.
RRSCData int `json:"rrSCData"`
// Parity disks for currently configured Reduced Redundancy storage class.
RRSCParity int `json:"rrSCParity"`
}
// ServerProperties holds server information
type ServerProperties struct {
State string `json:"state"`
Endpoint string `json:"endpoint"`
Uptime time.Duration `json:"uptime"`
Version string `json:"version"`
CommitID string `json:"commitID"`
Network map[string]string `json:"network"`
Disks []Disk `json:"disks"`
}
// Disk holds Disk information
type Disk struct {
DrivePath string `json:"path"`
State string `json:"state"`
UUID string `json:"uuid,omitempty"`
Model string `json:"model,omitempty"`
TotalSpace uint64 `json:"totalspace"`
UsedSpace uint64 `json:"usedspace"`
ReadThroughput float64 `json:"readthroughput,omitempty"`
WriteThroughPut float64 `json:"writethroughput,omitempty"`
ReadLatency float64 `json:"readlatency,omitempty"`
WriteLatency float64 `json:"writelatency,omitempty"`
Utilization float64 `json:"utilization,omitempty"`
}
// ServerInfo - Connect to a minio server and call Server Admin Info Management API
// to fetch server's information represented by infoMessage structure
func (adm *AdminClient) ServerInfo() (InfoMessage, error) {
resp, err := adm.executeMethod("GET", requestData{relPath: adminAPIPrefix + "/info"})
defer closeResponse(resp)
if err != nil {
return InfoMessage{}, err
}
// Check response http status code
if resp.StatusCode != http.StatusOK {
return InfoMessage{}, httpRespToErrorResponse(resp)
}
// Unmarshal the server's json response
var message InfoMessage
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return InfoMessage{}, err
}
err = json.Unmarshal(respBytes, &message)
if err != nil {
return InfoMessage{}, err
}
return message, nil
}

Loading…
Cancel
Save