From 7a5271ad96164276e0341b164b3b9c0bcd374c44 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 28 Apr 2020 13:57:56 -0700 Subject: [PATCH] fix: re-use connections in webhook/elasticsearch (#9461) - elasticsearch client should rely on the SDK helpers instead of pure HTTP calls. - webhook shouldn't need to check for IsActive() for all notifications, failure should be delayed. - Remove DialHTTP as its never used properly Fixes #9460 --- cmd/admin-handlers.go | 31 +++--- pkg/event/target/common.go | 28 +++++ pkg/event/target/elasticsearch.go | 75 ++++++-------- pkg/event/target/httpclient.go | 165 ------------------------------ pkg/event/target/webhook.go | 39 ++++--- pkg/net/url.go | 32 ------ 6 files changed, 100 insertions(+), 270 deletions(-) create mode 100644 pkg/event/target/common.go delete mode 100644 pkg/event/target/httpclient.go diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index e2e05a6dd..0f024e156 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -21,7 +21,6 @@ import ( "crypto/subtle" "crypto/tls" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -44,7 +43,6 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger/message/log" "github.com/minio/minio/pkg/auth" - "github.com/minio/minio/pkg/event/target" "github.com/minio/minio/pkg/handlers" iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/madmin" @@ -1534,7 +1532,6 @@ func fetchVaultStatus(cfg config.Config) madmin.Vault { } if err := checkConnection(kmsInfo.Endpoint, 15*time.Second); err != nil { - vault.Status = "offline" } else { vault.Status = "online" @@ -1602,21 +1599,23 @@ func fetchLoggerInfo(cfg config.Config) ([]madmin.Logger, []madmin.Audit) { // checkConnection - ping an endpoint , return err in case of no connection func checkConnection(endpointStr string, timeout time.Duration) error { - u, pErr := xnet.ParseURL(endpointStr) - if pErr != nil { - return pErr - } - tr := newCustomHTTPTransport(&tls.Config{RootCAs: globalRootCAs}, timeout)() defer tr.CloseIdleConnections() - if dErr := u.DialHTTP(tr); dErr != nil { - if urlErr, ok := dErr.(*url.Error); ok { - // To treat "connection refused" errors as un reachable endpoint. - if target.IsConnRefusedErr(urlErr.Err) { - return errors.New("endpoint unreachable, please check your endpoint") - } - } - return dErr + + ctx, cancel := context.WithTimeout(GlobalContext, timeout) + defer cancel() + + req, err := http.NewRequest(http.MethodHead, endpointStr, nil) + if err != nil { + return err + } + + client := &http.Client{Transport: tr} + resp, err := client.Do(req.WithContext(ctx)) + if err != nil { + return err } + defer xhttp.DrainBody(resp.Body) + resp.Body.Close() return nil } diff --git a/pkg/event/target/common.go b/pkg/event/target/common.go new file mode 100644 index 000000000..967db8500 --- /dev/null +++ b/pkg/event/target/common.go @@ -0,0 +1,28 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package target + +import "github.com/google/uuid" + +func getNewUUID() (string, error) { + u, err := uuid.NewRandom() + if err != nil { + return "", err + } + + return u.String(), nil +} diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 072fd52a3..59d15d976 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -19,10 +19,12 @@ package target import ( "context" "fmt" + "net/http" "net/url" "os" "path/filepath" "strings" + "time" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" @@ -98,13 +100,17 @@ func (target *ElasticsearchTarget) HasQueueStore() bool { // IsActive - Return true if target is up and active func (target *ElasticsearchTarget) IsActive() (bool, error) { - if dErr := target.args.URL.DialHTTP(nil); dErr != nil { - if xnet.IsNetworkOrHostDown(dErr) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, code, err := target.client.Ping(target.args.URL.String()).HttpHeadOnly(true).Do(ctx) + if err != nil { + if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) { return false, errNotConnected } - return false, dErr + return false, err } - return true, nil + return !(code >= http.StatusBadRequest), nil } // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. @@ -112,11 +118,11 @@ func (target *ElasticsearchTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() - if err != nil { - return err + err := target.send(eventData) + if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) { + return errNotConnected } - return target.send(eventData) + return err } // send - sends the event to the target. @@ -170,19 +176,13 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to Elasticsearch. func (target *ElasticsearchTarget) Send(eventKey string) error { - var err error - if target.client == nil { target.client, err = newClient(target.args) if err != nil { return err } } - _, err = target.IsActive() - if err != nil { - return err - } eventData, eErr := target.store.Get(eventKey) if eErr != nil { @@ -195,7 +195,7 @@ func (target *ElasticsearchTarget) Send(eventKey string) error { } if err := target.send(eventData); err != nil { - if xnet.IsNetworkOrHostDown(err) { + if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) { return errNotConnected } return err @@ -235,26 +235,22 @@ func createIndex(client *elastic.Client, args ElasticsearchArgs) error { // newClient - creates a new elastic client with args provided. func newClient(args ElasticsearchArgs) (*elastic.Client, error) { - client, clientErr := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetSniff(false), elastic.SetMaxRetries(10)) - if clientErr != nil { - if !(errors.Cause(clientErr) == elastic.ErrNoClient) { - return nil, clientErr - } - } else { - if err := createIndex(client, args); err != nil { - return nil, err + client, err := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetMaxRetries(10)) + if err != nil { + // https://github.com/olivere/elastic/wiki/Connection-Errors + if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) { + return nil, errNotConnected } + return nil, err + } + if err = createIndex(client, args); err != nil { + return nil, err } return client, nil } // NewElasticsearchTarget - creates new Elasticsearch target. func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) { - var client *elastic.Client - var err error - - var store Store - target := &ElasticsearchTarget{ id: event.TargetID{ID: id, Name: "elasticsearch"}, args: args, @@ -263,27 +259,20 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - target.loggerOnce(context.Background(), oErr, target.ID()) - return target, oErr + target.store = NewQueueStore(queueDir, args.QueueLimit) + if err := target.store.Open(); err != nil { + target.loggerOnce(context.Background(), err, target.ID()) + return target, err } - target.store = store } - dErr := args.URL.DialHTTP(nil) - if dErr != nil { - if store == nil { - target.loggerOnce(context.Background(), dErr, target.ID()) - return target, dErr - } - } else { - client, err = newClient(args) - if err != nil { + var err error + target.client, err = newClient(args) + if err != nil { + if target.store == nil || err != errNotConnected { target.loggerOnce(context.Background(), err, target.ID()) return target, err } - target.client = client } if target.store != nil && !test { diff --git a/pkg/event/target/httpclient.go b/pkg/event/target/httpclient.go deleted file mode 100644 index 85af07077..000000000 --- a/pkg/event/target/httpclient.go +++ /dev/null @@ -1,165 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2018 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package target - -import ( - "encoding/json" - "errors" - "net/http" - "sync/atomic" - "time" - - "github.com/google/uuid" - "github.com/minio/minio/pkg/event" - xnet "github.com/minio/minio/pkg/net" -) - -// HTTPClientTarget - HTTP client target. -type HTTPClientTarget struct { - id event.TargetID - w http.ResponseWriter - eventCh chan []byte - DoneCh chan struct{} - stopCh chan struct{} - isStopped uint32 - isRunning uint32 -} - -// ID - returns target ID. -func (target HTTPClientTarget) ID() event.TargetID { - return target.id -} - -// IsActive - does nothing and available for interface compatibility. -func (target *HTTPClientTarget) IsActive() (bool, error) { - return true, nil -} - -// HasQueueStore - No-Op. Added for interface compatibility -func (target *HTTPClientTarget) HasQueueStore() bool { - return false -} - -func (target *HTTPClientTarget) start() { - go func() { - defer func() { - atomic.AddUint32(&target.isRunning, 1) - - // Close DoneCh to indicate we are done. - close(target.DoneCh) - }() - - write := func(event []byte) error { - if _, err := target.w.Write(event); err != nil { - return err - } - - target.w.(http.Flusher).Flush() - return nil - } - - keepAliveTicker := time.NewTicker(500 * time.Millisecond) - defer keepAliveTicker.Stop() - - for { - select { - case <-target.stopCh: - // We are asked to stop. - return - case event, ok := <-target.eventCh: - if !ok { - // Got read error. Exit the goroutine. - return - } - if err := write(event); err != nil { - // Got write error to the client. Exit the goroutine. - return - } - case <-keepAliveTicker.C: - if err := write([]byte(" ")); err != nil { - // Got write error to the client. Exit the goroutine. - return - } - } - } - }() -} - -// Save - sends event to HTTP client. -func (target *HTTPClientTarget) Save(eventData event.Event) error { - return target.send(eventData) -} - -func (target *HTTPClientTarget) send(eventData event.Event) error { - if atomic.LoadUint32(&target.isRunning) != 0 { - return errors.New("closed http connection") - } - - data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}) - if err != nil { - return err - } - data = append(data, byte('\n')) - - select { - case target.eventCh <- data: - return nil - case <-target.DoneCh: - return errors.New("error in sending event") - } -} - -// Send - interface compatible method does no-op. -func (target *HTTPClientTarget) Send(eventKey string) error { - return nil -} - -// Close - closes underneath goroutine. -func (target *HTTPClientTarget) Close() error { - atomic.AddUint32(&target.isStopped, 1) - if atomic.LoadUint32(&target.isStopped) == 1 { - close(target.stopCh) - } - - return nil -} - -func getNewUUID() (string, error) { - u, err := uuid.NewRandom() - if err != nil { - return "", err - } - - return u.String(), nil -} - -// NewHTTPClientTarget - creates new HTTP client target. -func NewHTTPClientTarget(host xnet.Host, w http.ResponseWriter) (*HTTPClientTarget, error) { - uuid, err := getNewUUID() - if err != nil { - return nil, err - } - c := &HTTPClientTarget{ - id: event.TargetID{ID: "httpclient" + "+" + uuid + "+" + host.Name, Name: host.Port.String()}, - w: w, - eventCh: make(chan []byte), - DoneCh: make(chan struct{}), - stopCh: make(chan struct{}), - } - c.start() - return c, nil -} diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index b1c43f1fa..5f6e3e672 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -28,6 +28,7 @@ import ( "net/url" "os" "path/filepath" + "time" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" @@ -94,16 +95,27 @@ func (target *WebhookTarget) HasQueueStore() bool { // IsActive - Return true if target is up and active func (target *WebhookTarget) IsActive() (bool, error) { - u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String()) - if pErr != nil { - return false, pErr + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req, err := http.NewRequest(http.MethodHead, target.args.Endpoint.String(), nil) + if err != nil { + if xnet.IsNetworkOrHostDown(err) { + return false, errNotConnected + } + return false, err } - if dErr := u.DialHTTP(nil); dErr != nil { - if xnet.IsNetworkOrHostDown(dErr) { + + resp, err := target.httpClient.Do(req.WithContext(ctx)) + if err != nil { + if xnet.IsNetworkOrHostDown(err) || err == context.DeadlineExceeded { return false, errNotConnected } - return false, dErr + return false, err } + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + // No network failure i.e response from the target means its up return true, nil } @@ -112,11 +124,13 @@ func (target *WebhookTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - _, err := target.IsActive() + err := target.send(eventData) if err != nil { - return err + if xnet.IsNetworkOrHostDown(err) { + return errNotConnected + } } - return target.send(eventData) + return err } // send - sends an event to the webhook. @@ -161,10 +175,6 @@ func (target *WebhookTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to webhook. func (target *WebhookTarget) Send(eventKey string) error { - _, err := target.IsActive() - if err != nil { - return err - } eventData, eErr := target.store.Get(eventKey) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() @@ -217,7 +227,8 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge target.store = store } - if _, err := target.IsActive(); err != nil { + _, err := target.IsActive() + if err != nil { if target.store == nil || err != errNotConnected { target.loggerOnce(context.Background(), err, target.ID()) return target, err diff --git a/pkg/net/url.go b/pkg/net/url.go index f3853dcc1..1979455b4 100644 --- a/pkg/net/url.go +++ b/pkg/net/url.go @@ -20,14 +20,10 @@ import ( "encoding/json" "errors" "fmt" - "io" - "io/ioutil" "net" - "net/http" "net/url" "path" "strings" - "time" ) // URL - improved JSON friendly url.URL. @@ -85,34 +81,6 @@ func (u *URL) UnmarshalJSON(data []byte) (err error) { return nil } -// DialHTTP - dials the url to check the connection. -func (u URL) DialHTTP(transport *http.Transport) error { - if transport == nil { - transport = &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 2 * time.Second, - }).DialContext, - } - - } - - var client = &http.Client{ - Transport: transport, - } - - req, err := http.NewRequest("POST", u.String(), nil) - if err != nil { - return err - } - resp, err := client.Do(req) - if err != nil { - return err - } - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - return nil -} - // ParseHTTPURL - parses a string into HTTP URL, string is // expected to be of form http:// or https:// func ParseHTTPURL(s string) (u *URL, err error) {