|
|
|
@ -19,6 +19,7 @@ package cmd |
|
|
|
|
import ( |
|
|
|
|
"reflect" |
|
|
|
|
"testing" |
|
|
|
|
"time" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Tests event notify.
|
|
|
|
@ -38,7 +39,9 @@ func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { |
|
|
|
|
// remove the root folder after the test ends.
|
|
|
|
|
defer removeAll(rootPath) |
|
|
|
|
|
|
|
|
|
initEventNotifier(obj) |
|
|
|
|
if err := initEventNotifier(obj); err != nil { |
|
|
|
|
t.Fatal("Unexpected error:", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Notify object created event.
|
|
|
|
|
eventNotify(eventData{ |
|
|
|
@ -72,7 +75,7 @@ func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { |
|
|
|
|
|
|
|
|
|
// Notify object created event.
|
|
|
|
|
eventNotify(eventData{ |
|
|
|
|
Type: ObjectCreatedPost, |
|
|
|
|
Type: ObjectRemovedDelete, |
|
|
|
|
Bucket: bucketName, |
|
|
|
|
ObjInfo: ObjectInfo{ |
|
|
|
|
Bucket: bucketName, |
|
|
|
@ -140,3 +143,236 @@ func TestInitEventNotifier(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Test InitEventNotifier with faulty disks
|
|
|
|
|
func TestInitEventNotifierFaultyDisks(t *testing.T) { |
|
|
|
|
// Prepare for tests
|
|
|
|
|
rootPath, err := newTestConfig("us-east-1") |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatalf("Init Test config failed") |
|
|
|
|
} |
|
|
|
|
// remove the root folder after the test ends.
|
|
|
|
|
defer removeAll(rootPath) |
|
|
|
|
|
|
|
|
|
disk, err := getRandomDisks(1) |
|
|
|
|
defer removeAll(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to create directories for FS backend. ", err) |
|
|
|
|
} |
|
|
|
|
obj, err := getSingleNodeObjectLayer(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to initialize FS backend.", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bucketName := "bucket" |
|
|
|
|
if err := obj.MakeBucket(bucketName); err != nil { |
|
|
|
|
t.Fatal("Unexpected error:", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fs := obj.(fsObjects) |
|
|
|
|
fsstorage := fs.storage.(*posix) |
|
|
|
|
|
|
|
|
|
listenARN := "arn:minio:sns:us-east-1:1:listen" |
|
|
|
|
queueARN := "arn:minio:sqs:us-east-1:1:redis" |
|
|
|
|
|
|
|
|
|
// Write a notification.xml in the disk
|
|
|
|
|
notificationXML := "<NotificationConfiguration>" |
|
|
|
|
notificationXML += "<TopicConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Topic>" + listenARN + "</Topic></TopicConfiguration>" |
|
|
|
|
notificationXML += "<QueueConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Queue>" + queueARN + "</Queue></QueueConfiguration>" |
|
|
|
|
notificationXML += "</NotificationConfiguration>" |
|
|
|
|
if err := fsstorage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil { |
|
|
|
|
t.Fatal("Unexpected error:", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Test initEventNotifier() with faulty disks
|
|
|
|
|
for i := 1; i <= 5; i++ { |
|
|
|
|
fs.storage = newNaughtyDisk(fsstorage, map[int]error{i: errFaultyDisk}, nil) |
|
|
|
|
if err := initEventNotifier(fs); errorCause(err) != errFaultyDisk { |
|
|
|
|
t.Fatal("Unexpected error:", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared
|
|
|
|
|
func TestInitEventNotifierWithAMQP(t *testing.T) { |
|
|
|
|
// initialize the server and obtain the credentials and root.
|
|
|
|
|
// credentials are necessary to sign the HTTP request.
|
|
|
|
|
rootPath, err := newTestConfig("us-east-1") |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatalf("Init Test config failed") |
|
|
|
|
} |
|
|
|
|
// remove the root folder after the test ends.
|
|
|
|
|
defer removeAll(rootPath) |
|
|
|
|
|
|
|
|
|
disk, err := getRandomDisks(1) |
|
|
|
|
defer removeAll(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to create directories for FS backend. ", err) |
|
|
|
|
} |
|
|
|
|
fs, err := getSingleNodeObjectLayer(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to initialize FS backend.", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
serverConfig.SetAMQPNotifyByID("1", amqpNotify{Enable: true}) |
|
|
|
|
if err := initEventNotifier(fs); err == nil { |
|
|
|
|
t.Fatal("AMQP config didn't fail.") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// InitEventNotifierWithElasticSearch - test InitEventNotifier when ElasticSearch is not ready
|
|
|
|
|
func TestInitEventNotifierWithElasticSearch(t *testing.T) { |
|
|
|
|
// initialize the server and obtain the credentials and root.
|
|
|
|
|
// credentials are necessary to sign the HTTP request.
|
|
|
|
|
rootPath, err := newTestConfig("us-east-1") |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatalf("Init Test config failed") |
|
|
|
|
} |
|
|
|
|
// remove the root folder after the test ends.
|
|
|
|
|
defer removeAll(rootPath) |
|
|
|
|
|
|
|
|
|
disk, err := getRandomDisks(1) |
|
|
|
|
defer removeAll(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to create directories for FS backend. ", err) |
|
|
|
|
} |
|
|
|
|
fs, err := getSingleNodeObjectLayer(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to initialize FS backend.", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
serverConfig.SetElasticSearchNotifyByID("1", elasticSearchNotify{Enable: true}) |
|
|
|
|
if err := initEventNotifier(fs); err == nil { |
|
|
|
|
t.Fatal("ElasticSearch config didn't fail.") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// InitEventNotifierWithRedis - test InitEventNotifier when Redis is not ready
|
|
|
|
|
func TestInitEventNotifierWithRedis(t *testing.T) { |
|
|
|
|
// initialize the server and obtain the credentials and root.
|
|
|
|
|
// credentials are necessary to sign the HTTP request.
|
|
|
|
|
rootPath, err := newTestConfig("us-east-1") |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatalf("Init Test config failed") |
|
|
|
|
} |
|
|
|
|
// remove the root folder after the test ends.
|
|
|
|
|
defer removeAll(rootPath) |
|
|
|
|
|
|
|
|
|
disk, err := getRandomDisks(1) |
|
|
|
|
defer removeAll(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to create directories for FS backend. ", err) |
|
|
|
|
} |
|
|
|
|
fs, err := getSingleNodeObjectLayer(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to initialize FS backend.", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
serverConfig.SetRedisNotifyByID("1", redisNotify{Enable: true}) |
|
|
|
|
if err := initEventNotifier(fs); err == nil { |
|
|
|
|
t.Fatal("Redis config didn't fail.") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TestListenBucketNotification - test Listen Bucket Notification process
|
|
|
|
|
func TestListenBucketNotification(t *testing.T) { |
|
|
|
|
|
|
|
|
|
bucketName := "bucket" |
|
|
|
|
objectName := "object" |
|
|
|
|
|
|
|
|
|
// Prepare for tests
|
|
|
|
|
// Create fs backend
|
|
|
|
|
rootPath, err := newTestConfig("us-east-1") |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatalf("Init Test config failed") |
|
|
|
|
} |
|
|
|
|
// remove the root folder after the test ends.
|
|
|
|
|
defer removeAll(rootPath) |
|
|
|
|
|
|
|
|
|
disk, err := getRandomDisks(1) |
|
|
|
|
defer removeAll(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to create directories for FS backend. ", err) |
|
|
|
|
} |
|
|
|
|
obj, err := getSingleNodeObjectLayer(disk[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal("Unable to initialize FS backend.", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Create the bucket to listen on
|
|
|
|
|
if err := obj.MakeBucket(bucketName); err != nil { |
|
|
|
|
t.Fatal("Unexpected error:", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
listenARN := "arn:minio:sns:us-east-1:1:listen" |
|
|
|
|
queueARN := "arn:minio:sqs:us-east-1:1:redis" |
|
|
|
|
|
|
|
|
|
fs := obj.(fsObjects) |
|
|
|
|
storage := fs.storage.(*posix) |
|
|
|
|
|
|
|
|
|
// Create and store notification.xml with listen and queue notification configured
|
|
|
|
|
notificationXML := "<NotificationConfiguration>" |
|
|
|
|
notificationXML += "<TopicConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Topic>" + listenARN + "</Topic></TopicConfiguration>" |
|
|
|
|
notificationXML += "<QueueConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Queue>" + queueARN + "</Queue></QueueConfiguration>" |
|
|
|
|
notificationXML += "</NotificationConfiguration>" |
|
|
|
|
if err := storage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil { |
|
|
|
|
t.Fatal("Unexpected error:", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Init event notifier
|
|
|
|
|
if err := initEventNotifier(fs); err != nil { |
|
|
|
|
t.Fatal("Unexpected error:", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Check if the config is loaded
|
|
|
|
|
notificationCfg := globalEventNotifier.GetBucketNotificationConfig(bucketName) |
|
|
|
|
if notificationCfg == nil { |
|
|
|
|
t.Fatal("Cannot load bucket notification config") |
|
|
|
|
} |
|
|
|
|
if len(notificationCfg.TopicConfigs) != 1 || len(notificationCfg.QueueConfigs) != 1 { |
|
|
|
|
t.Fatal("Notification config is not correctly loaded. Exactly one topic and one queue config are expected") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Check if listen notification config is enabled
|
|
|
|
|
if !isMinioSNSConfigured(listenARN, notificationCfg.TopicConfigs) { |
|
|
|
|
t.Fatal("SNS listen is not configured.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Create a new notification event channel.
|
|
|
|
|
nEventCh := make(chan []NotificationEvent) |
|
|
|
|
// Close the listener channel.
|
|
|
|
|
defer close(nEventCh) |
|
|
|
|
// Set sns target.
|
|
|
|
|
globalEventNotifier.SetSNSTarget(listenARN, nEventCh) |
|
|
|
|
// Remove sns listener after the writer has closed or the client disconnected.
|
|
|
|
|
defer globalEventNotifier.RemoveSNSTarget(listenARN, nEventCh) |
|
|
|
|
|
|
|
|
|
// Fire an event notification
|
|
|
|
|
go eventNotify(eventData{ |
|
|
|
|
Type: ObjectRemovedDelete, |
|
|
|
|
Bucket: bucketName, |
|
|
|
|
ObjInfo: ObjectInfo{ |
|
|
|
|
Bucket: bucketName, |
|
|
|
|
Name: objectName, |
|
|
|
|
}, |
|
|
|
|
ReqParams: map[string]string{ |
|
|
|
|
"sourceIPAddress": "localhost:1337", |
|
|
|
|
}, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
// Wait for the event notification here, if nothing is received within 30 seconds,
|
|
|
|
|
// test error will be fired
|
|
|
|
|
select { |
|
|
|
|
case n := <-nEventCh: |
|
|
|
|
// Check that received event
|
|
|
|
|
if len(n) == 0 { |
|
|
|
|
t.Fatal("Unexpected error occured") |
|
|
|
|
} |
|
|
|
|
if n[0].S3.Object.Key != objectName { |
|
|
|
|
t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName) |
|
|
|
|
} |
|
|
|
|
break |
|
|
|
|
case <-time.After(30 * time.Second): |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|