From 175b07d6e418fdf5b4d7c62178d7ab3a4d35984d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 11 Oct 2019 17:46:03 -0700 Subject: [PATCH] Fix queueStore stops working with concurrent PUT/DELETE requests (#8381) - This PR fixes situation to avoid underflow, this is possible because of disconnected operations in replay/sendEvents - Hold right locks if Del() operation is performed in Get() - Remove panic in the code and use loggerOnce - Remove Timer and instead use Ticker instead for proper ticks --- cmd/config-current.go | 30 +++---- pkg/event/target/amqp.go | 5 +- pkg/event/target/elasticsearch.go | 6 +- pkg/event/target/kafka.go | 7 +- pkg/event/target/mqtt.go | 96 +++++++++++----------- pkg/event/target/mysql.go | 7 +- pkg/event/target/nats.go | 7 +- pkg/event/target/nsq.go | 7 +- pkg/event/target/postgresql.go | 7 +- pkg/event/target/queuestore.go | 118 ++++++++++++++++------------ pkg/event/target/queuestore_test.go | 44 +++++++++-- pkg/event/target/redis.go | 4 +- pkg/event/target/store.go | 69 +++++++--------- pkg/event/target/webhook.go | 7 +- 14 files changed, 230 insertions(+), 184 deletions(-) diff --git a/cmd/config-current.go b/cmd/config-current.go index 065086d93..03f1314ad 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -373,7 +373,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewElasticsearchTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewElasticsearchTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("elasticsearch(%s): %s", k, err.Error()) } @@ -387,7 +387,7 @@ func (s *serverConfig) TestNotificationTargets() error { if v.TLS.Enable { v.TLS.RootCAs = globalRootCAs } - t, err := target.NewKafkaTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewKafkaTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("kafka(%s): %s", k, err.Error()) } @@ -399,7 +399,7 @@ func (s *serverConfig) TestNotificationTargets() error { continue } v.RootCAs = globalRootCAs - t, err := target.NewMQTTTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewMQTTTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("mqtt(%s): %s", k, err.Error()) } @@ -410,7 +410,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewMySQLTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewMySQLTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("mysql(%s): %s", k, err.Error()) } @@ -421,7 +421,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewNATSTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewNATSTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("nats(%s): %s", k, err.Error()) } @@ -432,7 +432,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewNSQTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewNSQTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("nsq(%s): %s", k, err.Error()) } @@ -443,7 +443,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewPostgreSQLTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewPostgreSQLTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("postgreSQL(%s): %s", k, err.Error()) } @@ -583,7 +583,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.Elasticsearch { if args.Enable { - newTarget, err := target.NewElasticsearchTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewElasticsearchTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -602,7 +602,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { if args.TLS.Enable { args.TLS.RootCAs = globalRootCAs } - newTarget, err := target.NewKafkaTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewKafkaTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -617,7 +617,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.MQTT { if args.Enable { args.RootCAs = globalRootCAs - newTarget, err := target.NewMQTTTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewMQTTTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -631,7 +631,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.MySQL { if args.Enable { - newTarget, err := target.NewMySQLTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewMySQLTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -645,7 +645,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.NATS { if args.Enable { - newTarget, err := target.NewNATSTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewNATSTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -659,7 +659,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.NSQ { if args.Enable { - newTarget, err := target.NewNSQTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewNSQTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -673,7 +673,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.PostgreSQL { if args.Enable { - newTarget, err := target.NewPostgreSQLTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewPostgreSQLTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -702,7 +702,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.Webhook { if args.Enable { args.RootCAs = globalRootCAs - newTarget := target.NewWebhookTarget(id, args, GlobalServiceDoneCh) + newTarget := target.NewWebhookTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err := targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) continue diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index 6805883f9..b56525174 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -246,9 +246,10 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 64d533163..7b028c0e8 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -221,7 +221,7 @@ func newClient(args ElasticsearchArgs) (*elastic.Client, error) { } // NewElasticsearchTarget - creates new Elasticsearch target. -func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}) (*ElasticsearchTarget, error) { +func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*ElasticsearchTarget, error) { var client *elastic.Client var err error @@ -256,9 +256,9 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index 23e00846b..697eccadd 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -17,6 +17,7 @@ package target import ( + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -191,7 +192,7 @@ func (k KafkaArgs) pingBrokers() bool { } // NewKafkaTarget - creates new Kafka target with auth credentials. -func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}) (*KafkaTarget, error) { +func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*KafkaTarget, error) { config := sarama.NewConfig() config.Net.SASL.User = args.SASL.User @@ -242,9 +243,9 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}) (*KafkaTa if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index 0cc22d0f3..7e1cf9417 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -17,16 +17,18 @@ package target import ( + "context" "crypto/tls" "crypto/x509" "encoding/json" "errors" + "fmt" "net/url" "os" "path/filepath" "time" - "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" ) @@ -82,10 +84,11 @@ func (m MQTTArgs) Validate() error { // MQTTTarget - MQTT target. type MQTTTarget struct { - id event.TargetID - args MQTTArgs - client mqtt.Client - store Store + id event.TargetID + args MQTTArgs + client mqtt.Client + store Store + loggerOnce func(ctx context.Context, err error, id interface{}) } // ID - returns target ID. @@ -116,22 +119,21 @@ func (target *MQTTTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to MQTT. func (target *MQTTTarget) Send(eventKey string) error { - if !target.client.IsConnectionOpen() { return errNotConnected } - eventData, eErr := target.store.Get(eventKey) - if eErr != nil { + eventData, err := target.store.Get(eventKey) + if err != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. - if os.IsNotExist(eErr) { + if os.IsNotExist(err) { return nil } - return eErr + return err } - if err := target.send(eventData); err != nil { + if err = target.send(eventData); err != nil { return err } @@ -139,7 +141,8 @@ func (target *MQTTTarget) Send(eventKey string) error { return target.store.Del(eventKey) } -// Save - saves the events to the store if queuestore is configured, which will be replayed when the mqtt connection is active. +// Save - saves the events to the store if queuestore is configured, which will +// be replayed when the mqtt connection is active. func (target *MQTTTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) @@ -159,7 +162,7 @@ func (target *MQTTTarget) Close() error { } // NewMQTTTarget - creates new MQTT target. -func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarget, error) { +func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*MQTTTarget, error) { options := mqtt.NewClientOptions(). SetClientID(""). SetCleanSession(true). @@ -170,57 +173,60 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge SetTLSConfig(&tls.Config{RootCAs: args.RootCAs}). AddBroker(args.Broker.String()) - var store Store - - if args.QueueDir != "" { - queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id) - store = NewQueueStore(queueDir, args.QueueLimit) - if oErr := store.Open(); oErr != nil { - return nil, oErr - } - } - client := mqtt.NewClient(options) // The client should establish a first time connection. // Connect() should be successful atleast once to publish events. token := client.Connect() + target := &MQTTTarget{ + id: event.TargetID{ID: id, Name: "mqtt"}, + args: args, + client: client, + loggerOnce: loggerOnce, + } + // Retries until the clientID gets registered. retryRegister := func() { // Repeat the pings until the client registers the clientId and receives a token. for { - if token.Wait() && token.Error() == nil { - // Connected - break + var terr error + select { + case <-doneCh: + return + default: + terr = token.Error() + if token.Wait() && terr == nil { + // Connected + return + } + // Reconnecting + time.Sleep(reconnectInterval * time.Second) + terr = fmt.Errorf("Previous connect failed with %s, attempting a reconnect", terr) + target.loggerOnce(context.Background(), terr, target.ID()) + token = client.Connect() } - // Reconnecting - time.Sleep(reconnectInterval * time.Second) - token = client.Connect() } } - if store == nil { - if token.Wait() && token.Error() != nil { - return nil, token.Error() + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id) + target.store = NewQueueStore(queueDir, args.QueueLimit) + if err := target.store.Open(); err != nil { + return nil, err } - } else { - go retryRegister() - } - target := &MQTTTarget{ - id: event.TargetID{ID: id, Name: "mqtt"}, - args: args, - client: client, - store: store, - } + go retryRegister() - if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + } else { + if token.Wait() && token.Error() != nil { + return nil, token.Error() + } } - return target, nil } diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index 3aa8ebafc..4e626ccbc 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -54,6 +54,7 @@ package target import ( + "context" "database/sql" "encoding/json" "errors" @@ -311,7 +312,7 @@ func (target *MySQLTarget) executeStmts() error { } // NewMySQLTarget - creates new MySQL target. -func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTarget, error) { +func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*MySQLTarget, error) { var firstPing bool if args.DSN == "" { config := mysql.Config{ @@ -363,9 +364,9 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTa if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/nats.go b/pkg/event/target/nats.go index 65ebf0852..833bfeaa0 100644 --- a/pkg/event/target/nats.go +++ b/pkg/event/target/nats.go @@ -17,6 +17,7 @@ package target import ( + "context" "encoding/json" "errors" "net/url" @@ -233,7 +234,7 @@ func (target *NATSTarget) Close() (err error) { } // NewNATSTarget - creates new NATS target. -func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}) (*NATSTarget, error) { +func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*NATSTarget, error) { var natsConn *nats.Conn var stanConn stan.Conn @@ -271,9 +272,9 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}) (*NATSTarge if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/nsq.go b/pkg/event/target/nsq.go index 68d647b9d..2521835eb 100644 --- a/pkg/event/target/nsq.go +++ b/pkg/event/target/nsq.go @@ -17,6 +17,7 @@ package target import ( + "context" "crypto/tls" "encoding/json" "errors" @@ -149,7 +150,7 @@ func (target *NSQTarget) Close() (err error) { } // NewNSQTarget - creates new NSQ target. -func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}) (*NSQTarget, error) { +func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*NSQTarget, error) { config := nsq.NewConfig() if args.TLS.Enable { config.TlsV1 = true @@ -189,9 +190,9 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}) (*NSQTarget, if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index 849166500..e3e742c67 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -54,6 +54,7 @@ package target import ( + "context" "database/sql" "encoding/json" "errors" @@ -312,7 +313,7 @@ func (target *PostgreSQLTarget) executeStmts() error { } // NewPostgreSQLTarget - creates new PostgreSQL target. -func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}) (*PostgreSQLTarget, error) { +func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*PostgreSQLTarget, error) { var firstPing bool params := []string{args.ConnectionString} @@ -370,9 +371,9 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}) if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/queuestore.go b/pkg/event/target/queuestore.go index 6d9005fe1..dfd1a6b63 100644 --- a/pkg/event/target/queuestore.go +++ b/pkg/event/target/queuestore.go @@ -19,6 +19,7 @@ package target import ( "encoding/json" "io/ioutil" + "math" "os" "path/filepath" "sort" @@ -36,28 +37,29 @@ const ( // QueueStore - Filestore for persisting events. type QueueStore struct { sync.RWMutex - directory string - eC uint64 - limit uint64 + currentEntries uint64 + entryLimit uint64 + directory string } // NewQueueStore - Creates an instance for QueueStore. -func NewQueueStore(directory string, limit uint64) *QueueStore { +func NewQueueStore(directory string, limit uint64) Store { if limit == 0 { limit = maxLimit - currRlimit, _, err := sys.GetMaxOpenFileLimit() + _, maxRLimit, err := sys.GetMaxOpenFileLimit() if err == nil { - if currRlimit > limit { - limit = currRlimit + // Limit the maximum number of entries + // to maximum open file limit + if maxRLimit < limit { + limit = maxRLimit } } } - queueStore := &QueueStore{ - directory: directory, - limit: limit, + return &QueueStore{ + directory: directory, + entryLimit: limit, } - return queueStore } // Open - Creates the directory if not present. @@ -65,16 +67,21 @@ func (store *QueueStore) Open() error { store.Lock() defer store.Unlock() - if terr := os.MkdirAll(store.directory, os.FileMode(0770)); terr != nil { - return terr + if err := os.MkdirAll(store.directory, os.FileMode(0770)); err != nil { + return err + } + + names, err := store.list() + if err != nil { + return err } - eCount := uint64(len(store.list())) - if eCount >= store.limit { + currentEntries := uint64(len(names)) + if currentEntries >= store.entryLimit { return errLimitExceeded } - store.eC = eCount + store.currentEntries = currentEntries return nil } @@ -94,7 +101,7 @@ func (store *QueueStore) write(directory string, key string, e event.Event) erro } // Increment the event count. - store.eC++ + store.currentEntries++ return nil } @@ -103,39 +110,40 @@ func (store *QueueStore) write(directory string, key string, e event.Event) erro func (store *QueueStore) Put(e event.Event) error { store.Lock() defer store.Unlock() - if store.eC >= store.limit { + if store.currentEntries >= store.entryLimit { return errLimitExceeded } - key, kErr := getNewUUID() - if kErr != nil { - return kErr + key, err := getNewUUID() + if err != nil { + return err } return store.write(store.directory, key, e) } // Get - gets a event from the store. -func (store *QueueStore) Get(key string) (event.Event, error) { +func (store *QueueStore) Get(key string) (event event.Event, err error) { store.RLock() - defer store.RUnlock() - - var event event.Event - filepath := filepath.Join(store.directory, key+eventExt) + defer func(store *QueueStore) { + store.RUnlock() + if err != nil { + // Upon error we remove the entry. + store.Del(key) + } + }(store) - eventData, rerr := ioutil.ReadFile(filepath) - if rerr != nil { - store.del(key) - return event, rerr + var eventData []byte + eventData, err = ioutil.ReadFile(filepath.Join(store.directory, key+eventExt)) + if err != nil { + return event, err } if len(eventData) == 0 { - store.del(key) + return event, os.ErrNotExist } - uerr := json.Unmarshal(eventData, &event) - if uerr != nil { - store.del(key) - return event, uerr + if err = json.Unmarshal(eventData, &event); err != nil { + return event, err } return event, nil @@ -150,41 +158,49 @@ func (store *QueueStore) Del(key string) error { // lockless call func (store *QueueStore) del(key string) error { - p := filepath.Join(store.directory, key+eventExt) - - rerr := os.Remove(p) - if rerr != nil { - return rerr + if err := os.Remove(filepath.Join(store.directory, key+eventExt)); err != nil { + return err } - // Decrement the event count. - store.eC-- - + // Decrement the current entries count. + store.currentEntries-- + + // Current entries can underflow, when multiple + // events are being pushed in parallel, this code + // is needed to ensure that we don't underflow. + // + // queueStore replayEvents is not serialized, + // this code is needed to protect us under + // such situations. + if store.currentEntries == math.MaxUint64 { + store.currentEntries = 0 + } return nil } // List - lists all files from the directory. -func (store *QueueStore) List() []string { +func (store *QueueStore) List() ([]string, error) { store.RLock() defer store.RUnlock() return store.list() } -// lockless call. -func (store *QueueStore) list() []string { +// list lock less. +func (store *QueueStore) list() ([]string, error) { var names []string - storeDir, _ := os.Open(store.directory) - files, _ := storeDir.Readdir(-1) + files, err := ioutil.ReadDir(store.directory) + if err != nil { + return names, err + } // Sort the dentries. sort.Slice(files, func(i, j int) bool { - return files[i].ModTime().Unix() < files[j].ModTime().Unix() + return files[i].ModTime().Before(files[j].ModTime()) }) for _, file := range files { names = append(names, file.Name()) } - _ = storeDir.Close() - return names + return names, nil } diff --git a/pkg/event/target/queuestore_test.go b/pkg/event/target/queuestore_test.go index cd8a995ba..3638cb65e 100644 --- a/pkg/event/target/queuestore_test.go +++ b/pkg/event/target/queuestore_test.go @@ -65,8 +65,12 @@ func TestQueueStorePut(t *testing.T) { } } // Count the events. - if len(store.List()) != 100 { - t.Fatalf("List() Expected: 100, got %d", len(store.List())) + names, err := store.List() + if err != nil { + t.Fatal(err) + } + if len(names) != 100 { + t.Fatalf("List() Expected: 100, got %d", len(names)) } } @@ -87,7 +91,10 @@ func TestQueueStoreGet(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.List() + eventKeys, err := store.List() + if err != nil { + t.Fatal(err) + } // Get 10 events. if len(eventKeys) == 10 { for _, key := range eventKeys { @@ -121,7 +128,10 @@ func TestQueueStoreDel(t *testing.T) { t.Fatal("Failed to put to queue store ", err) } } - eventKeys := store.List() + eventKeys, err := store.List() + if err != nil { + t.Fatal(err) + } // Remove all the events. if len(eventKeys) == 20 { for _, key := range eventKeys { @@ -134,8 +144,12 @@ func TestQueueStoreDel(t *testing.T) { t.Fatalf("List() Expected: 20, got %d", len(eventKeys)) } - if len(store.List()) != 0 { - t.Fatalf("List() Expected: 0, got %d", len(store.List())) + names, err := store.List() + if err != nil { + t.Fatal(err) + } + if len(names) != 0 { + t.Fatalf("List() Expected: 0, got %d", len(names)) } } @@ -179,7 +193,21 @@ func TestQueueStoreListN(t *testing.T) { } } // Should return all the event keys in the store. - if len(store.List()) != 10 { - t.Fatalf("List() Expected: 10, got %d", len(store.List())) + names, err := store.List() + if err != nil { + t.Fatal(err) + } + + if len(names) != 10 { + t.Fatalf("List() Expected: 10, got %d", len(names)) + } + + if err = os.RemoveAll(queueDir); err != nil { + t.Fatal(err) + } + + _, err = store.List() + if !os.IsNotExist(err) { + t.Fatalf("Expected List() to fail with os.ErrNotExist, %s", err) } } diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index 790d7e1be..69db7d9a5 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -289,9 +289,9 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target, nil diff --git a/pkg/event/target/store.go b/pkg/event/target/store.go index 5dd0ab5c6..7f357d831 100644 --- a/pkg/event/target/store.go +++ b/pkg/event/target/store.go @@ -17,10 +17,9 @@ package target import ( + "context" "errors" "fmt" - "net" - "os" "strings" "syscall" "time" @@ -40,35 +39,39 @@ var errLimitExceeded = errors.New("the maximum store limit reached") type Store interface { Put(event event.Event) error Get(key string) (event.Event, error) - List() []string + List() ([]string, error) Del(key string) error Open() error } // replayEvents - Reads the events from the store and replays. -func replayEvents(store Store, doneCh <-chan struct{}) <-chan string { - var names []string +func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}), id event.TargetID) <-chan string { eventKeyCh := make(chan string) go func() { - retryTimer := time.NewTimer(retryInterval) - defer retryTimer.Stop() + retryTicker := time.NewTicker(retryInterval) + defer retryTicker.Stop() defer close(eventKeyCh) for { - names = store.List() - for _, name := range names { - select { - case eventKeyCh <- strings.TrimSuffix(name, eventExt): - // Get next key. - case <-doneCh: - return + names, err := store.List() + if err == nil { + for _, name := range names { + select { + case eventKeyCh <- strings.TrimSuffix(name, eventExt): + // Get next key. + case <-doneCh: + return + } } } if len(names) < 2 { - retryTimer.Reset(retryInterval) select { - case <-retryTimer.C: + case <-retryTicker.C: + if err != nil { + loggerOnce(context.Background(), + fmt.Errorf("store.List() failed '%v'", err), id) + } case <-doneCh: return } @@ -81,16 +84,7 @@ func replayEvents(store Store, doneCh <-chan struct{}) <-chan string { // IsConnRefusedErr - To check fot "connection refused" error. func IsConnRefusedErr(err error) bool { - if opErr, ok := err.(*net.OpError); ok { - if sysErr, ok := opErr.Err.(*os.SyscallError); ok { - if errno, ok := sysErr.Err.(syscall.Errno); ok { - if errno == syscall.ECONNREFUSED { - return true - } - } - } - } - return false + return errors.Is(err, syscall.ECONNREFUSED) } // IsConnResetErr - Checks for connection reset errors. @@ -99,20 +93,13 @@ func IsConnResetErr(err error) bool { return true } // incase if error message is wrapped. - if opErr, ok := err.(*net.OpError); ok { - if syscallErr, ok := opErr.Err.(*os.SyscallError); ok { - if syscallErr.Err == syscall.ECONNRESET { - return true - } - } - } - return false + return errors.Is(err, syscall.ECONNRESET) } // sendEvents - Reads events from the store and re-plays. -func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}) { - retryTimer := time.NewTimer(retryInterval) - defer retryTimer.Stop() +func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) { + retryTicker := time.NewTicker(retryInterval) + defer retryTicker.Stop() send := func(eventKey string) bool { for { @@ -122,12 +109,14 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str } if err != errNotConnected && !IsConnResetErr(err) { - panic(fmt.Errorf("target.Send() failed with '%v'", err)) + loggerOnce(context.Background(), + fmt.Errorf("target.Send() failed with '%v'", err), + target.ID()) + continue } - retryTimer.Reset(retryInterval) select { - case <-retryTimer.C: + case <-retryTicker.C: case <-doneCh: return false } diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index ab185312a..66c0e6cfb 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -18,6 +18,7 @@ package target import ( "bytes" + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -171,7 +172,7 @@ func (target *WebhookTarget) Close() error { } // NewWebhookTarget - creates new Webhook target. -func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}) *WebhookTarget { +func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) *WebhookTarget { var store Store @@ -203,9 +204,9 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}) *Webh if target.store != nil { // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh) + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh) + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) } return target