fix: re-use connections in webhook/elasticsearch (#9461)

- elasticsearch client should rely on the SDK helpers
  instead of pure HTTP calls.
- webhook shouldn't need to check for IsActive() for
  all notifications, failure should be delayed.
- Remove DialHTTP as its never used properly

Fixes #9460
master
Harshavardhana 5 years ago committed by GitHub
parent 1b122526aa
commit 7a5271ad96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      cmd/admin-handlers.go
  2. 28
      pkg/event/target/common.go
  3. 75
      pkg/event/target/elasticsearch.go
  4. 165
      pkg/event/target/httpclient.go
  5. 39
      pkg/event/target/webhook.go
  6. 32
      pkg/net/url.go

@ -21,7 +21,6 @@ import (
"crypto/subtle" "crypto/subtle"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -44,7 +43,6 @@ import (
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/logger/message/log" "github.com/minio/minio/cmd/logger/message/log"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/event/target"
"github.com/minio/minio/pkg/handlers" "github.com/minio/minio/pkg/handlers"
iampolicy "github.com/minio/minio/pkg/iam/policy" iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
@ -1534,7 +1532,6 @@ func fetchVaultStatus(cfg config.Config) madmin.Vault {
} }
if err := checkConnection(kmsInfo.Endpoint, 15*time.Second); err != nil { if err := checkConnection(kmsInfo.Endpoint, 15*time.Second); err != nil {
vault.Status = "offline" vault.Status = "offline"
} else { } else {
vault.Status = "online" vault.Status = "online"
@ -1602,21 +1599,23 @@ func fetchLoggerInfo(cfg config.Config) ([]madmin.Logger, []madmin.Audit) {
// checkConnection - ping an endpoint , return err in case of no connection // checkConnection - ping an endpoint , return err in case of no connection
func checkConnection(endpointStr string, timeout time.Duration) error { func checkConnection(endpointStr string, timeout time.Duration) error {
u, pErr := xnet.ParseURL(endpointStr)
if pErr != nil {
return pErr
}
tr := newCustomHTTPTransport(&tls.Config{RootCAs: globalRootCAs}, timeout)() tr := newCustomHTTPTransport(&tls.Config{RootCAs: globalRootCAs}, timeout)()
defer tr.CloseIdleConnections() defer tr.CloseIdleConnections()
if dErr := u.DialHTTP(tr); dErr != nil {
if urlErr, ok := dErr.(*url.Error); ok { ctx, cancel := context.WithTimeout(GlobalContext, timeout)
// To treat "connection refused" errors as un reachable endpoint. defer cancel()
if target.IsConnRefusedErr(urlErr.Err) {
return errors.New("endpoint unreachable, please check your endpoint") req, err := http.NewRequest(http.MethodHead, endpointStr, nil)
} if err != nil {
} return err
return dErr }
client := &http.Client{Transport: tr}
resp, err := client.Do(req.WithContext(ctx))
if err != nil {
return err
} }
defer xhttp.DrainBody(resp.Body)
resp.Body.Close()
return nil return nil
} }

@ -0,0 +1,28 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import "github.com/google/uuid"
func getNewUUID() (string, error) {
u, err := uuid.NewRandom()
if err != nil {
return "", err
}
return u.String(), nil
}

@ -19,10 +19,12 @@ package target
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
@ -98,13 +100,17 @@ func (target *ElasticsearchTarget) HasQueueStore() bool {
// IsActive - Return true if target is up and active // IsActive - Return true if target is up and active
func (target *ElasticsearchTarget) IsActive() (bool, error) { func (target *ElasticsearchTarget) IsActive() (bool, error) {
if dErr := target.args.URL.DialHTTP(nil); dErr != nil { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if xnet.IsNetworkOrHostDown(dErr) { defer cancel()
_, code, err := target.client.Ping(target.args.URL.String()).HttpHeadOnly(true).Do(ctx)
if err != nil {
if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) {
return false, errNotConnected return false, errNotConnected
} }
return false, dErr return false, err
} }
return true, nil return !(code >= http.StatusBadRequest), nil
} }
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
@ -112,11 +118,11 @@ func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) return target.store.Put(eventData)
} }
_, err := target.IsActive() err := target.send(eventData)
if err != nil { if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) {
return err return errNotConnected
} }
return target.send(eventData) return err
} }
// send - sends the event to the target. // send - sends the event to the target.
@ -170,19 +176,13 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to Elasticsearch. // Send - reads an event from store and sends it to Elasticsearch.
func (target *ElasticsearchTarget) Send(eventKey string) error { func (target *ElasticsearchTarget) Send(eventKey string) error {
var err error var err error
if target.client == nil { if target.client == nil {
target.client, err = newClient(target.args) target.client, err = newClient(target.args)
if err != nil { if err != nil {
return err return err
} }
} }
_, err = target.IsActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey) eventData, eErr := target.store.Get(eventKey)
if eErr != nil { if eErr != nil {
@ -195,7 +195,7 @@ func (target *ElasticsearchTarget) Send(eventKey string) error {
} }
if err := target.send(eventData); err != nil { if err := target.send(eventData); err != nil {
if xnet.IsNetworkOrHostDown(err) { if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) {
return errNotConnected return errNotConnected
} }
return err return err
@ -235,26 +235,22 @@ func createIndex(client *elastic.Client, args ElasticsearchArgs) error {
// newClient - creates a new elastic client with args provided. // newClient - creates a new elastic client with args provided.
func newClient(args ElasticsearchArgs) (*elastic.Client, error) { func newClient(args ElasticsearchArgs) (*elastic.Client, error) {
client, clientErr := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetSniff(false), elastic.SetMaxRetries(10)) client, err := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetMaxRetries(10))
if clientErr != nil { if err != nil {
if !(errors.Cause(clientErr) == elastic.ErrNoClient) { // https://github.com/olivere/elastic/wiki/Connection-Errors
return nil, clientErr if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err) {
} return nil, errNotConnected
} else {
if err := createIndex(client, args); err != nil {
return nil, err
} }
return nil, err
}
if err = createIndex(client, args); err != nil {
return nil, err
} }
return client, nil return client, nil
} }
// NewElasticsearchTarget - creates new Elasticsearch target. // NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) { func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) {
var client *elastic.Client
var err error
var store Store
target := &ElasticsearchTarget{ target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"}, id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args, args: args,
@ -263,27 +259,20 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str
if args.QueueDir != "" { if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
store = NewQueueStore(queueDir, args.QueueLimit) target.store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil { if err := target.store.Open(); err != nil {
target.loggerOnce(context.Background(), oErr, target.ID()) target.loggerOnce(context.Background(), err, target.ID())
return target, oErr return target, err
} }
target.store = store
} }
dErr := args.URL.DialHTTP(nil) var err error
if dErr != nil { target.client, err = newClient(args)
if store == nil { if err != nil {
target.loggerOnce(context.Background(), dErr, target.ID()) if target.store == nil || err != errNotConnected {
return target, dErr
}
} else {
client, err = newClient(args)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID()) target.loggerOnce(context.Background(), err, target.ID())
return target, err return target, err
} }
target.client = client
} }
if target.store != nil && !test { if target.store != nil && !test {

@ -1,165 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"encoding/json"
"errors"
"net/http"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
// HTTPClientTarget - HTTP client target.
type HTTPClientTarget struct {
id event.TargetID
w http.ResponseWriter
eventCh chan []byte
DoneCh chan struct{}
stopCh chan struct{}
isStopped uint32
isRunning uint32
}
// ID - returns target ID.
func (target HTTPClientTarget) ID() event.TargetID {
return target.id
}
// IsActive - does nothing and available for interface compatibility.
func (target *HTTPClientTarget) IsActive() (bool, error) {
return true, nil
}
// HasQueueStore - No-Op. Added for interface compatibility
func (target *HTTPClientTarget) HasQueueStore() bool {
return false
}
func (target *HTTPClientTarget) start() {
go func() {
defer func() {
atomic.AddUint32(&target.isRunning, 1)
// Close DoneCh to indicate we are done.
close(target.DoneCh)
}()
write := func(event []byte) error {
if _, err := target.w.Write(event); err != nil {
return err
}
target.w.(http.Flusher).Flush()
return nil
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
for {
select {
case <-target.stopCh:
// We are asked to stop.
return
case event, ok := <-target.eventCh:
if !ok {
// Got read error. Exit the goroutine.
return
}
if err := write(event); err != nil {
// Got write error to the client. Exit the goroutine.
return
}
case <-keepAliveTicker.C:
if err := write([]byte(" ")); err != nil {
// Got write error to the client. Exit the goroutine.
return
}
}
}
}()
}
// Save - sends event to HTTP client.
func (target *HTTPClientTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *HTTPClientTarget) send(eventData event.Event) error {
if atomic.LoadUint32(&target.isRunning) != 0 {
return errors.New("closed http connection")
}
data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
if err != nil {
return err
}
data = append(data, byte('\n'))
select {
case target.eventCh <- data:
return nil
case <-target.DoneCh:
return errors.New("error in sending event")
}
}
// Send - interface compatible method does no-op.
func (target *HTTPClientTarget) Send(eventKey string) error {
return nil
}
// Close - closes underneath goroutine.
func (target *HTTPClientTarget) Close() error {
atomic.AddUint32(&target.isStopped, 1)
if atomic.LoadUint32(&target.isStopped) == 1 {
close(target.stopCh)
}
return nil
}
func getNewUUID() (string, error) {
u, err := uuid.NewRandom()
if err != nil {
return "", err
}
return u.String(), nil
}
// NewHTTPClientTarget - creates new HTTP client target.
func NewHTTPClientTarget(host xnet.Host, w http.ResponseWriter) (*HTTPClientTarget, error) {
uuid, err := getNewUUID()
if err != nil {
return nil, err
}
c := &HTTPClientTarget{
id: event.TargetID{ID: "httpclient" + "+" + uuid + "+" + host.Name, Name: host.Port.String()},
w: w,
eventCh: make(chan []byte),
DoneCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
c.start()
return c, nil
}

@ -28,6 +28,7 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
@ -94,16 +95,27 @@ func (target *WebhookTarget) HasQueueStore() bool {
// IsActive - Return true if target is up and active // IsActive - Return true if target is up and active
func (target *WebhookTarget) IsActive() (bool, error) { func (target *WebhookTarget) IsActive() (bool, error) {
u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if pErr != nil { defer cancel()
return false, pErr
req, err := http.NewRequest(http.MethodHead, target.args.Endpoint.String(), nil)
if err != nil {
if xnet.IsNetworkOrHostDown(err) {
return false, errNotConnected
}
return false, err
} }
if dErr := u.DialHTTP(nil); dErr != nil {
if xnet.IsNetworkOrHostDown(dErr) { resp, err := target.httpClient.Do(req.WithContext(ctx))
if err != nil {
if xnet.IsNetworkOrHostDown(err) || err == context.DeadlineExceeded {
return false, errNotConnected return false, errNotConnected
} }
return false, dErr return false, err
} }
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
// No network failure i.e response from the target means its up
return true, nil return true, nil
} }
@ -112,11 +124,13 @@ func (target *WebhookTarget) Save(eventData event.Event) error {
if target.store != nil { if target.store != nil {
return target.store.Put(eventData) return target.store.Put(eventData)
} }
_, err := target.IsActive() err := target.send(eventData)
if err != nil { if err != nil {
return err if xnet.IsNetworkOrHostDown(err) {
return errNotConnected
}
} }
return target.send(eventData) return err
} }
// send - sends an event to the webhook. // send - sends an event to the webhook.
@ -161,10 +175,6 @@ func (target *WebhookTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to webhook. // Send - reads an event from store and sends it to webhook.
func (target *WebhookTarget) Send(eventKey string) error { func (target *WebhookTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey) eventData, eErr := target.store.Get(eventKey)
if eErr != nil { if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
@ -217,7 +227,8 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge
target.store = store target.store = store
} }
if _, err := target.IsActive(); err != nil { _, err := target.IsActive()
if err != nil {
if target.store == nil || err != errNotConnected { if target.store == nil || err != errNotConnected {
target.loggerOnce(context.Background(), err, target.ID()) target.loggerOnce(context.Background(), err, target.ID())
return target, err return target, err

@ -20,14 +20,10 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"net" "net"
"net/http"
"net/url" "net/url"
"path" "path"
"strings" "strings"
"time"
) )
// URL - improved JSON friendly url.URL. // URL - improved JSON friendly url.URL.
@ -85,34 +81,6 @@ func (u *URL) UnmarshalJSON(data []byte) (err error) {
return nil return nil
} }
// DialHTTP - dials the url to check the connection.
func (u URL) DialHTTP(transport *http.Transport) error {
if transport == nil {
transport = &http.Transport{
DialContext: (&net.Dialer{
Timeout: 2 * time.Second,
}).DialContext,
}
}
var client = &http.Client{
Transport: transport,
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
return nil
}
// ParseHTTPURL - parses a string into HTTP URL, string is // ParseHTTPURL - parses a string into HTTP URL, string is
// expected to be of form http:// or https:// // expected to be of form http:// or https://
func ParseHTTPURL(s string) (u *URL, err error) { func ParseHTTPURL(s string) (u *URL, err error) {

Loading…
Cancel
Save