Enhance the event store interface to support channeling (#7343)

- Avoids code duplication across the other targets. By having a
  centralized function call.

- Reduce the room for race.
master
Praveen raj Mani 6 years ago committed by Nitish Tiwari
parent ddb0d646aa
commit 47ca411163
  1. 4
      cmd/config-current.go
  2. 14
      cmd/peer-rest-client-target.go
  3. 13
      pkg/event/target/amqp.go
  4. 14
      pkg/event/target/elasticsearch.go
  5. 13
      pkg/event/target/httpclient.go
  6. 13
      pkg/event/target/kafka.go
  7. 26
      pkg/event/target/memorystore.go
  8. 45
      pkg/event/target/memorystore_test.go
  9. 82
      pkg/event/target/mqtt.go
  10. 13
      pkg/event/target/mysql.go
  11. 13
      pkg/event/target/nats.go
  12. 13
      pkg/event/target/nsq.go
  13. 13
      pkg/event/target/postgresql.go
  14. 42
      pkg/event/target/queuestore.go
  15. 58
      pkg/event/target/queuestore_test.go
  16. 13
      pkg/event/target/redis.go
  17. 98
      pkg/event/target/store.go
  18. 13
      pkg/event/target/webhook.go
  19. 5
      pkg/event/targetlist.go
  20. 12
      pkg/event/targetlist_test.go

@ -336,7 +336,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable {
continue
}
t, err := target.NewMQTTTarget(k, v)
t, err := target.NewMQTTTarget(k, v, GlobalServiceDoneCh)
if err != nil {
return fmt.Errorf("mqtt(%s): %s", k, err.Error())
}
@ -682,7 +682,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.MQTT {
if args.Enable {
args.RootCAs = globalRootCAs
newTarget, err := target.NewMQTTTarget(id, args)
newTarget, err := target.NewMQTTTarget(id, args, GlobalServiceDoneCh)
if err != nil {
logger.LogIf(context.Background(), err)
continue

@ -31,8 +31,18 @@ func (target *PeerRESTClientTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to remote peer by making RPC call.
func (target *PeerRESTClientTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *PeerRESTClientTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
// Send - interface compatible method does no-op.
func (target *PeerRESTClientTarget) Send(eventKey string) error {
return nil
}
// sends event to remote peer by making RPC call.
func (target *PeerRESTClientTarget) send(eventData event.Event) error {
return target.restClient.SendEvent(target.bucketName, target.id, target.remoteTargetID, eventData)
}

@ -107,8 +107,12 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) {
return ch, nil
}
// Send - sends event to AMQP.
func (target *AMQPTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *AMQPTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *AMQPTarget) send(eventData event.Event) error {
ch, err := target.channel()
if err != nil {
return err
@ -142,6 +146,11 @@ func (target *AMQPTarget) Send(eventData event.Event) error {
})
}
// Send - interface compatible method does no-op.
func (target *AMQPTarget) Send(eventKey string) error {
return nil
}
// Close - does nothing and available for interface compatibility.
func (target *AMQPTarget) Close() error {
return nil

@ -69,8 +69,13 @@ func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Elasticsearch.
func (target *ElasticsearchTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *ElasticsearchTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *ElasticsearchTarget) send(eventData event.Event) error {
var key string
remove := func() error {
@ -111,6 +116,11 @@ func (target *ElasticsearchTarget) Send(eventData event.Event) error {
return nil
}
// Send - interface compatible method does no-op.
func (target *ElasticsearchTarget) Send(eventKey string) error {
return nil
}
// Close - does nothing and available for interface compatibility.
func (target *ElasticsearchTarget) Close() error {
return nil

@ -89,8 +89,12 @@ func (target *HTTPClientTarget) start() {
}()
}
// Send - sends event to HTTP client.
func (target *HTTPClientTarget) Send(eventData event.Event) error {
// Save - sends event to HTTP client.
func (target *HTTPClientTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *HTTPClientTarget) send(eventData event.Event) error {
if atomic.LoadUint32(&target.isRunning) != 0 {
return errors.New("closed http connection")
}
@ -109,6 +113,11 @@ func (target *HTTPClientTarget) Send(eventData event.Event) error {
}
}
// Send - interface compatible method does no-op.
func (target *HTTPClientTarget) Send(eventKey string) error {
return nil
}
// Close - closes underneath goroutine.
func (target *HTTPClientTarget) Close() error {
atomic.AddUint32(&target.isStopped, 1)

@ -73,8 +73,12 @@ func (target *KafkaTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Kafka.
func (target *KafkaTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *KafkaTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *KafkaTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
@ -96,6 +100,11 @@ func (target *KafkaTarget) Send(eventData event.Event) error {
return err
}
// Send - interface compatible method does no-op.
func (target *KafkaTarget) Send(eventKey string) error {
return nil
}
// Close - closes underneath kafka connection.
func (target *KafkaTarget) Close() error {
return target.producer.Close()

@ -57,7 +57,7 @@ func (store *MemoryStore) Put(e event.Event) error {
store.Lock()
defer store.Unlock()
if store.eC == store.limit {
return ErrLimitExceeded
return errLimitExceeded
}
key, kErr := getNewUUID()
if kErr != nil {
@ -77,27 +77,41 @@ func (store *MemoryStore) Get(key string) (event.Event, error) {
return event, nil
}
return event.Event{}, ErrNoSuchKey
return event.Event{}, errNoSuchKey
}
// Del - deletes the event from store.
func (store *MemoryStore) Del(key string) {
func (store *MemoryStore) Del(key string) error {
store.Lock()
defer store.Unlock()
delete(store.events, key)
store.eC--
return nil
}
// ListAll - lists all the keys in the store.
func (store *MemoryStore) ListAll() []string {
// ListN - lists atmost N keys in the store.
func (store *MemoryStore) ListN(n int) []string {
store.RLock()
defer store.RUnlock()
var i int
if n == -1 {
n = len(store.events)
}
keys := []string{}
for k := range store.events {
keys = append(keys, k)
if i < n {
keys = append(keys, k)
i++
continue
} else {
break
}
}
return keys

@ -23,7 +23,7 @@ import (
// TestMemoryStorePut - Tests for store.Put
func TestMemoryStorePut(t *testing.T) {
store := NewMemoryStore(1000)
store := NewMemoryStore(100)
defer func() {
store = nil
}()
@ -32,14 +32,14 @@ func TestMemoryStorePut(t *testing.T) {
t.Fatal("Failed to put to queue store ", err)
}
}
if len(store.ListAll()) != 100 {
t.Fatalf("ListAll() Expected: 100, got %d", len(store.ListAll()))
if len(store.ListN(-1)) != 100 {
t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1)))
}
}
// TestMemoryStoreGet - Tests for store.Get.
func TestMemoryStoreGet(t *testing.T) {
store := NewMemoryStore(1000)
store := NewMemoryStore(10)
defer func() {
store = nil
}()
@ -48,7 +48,7 @@ func TestMemoryStoreGet(t *testing.T) {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
eventKeys := store.ListN(-1)
if len(eventKeys) == 10 {
for _, key := range eventKeys {
event, eErr := store.Get(key)
@ -60,13 +60,13 @@ func TestMemoryStoreGet(t *testing.T) {
}
}
} else {
t.Fatalf("ListAll() Expected: 10, got %d", len(eventKeys))
t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys))
}
}
// TestMemoryStoreDel - Tests for store.Del.
func TestMemoryStoreDel(t *testing.T) {
store := NewMemoryStore(1000)
store := NewMemoryStore(20)
defer func() {
store = nil
}()
@ -75,17 +75,17 @@ func TestMemoryStoreDel(t *testing.T) {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
eventKeys := store.ListN(-1)
if len(eventKeys) == 20 {
for _, key := range eventKeys {
store.Del(key)
_ = store.Del(key)
}
} else {
t.Fatalf("ListAll() Expected: 20, got %d", len(eventKeys))
t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys))
}
if len(store.ListAll()) != 0 {
t.Fatalf("ListAll() Expected: 0, got %d", len(store.ListAll()))
if len(store.ListN(-1)) != 0 {
t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1)))
}
}
@ -101,6 +101,25 @@ func TestMemoryStoreLimit(t *testing.T) {
}
}
if err := store.Put(testEvent); err == nil {
t.Fatalf("Expected to fail with %s, but passes", ErrLimitExceeded)
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
}
}
// TestMemoryStoreListN - tests for store.ListN.
func TestMemoryStoreListN(t *testing.T) {
store := NewMemoryStore(10)
defer func() {
store = nil
}()
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
if len(store.ListN(5)) != 5 {
t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5)))
}
if len(store.ListN(-1)) != 10 {
t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1)))
}
}

@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"net/url"
"os"
"path/filepath"
"time"
@ -31,8 +32,8 @@ import (
)
const (
retryInterval = 3 // In Seconds
reconnectInterval = 5 // In Seconds
storePrefix = "minio"
)
// MQTTArgs - MQTT target arguments.
@ -82,7 +83,6 @@ type MQTTTarget struct {
args MQTTArgs
client mqtt.Client
store Store
reconn bool
}
// ID - returns target ID.
@ -90,33 +90,22 @@ func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
// Reads persisted events from the store and re-plays.
func (target *MQTTTarget) retry() {
target.reconn = true
events := target.store.ListAll()
for len(events) != 0 {
for _, key := range events {
event, eErr := target.store.Get(key)
if eErr != nil {
continue
}
for !target.client.IsConnectionOpen() {
time.Sleep(retryInterval * time.Second)
}
// The connection is open.
if err := target.send(event); err != nil {
continue
}
// Delete after a successful publish.
target.store.Del(key)
}
events = target.store.ListAll()
// Send - sends event to MQTT.
func (target *MQTTTarget) Send(eventKey string) error {
if !target.client.IsConnectionOpen() {
return errNotConnected
}
// Release the reconn state.
target.reconn = false
}
func (target *MQTTTarget) send(eventData event.Event) error {
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
@ -131,26 +120,17 @@ func (target *MQTTTarget) send(eventData event.Event) error {
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
token.Wait()
return token.Error()
if token.Error() != nil {
return token.Error()
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Send - sends event to MQTT when the connection is active.
func (target *MQTTTarget) Send(eventData event.Event) error {
// Persist the events if the connection is not active.
if !target.client.IsConnectionOpen() {
if err := target.store.Put(eventData); err != nil {
return err
}
// Ignore if retry is triggered already.
if !target.reconn {
go target.retry()
}
return nil
}
// Publishes to the broker as the connection is active.
return target.send(eventData)
// Save - saves the events to the store which will be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error {
return target.store.Put(eventData)
}
// Close - does nothing and available for interface compatibility.
@ -159,7 +139,7 @@ func (target *MQTTTarget) Close() error {
}
// NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) {
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarget, error) {
options := mqtt.NewClientOptions().
SetClientID("").
SetCleanSession(true).
@ -173,7 +153,8 @@ func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) {
var store Store
if args.QueueDir != "" {
store = NewQueueStore(args.QueueDir, args.QueueLimit)
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
@ -205,14 +186,13 @@ func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) {
args: args,
client: client,
store: store,
reconn: false,
}
// Replay any previously persisted events in the store.
if len(target.store.ListAll()) != 0 {
go target.retry()
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
}
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
return target, nil
}

@ -141,8 +141,12 @@ func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to MySQL.
func (target *MySQLTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *MySQLTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *MySQLTarget) send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
@ -181,6 +185,11 @@ func (target *MySQLTarget) Send(eventData event.Event) error {
return nil
}
// Send - interface compatible method does no-op.
func (target *MySQLTarget) Send(eventKey string) error {
return nil
}
// Close - closes underneath connections to MySQL database.
func (target *MySQLTarget) Close() error {
if target.updateStmt != nil {

@ -81,8 +81,12 @@ func (target *NATSTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to NATS.
func (target *NATSTarget) Send(eventData event.Event) (err error) {
// Save - Sends event directly without persisting.
func (target *NATSTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *NATSTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
@ -107,6 +111,11 @@ func (target *NATSTarget) Send(eventData event.Event) (err error) {
return err
}
// Send - interface compatible method does no-op.
func (target *NATSTarget) Send(eventKey string) error {
return nil
}
// Close - closes underneath connections to NATS server.
func (target *NATSTarget) Close() (err error) {
if target.stanConn != nil {

@ -68,8 +68,12 @@ func (target *NSQTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to NSQD.
func (target *NSQTarget) Send(eventData event.Event) (err error) {
// Save - Sends event directly without persisting.
func (target *NSQTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *NSQTarget) send(eventData event.Event) (err error) {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
@ -86,6 +90,11 @@ func (target *NSQTarget) Send(eventData event.Event) (err error) {
return err
}
// Send - interface compatible method does no-op.
func (target *NSQTarget) Send(eventKey string) error {
return nil
}
// Close - closes underneath connections to NSQD server.
func (target *NSQTarget) Close() (err error) {
// this blocks until complete:

@ -140,8 +140,12 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to PostgreSQL.
func (target *PostgreSQLTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *PostgreSQLTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *PostgreSQLTarget) send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
@ -180,6 +184,11 @@ func (target *PostgreSQLTarget) Send(eventData event.Event) error {
return nil
}
// Send - interface compatible method does no-op.
func (target *PostgreSQLTarget) Send(eventKey string) error {
return nil
}
// Close - closes underneath connections to PostgreSQL database.
func (target *PostgreSQLTarget) Close() error {
if target.updateStmt != nil {

@ -21,7 +21,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"github.com/minio/minio/pkg/event"
@ -61,9 +60,9 @@ func (store *QueueStore) Open() error {
return terr
}
eCount := uint16(len(store.listAll()))
eCount := uint16(len(store.listN(-1)))
if eCount >= store.limit {
return ErrLimitExceeded
return errLimitExceeded
}
store.eC = eCount
@ -96,7 +95,7 @@ func (store *QueueStore) Put(e event.Event) error {
store.Lock()
defer store.Unlock()
if store.eC >= store.limit {
return ErrLimitExceeded
return errLimitExceeded
}
key, kErr := getNewUUID()
if kErr != nil {
@ -134,45 +133,38 @@ func (store *QueueStore) Get(key string) (event.Event, error) {
}
// Del - Deletes an entry from the store.
func (store *QueueStore) Del(key string) {
func (store *QueueStore) Del(key string) error {
store.Lock()
defer store.Unlock()
store.del(key)
return store.del(key)
}
// lockless call
func (store *QueueStore) del(key string) {
func (store *QueueStore) del(key string) error {
p := filepath.Join(store.directory, key+eventExt)
rerr := os.Remove(p)
if rerr != nil {
return
return rerr
}
// Decrement the event count.
store.eC--
return nil
}
// ListAll - lists all the keys in the directory.
func (store *QueueStore) ListAll() []string {
// ListN - lists atmost N files from the directory.
func (store *QueueStore) ListN(n int) []string {
store.RLock()
defer store.RUnlock()
return store.listAll()
return store.listN(n)
}
// lockless call.
func (store *QueueStore) listAll() []string {
var err error
var keys []string
var files []os.FileInfo
files, err = ioutil.ReadDir(store.directory)
if err != nil {
return nil
}
for _, f := range files {
keys = append(keys, strings.TrimSuffix(f.Name(), eventExt))
}
return keys
func (store *QueueStore) listN(n int) []string {
storeDir, _ := os.Open(store.directory)
names, _ := storeDir.Readdirnames(n)
_ = storeDir.Close()
return names
}

@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"github.com/minio/minio/pkg/event"
@ -55,7 +56,7 @@ func TestQueueStorePut(t *testing.T) {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10000)
store, err := setUpStore(queueDir, 100)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
@ -67,8 +68,8 @@ func TestQueueStorePut(t *testing.T) {
}
}
// Count the events.
if len(store.ListAll()) != 100 {
t.Fatalf("ListAll() Expected: 100, got %d", len(store.ListAll()))
if len(store.ListN(-1)) != 100 {
t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1)))
}
}
@ -79,7 +80,7 @@ func TestQueueStoreGet(t *testing.T) {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10000)
store, err := setUpStore(queueDir, 10)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
@ -89,11 +90,11 @@ func TestQueueStoreGet(t *testing.T) {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
eventKeys := store.ListN(-1)
// Get 10 events.
if len(eventKeys) == 10 {
for _, key := range eventKeys {
event, eErr := store.Get(key)
event, eErr := store.Get(strings.TrimSuffix(key, eventExt))
if eErr != nil {
t.Fatal("Failed to Get the event from the queue store ", eErr)
}
@ -102,7 +103,7 @@ func TestQueueStoreGet(t *testing.T) {
}
}
} else {
t.Fatalf("ListAll() Expected: 10, got %d", len(eventKeys))
t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys))
}
}
@ -113,7 +114,7 @@ func TestQueueStoreDel(t *testing.T) {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10000)
store, err := setUpStore(queueDir, 20)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
@ -123,18 +124,21 @@ func TestQueueStoreDel(t *testing.T) {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListAll()
eventKeys := store.ListN(-1)
// Remove all the events.
if len(eventKeys) == 20 {
for _, key := range eventKeys {
store.Del(key)
err := store.Del(strings.TrimSuffix(key, eventExt))
if err != nil {
t.Fatal("queue store Del failed with ", err)
}
}
} else {
t.Fatalf("ListAll() Expected: 20, got %d", len(eventKeys))
t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys))
}
if len(store.ListAll()) != 0 {
t.Fatalf("ListAll() Expected: 0, got %d", len(store.ListAll()))
if len(store.ListN(-1)) != 0 {
t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1)))
}
}
@ -157,6 +161,32 @@ func TestQueueStoreLimit(t *testing.T) {
}
// Should not allow 6th Put.
if err := store.Put(testEvent); err == nil {
t.Fatalf("Expected to fail with %s, but passes", ErrLimitExceeded)
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
}
}
// TestQueueStoreLimit - tests for store.LimitN.
func TestQueueStoreListN(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
// Should return only 5 event keys.
if len(store.ListN(5)) != 5 {
t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5)))
}
// Should return all the event keys in the store.
if len(store.ListN(-1)) != 10 {
t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1)))
}
}

@ -69,8 +69,12 @@ func (target *RedisTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Redis.
func (target *RedisTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *RedisTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *RedisTarget) send(eventData event.Event) error {
conn := target.pool.Get()
defer func() {
// FIXME: log returned error. ignore time being.
@ -109,6 +113,11 @@ func (target *RedisTarget) Send(eventData event.Event) error {
return nil
}
// Send - interface compatible method does no-op.
func (target *RedisTarget) Send(eventKey string) error {
return nil
}
// Close - does nothing and available for interface compatibility.
func (target *RedisTarget) Close() error {
return nil

@ -18,20 +18,106 @@ package target
import (
"errors"
"fmt"
"strings"
"time"
"github.com/minio/minio/pkg/event"
)
// ErrLimitExceeded error is sent when the maximum limit is reached.
var ErrLimitExceeded = errors.New("[Store] The maximum limit reached")
const retryInterval = 3 * time.Second
// errNotConnected - indicates that the target connection is not active.
var errNotConnected = errors.New("not connected to target server/service")
// ErrNoSuchKey error is sent in Get when the key is not found.
var ErrNoSuchKey = errors.New("[Store] No such key found")
// errLimitExceeded error is sent when the maximum limit is reached.
var errLimitExceeded = errors.New("the maximum store limit reached")
// errNoSuchKey error is sent in Get when the key is not found.
var errNoSuchKey = errors.New("no such key found in store")
// Store - To persist the events.
type Store interface {
Put(event event.Event) error
Get(key string) (event.Event, error)
ListAll() []string
Del(key string)
ListN(n int) []string
Del(key string) error
Open() error
}
// replayEvents - Reads the events from the store and replays.
func replayEvents(store Store, doneCh <-chan struct{}) <-chan string {
var names []string
eventKeyCh := make(chan string)
go func() {
retryTimer := time.NewTimer(retryInterval)
defer retryTimer.Stop()
defer close(eventKeyCh)
for {
names = store.ListN(100)
for _, name := range names {
select {
case eventKeyCh <- strings.TrimSuffix(name, eventExt):
// Get next key.
case <-doneCh:
return
}
}
if len(names) < 2 {
retryTimer.Reset(retryInterval)
select {
case <-retryTimer.C:
case <-doneCh:
return
}
}
}
}()
return eventKeyCh
}
// sendEvents - Reads events from the store and re-plays.
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}) {
retryTimer := time.NewTimer(retryInterval)
defer retryTimer.Stop()
send := func(eventKey string) bool {
for {
err := target.Send(eventKey)
if err == nil {
break
}
if err != errNotConnected {
panic(fmt.Errorf("target.Send() failed with '%v'", err))
}
retryTimer.Reset(retryInterval)
select {
case <-retryTimer.C:
case <-doneCh:
return false
}
}
return true
}
for {
select {
case eventKey, ok := <-eventKeyCh:
if !ok {
// closed channel.
return
}
if !send(eventKey) {
return
}
case <-doneCh:
return
}
}
}

@ -64,8 +64,12 @@ func (target WebhookTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Webhook.
func (target *WebhookTarget) Send(eventData event.Event) error {
// Save - Sends event directly without persisting.
func (target *WebhookTarget) Save(eventData event.Event) error {
return target.send(eventData)
}
func (target *WebhookTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
@ -100,6 +104,11 @@ func (target *WebhookTarget) Send(eventData event.Event) error {
return nil
}
// Send - interface compatible method does no-op.
func (target *WebhookTarget) Send(eventKey string) error {
return nil
}
// Close - does nothing and available for interface compatibility.
func (target *WebhookTarget) Close() error {
return nil

@ -24,7 +24,8 @@ import (
// Target - event target interface
type Target interface {
ID() TargetID
Send(Event) error
Save(Event) error
Send(string) error
Close() error
}
@ -130,7 +131,7 @@ func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetID
wg.Add(1)
go func(id TargetID, target Target) {
defer wg.Done()
if err := target.Send(event); err != nil {
if err := target.Save(event); err != nil {
errCh <- TargetIDErr{
ID: id,
Err: err,

@ -34,7 +34,12 @@ func (target ExampleTarget) ID() TargetID {
return target.id
}
func (target ExampleTarget) Send(eventData Event) error {
// Save - Sends event directly without persisting.
func (target ExampleTarget) Save(eventData Event) error {
return target.send(eventData)
}
func (target ExampleTarget) send(eventData Event) error {
b := make([]byte, 1)
if _, err := rand.Read(b); err != nil {
panic(err)
@ -49,6 +54,11 @@ func (target ExampleTarget) Send(eventData Event) error {
return nil
}
// Send - interface compatible method does no-op.
func (target ExampleTarget) Send(eventKey string) error {
return nil
}
func (target ExampleTarget) Close() error {
if target.closeErr {
return errors.New("close error")

Loading…
Cancel
Save