diff --git a/cmd/config-current.go b/cmd/config-current.go index 12a58ad57..bbea8753b 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -282,7 +282,7 @@ func validateConfig(s config.Config) error { return err } - return notify.TestNotificationTargets(s, GlobalServiceDoneCh, globalRootCAs) + return notify.TestNotificationTargets(s, GlobalServiceDoneCh, NewCustomHTTPTransport()) } func lookupConfigs(s config.Config) (err error) { diff --git a/cmd/config/notify/parse.go b/cmd/config/notify/parse.go index 906e76abf..db5d2f2fd 100644 --- a/cmd/config/notify/parse.go +++ b/cmd/config/notify/parse.go @@ -19,6 +19,7 @@ package notify import ( "crypto/tls" "crypto/x509" + "net/http" "strconv" "strings" "time" @@ -38,15 +39,15 @@ const ( // TestNotificationTargets is similar to GetNotificationTargets() // avoids explicit registration. -func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs *x509.CertPool) error { - _, err := RegisterNotificationTargets(cfg, doneCh, rootCAs, true) +func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport) error { + _, err := RegisterNotificationTargets(cfg, doneCh, transport, true) return err } // GetNotificationTargets registers and initializes all notification // targets, returns error if any. -func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs *x509.CertPool) (*event.TargetList, error) { - return RegisterNotificationTargets(cfg, doneCh, rootCAs, false) +func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport) (*event.TargetList, error) { + return RegisterNotificationTargets(cfg, doneCh, transport, false) } // RegisterNotificationTargets - returns TargetList which contains enabled targets in serverConfig. @@ -54,7 +55,7 @@ func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs * // * Add a new target in pkg/event/target package. // * Add newly added target configuration to serverConfig.Notify.. // * Handle the configuration in this function to create/add into TargetList. -func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs *x509.CertPool, test bool) (*event.TargetList, error) { +func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, test bool) (*event.TargetList, error) { targetList := event.NewTargetList() if err := checkValidNotificationKeys(cfg); err != nil { return nil, err @@ -75,7 +76,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root return nil, err } - mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], rootCAs) + mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs) if err != nil { return nil, err } @@ -85,7 +86,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root return nil, err } - natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], rootCAs) + natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs) if err != nil { return nil, err } @@ -105,7 +106,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root return nil, err } - webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], rootCAs) + webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport) if err != nil { return nil, err } @@ -149,7 +150,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root if !args.Enable { continue } - args.TLS.RootCAs = rootCAs + args.TLS.RootCAs = transport.TLSClientConfig.RootCAs newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf) if err != nil { return nil, err @@ -167,7 +168,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root if !args.Enable { continue } - args.RootCAs = rootCAs + args.RootCAs = transport.TLSClientConfig.RootCAs newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf) if err != nil { return nil, err @@ -270,8 +271,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root if !args.Enable { continue } - args.RootCAs = rootCAs - newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport) if err != nil { return nil, err } @@ -1426,7 +1426,8 @@ var ( ) // GetNotifyWebhook - returns a map of registered notification 'webhook' targets -func GetNotifyWebhook(webhookKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.WebhookArgs, error) { +func GetNotifyWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) ( + map[string]target.WebhookArgs, error) { webhookTargets := make(map[string]target.WebhookArgs) for k, kv := range mergeTargets(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) { enableEnv := target.EnvWebhookEnable @@ -1468,7 +1469,7 @@ func GetNotifyWebhook(webhookKVS map[string]config.KVS, rootCAs *x509.CertPool) webhookArgs := target.WebhookArgs{ Enable: enabled, Endpoint: *url, - RootCAs: rootCAs, + Transport: transport, AuthToken: env.Get(authEnv, kv.Get(target.WebhookAuthToken)), QueueDir: env.Get(queueDirEnv, kv.Get(target.WebhookQueueDir)), QueueLimit: uint64(queueLimit), diff --git a/cmd/notification.go b/cmd/notification.go index 8efc90c63..1a71beb63 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -610,7 +610,7 @@ func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucket // AddNotificationTargetsFromConfig - adds notification targets from server config. func (sys *NotificationSys) AddNotificationTargetsFromConfig(cfg config.Config) error { - targetList, err := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, globalRootCAs) + targetList, err := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport()) if err != nil { return err } diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index 292799957..27bb72fd5 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -19,19 +19,15 @@ package target import ( "bytes" "context" - "crypto/tls" - "crypto/x509" "encoding/json" "errors" "fmt" "io" "io/ioutil" - "net" "net/http" "net/url" "os" "path/filepath" - "time" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" @@ -53,12 +49,12 @@ const ( // WebhookArgs - Webhook target arguments. type WebhookArgs struct { - Enable bool `json:"enable"` - Endpoint xnet.URL `json:"endpoint"` - AuthToken string `json:"authToken"` - RootCAs *x509.CertPool `json:"-"` - QueueDir string `json:"queueDir"` - QueueLimit uint64 `json:"queueLimit"` + Enable bool `json:"enable"` + Endpoint xnet.URL `json:"endpoint"` + AuthToken string `json:"authToken"` + Transport *http.Transport `json:"-"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` } // Validate WebhookArgs fields @@ -149,11 +145,12 @@ func (target *WebhookTarget) send(eventData event.Event) error { return err } - // FIXME: log returned error. ignore time being. + defer resp.Body.Close() io.Copy(ioutil.Discard, resp.Body) - _ = resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { + // close any idle connections upon any error. + target.httpClient.CloseIdleConnections() return fmt.Errorf("sending event failed with %v", resp.Status) } @@ -193,7 +190,7 @@ func (target *WebhookTarget) Close() error { } // NewWebhookTarget - creates new Webhook target. -func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*WebhookTarget, error) { +func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport) (*WebhookTarget, error) { var store Store @@ -209,16 +206,7 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge id: event.TargetID{ID: id, Name: "webhook"}, args: args, httpClient: &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{RootCAs: args.RootCAs}, - DialContext: (&net.Dialer{ - Timeout: 5 * time.Second, - KeepAlive: 5 * time.Second, - }).DialContext, - TLSHandshakeTimeout: 3 * time.Second, - ResponseHeaderTimeout: 3 * time.Second, - ExpectContinueTimeout: 2 * time.Second, - }, + Transport: transport, }, store: store, }