Fix queueStore stops working with concurrent PUT/DELETE requests (#8381)

- This PR fixes situation to avoid underflow, this is possible
  because of disconnected operations in replay/sendEvents
- Hold right locks if Del() operation is performed in Get()
- Remove panic in the code and use loggerOnce
- Remove Timer and instead use Ticker instead for proper ticks
master
Harshavardhana 5 years ago committed by GitHub
parent f16df2a4e7
commit 175b07d6e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      cmd/config-current.go
  2. 5
      pkg/event/target/amqp.go
  3. 6
      pkg/event/target/elasticsearch.go
  4. 7
      pkg/event/target/kafka.go
  5. 96
      pkg/event/target/mqtt.go
  6. 7
      pkg/event/target/mysql.go
  7. 7
      pkg/event/target/nats.go
  8. 7
      pkg/event/target/nsq.go
  9. 7
      pkg/event/target/postgresql.go
  10. 118
      pkg/event/target/queuestore.go
  11. 44
      pkg/event/target/queuestore_test.go
  12. 4
      pkg/event/target/redis.go
  13. 69
      pkg/event/target/store.go
  14. 7
      pkg/event/target/webhook.go

@ -373,7 +373,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable {
continue
}
t, err := target.NewElasticsearchTarget(k, v, GlobalServiceDoneCh)
t, err := target.NewElasticsearchTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
return fmt.Errorf("elasticsearch(%s): %s", k, err.Error())
}
@ -387,7 +387,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if v.TLS.Enable {
v.TLS.RootCAs = globalRootCAs
}
t, err := target.NewKafkaTarget(k, v, GlobalServiceDoneCh)
t, err := target.NewKafkaTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
return fmt.Errorf("kafka(%s): %s", k, err.Error())
}
@ -399,7 +399,7 @@ func (s *serverConfig) TestNotificationTargets() error {
continue
}
v.RootCAs = globalRootCAs
t, err := target.NewMQTTTarget(k, v, GlobalServiceDoneCh)
t, err := target.NewMQTTTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
return fmt.Errorf("mqtt(%s): %s", k, err.Error())
}
@ -410,7 +410,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable {
continue
}
t, err := target.NewMySQLTarget(k, v, GlobalServiceDoneCh)
t, err := target.NewMySQLTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
return fmt.Errorf("mysql(%s): %s", k, err.Error())
}
@ -421,7 +421,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable {
continue
}
t, err := target.NewNATSTarget(k, v, GlobalServiceDoneCh)
t, err := target.NewNATSTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
return fmt.Errorf("nats(%s): %s", k, err.Error())
}
@ -432,7 +432,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable {
continue
}
t, err := target.NewNSQTarget(k, v, GlobalServiceDoneCh)
t, err := target.NewNSQTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
return fmt.Errorf("nsq(%s): %s", k, err.Error())
}
@ -443,7 +443,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable {
continue
}
t, err := target.NewPostgreSQLTarget(k, v, GlobalServiceDoneCh)
t, err := target.NewPostgreSQLTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
return fmt.Errorf("postgreSQL(%s): %s", k, err.Error())
}
@ -583,7 +583,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.Elasticsearch {
if args.Enable {
newTarget, err := target.NewElasticsearchTarget(id, args, GlobalServiceDoneCh)
newTarget, err := target.NewElasticsearchTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
logger.LogIf(context.Background(), err)
continue
@ -602,7 +602,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
if args.TLS.Enable {
args.TLS.RootCAs = globalRootCAs
}
newTarget, err := target.NewKafkaTarget(id, args, GlobalServiceDoneCh)
newTarget, err := target.NewKafkaTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
logger.LogIf(context.Background(), err)
continue
@ -617,7 +617,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.MQTT {
if args.Enable {
args.RootCAs = globalRootCAs
newTarget, err := target.NewMQTTTarget(id, args, GlobalServiceDoneCh)
newTarget, err := target.NewMQTTTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
logger.LogIf(context.Background(), err)
continue
@ -631,7 +631,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.MySQL {
if args.Enable {
newTarget, err := target.NewMySQLTarget(id, args, GlobalServiceDoneCh)
newTarget, err := target.NewMySQLTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
logger.LogIf(context.Background(), err)
continue
@ -645,7 +645,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.NATS {
if args.Enable {
newTarget, err := target.NewNATSTarget(id, args, GlobalServiceDoneCh)
newTarget, err := target.NewNATSTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
logger.LogIf(context.Background(), err)
continue
@ -659,7 +659,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.NSQ {
if args.Enable {
newTarget, err := target.NewNSQTarget(id, args, GlobalServiceDoneCh)
newTarget, err := target.NewNSQTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
logger.LogIf(context.Background(), err)
continue
@ -673,7 +673,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.PostgreSQL {
if args.Enable {
newTarget, err := target.NewPostgreSQLTarget(id, args, GlobalServiceDoneCh)
newTarget, err := target.NewPostgreSQLTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err != nil {
logger.LogIf(context.Background(), err)
continue
@ -702,7 +702,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.Webhook {
if args.Enable {
args.RootCAs = globalRootCAs
newTarget := target.NewWebhookTarget(id, args, GlobalServiceDoneCh)
newTarget := target.NewWebhookTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf)
if err := targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
continue

@ -246,9 +246,10 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -221,7 +221,7 @@ func newClient(args ElasticsearchArgs) (*elastic.Client, error) {
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}) (*ElasticsearchTarget, error) {
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*ElasticsearchTarget, error) {
var client *elastic.Client
var err error
@ -256,9 +256,9 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -17,6 +17,7 @@
package target
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -191,7 +192,7 @@ func (k KafkaArgs) pingBrokers() bool {
}
// NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}) (*KafkaTarget, error) {
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*KafkaTarget, error) {
config := sarama.NewConfig()
config.Net.SASL.User = args.SASL.User
@ -242,9 +243,9 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}) (*KafkaTa
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -17,16 +17,18 @@
package target
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"time"
"github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
@ -82,10 +84,11 @@ func (m MQTTArgs) Validate() error {
// MQTTTarget - MQTT target.
type MQTTTarget struct {
id event.TargetID
args MQTTArgs
client mqtt.Client
store Store
id event.TargetID
args MQTTArgs
client mqtt.Client
store Store
loggerOnce func(ctx context.Context, err error, id interface{})
}
// ID - returns target ID.
@ -116,22 +119,21 @@ func (target *MQTTTarget) send(eventData event.Event) error {
// Send - reads an event from store and sends it to MQTT.
func (target *MQTTTarget) Send(eventKey string) error {
if !target.client.IsConnectionOpen() {
return errNotConnected
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
eventData, err := target.store.Get(eventKey)
if err != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
if os.IsNotExist(err) {
return nil
}
return eErr
return err
}
if err := target.send(eventData); err != nil {
if err = target.send(eventData); err != nil {
return err
}
@ -139,7 +141,8 @@ func (target *MQTTTarget) Send(eventKey string) error {
return target.store.Del(eventKey)
}
// Save - saves the events to the store if queuestore is configured, which will be replayed when the mqtt connection is active.
// Save - saves the events to the store if queuestore is configured, which will
// be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
@ -159,7 +162,7 @@ func (target *MQTTTarget) Close() error {
}
// NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarget, error) {
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*MQTTTarget, error) {
options := mqtt.NewClientOptions().
SetClientID("").
SetCleanSession(true).
@ -170,57 +173,60 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge
SetTLSConfig(&tls.Config{RootCAs: args.RootCAs}).
AddBroker(args.Broker.String())
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
}
client := mqtt.NewClient(options)
// The client should establish a first time connection.
// Connect() should be successful atleast once to publish events.
token := client.Connect()
target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
client: client,
loggerOnce: loggerOnce,
}
// Retries until the clientID gets registered.
retryRegister := func() {
// Repeat the pings until the client registers the clientId and receives a token.
for {
if token.Wait() && token.Error() == nil {
// Connected
break
var terr error
select {
case <-doneCh:
return
default:
terr = token.Error()
if token.Wait() && terr == nil {
// Connected
return
}
// Reconnecting
time.Sleep(reconnectInterval * time.Second)
terr = fmt.Errorf("Previous connect failed with %s, attempting a reconnect", terr)
target.loggerOnce(context.Background(), terr, target.ID())
token = client.Connect()
}
// Reconnecting
time.Sleep(reconnectInterval * time.Second)
token = client.Connect()
}
}
if store == nil {
if token.Wait() && token.Error() != nil {
return nil, token.Error()
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
target.store = NewQueueStore(queueDir, args.QueueLimit)
if err := target.store.Open(); err != nil {
return nil, err
}
} else {
go retryRegister()
}
target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
client: client,
store: store,
}
go retryRegister()
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
} else {
if token.Wait() && token.Error() != nil {
return nil, token.Error()
}
}
return target, nil
}

@ -54,6 +54,7 @@
package target
import (
"context"
"database/sql"
"encoding/json"
"errors"
@ -311,7 +312,7 @@ func (target *MySQLTarget) executeStmts() error {
}
// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTarget, error) {
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*MySQLTarget, error) {
var firstPing bool
if args.DSN == "" {
config := mysql.Config{
@ -363,9 +364,9 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTa
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -17,6 +17,7 @@
package target
import (
"context"
"encoding/json"
"errors"
"net/url"
@ -233,7 +234,7 @@ func (target *NATSTarget) Close() (err error) {
}
// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}) (*NATSTarget, error) {
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*NATSTarget, error) {
var natsConn *nats.Conn
var stanConn stan.Conn
@ -271,9 +272,9 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}) (*NATSTarge
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -17,6 +17,7 @@
package target
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
@ -149,7 +150,7 @@ func (target *NSQTarget) Close() (err error) {
}
// NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}) (*NSQTarget, error) {
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*NSQTarget, error) {
config := nsq.NewConfig()
if args.TLS.Enable {
config.TlsV1 = true
@ -189,9 +190,9 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}) (*NSQTarget,
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -54,6 +54,7 @@
package target
import (
"context"
"database/sql"
"encoding/json"
"errors"
@ -312,7 +313,7 @@ func (target *PostgreSQLTarget) executeStmts() error {
}
// NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}) (*PostgreSQLTarget, error) {
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*PostgreSQLTarget, error) {
var firstPing bool
params := []string{args.ConnectionString}
@ -370,9 +371,9 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{})
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -19,6 +19,7 @@ package target
import (
"encoding/json"
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
@ -36,28 +37,29 @@ const (
// QueueStore - Filestore for persisting events.
type QueueStore struct {
sync.RWMutex
directory string
eC uint64
limit uint64
currentEntries uint64
entryLimit uint64
directory string
}
// NewQueueStore - Creates an instance for QueueStore.
func NewQueueStore(directory string, limit uint64) *QueueStore {
func NewQueueStore(directory string, limit uint64) Store {
if limit == 0 {
limit = maxLimit
currRlimit, _, err := sys.GetMaxOpenFileLimit()
_, maxRLimit, err := sys.GetMaxOpenFileLimit()
if err == nil {
if currRlimit > limit {
limit = currRlimit
// Limit the maximum number of entries
// to maximum open file limit
if maxRLimit < limit {
limit = maxRLimit
}
}
}
queueStore := &QueueStore{
directory: directory,
limit: limit,
return &QueueStore{
directory: directory,
entryLimit: limit,
}
return queueStore
}
// Open - Creates the directory if not present.
@ -65,16 +67,21 @@ func (store *QueueStore) Open() error {
store.Lock()
defer store.Unlock()
if terr := os.MkdirAll(store.directory, os.FileMode(0770)); terr != nil {
return terr
if err := os.MkdirAll(store.directory, os.FileMode(0770)); err != nil {
return err
}
names, err := store.list()
if err != nil {
return err
}
eCount := uint64(len(store.list()))
if eCount >= store.limit {
currentEntries := uint64(len(names))
if currentEntries >= store.entryLimit {
return errLimitExceeded
}
store.eC = eCount
store.currentEntries = currentEntries
return nil
}
@ -94,7 +101,7 @@ func (store *QueueStore) write(directory string, key string, e event.Event) erro
}
// Increment the event count.
store.eC++
store.currentEntries++
return nil
}
@ -103,39 +110,40 @@ func (store *QueueStore) write(directory string, key string, e event.Event) erro
func (store *QueueStore) Put(e event.Event) error {
store.Lock()
defer store.Unlock()
if store.eC >= store.limit {
if store.currentEntries >= store.entryLimit {
return errLimitExceeded
}
key, kErr := getNewUUID()
if kErr != nil {
return kErr
key, err := getNewUUID()
if err != nil {
return err
}
return store.write(store.directory, key, e)
}
// Get - gets a event from the store.
func (store *QueueStore) Get(key string) (event.Event, error) {
func (store *QueueStore) Get(key string) (event event.Event, err error) {
store.RLock()
defer store.RUnlock()
var event event.Event
filepath := filepath.Join(store.directory, key+eventExt)
defer func(store *QueueStore) {
store.RUnlock()
if err != nil {
// Upon error we remove the entry.
store.Del(key)
}
}(store)
eventData, rerr := ioutil.ReadFile(filepath)
if rerr != nil {
store.del(key)
return event, rerr
var eventData []byte
eventData, err = ioutil.ReadFile(filepath.Join(store.directory, key+eventExt))
if err != nil {
return event, err
}
if len(eventData) == 0 {
store.del(key)
return event, os.ErrNotExist
}
uerr := json.Unmarshal(eventData, &event)
if uerr != nil {
store.del(key)
return event, uerr
if err = json.Unmarshal(eventData, &event); err != nil {
return event, err
}
return event, nil
@ -150,41 +158,49 @@ func (store *QueueStore) Del(key string) error {
// lockless call
func (store *QueueStore) del(key string) error {
p := filepath.Join(store.directory, key+eventExt)
rerr := os.Remove(p)
if rerr != nil {
return rerr
if err := os.Remove(filepath.Join(store.directory, key+eventExt)); err != nil {
return err
}
// Decrement the event count.
store.eC--
// Decrement the current entries count.
store.currentEntries--
// Current entries can underflow, when multiple
// events are being pushed in parallel, this code
// is needed to ensure that we don't underflow.
//
// queueStore replayEvents is not serialized,
// this code is needed to protect us under
// such situations.
if store.currentEntries == math.MaxUint64 {
store.currentEntries = 0
}
return nil
}
// List - lists all files from the directory.
func (store *QueueStore) List() []string {
func (store *QueueStore) List() ([]string, error) {
store.RLock()
defer store.RUnlock()
return store.list()
}
// lockless call.
func (store *QueueStore) list() []string {
// list lock less.
func (store *QueueStore) list() ([]string, error) {
var names []string
storeDir, _ := os.Open(store.directory)
files, _ := storeDir.Readdir(-1)
files, err := ioutil.ReadDir(store.directory)
if err != nil {
return names, err
}
// Sort the dentries.
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Unix() < files[j].ModTime().Unix()
return files[i].ModTime().Before(files[j].ModTime())
})
for _, file := range files {
names = append(names, file.Name())
}
_ = storeDir.Close()
return names
return names, nil
}

@ -65,8 +65,12 @@ func TestQueueStorePut(t *testing.T) {
}
}
// Count the events.
if len(store.List()) != 100 {
t.Fatalf("List() Expected: 100, got %d", len(store.List()))
names, err := store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 100 {
t.Fatalf("List() Expected: 100, got %d", len(names))
}
}
@ -87,7 +91,10 @@ func TestQueueStoreGet(t *testing.T) {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.List()
eventKeys, err := store.List()
if err != nil {
t.Fatal(err)
}
// Get 10 events.
if len(eventKeys) == 10 {
for _, key := range eventKeys {
@ -121,7 +128,10 @@ func TestQueueStoreDel(t *testing.T) {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.List()
eventKeys, err := store.List()
if err != nil {
t.Fatal(err)
}
// Remove all the events.
if len(eventKeys) == 20 {
for _, key := range eventKeys {
@ -134,8 +144,12 @@ func TestQueueStoreDel(t *testing.T) {
t.Fatalf("List() Expected: 20, got %d", len(eventKeys))
}
if len(store.List()) != 0 {
t.Fatalf("List() Expected: 0, got %d", len(store.List()))
names, err := store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 0 {
t.Fatalf("List() Expected: 0, got %d", len(names))
}
}
@ -179,7 +193,21 @@ func TestQueueStoreListN(t *testing.T) {
}
}
// Should return all the event keys in the store.
if len(store.List()) != 10 {
t.Fatalf("List() Expected: 10, got %d", len(store.List()))
names, err := store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 10 {
t.Fatalf("List() Expected: 10, got %d", len(names))
}
if err = os.RemoveAll(queueDir); err != nil {
t.Fatal(err)
}
_, err = store.List()
if !os.IsNotExist(err) {
t.Fatalf("Expected List() to fail with os.ErrNotExist, %s", err)
}
}

@ -289,9 +289,9 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target, nil

@ -17,10 +17,9 @@
package target
import (
"context"
"errors"
"fmt"
"net"
"os"
"strings"
"syscall"
"time"
@ -40,35 +39,39 @@ var errLimitExceeded = errors.New("the maximum store limit reached")
type Store interface {
Put(event event.Event) error
Get(key string) (event.Event, error)
List() []string
List() ([]string, error)
Del(key string) error
Open() error
}
// replayEvents - Reads the events from the store and replays.
func replayEvents(store Store, doneCh <-chan struct{}) <-chan string {
var names []string
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}), id event.TargetID) <-chan string {
eventKeyCh := make(chan string)
go func() {
retryTimer := time.NewTimer(retryInterval)
defer retryTimer.Stop()
retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()
defer close(eventKeyCh)
for {
names = store.List()
for _, name := range names {
select {
case eventKeyCh <- strings.TrimSuffix(name, eventExt):
// Get next key.
case <-doneCh:
return
names, err := store.List()
if err == nil {
for _, name := range names {
select {
case eventKeyCh <- strings.TrimSuffix(name, eventExt):
// Get next key.
case <-doneCh:
return
}
}
}
if len(names) < 2 {
retryTimer.Reset(retryInterval)
select {
case <-retryTimer.C:
case <-retryTicker.C:
if err != nil {
loggerOnce(context.Background(),
fmt.Errorf("store.List() failed '%v'", err), id)
}
case <-doneCh:
return
}
@ -81,16 +84,7 @@ func replayEvents(store Store, doneCh <-chan struct{}) <-chan string {
// IsConnRefusedErr - To check fot "connection refused" error.
func IsConnRefusedErr(err error) bool {
if opErr, ok := err.(*net.OpError); ok {
if sysErr, ok := opErr.Err.(*os.SyscallError); ok {
if errno, ok := sysErr.Err.(syscall.Errno); ok {
if errno == syscall.ECONNREFUSED {
return true
}
}
}
}
return false
return errors.Is(err, syscall.ECONNREFUSED)
}
// IsConnResetErr - Checks for connection reset errors.
@ -99,20 +93,13 @@ func IsConnResetErr(err error) bool {
return true
}
// incase if error message is wrapped.
if opErr, ok := err.(*net.OpError); ok {
if syscallErr, ok := opErr.Err.(*os.SyscallError); ok {
if syscallErr.Err == syscall.ECONNRESET {
return true
}
}
}
return false
return errors.Is(err, syscall.ECONNRESET)
}
// sendEvents - Reads events from the store and re-plays.
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}) {
retryTimer := time.NewTimer(retryInterval)
defer retryTimer.Stop()
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) {
retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()
send := func(eventKey string) bool {
for {
@ -122,12 +109,14 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str
}
if err != errNotConnected && !IsConnResetErr(err) {
panic(fmt.Errorf("target.Send() failed with '%v'", err))
loggerOnce(context.Background(),
fmt.Errorf("target.Send() failed with '%v'", err),
target.ID())
continue
}
retryTimer.Reset(retryInterval)
select {
case <-retryTimer.C:
case <-retryTicker.C:
case <-doneCh:
return false
}

@ -18,6 +18,7 @@ package target
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -171,7 +172,7 @@ func (target *WebhookTarget) Close() error {
}
// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}) *WebhookTarget {
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) *WebhookTarget {
var store Store
@ -203,9 +204,9 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}) *Webh
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
return target

Loading…
Cancel
Save