Notification: Changes to persistent event store (#7658)

This patch includes the following changes in event store interface
- Removes memory store. We will not persist events in memory anymore, if `queueDir` is not set.
- Orders the events before replaying to the broker.
master
Praveen raj Mani 6 years ago committed by kannappanr
parent 59e1d94770
commit c4c79f61ce
  1. 118
      pkg/event/target/memorystore.go
  2. 125
      pkg/event/target/memorystore_test.go
  3. 81
      pkg/event/target/mqtt.go
  4. 24
      pkg/event/target/queuestore.go
  5. 24
      pkg/event/target/queuestore_test.go
  6. 7
      pkg/event/target/store.go

@ -1,118 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"sync"
"github.com/minio/minio/pkg/event"
)
const (
maxStoreLimit = 10000
)
// MemoryStore persists events in memory.
type MemoryStore struct {
sync.RWMutex
events map[string]event.Event
eC uint16
limit uint16
}
// NewMemoryStore creates a memory store instance.
func NewMemoryStore(limit uint16) *MemoryStore {
if limit == 0 || limit > maxStoreLimit {
limit = maxStoreLimit
}
memoryStore := &MemoryStore{
events: make(map[string]event.Event),
limit: limit,
}
return memoryStore
}
// Open is in-effective here.
// Implemented for interface compatibility.
func (store *MemoryStore) Open() error {
return nil
}
// Put - puts the event in store.
func (store *MemoryStore) Put(e event.Event) error {
store.Lock()
defer store.Unlock()
if store.eC == store.limit {
return errLimitExceeded
}
key, kErr := getNewUUID()
if kErr != nil {
return kErr
}
store.events[key] = e
store.eC++
return nil
}
// Get - retrieves the event from store.
func (store *MemoryStore) Get(key string) (event.Event, error) {
store.RLock()
defer store.RUnlock()
if event, exist := store.events[key]; exist {
return event, nil
}
return event.Event{}, errNoSuchKey
}
// Del - deletes the event from store.
func (store *MemoryStore) Del(key string) error {
store.Lock()
defer store.Unlock()
delete(store.events, key)
store.eC--
return nil
}
// ListN - lists atmost N keys in the store.
func (store *MemoryStore) ListN(n int) []string {
store.RLock()
defer store.RUnlock()
var i int
if n == -1 {
n = len(store.events)
}
keys := []string{}
for k := range store.events {
if i < n {
keys = append(keys, k)
i++
continue
} else {
break
}
}
return keys
}

@ -1,125 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"reflect"
"testing"
)
// TestMemoryStorePut - Tests for store.Put
func TestMemoryStorePut(t *testing.T) {
store := NewMemoryStore(100)
defer func() {
store = nil
}()
for i := 0; i < 100; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
if len(store.ListN(-1)) != 100 {
t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1)))
}
}
// TestMemoryStoreGet - Tests for store.Get.
func TestMemoryStoreGet(t *testing.T) {
store := NewMemoryStore(10)
defer func() {
store = nil
}()
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListN(-1)
if len(eventKeys) == 10 {
for _, key := range eventKeys {
event, eErr := store.Get(key)
if eErr != nil {
t.Fatal("Failed to Get the event from the queue store ", eErr)
}
if !reflect.DeepEqual(testEvent, event) {
t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event)
}
}
} else {
t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys))
}
}
// TestMemoryStoreDel - Tests for store.Del.
func TestMemoryStoreDel(t *testing.T) {
store := NewMemoryStore(20)
defer func() {
store = nil
}()
for i := 0; i < 20; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys := store.ListN(-1)
if len(eventKeys) == 20 {
for _, key := range eventKeys {
_ = store.Del(key)
}
} else {
t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys))
}
if len(store.ListN(-1)) != 0 {
t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1)))
}
}
// TestMemoryStoreLimit - tests for store limit.
func TestMemoryStoreLimit(t *testing.T) {
store := NewMemoryStore(5)
defer func() {
store = nil
}()
for i := 0; i < 5; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
if err := store.Put(testEvent); err == nil {
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
}
}
// TestMemoryStoreListN - tests for store.ListN.
func TestMemoryStoreListN(t *testing.T) {
store := NewMemoryStore(10)
defer func() {
store = nil
}()
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
if len(store.ListN(5)) != 5 {
t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5)))
}
if len(store.ListN(-1)) != 10 {
t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1)))
}
}

@ -73,6 +73,9 @@ func (m MQTTArgs) Validate() error {
return errors.New("qos should be set to 1 or 2 if queueDir is set") return errors.New("qos should be set to 1 or 2 if queueDir is set")
} }
} }
if m.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
return nil return nil
} }
@ -90,7 +93,29 @@ func (target *MQTTTarget) ID() event.TargetID {
return target.id return target.id
} }
// Send - sends event to MQTT. // send - sends an event to the mqtt.
func (target *MQTTTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
token.Wait()
if token.Error() != nil {
return token.Error()
}
return nil
}
// Send - reads an event from store and sends it to MQTT.
func (target *MQTTTarget) Send(eventKey string) error { func (target *MQTTTarget) Send(eventKey string) error {
if !target.client.IsConnectionOpen() { if !target.client.IsConnectionOpen() {
@ -107,30 +132,26 @@ func (target *MQTTTarget) Send(eventKey string) error {
return eErr return eErr
} }
objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err := target.send(eventData); err != nil {
if err != nil {
return err return err
} }
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
token.Wait()
if token.Error() != nil {
return token.Error()
}
// Delete the event from store. // Delete the event from store.
return target.store.Del(eventKey) return target.store.Del(eventKey)
} }
// Save - saves the events to the store which will be replayed when the mqtt connection is active. // Save - saves the events to the store if questore is configured, which will be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error { func (target *MQTTTarget) Save(eventData event.Event) error {
return target.store.Put(eventData) if target.store != nil {
return target.store.Put(eventData)
}
// Do not send if the connection is not active.
if !target.client.IsConnectionOpen() {
return errNotConnected
}
return target.send(eventData)
} }
// Close - does nothing and available for interface compatibility. // Close - does nothing and available for interface compatibility.
@ -158,8 +179,6 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge
if oErr := store.Open(); oErr != nil { if oErr := store.Open(); oErr != nil {
return nil, oErr return nil, oErr
} }
} else {
store = NewMemoryStore(args.QueueLimit)
} }
client := mqtt.NewClient(options) client := mqtt.NewClient(options)
@ -168,7 +187,8 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge
// Connect() should be successful atleast once to publish events. // Connect() should be successful atleast once to publish events.
token := client.Connect() token := client.Connect()
go func() { // Retries until the clientID gets registered.
retryRegister := func() {
// Repeat the pings until the client registers the clientId and receives a token. // Repeat the pings until the client registers the clientId and receives a token.
for { for {
if token.Wait() && token.Error() == nil { if token.Wait() && token.Error() == nil {
@ -179,7 +199,15 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge
time.Sleep(reconnectInterval * time.Second) time.Sleep(reconnectInterval * time.Second)
token = client.Connect() token = client.Connect()
} }
}() }
if store == nil {
if token.Wait() && token.Error() != nil {
return nil, token.Error()
}
} else {
go retryRegister()
}
target := &MQTTTarget{ target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"}, id: event.TargetID{ID: id, Name: "mqtt"},
@ -188,11 +216,12 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}) (*MQTTTarge
store: store, store: store,
} }
// Replays the events from the store. if target.store != nil {
eventKeyCh := replayEvents(target.store, doneCh) // Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store. // Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh) go sendEvents(target, eventKeyCh, doneCh)
}
return target, nil return target, nil
} }

@ -21,6 +21,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"sync" "sync"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
@ -60,7 +61,7 @@ func (store *QueueStore) Open() error {
return terr return terr
} }
eCount := uint16(len(store.listN(-1))) eCount := uint16(len(store.list()))
if eCount >= store.limit { if eCount >= store.limit {
return errLimitExceeded return errLimitExceeded
} }
@ -154,17 +155,28 @@ func (store *QueueStore) del(key string) error {
return nil return nil
} }
// ListN - lists atmost N files from the directory. // List - lists all files from the directory.
func (store *QueueStore) ListN(n int) []string { func (store *QueueStore) List() []string {
store.RLock() store.RLock()
defer store.RUnlock() defer store.RUnlock()
return store.listN(n) return store.list()
} }
// lockless call. // lockless call.
func (store *QueueStore) listN(n int) []string { func (store *QueueStore) list() []string {
var names []string
storeDir, _ := os.Open(store.directory) storeDir, _ := os.Open(store.directory)
names, _ := storeDir.Readdirnames(n) files, _ := storeDir.Readdir(-1)
// Sort the dentries.
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Unix() < files[j].ModTime().Unix()
})
for _, file := range files {
names = append(names, file.Name())
}
_ = storeDir.Close() _ = storeDir.Close()
return names return names
} }

@ -68,8 +68,8 @@ func TestQueueStorePut(t *testing.T) {
} }
} }
// Count the events. // Count the events.
if len(store.ListN(-1)) != 100 { if len(store.List()) != 100 {
t.Fatalf("ListN() Expected: 100, got %d", len(store.ListN(-1))) t.Fatalf("List() Expected: 100, got %d", len(store.List()))
} }
} }
@ -90,7 +90,7 @@ func TestQueueStoreGet(t *testing.T) {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
eventKeys := store.ListN(-1) eventKeys := store.List()
// Get 10 events. // Get 10 events.
if len(eventKeys) == 10 { if len(eventKeys) == 10 {
for _, key := range eventKeys { for _, key := range eventKeys {
@ -103,7 +103,7 @@ func TestQueueStoreGet(t *testing.T) {
} }
} }
} else { } else {
t.Fatalf("ListN() Expected: 10, got %d", len(eventKeys)) t.Fatalf("List() Expected: 10, got %d", len(eventKeys))
} }
} }
@ -124,7 +124,7 @@ func TestQueueStoreDel(t *testing.T) {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
eventKeys := store.ListN(-1) eventKeys := store.List()
// Remove all the events. // Remove all the events.
if len(eventKeys) == 20 { if len(eventKeys) == 20 {
for _, key := range eventKeys { for _, key := range eventKeys {
@ -134,11 +134,11 @@ func TestQueueStoreDel(t *testing.T) {
} }
} }
} else { } else {
t.Fatalf("ListN() Expected: 20, got %d", len(eventKeys)) t.Fatalf("List() Expected: 20, got %d", len(eventKeys))
} }
if len(store.ListN(-1)) != 0 { if len(store.List()) != 0 {
t.Fatalf("ListN() Expected: 0, got %d", len(store.ListN(-1))) t.Fatalf("List() Expected: 0, got %d", len(store.List()))
} }
} }
@ -181,12 +181,8 @@ func TestQueueStoreListN(t *testing.T) {
t.Fatal("Failed to put to queue store ", err) t.Fatal("Failed to put to queue store ", err)
} }
} }
// Should return only 5 event keys.
if len(store.ListN(5)) != 5 {
t.Fatalf("ListN(5) Expected: 5, got %d", len(store.ListN(5)))
}
// Should return all the event keys in the store. // Should return all the event keys in the store.
if len(store.ListN(-1)) != 10 { if len(store.List()) != 10 {
t.Fatalf("ListN(-1) Expected: 10, got %d", len(store.ListN(-1))) t.Fatalf("List() Expected: 10, got %d", len(store.List()))
} }
} }

@ -33,14 +33,11 @@ var errNotConnected = errors.New("not connected to target server/service")
// errLimitExceeded error is sent when the maximum limit is reached. // errLimitExceeded error is sent when the maximum limit is reached.
var errLimitExceeded = errors.New("the maximum store limit reached") var errLimitExceeded = errors.New("the maximum store limit reached")
// errNoSuchKey error is sent in Get when the key is not found.
var errNoSuchKey = errors.New("no such key found in store")
// Store - To persist the events. // Store - To persist the events.
type Store interface { type Store interface {
Put(event event.Event) error Put(event event.Event) error
Get(key string) (event.Event, error) Get(key string) (event.Event, error)
ListN(n int) []string List() []string
Del(key string) error Del(key string) error
Open() error Open() error
} }
@ -55,7 +52,7 @@ func replayEvents(store Store, doneCh <-chan struct{}) <-chan string {
defer retryTimer.Stop() defer retryTimer.Stop()
defer close(eventKeyCh) defer close(eventKeyCh)
for { for {
names = store.ListN(100) names = store.List()
for _, name := range names { for _, name := range names {
select { select {
case eventKeyCh <- strings.TrimSuffix(name, eventExt): case eventKeyCh <- strings.TrimSuffix(name, eventExt):

Loading…
Cancel
Save