Start using custom HTTP transport for webhook endpoints (#8630)

Use a more performant http transport for webhook
endpoints with proper connection pooling, appropriate
timeouts etc.
master
Harshavardhana 5 years ago committed by kannappanr
parent 555969ee42
commit c364f0af6c
  1. 2
      cmd/config-current.go
  2. 29
      cmd/config/notify/parse.go
  3. 2
      cmd/notification.go
  4. 34
      pkg/event/target/webhook.go

@ -282,7 +282,7 @@ func validateConfig(s config.Config) error {
return err
}
return notify.TestNotificationTargets(s, GlobalServiceDoneCh, globalRootCAs)
return notify.TestNotificationTargets(s, GlobalServiceDoneCh, NewCustomHTTPTransport())
}
func lookupConfigs(s config.Config) (err error) {

@ -19,6 +19,7 @@ package notify
import (
"crypto/tls"
"crypto/x509"
"net/http"
"strconv"
"strings"
"time"
@ -38,15 +39,15 @@ const (
// TestNotificationTargets is similar to GetNotificationTargets()
// avoids explicit registration.
func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs *x509.CertPool) error {
_, err := RegisterNotificationTargets(cfg, doneCh, rootCAs, true)
func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport) error {
_, err := RegisterNotificationTargets(cfg, doneCh, transport, true)
return err
}
// GetNotificationTargets registers and initializes all notification
// targets, returns error if any.
func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs *x509.CertPool) (*event.TargetList, error) {
return RegisterNotificationTargets(cfg, doneCh, rootCAs, false)
func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport) (*event.TargetList, error) {
return RegisterNotificationTargets(cfg, doneCh, transport, false)
}
// RegisterNotificationTargets - returns TargetList which contains enabled targets in serverConfig.
@ -54,7 +55,7 @@ func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs *
// * Add a new target in pkg/event/target package.
// * Add newly added target configuration to serverConfig.Notify.<TARGET_NAME>.
// * Handle the configuration in this function to create/add into TargetList.
func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, rootCAs *x509.CertPool, test bool) (*event.TargetList, error) {
func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, test bool) (*event.TargetList, error) {
targetList := event.NewTargetList()
if err := checkValidNotificationKeys(cfg); err != nil {
return nil, err
@ -75,7 +76,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
return nil, err
}
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], rootCAs)
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
@ -85,7 +86,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
return nil, err
}
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], rootCAs)
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs)
if err != nil {
return nil, err
}
@ -105,7 +106,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
return nil, err
}
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], rootCAs)
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport)
if err != nil {
return nil, err
}
@ -149,7 +150,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
if !args.Enable {
continue
}
args.TLS.RootCAs = rootCAs
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf)
if err != nil {
return nil, err
@ -167,7 +168,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
if !args.Enable {
continue
}
args.RootCAs = rootCAs
args.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf)
if err != nil {
return nil, err
@ -270,8 +271,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, root
if !args.Enable {
continue
}
args.RootCAs = rootCAs
newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport)
if err != nil {
return nil, err
}
@ -1426,7 +1426,8 @@ var (
)
// GetNotifyWebhook - returns a map of registered notification 'webhook' targets
func GetNotifyWebhook(webhookKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.WebhookArgs, error) {
func GetNotifyWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) (
map[string]target.WebhookArgs, error) {
webhookTargets := make(map[string]target.WebhookArgs)
for k, kv := range mergeTargets(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) {
enableEnv := target.EnvWebhookEnable
@ -1468,7 +1469,7 @@ func GetNotifyWebhook(webhookKVS map[string]config.KVS, rootCAs *x509.CertPool)
webhookArgs := target.WebhookArgs{
Enable: enabled,
Endpoint: *url,
RootCAs: rootCAs,
Transport: transport,
AuthToken: env.Get(authEnv, kv.Get(target.WebhookAuthToken)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.WebhookQueueDir)),
QueueLimit: uint64(queueLimit),

@ -610,7 +610,7 @@ func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucket
// AddNotificationTargetsFromConfig - adds notification targets from server config.
func (sys *NotificationSys) AddNotificationTargetsFromConfig(cfg config.Config) error {
targetList, err := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, globalRootCAs)
targetList, err := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport())
if err != nil {
return err
}

@ -19,19 +19,15 @@ package target
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"time"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
@ -53,12 +49,12 @@ const (
// WebhookArgs - Webhook target arguments.
type WebhookArgs struct {
Enable bool `json:"enable"`
Endpoint xnet.URL `json:"endpoint"`
AuthToken string `json:"authToken"`
RootCAs *x509.CertPool `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
Enable bool `json:"enable"`
Endpoint xnet.URL `json:"endpoint"`
AuthToken string `json:"authToken"`
Transport *http.Transport `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate WebhookArgs fields
@ -149,11 +145,12 @@ func (target *WebhookTarget) send(eventData event.Event) error {
return err
}
// FIXME: log returned error. ignore time being.
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
// close any idle connections upon any error.
target.httpClient.CloseIdleConnections()
return fmt.Errorf("sending event failed with %v", resp.Status)
}
@ -193,7 +190,7 @@ func (target *WebhookTarget) Close() error {
}
// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*WebhookTarget, error) {
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport) (*WebhookTarget, error) {
var store Store
@ -209,16 +206,7 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
httpClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: args.RootCAs},
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 5 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 3 * time.Second,
ResponseHeaderTimeout: 3 * time.Second,
ExpectContinueTimeout: 2 * time.Second,
},
Transport: transport,
},
store: store,
}

Loading…
Cancel
Save