diff --git a/.travis.yml b/.travis.yml
index 814e766c0..fdf9bd708 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -21,4 +21,4 @@ after_success:
- bash <(curl -s https://codecov.io/bash)
go:
-- 1.7.3
+- 1.7.4
diff --git a/cmd/api-headers.go b/cmd/api-headers.go
index 62d32a0ec..5f6cc730c 100644
--- a/cmd/api-headers.go
+++ b/cmd/api-headers.go
@@ -21,7 +21,6 @@ import (
"encoding/xml"
"fmt"
"net/http"
- "runtime"
"strconv"
"time"
)
@@ -36,7 +35,7 @@ func mustGetRequestID(t time.Time) string {
func setCommonHeaders(w http.ResponseWriter) {
// Set unique request ID for each reply.
w.Header().Set(responseRequestIDKey, mustGetRequestID(time.Now().UTC()))
- w.Header().Set("Server", ("Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")"))
+ w.Header().Set("Server", globalServerUserAgent)
w.Header().Set("Accept-Ranges", "bytes")
}
diff --git a/cmd/bucket-notification-utils.go b/cmd/bucket-notification-utils.go
index ff25e9d51..2c0e735f5 100644
--- a/cmd/bucket-notification-utils.go
+++ b/cmd/bucket-notification-utils.go
@@ -131,6 +131,7 @@ func isValidQueueID(queueARN string) bool {
// Unmarshals QueueARN into structured object.
sqsARN := unmarshalSqsARN(queueARN)
// Is Queue identifier valid?.
+
if isAMQPQueue(sqsARN) { // AMQP eueue.
amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID)
return amqpN.Enable && amqpN.URL != ""
@@ -151,6 +152,9 @@ func isValidQueueID(queueARN string) bool {
kafkaN := serverConfig.GetKafkaNotifyByID(sqsARN.AccountID)
return (kafkaN.Enable && len(kafkaN.Brokers) > 0 &&
kafkaN.Topic != "")
+ } else if isWebhookQueue(sqsARN) {
+ webhookN := serverConfig.GetWebhookNotifyByID(sqsARN.AccountID)
+ return webhookN.Enable && webhookN.Endpoint != ""
}
return false
}
@@ -241,6 +245,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
// - redis
// - postgresql
// - kafka
+// - webhook
func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
mSqs = arnSQS{}
if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") {
@@ -260,6 +265,8 @@ func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
mSqs.Type = queueTypePostgreSQL
case strings.HasSuffix(sqsType, queueTypeKafka):
mSqs.Type = queueTypeKafka
+ case strings.HasSuffix(sqsType, queueTypeWebhook):
+ mSqs.Type = queueTypeWebhook
} // Add more queues here.
mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type)
return mSqs
diff --git a/cmd/bucket-notification-utils_test.go b/cmd/bucket-notification-utils_test.go
index 64130e781..0a59f30f2 100644
--- a/cmd/bucket-notification-utils_test.go
+++ b/cmd/bucket-notification-utils_test.go
@@ -228,6 +228,12 @@ func TestQueueARN(t *testing.T) {
queueARN string
errCode APIErrorCode
}{
+
+ // Valid webhook queue arn.
+ {
+ queueARN: "arn:minio:sqs:us-east-1:1:webhook",
+ errCode: ErrNone,
+ },
// Valid redis queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:redis",
@@ -306,6 +312,11 @@ func TestUnmarshalSQSARN(t *testing.T) {
queueARN string
Type string
}{
+ // Valid webhook queue arn.
+ {
+ queueARN: "arn:minio:sqs:us-east-1:1:webhook",
+ Type: "webhook",
+ },
// Valid redis queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:redis",
diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go
index 8f6bccf4e..acf72f017 100644
--- a/cmd/config-migrate.go
+++ b/cmd/config-migrate.go
@@ -70,6 +70,10 @@ func migrateConfig() error {
if err := migrateV11ToV12(); err != nil {
return err
}
+ // Migration version '12' to '13'.
+ if err := migrateV12ToV13(); err != nil {
+ return err
+ }
return nil
}
@@ -836,3 +840,97 @@ func migrateV11ToV12() error {
)
return nil
}
+
+// Version '12' to '13' migration. Add support for custom webhook endpoint.
+func migrateV12ToV13() error {
+ cv12, err := loadConfigV12()
+ if err != nil {
+ if os.IsNotExist(err) {
+ return nil
+ }
+ return fmt.Errorf("Unable to load config version ‘12’. %v", err)
+ }
+ if cv12.Version != "12" {
+ return nil
+ }
+
+ // Copy over fields from V12 into V13 config struct
+ srvConfig := &serverConfigV13{}
+ srvConfig.Version = "13"
+ srvConfig.Credential = cv12.Credential
+ srvConfig.Region = cv12.Region
+ if srvConfig.Region == "" {
+ // Region needs to be set for AWS Signature Version 4.
+ srvConfig.Region = "us-east-1"
+ }
+ srvConfig.Logger.Console = cv12.Logger.Console
+ srvConfig.Logger.File = cv12.Logger.File
+
+ // check and set notifiers config
+ if len(cv12.Notify.AMQP) == 0 {
+ srvConfig.Notify.AMQP = make(map[string]amqpNotify)
+ srvConfig.Notify.AMQP["1"] = amqpNotify{}
+ } else {
+ srvConfig.Notify.AMQP = cv12.Notify.AMQP
+ }
+ if len(cv12.Notify.ElasticSearch) == 0 {
+ srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
+ srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{}
+ } else {
+ srvConfig.Notify.ElasticSearch = cv12.Notify.ElasticSearch
+ }
+ if len(cv12.Notify.Redis) == 0 {
+ srvConfig.Notify.Redis = make(map[string]redisNotify)
+ srvConfig.Notify.Redis["1"] = redisNotify{}
+ } else {
+ srvConfig.Notify.Redis = cv12.Notify.Redis
+ }
+ if len(cv12.Notify.PostgreSQL) == 0 {
+ srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
+ srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{}
+ } else {
+ srvConfig.Notify.PostgreSQL = cv12.Notify.PostgreSQL
+ }
+ if len(cv12.Notify.Kafka) == 0 {
+ srvConfig.Notify.Kafka = make(map[string]kafkaNotify)
+ srvConfig.Notify.Kafka["1"] = kafkaNotify{}
+ } else {
+ srvConfig.Notify.Kafka = cv12.Notify.Kafka
+ }
+ if len(cv12.Notify.NATS) == 0 {
+ srvConfig.Notify.NATS = make(map[string]natsNotify)
+ srvConfig.Notify.NATS["1"] = natsNotify{}
+ } else {
+ srvConfig.Notify.NATS = cv12.Notify.NATS
+ }
+
+ // V12 will not have a webhook config. So we initialize one here.
+ srvConfig.Notify.Webhook = make(map[string]webhookNotify)
+ srvConfig.Notify.Webhook["1"] = webhookNotify{}
+
+ qc, err := quick.New(srvConfig)
+ if err != nil {
+ return fmt.Errorf("Unable to initialize the quick config. %v",
+ err)
+ }
+ configFile, err := getConfigFile()
+ if err != nil {
+ return fmt.Errorf("Unable to get config file. %v", err)
+ }
+
+ err = qc.Save(configFile)
+ if err != nil {
+ return fmt.Errorf(
+ "Failed to migrate config from ‘"+
+ cv12.Version+"’ to ‘"+srvConfig.Version+
+ "’ failed. %v", err,
+ )
+ }
+
+ console.Println(
+ "Migration from version ‘" +
+ cv12.Version + "’ to ‘" + srvConfig.Version +
+ "’ completed successfully.",
+ )
+ return nil
+}
diff --git a/cmd/config-migrate_test.go b/cmd/config-migrate_test.go
index d4897e049..e5478cd00 100644
--- a/cmd/config-migrate_test.go
+++ b/cmd/config-migrate_test.go
@@ -101,7 +101,10 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
}
if err := migrateV11ToV12(); err != nil {
- t.Fatal("migrate v10 to v11 should succeed when no config file is found")
+ t.Fatal("migrate v11 to v12 should succeed when no config file is found")
+ }
+ if err := migrateV12ToV13(); err != nil {
+ t.Fatal("migrate v12 to v13 should succeed when no config file is found")
}
}
@@ -212,5 +215,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV11ToV12(); err == nil {
t.Fatal("migrateConfigV11ToV12() should fail with a corrupted json")
}
-
+ if err := migrateV12ToV13(); err == nil {
+ t.Fatal("migrateConfigV12ToV13() should fail with a corrupted json")
+ }
}
diff --git a/cmd/config-old.go b/cmd/config-old.go
index 30b7fd460..2914dd597 100644
--- a/cmd/config-old.go
+++ b/cmd/config-old.go
@@ -325,7 +325,8 @@ func loadConfigV6() (*configV6, error) {
return c, nil
}
-// Notifier represents collection of supported notification queues.
+// Notifier represents collection of supported notification queues in version
+// 1 without NATS streaming.
type notifierV1 struct {
AMQP map[string]amqpNotify `json:"amqp"`
NATS map[string]natsNotifyV1 `json:"nats"`
@@ -335,6 +336,17 @@ type notifierV1 struct {
Kafka map[string]kafkaNotify `json:"kafka"`
}
+// Notifier represents collection of supported notification queues in version 2
+// with NATS streaming but without webhook.
+type notifierV2 struct {
+ AMQP map[string]amqpNotify `json:"amqp"`
+ NATS map[string]natsNotify `json:"nats"`
+ ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
+ Redis map[string]redisNotify `json:"redis"`
+ PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
+ Kafka map[string]kafkaNotify `json:"kafka"`
+}
+
// configV7 server configuration version '7'.
type serverConfigV7 struct {
Version string `json:"version"`
@@ -538,3 +550,39 @@ func loadConfigV11() (*serverConfigV11, error) {
}
return srvCfg, nil
}
+
+// serverConfigV12 server configuration version '12' which is like
+// version '11' except it adds support for NATS streaming notifications.
+type serverConfigV12 struct {
+ Version string `json:"version"`
+
+ // S3 API configuration.
+ Credential credential `json:"credential"`
+ Region string `json:"region"`
+
+ // Additional error logging configuration.
+ Logger logger `json:"logger"`
+
+ // Notification queue configuration.
+ Notify notifierV2 `json:"notify"`
+}
+
+func loadConfigV12() (*serverConfigV12, error) {
+ configFile, err := getConfigFile()
+ if err != nil {
+ return nil, err
+ }
+ if _, err = os.Stat(configFile); err != nil {
+ return nil, err
+ }
+ srvCfg := &serverConfigV12{}
+ srvCfg.Version = "12"
+ qc, err := quick.New(srvCfg)
+ if err != nil {
+ return nil, err
+ }
+ if err := qc.Load(configFile); err != nil {
+ return nil, err
+ }
+ return srvCfg, nil
+}
diff --git a/cmd/config-v12.go b/cmd/config-v13.go
similarity index 72%
rename from cmd/config-v12.go
rename to cmd/config-v13.go
index 017529303..b29419699 100644
--- a/cmd/config-v12.go
+++ b/cmd/config-v13.go
@@ -26,9 +26,9 @@ import (
// Read Write mutex for safe access to ServerConfig.
var serverConfigMu sync.RWMutex
-// serverConfigV12 server configuration version '12' which is like
-// version '11' except it adds support for NATS streaming notifications.
-type serverConfigV12 struct {
+// serverConfigV13 server configuration version '13' which is like
+// version '12' except it adds support for webhook notification.
+type serverConfigV13 struct {
Version string `json:"version"`
// S3 API configuration.
@@ -47,7 +47,7 @@ type serverConfigV12 struct {
func initConfig() (bool, error) {
if !isConfigFileExists() {
// Initialize server config.
- srvCfg := &serverConfigV12{}
+ srvCfg := &serverConfigV13{}
srvCfg.Version = globalMinioConfigVersion
srvCfg.Region = "us-east-1"
srvCfg.Credential = newCredential()
@@ -71,12 +71,15 @@ func initConfig() (bool, error) {
srvCfg.Notify.PostgreSQL["1"] = postgreSQLNotify{}
srvCfg.Notify.Kafka = make(map[string]kafkaNotify)
srvCfg.Notify.Kafka["1"] = kafkaNotify{}
+ srvCfg.Notify.Webhook = make(map[string]webhookNotify)
+ srvCfg.Notify.Webhook["1"] = webhookNotify{}
// Create config path.
err := createConfigPath()
if err != nil {
return false, err
}
+
// hold the mutex lock before a new config is assigned.
// Save the new config globally.
// unlock the mutex.
@@ -94,7 +97,7 @@ func initConfig() (bool, error) {
if _, err = os.Stat(configFile); err != nil {
return false, err
}
- srvCfg := &serverConfigV12{}
+ srvCfg := &serverConfigV13{}
srvCfg.Version = globalMinioConfigVersion
qc, err := quick.New(srvCfg)
if err != nil {
@@ -116,10 +119,10 @@ func initConfig() (bool, error) {
}
// serverConfig server config.
-var serverConfig *serverConfigV12
+var serverConfig *serverConfigV13
// GetVersion get current config version.
-func (s serverConfigV12) GetVersion() string {
+func (s serverConfigV13) GetVersion() string {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -128,14 +131,14 @@ func (s serverConfigV12) GetVersion() string {
/// Logger related.
-func (s *serverConfigV12) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
+func (s *serverConfigV13) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.AMQP[accountID] = amqpn
}
-func (s serverConfigV12) GetAMQP() map[string]amqpNotify {
+func (s serverConfigV13) GetAMQP() map[string]amqpNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -143,7 +146,7 @@ func (s serverConfigV12) GetAMQP() map[string]amqpNotify {
}
// GetAMQPNotify get current AMQP logger.
-func (s serverConfigV12) GetAMQPNotifyByID(accountID string) amqpNotify {
+func (s serverConfigV13) GetAMQPNotifyByID(accountID string) amqpNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -151,35 +154,35 @@ func (s serverConfigV12) GetAMQPNotifyByID(accountID string) amqpNotify {
}
//
-func (s *serverConfigV12) SetNATSNotifyByID(accountID string, natsn natsNotify) {
+func (s *serverConfigV13) SetNATSNotifyByID(accountID string, natsn natsNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.NATS[accountID] = natsn
}
-func (s serverConfigV12) GetNATS() map[string]natsNotify {
+func (s serverConfigV13) GetNATS() map[string]natsNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.NATS
}
// GetNATSNotify get current NATS logger.
-func (s serverConfigV12) GetNATSNotifyByID(accountID string) natsNotify {
+func (s serverConfigV13) GetNATSNotifyByID(accountID string) natsNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.NATS[accountID]
}
-func (s *serverConfigV12) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
+func (s *serverConfigV13) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.ElasticSearch[accountID] = esNotify
}
-func (s serverConfigV12) GetElasticSearch() map[string]elasticSearchNotify {
+func (s serverConfigV13) GetElasticSearch() map[string]elasticSearchNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -187,50 +190,72 @@ func (s serverConfigV12) GetElasticSearch() map[string]elasticSearchNotify {
}
// GetElasticSearchNotify get current ElasicSearch logger.
-func (s serverConfigV12) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
+func (s serverConfigV13) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.ElasticSearch[accountID]
}
-func (s *serverConfigV12) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
+func (s *serverConfigV13) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.Redis[accountID] = rNotify
}
-func (s serverConfigV12) GetRedis() map[string]redisNotify {
+func (s serverConfigV13) GetRedis() map[string]redisNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Redis
}
+func (s serverConfigV13) GetWebhook() map[string]webhookNotify {
+ serverConfigMu.RLock()
+ defer serverConfigMu.RUnlock()
+
+ return s.Notify.Webhook
+}
+
+// GetWebhookNotifyByID get current Webhook logger.
+func (s serverConfigV13) GetWebhookNotifyByID(accountID string) webhookNotify {
+ serverConfigMu.RLock()
+ defer serverConfigMu.RUnlock()
+
+ return s.Notify.Webhook[accountID]
+}
+
+func (s *serverConfigV13) SetWebhookNotifyByID(accountID string, pgn webhookNotify) {
+ serverConfigMu.Lock()
+ defer serverConfigMu.Unlock()
+
+ s.Notify.Webhook[accountID] = pgn
+}
+
// GetRedisNotify get current Redis logger.
-func (s serverConfigV12) GetRedisNotifyByID(accountID string) redisNotify {
+func (s serverConfigV13) GetRedisNotifyByID(accountID string) redisNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Redis[accountID]
}
-func (s *serverConfigV12) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
+func (s *serverConfigV13) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.PostgreSQL[accountID] = pgn
}
-func (s serverConfigV12) GetPostgreSQL() map[string]postgreSQLNotify {
+func (s serverConfigV13) GetPostgreSQL() map[string]postgreSQLNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.PostgreSQL
}
-func (s serverConfigV12) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
+func (s serverConfigV13) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -238,21 +263,21 @@ func (s serverConfigV12) GetPostgreSQLNotifyByID(accountID string) postgreSQLNot
}
// Kafka related functions
-func (s *serverConfigV12) SetKafkaNotifyByID(accountID string, kn kafkaNotify) {
+func (s *serverConfigV13) SetKafkaNotifyByID(accountID string, kn kafkaNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.Kafka[accountID] = kn
}
-func (s serverConfigV12) GetKafka() map[string]kafkaNotify {
+func (s serverConfigV13) GetKafka() map[string]kafkaNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Kafka
}
-func (s serverConfigV12) GetKafkaNotifyByID(accountID string) kafkaNotify {
+func (s serverConfigV13) GetKafkaNotifyByID(accountID string) kafkaNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -260,7 +285,7 @@ func (s serverConfigV12) GetKafkaNotifyByID(accountID string) kafkaNotify {
}
// SetFileLogger set new file logger.
-func (s *serverConfigV12) SetFileLogger(flogger fileLogger) {
+func (s *serverConfigV13) SetFileLogger(flogger fileLogger) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -268,7 +293,7 @@ func (s *serverConfigV12) SetFileLogger(flogger fileLogger) {
}
// GetFileLogger get current file logger.
-func (s serverConfigV12) GetFileLogger() fileLogger {
+func (s serverConfigV13) GetFileLogger() fileLogger {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -276,7 +301,7 @@ func (s serverConfigV12) GetFileLogger() fileLogger {
}
// SetConsoleLogger set new console logger.
-func (s *serverConfigV12) SetConsoleLogger(clogger consoleLogger) {
+func (s *serverConfigV13) SetConsoleLogger(clogger consoleLogger) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -284,7 +309,7 @@ func (s *serverConfigV12) SetConsoleLogger(clogger consoleLogger) {
}
// GetConsoleLogger get current console logger.
-func (s serverConfigV12) GetConsoleLogger() consoleLogger {
+func (s serverConfigV13) GetConsoleLogger() consoleLogger {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -292,7 +317,7 @@ func (s serverConfigV12) GetConsoleLogger() consoleLogger {
}
// SetRegion set new region.
-func (s *serverConfigV12) SetRegion(region string) {
+func (s *serverConfigV13) SetRegion(region string) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -300,7 +325,7 @@ func (s *serverConfigV12) SetRegion(region string) {
}
// GetRegion get current region.
-func (s serverConfigV12) GetRegion() string {
+func (s serverConfigV13) GetRegion() string {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -308,7 +333,7 @@ func (s serverConfigV12) GetRegion() string {
}
// SetCredentials set new credentials.
-func (s *serverConfigV12) SetCredential(creds credential) {
+func (s *serverConfigV13) SetCredential(creds credential) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -316,7 +341,7 @@ func (s *serverConfigV12) SetCredential(creds credential) {
}
// GetCredentials get current credentials.
-func (s serverConfigV12) GetCredential() credential {
+func (s serverConfigV13) GetCredential() credential {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -324,7 +349,7 @@ func (s serverConfigV12) GetCredential() credential {
}
// Save config.
-func (s serverConfigV12) Save() error {
+func (s serverConfigV13) Save() error {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
diff --git a/cmd/config-v12_test.go b/cmd/config-v13_test.go
similarity index 92%
rename from cmd/config-v12_test.go
rename to cmd/config-v13_test.go
index 755976d43..5a80e43aa 100644
--- a/cmd/config-v12_test.go
+++ b/cmd/config-v13_test.go
@@ -67,6 +67,13 @@ func TestServerConfig(t *testing.T) {
t.Errorf("Expecting Kafka config %#v found %#v", kafkaNotify{}, savedNotifyCfg4)
}
+ // Set new Webhook notification id.
+ serverConfig.SetWebhookNotifyByID("2", webhookNotify{})
+ savedNotifyCfg5 := serverConfig.GetWebhookNotifyByID("2")
+ if !reflect.DeepEqual(savedNotifyCfg5, webhookNotify{}) {
+ t.Errorf("Expecting Webhook config %#v found %#v", webhookNotify{}, savedNotifyCfg3)
+ }
+
// Set new console logger.
serverConfig.SetConsoleLogger(consoleLogger{
Enable: true,
diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go
index 85c600242..fa3eb5b7b 100644
--- a/cmd/event-notifier.go
+++ b/cmd/event-notifier.go
@@ -612,6 +612,28 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
queueTargets[queueARN] = redisLog
}
+
+ // Load Webhook targets, initialize their respective loggers.
+ for accountID, webhookN := range serverConfig.GetWebhook() {
+ if !webhookN.Enable {
+ continue
+ }
+ // Construct the queue ARN for Webhook.
+ queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeWebhook
+ _, ok := queueTargets[queueARN]
+ if ok {
+ continue
+ }
+
+ // Using accountID we can now initialize a new Webhook logrus instance.
+ webhookLog, err := newWebhookNotify(accountID)
+ if err != nil {
+
+ return nil, err
+ }
+ queueTargets[queueARN] = webhookLog
+ }
+
// Load elastic targets, initialize their respective loggers.
for accountID, elasticN := range serverConfig.GetElasticSearch() {
if !elasticN.Enable {
@@ -637,6 +659,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
queueTargets[queueARN] = elasticLog
}
+
// Load PostgreSQL targets, initialize their respective loggers.
for accountID, pgN := range serverConfig.GetPostgreSQL() {
if !pgN.Enable {
diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go
index ee842676a..45e8a0b02 100644
--- a/cmd/event-notifier_test.go
+++ b/cmd/event-notifier_test.go
@@ -77,6 +77,99 @@ func TestInitEventNotifierFaultyDisks(t *testing.T) {
}
}
+// InitEventNotifierWithPostgreSQL - tests InitEventNotifier when PostgreSQL is not prepared
+func TestInitEventNotifierWithPostgreSQL(t *testing.T) {
+ // initialize the server and obtain the credentials and root.
+ // credentials are necessary to sign the HTTP request.
+ rootPath, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatalf("Init Test config failed")
+ }
+ // remove the root directory after the test ends.
+ defer removeAll(rootPath)
+
+ disks, err := getRandomDisks(1)
+ defer removeAll(disks[0])
+ if err != nil {
+ t.Fatal("Unable to create directories for FS backend. ", err)
+ }
+ endpoints, err := parseStorageEndpoints(disks)
+ if err != nil {
+ t.Fatal(err)
+ }
+ fs, _, err := initObjectLayer(endpoints)
+ if err != nil {
+ t.Fatal("Unable to initialize FS backend.", err)
+ }
+
+ serverConfig.SetPostgreSQLNotifyByID("1", postgreSQLNotify{Enable: true})
+ if err := initEventNotifier(fs); err == nil {
+ t.Fatal("PostgreSQL config didn't fail.")
+ }
+}
+
+// InitEventNotifierWithNATS - tests InitEventNotifier when NATS is not prepared
+func TestInitEventNotifierWithNATS(t *testing.T) {
+ // initialize the server and obtain the credentials and root.
+ // credentials are necessary to sign the HTTP request.
+ rootPath, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatalf("Init Test config failed")
+ }
+ // remove the root directory after the test ends.
+ defer removeAll(rootPath)
+
+ disks, err := getRandomDisks(1)
+ defer removeAll(disks[0])
+ if err != nil {
+ t.Fatal("Unable to create directories for FS backend. ", err)
+ }
+ endpoints, err := parseStorageEndpoints(disks)
+ if err != nil {
+ t.Fatal(err)
+ }
+ fs, _, err := initObjectLayer(endpoints)
+ if err != nil {
+ t.Fatal("Unable to initialize FS backend.", err)
+ }
+
+ serverConfig.SetNATSNotifyByID("1", natsNotify{Enable: true})
+ if err := initEventNotifier(fs); err == nil {
+ t.Fatal("NATS config didn't fail.")
+ }
+}
+
+// InitEventNotifierWithWebHook - tests InitEventNotifier when WebHook is not prepared
+func TestInitEventNotifierWithWebHook(t *testing.T) {
+ // initialize the server and obtain the credentials and root.
+ // credentials are necessary to sign the HTTP request.
+ rootPath, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatalf("Init Test config failed")
+ }
+ // remove the root directory after the test ends.
+ defer removeAll(rootPath)
+
+ disks, err := getRandomDisks(1)
+ defer removeAll(disks[0])
+ if err != nil {
+ t.Fatal("Unable to create directories for FS backend. ", err)
+ }
+ endpoints, err := parseStorageEndpoints(disks)
+ if err != nil {
+ t.Fatal(err)
+ }
+ fs, _, err := initObjectLayer(endpoints)
+ if err != nil {
+ t.Fatal("Unable to initialize FS backend.", err)
+ }
+
+ serverConfig.SetWebhookNotifyByID("1", webhookNotify{Enable: true})
+ if err := initEventNotifier(fs); err == nil {
+ t.Fatal("WebHook config didn't fail.")
+ }
+}
+
// InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared
func TestInitEventNotifierWithAMQP(t *testing.T) {
// initialize the server and obtain the credentials and root.
diff --git a/cmd/globals.go b/cmd/globals.go
index 0f0eb7476..419886e00 100644
--- a/cmd/globals.go
+++ b/cmd/globals.go
@@ -19,6 +19,7 @@ package cmd
import (
"crypto/x509"
"os"
+ "runtime"
"strings"
"time"
@@ -36,7 +37,7 @@ const (
// minio configuration related constants.
const (
- globalMinioConfigVersion = "12"
+ globalMinioConfigVersion = "13"
globalMinioConfigDir = ".minio"
globalMinioCertsDir = "certs"
globalMinioCertsCADir = "CAs"
@@ -96,6 +97,9 @@ var (
// List of admin peers.
globalAdminPeers = adminPeers{}
+ // Minio server user agent string.
+ globalServerUserAgent = "Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")"
+
// Add new variable global values here.
)
diff --git a/cmd/notifiers.go b/cmd/notifiers.go
index 8e65b9d9a..b7b5ec496 100644
--- a/cmd/notifiers.go
+++ b/cmd/notifiers.go
@@ -40,6 +40,8 @@ const (
queueTypePostgreSQL = "postgresql"
// Static string indicating queue type 'kafka'.
queueTypeKafka = "kafka"
+ // Static string for Webhooks
+ queueTypeWebhook = "webhook"
)
// Topic type.
@@ -61,6 +63,7 @@ type notifier struct {
Redis map[string]redisNotify `json:"redis"`
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
Kafka map[string]kafkaNotify `json:"kafka"`
+ Webhook map[string]webhookNotify `json:"webhook"`
// Add new notification queues.
}
@@ -102,6 +105,18 @@ func isNATSQueue(sqsArn arnSQS) bool {
return true
}
+// Returns true if queueArn is for an Webhook queue
+func isWebhookQueue(sqsArn arnSQS) bool {
+ if sqsArn.Type != queueTypeWebhook {
+ return false
+ }
+ rNotify := serverConfig.GetWebhookNotifyByID(sqsArn.AccountID)
+ if !rNotify.Enable {
+ return false
+ }
+ return true
+}
+
// Returns true if queueArn is for an Redis queue.
func isRedisQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeRedis {
diff --git a/cmd/notify-amqp.go b/cmd/notify-amqp.go
index ac5698d62..a5b87f72d 100644
--- a/cmd/notify-amqp.go
+++ b/cmd/notify-amqp.go
@@ -82,7 +82,7 @@ func newAMQPNotify(accountID string) (*logrus.Logger, error) {
return amqpLog, nil
}
-// Fire is called when an event should be sent to the message broker.k
+// Fire is called when an event should be sent to the message broker.
func (q amqpConn) Fire(entry *logrus.Entry) error {
ch, err := q.Connection.Channel()
if err != nil {
diff --git a/cmd/notify-webhook.go b/cmd/notify-webhook.go
new file mode 100644
index 000000000..10904672e
--- /dev/null
+++ b/cmd/notify-webhook.go
@@ -0,0 +1,136 @@
+/*
+ * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cmd
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+)
+
+type webhookNotify struct {
+ Enable bool `json:"enable"`
+ Endpoint string `json:"endpoint"`
+}
+
+type httpConn struct {
+ *http.Client
+ Endpoint string
+}
+
+// Lookup host address by dialing.
+func lookupHost(addr string) error {
+ dialer := &net.Dialer{
+ Timeout: 300 * time.Millisecond,
+ KeepAlive: 300 * time.Millisecond,
+ }
+ nconn, err := dialer.Dial("tcp", addr)
+ if err != nil {
+ return err
+ }
+ return nconn.Close()
+}
+
+// Initializes new webhook logrus notifier.
+func newWebhookNotify(accountID string) (*logrus.Logger, error) {
+ rNotify := serverConfig.GetWebhookNotifyByID(accountID)
+
+ if rNotify.Endpoint == "" {
+ return nil, errInvalidArgument
+ }
+
+ u, err := url.Parse(rNotify.Endpoint)
+ if err != nil {
+ return nil, err
+ }
+
+ if err = lookupHost(u.Host); err != nil {
+ return nil, err
+ }
+
+ conn := httpConn{
+ // Configure aggressive timeouts for client posts.
+ Client: &http.Client{
+ Transport: &http.Transport{
+ DialContext: (&net.Dialer{
+ Timeout: 5 * time.Second,
+ KeepAlive: 5 * time.Second,
+ }).DialContext,
+ TLSHandshakeTimeout: 3 * time.Second,
+ ResponseHeaderTimeout: 3 * time.Second,
+ ExpectContinueTimeout: 2 * time.Second,
+ },
+ },
+ Endpoint: rNotify.Endpoint,
+ }
+
+ notifyLog := logrus.New()
+ notifyLog.Out = ioutil.Discard
+
+ // Set default JSON formatter.
+ notifyLog.Formatter = new(logrus.JSONFormatter)
+
+ notifyLog.Hooks.Add(conn)
+
+ // Success
+ return notifyLog, nil
+}
+
+// Fire is called when an event should be sent to the message broker.
+func (n httpConn) Fire(entry *logrus.Entry) error {
+ body, err := entry.Reader()
+ if err != nil {
+ return err
+ }
+
+ req, err := http.NewRequest("POST", n.Endpoint, body)
+ if err != nil {
+ return err
+ }
+
+ // Set content-type.
+ req.Header.Set("Content-Type", "application/json")
+
+ // Set proper server user-agent.
+ req.Header.Set("User-Agent", globalServerUserAgent)
+
+ // Initiate the http request.
+ resp, err := n.Do(req)
+ if err != nil {
+ return err
+ }
+
+ if resp.StatusCode != http.StatusOK &&
+ resp.StatusCode != http.StatusAccepted &&
+ resp.StatusCode != http.StatusContinue {
+ return fmt.Errorf("Unable to send event %s", resp.Status)
+ }
+
+ return nil
+}
+
+// Levels are Required for logrus hook implementation
+func (httpConn) Levels() []logrus.Level {
+ return []logrus.Level{
+ logrus.InfoLevel,
+ }
+}
diff --git a/cmd/notify-webhook_test.go b/cmd/notify-webhook_test.go
new file mode 100644
index 000000000..3f298694f
--- /dev/null
+++ b/cmd/notify-webhook_test.go
@@ -0,0 +1,79 @@
+/*
+ * Minio Cloud Storage, (C) 2017 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cmd
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "path"
+ "testing"
+
+ "github.com/Sirupsen/logrus"
+)
+
+// Custom post handler to handle POST requests.
+type postHandler struct{}
+
+func (p postHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, fmt.Sprintf("Unexpected method %s", r.Method), http.StatusBadRequest)
+ return
+ }
+ io.Copy(w, r.Body)
+}
+
+// Tests web hook initialization.
+func TestNewWebHookNotify(t *testing.T) {
+ root, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer removeAll(root)
+
+ _, err = newWebhookNotify("1")
+ if err == nil {
+ t.Fatal("Unexpected should fail")
+ }
+
+ serverConfig.SetWebhookNotifyByID("10", webhookNotify{Enable: true, Endpoint: "http://www."})
+ _, err = newWebhookNotify("10")
+ if err == nil {
+ t.Fatal("Unexpected should fail with lookupHost")
+ }
+
+ serverConfig.SetWebhookNotifyByID("15", webhookNotify{Enable: true, Endpoint: "http://%"})
+ _, err = newWebhookNotify("15")
+ if err == nil {
+ t.Fatal("Unexpected should fail with invalid URL escape")
+ }
+
+ server := httptest.NewServer(postHandler{})
+ defer server.Close()
+
+ serverConfig.SetWebhookNotifyByID("20", webhookNotify{Enable: true, Endpoint: server.URL})
+ webhook, err := newWebhookNotify("20")
+ if err != nil {
+ t.Fatal("Unexpected shouldn't fail", err)
+ }
+
+ webhook.WithFields(logrus.Fields{
+ "Key": path.Join("bucket", "object"),
+ "EventType": "s3:ObjectCreated:Put",
+ }).Info()
+}
diff --git a/cmd/server_test.go b/cmd/server_test.go
index 5f6fa9b83..04af7e129 100644
--- a/cmd/server_test.go
+++ b/cmd/server_test.go
@@ -100,7 +100,37 @@ func (s *TestSuiteCommon) TestAuth(c *C) {
c.Assert(len(cred.SecretKey), Equals, secretKeyMaxLen)
}
-func (s *TestSuiteCommon) TestBucketSQSNotification(c *C) {
+func (s *TestSuiteCommon) TestBucketSQSNotificationWebHook(c *C) {
+ // Sample bucket notification.
+ bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sqs:us-east-1:444455556666:webhook`
+ // generate a random bucket Name.
+ bucketName := getRandomBucketName()
+ // HTTP request to create the bucket.
+ request, err := newTestSignedRequest("PUT", getMakeBucketURL(s.endPoint, bucketName),
+ 0, nil, s.accessKey, s.secretKey, s.signer)
+ c.Assert(err, IsNil)
+
+ client := http.Client{Transport: s.transport}
+ // execute the request.
+ response, err := client.Do(request)
+ c.Assert(err, IsNil)
+
+ // assert the http response status code.
+ c.Assert(response.StatusCode, Equals, http.StatusOK)
+
+ request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
+ int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey, s.signer)
+ c.Assert(err, IsNil)
+
+ client = http.Client{Transport: s.transport}
+ // execute the HTTP request.
+ response, err = client.Do(request)
+
+ c.Assert(err, IsNil)
+ verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
+}
+
+func (s *TestSuiteCommon) TestBucketSQSNotificationAMQP(c *C) {
// Sample bucket notification.
bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sqs:us-east-1:444455556666:amqp`
// generate a random bucket Name.