From d6a327fbc54e6536ec8552c72faf169b740e0e7c Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 9 Jan 2017 22:22:10 +0000 Subject: [PATCH] Add notifications by webhook. Add a new config entry moving to version 13. ``` "webhook": { "1": { "enable": true, "address": "http://requestb.in/1i9al7m1" } } ``` --- .travis.yml | 2 +- cmd/api-headers.go | 3 +- cmd/bucket-notification-utils.go | 7 + cmd/bucket-notification-utils_test.go | 11 ++ cmd/config-migrate.go | 98 +++++++++++++ cmd/config-migrate_test.go | 9 +- cmd/config-old.go | 50 ++++++- cmd/{config-v12.go => config-v13.go} | 93 +++++++----- ...{config-v12_test.go => config-v13_test.go} | 7 + cmd/event-notifier.go | 23 +++ cmd/event-notifier_test.go | 93 ++++++++++++ cmd/globals.go | 6 +- cmd/notifiers.go | 15 ++ cmd/notify-amqp.go | 2 +- cmd/notify-webhook.go | 136 ++++++++++++++++++ cmd/notify-webhook_test.go | 79 ++++++++++ cmd/server_test.go | 32 ++++- 17 files changed, 623 insertions(+), 43 deletions(-) rename cmd/{config-v12.go => config-v13.go} (72%) rename cmd/{config-v12_test.go => config-v13_test.go} (92%) create mode 100644 cmd/notify-webhook.go create mode 100644 cmd/notify-webhook_test.go diff --git a/.travis.yml b/.travis.yml index 814e766c0..fdf9bd708 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,4 +21,4 @@ after_success: - bash <(curl -s https://codecov.io/bash) go: -- 1.7.3 +- 1.7.4 diff --git a/cmd/api-headers.go b/cmd/api-headers.go index 62d32a0ec..5f6cc730c 100644 --- a/cmd/api-headers.go +++ b/cmd/api-headers.go @@ -21,7 +21,6 @@ import ( "encoding/xml" "fmt" "net/http" - "runtime" "strconv" "time" ) @@ -36,7 +35,7 @@ func mustGetRequestID(t time.Time) string { func setCommonHeaders(w http.ResponseWriter) { // Set unique request ID for each reply. w.Header().Set(responseRequestIDKey, mustGetRequestID(time.Now().UTC())) - w.Header().Set("Server", ("Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")")) + w.Header().Set("Server", globalServerUserAgent) w.Header().Set("Accept-Ranges", "bytes") } diff --git a/cmd/bucket-notification-utils.go b/cmd/bucket-notification-utils.go index ff25e9d51..2c0e735f5 100644 --- a/cmd/bucket-notification-utils.go +++ b/cmd/bucket-notification-utils.go @@ -131,6 +131,7 @@ func isValidQueueID(queueARN string) bool { // Unmarshals QueueARN into structured object. sqsARN := unmarshalSqsARN(queueARN) // Is Queue identifier valid?. + if isAMQPQueue(sqsARN) { // AMQP eueue. amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID) return amqpN.Enable && amqpN.URL != "" @@ -151,6 +152,9 @@ func isValidQueueID(queueARN string) bool { kafkaN := serverConfig.GetKafkaNotifyByID(sqsARN.AccountID) return (kafkaN.Enable && len(kafkaN.Brokers) > 0 && kafkaN.Topic != "") + } else if isWebhookQueue(sqsARN) { + webhookN := serverConfig.GetWebhookNotifyByID(sqsARN.AccountID) + return webhookN.Enable && webhookN.Endpoint != "" } return false } @@ -241,6 +245,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { // - redis // - postgresql // - kafka +// - webhook func unmarshalSqsARN(queueARN string) (mSqs arnSQS) { mSqs = arnSQS{} if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") { @@ -260,6 +265,8 @@ func unmarshalSqsARN(queueARN string) (mSqs arnSQS) { mSqs.Type = queueTypePostgreSQL case strings.HasSuffix(sqsType, queueTypeKafka): mSqs.Type = queueTypeKafka + case strings.HasSuffix(sqsType, queueTypeWebhook): + mSqs.Type = queueTypeWebhook } // Add more queues here. mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type) return mSqs diff --git a/cmd/bucket-notification-utils_test.go b/cmd/bucket-notification-utils_test.go index 64130e781..0a59f30f2 100644 --- a/cmd/bucket-notification-utils_test.go +++ b/cmd/bucket-notification-utils_test.go @@ -228,6 +228,12 @@ func TestQueueARN(t *testing.T) { queueARN string errCode APIErrorCode }{ + + // Valid webhook queue arn. + { + queueARN: "arn:minio:sqs:us-east-1:1:webhook", + errCode: ErrNone, + }, // Valid redis queue arn. { queueARN: "arn:minio:sqs:us-east-1:1:redis", @@ -306,6 +312,11 @@ func TestUnmarshalSQSARN(t *testing.T) { queueARN string Type string }{ + // Valid webhook queue arn. + { + queueARN: "arn:minio:sqs:us-east-1:1:webhook", + Type: "webhook", + }, // Valid redis queue arn. { queueARN: "arn:minio:sqs:us-east-1:1:redis", diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index 8f6bccf4e..acf72f017 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -70,6 +70,10 @@ func migrateConfig() error { if err := migrateV11ToV12(); err != nil { return err } + // Migration version '12' to '13'. + if err := migrateV12ToV13(); err != nil { + return err + } return nil } @@ -836,3 +840,97 @@ func migrateV11ToV12() error { ) return nil } + +// Version '12' to '13' migration. Add support for custom webhook endpoint. +func migrateV12ToV13() error { + cv12, err := loadConfigV12() + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("Unable to load config version ‘12’. %v", err) + } + if cv12.Version != "12" { + return nil + } + + // Copy over fields from V12 into V13 config struct + srvConfig := &serverConfigV13{} + srvConfig.Version = "13" + srvConfig.Credential = cv12.Credential + srvConfig.Region = cv12.Region + if srvConfig.Region == "" { + // Region needs to be set for AWS Signature Version 4. + srvConfig.Region = "us-east-1" + } + srvConfig.Logger.Console = cv12.Logger.Console + srvConfig.Logger.File = cv12.Logger.File + + // check and set notifiers config + if len(cv12.Notify.AMQP) == 0 { + srvConfig.Notify.AMQP = make(map[string]amqpNotify) + srvConfig.Notify.AMQP["1"] = amqpNotify{} + } else { + srvConfig.Notify.AMQP = cv12.Notify.AMQP + } + if len(cv12.Notify.ElasticSearch) == 0 { + srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify) + srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{} + } else { + srvConfig.Notify.ElasticSearch = cv12.Notify.ElasticSearch + } + if len(cv12.Notify.Redis) == 0 { + srvConfig.Notify.Redis = make(map[string]redisNotify) + srvConfig.Notify.Redis["1"] = redisNotify{} + } else { + srvConfig.Notify.Redis = cv12.Notify.Redis + } + if len(cv12.Notify.PostgreSQL) == 0 { + srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify) + srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{} + } else { + srvConfig.Notify.PostgreSQL = cv12.Notify.PostgreSQL + } + if len(cv12.Notify.Kafka) == 0 { + srvConfig.Notify.Kafka = make(map[string]kafkaNotify) + srvConfig.Notify.Kafka["1"] = kafkaNotify{} + } else { + srvConfig.Notify.Kafka = cv12.Notify.Kafka + } + if len(cv12.Notify.NATS) == 0 { + srvConfig.Notify.NATS = make(map[string]natsNotify) + srvConfig.Notify.NATS["1"] = natsNotify{} + } else { + srvConfig.Notify.NATS = cv12.Notify.NATS + } + + // V12 will not have a webhook config. So we initialize one here. + srvConfig.Notify.Webhook = make(map[string]webhookNotify) + srvConfig.Notify.Webhook["1"] = webhookNotify{} + + qc, err := quick.New(srvConfig) + if err != nil { + return fmt.Errorf("Unable to initialize the quick config. %v", + err) + } + configFile, err := getConfigFile() + if err != nil { + return fmt.Errorf("Unable to get config file. %v", err) + } + + err = qc.Save(configFile) + if err != nil { + return fmt.Errorf( + "Failed to migrate config from ‘"+ + cv12.Version+"’ to ‘"+srvConfig.Version+ + "’ failed. %v", err, + ) + } + + console.Println( + "Migration from version ‘" + + cv12.Version + "’ to ‘" + srvConfig.Version + + "’ completed successfully.", + ) + return nil +} diff --git a/cmd/config-migrate_test.go b/cmd/config-migrate_test.go index d4897e049..e5478cd00 100644 --- a/cmd/config-migrate_test.go +++ b/cmd/config-migrate_test.go @@ -101,7 +101,10 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) { t.Fatal("migrate v10 to v11 should succeed when no config file is found") } if err := migrateV11ToV12(); err != nil { - t.Fatal("migrate v10 to v11 should succeed when no config file is found") + t.Fatal("migrate v11 to v12 should succeed when no config file is found") + } + if err := migrateV12ToV13(); err != nil { + t.Fatal("migrate v12 to v13 should succeed when no config file is found") } } @@ -212,5 +215,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) { if err := migrateV11ToV12(); err == nil { t.Fatal("migrateConfigV11ToV12() should fail with a corrupted json") } - + if err := migrateV12ToV13(); err == nil { + t.Fatal("migrateConfigV12ToV13() should fail with a corrupted json") + } } diff --git a/cmd/config-old.go b/cmd/config-old.go index 30b7fd460..2914dd597 100644 --- a/cmd/config-old.go +++ b/cmd/config-old.go @@ -325,7 +325,8 @@ func loadConfigV6() (*configV6, error) { return c, nil } -// Notifier represents collection of supported notification queues. +// Notifier represents collection of supported notification queues in version +// 1 without NATS streaming. type notifierV1 struct { AMQP map[string]amqpNotify `json:"amqp"` NATS map[string]natsNotifyV1 `json:"nats"` @@ -335,6 +336,17 @@ type notifierV1 struct { Kafka map[string]kafkaNotify `json:"kafka"` } +// Notifier represents collection of supported notification queues in version 2 +// with NATS streaming but without webhook. +type notifierV2 struct { + AMQP map[string]amqpNotify `json:"amqp"` + NATS map[string]natsNotify `json:"nats"` + ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"` + Redis map[string]redisNotify `json:"redis"` + PostgreSQL map[string]postgreSQLNotify `json:"postgresql"` + Kafka map[string]kafkaNotify `json:"kafka"` +} + // configV7 server configuration version '7'. type serverConfigV7 struct { Version string `json:"version"` @@ -538,3 +550,39 @@ func loadConfigV11() (*serverConfigV11, error) { } return srvCfg, nil } + +// serverConfigV12 server configuration version '12' which is like +// version '11' except it adds support for NATS streaming notifications. +type serverConfigV12 struct { + Version string `json:"version"` + + // S3 API configuration. + Credential credential `json:"credential"` + Region string `json:"region"` + + // Additional error logging configuration. + Logger logger `json:"logger"` + + // Notification queue configuration. + Notify notifierV2 `json:"notify"` +} + +func loadConfigV12() (*serverConfigV12, error) { + configFile, err := getConfigFile() + if err != nil { + return nil, err + } + if _, err = os.Stat(configFile); err != nil { + return nil, err + } + srvCfg := &serverConfigV12{} + srvCfg.Version = "12" + qc, err := quick.New(srvCfg) + if err != nil { + return nil, err + } + if err := qc.Load(configFile); err != nil { + return nil, err + } + return srvCfg, nil +} diff --git a/cmd/config-v12.go b/cmd/config-v13.go similarity index 72% rename from cmd/config-v12.go rename to cmd/config-v13.go index 017529303..b29419699 100644 --- a/cmd/config-v12.go +++ b/cmd/config-v13.go @@ -26,9 +26,9 @@ import ( // Read Write mutex for safe access to ServerConfig. var serverConfigMu sync.RWMutex -// serverConfigV12 server configuration version '12' which is like -// version '11' except it adds support for NATS streaming notifications. -type serverConfigV12 struct { +// serverConfigV13 server configuration version '13' which is like +// version '12' except it adds support for webhook notification. +type serverConfigV13 struct { Version string `json:"version"` // S3 API configuration. @@ -47,7 +47,7 @@ type serverConfigV12 struct { func initConfig() (bool, error) { if !isConfigFileExists() { // Initialize server config. - srvCfg := &serverConfigV12{} + srvCfg := &serverConfigV13{} srvCfg.Version = globalMinioConfigVersion srvCfg.Region = "us-east-1" srvCfg.Credential = newCredential() @@ -71,12 +71,15 @@ func initConfig() (bool, error) { srvCfg.Notify.PostgreSQL["1"] = postgreSQLNotify{} srvCfg.Notify.Kafka = make(map[string]kafkaNotify) srvCfg.Notify.Kafka["1"] = kafkaNotify{} + srvCfg.Notify.Webhook = make(map[string]webhookNotify) + srvCfg.Notify.Webhook["1"] = webhookNotify{} // Create config path. err := createConfigPath() if err != nil { return false, err } + // hold the mutex lock before a new config is assigned. // Save the new config globally. // unlock the mutex. @@ -94,7 +97,7 @@ func initConfig() (bool, error) { if _, err = os.Stat(configFile); err != nil { return false, err } - srvCfg := &serverConfigV12{} + srvCfg := &serverConfigV13{} srvCfg.Version = globalMinioConfigVersion qc, err := quick.New(srvCfg) if err != nil { @@ -116,10 +119,10 @@ func initConfig() (bool, error) { } // serverConfig server config. -var serverConfig *serverConfigV12 +var serverConfig *serverConfigV13 // GetVersion get current config version. -func (s serverConfigV12) GetVersion() string { +func (s serverConfigV13) GetVersion() string { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -128,14 +131,14 @@ func (s serverConfigV12) GetVersion() string { /// Logger related. -func (s *serverConfigV12) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) { +func (s *serverConfigV13) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) { serverConfigMu.Lock() defer serverConfigMu.Unlock() s.Notify.AMQP[accountID] = amqpn } -func (s serverConfigV12) GetAMQP() map[string]amqpNotify { +func (s serverConfigV13) GetAMQP() map[string]amqpNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -143,7 +146,7 @@ func (s serverConfigV12) GetAMQP() map[string]amqpNotify { } // GetAMQPNotify get current AMQP logger. -func (s serverConfigV12) GetAMQPNotifyByID(accountID string) amqpNotify { +func (s serverConfigV13) GetAMQPNotifyByID(accountID string) amqpNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -151,35 +154,35 @@ func (s serverConfigV12) GetAMQPNotifyByID(accountID string) amqpNotify { } // -func (s *serverConfigV12) SetNATSNotifyByID(accountID string, natsn natsNotify) { +func (s *serverConfigV13) SetNATSNotifyByID(accountID string, natsn natsNotify) { serverConfigMu.Lock() defer serverConfigMu.Unlock() s.Notify.NATS[accountID] = natsn } -func (s serverConfigV12) GetNATS() map[string]natsNotify { +func (s serverConfigV13) GetNATS() map[string]natsNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() return s.Notify.NATS } // GetNATSNotify get current NATS logger. -func (s serverConfigV12) GetNATSNotifyByID(accountID string) natsNotify { +func (s serverConfigV13) GetNATSNotifyByID(accountID string) natsNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() return s.Notify.NATS[accountID] } -func (s *serverConfigV12) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) { +func (s *serverConfigV13) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) { serverConfigMu.Lock() defer serverConfigMu.Unlock() s.Notify.ElasticSearch[accountID] = esNotify } -func (s serverConfigV12) GetElasticSearch() map[string]elasticSearchNotify { +func (s serverConfigV13) GetElasticSearch() map[string]elasticSearchNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -187,50 +190,72 @@ func (s serverConfigV12) GetElasticSearch() map[string]elasticSearchNotify { } // GetElasticSearchNotify get current ElasicSearch logger. -func (s serverConfigV12) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify { +func (s serverConfigV13) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() return s.Notify.ElasticSearch[accountID] } -func (s *serverConfigV12) SetRedisNotifyByID(accountID string, rNotify redisNotify) { +func (s *serverConfigV13) SetRedisNotifyByID(accountID string, rNotify redisNotify) { serverConfigMu.Lock() defer serverConfigMu.Unlock() s.Notify.Redis[accountID] = rNotify } -func (s serverConfigV12) GetRedis() map[string]redisNotify { +func (s serverConfigV13) GetRedis() map[string]redisNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() return s.Notify.Redis } +func (s serverConfigV13) GetWebhook() map[string]webhookNotify { + serverConfigMu.RLock() + defer serverConfigMu.RUnlock() + + return s.Notify.Webhook +} + +// GetWebhookNotifyByID get current Webhook logger. +func (s serverConfigV13) GetWebhookNotifyByID(accountID string) webhookNotify { + serverConfigMu.RLock() + defer serverConfigMu.RUnlock() + + return s.Notify.Webhook[accountID] +} + +func (s *serverConfigV13) SetWebhookNotifyByID(accountID string, pgn webhookNotify) { + serverConfigMu.Lock() + defer serverConfigMu.Unlock() + + s.Notify.Webhook[accountID] = pgn +} + // GetRedisNotify get current Redis logger. -func (s serverConfigV12) GetRedisNotifyByID(accountID string) redisNotify { +func (s serverConfigV13) GetRedisNotifyByID(accountID string) redisNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() return s.Notify.Redis[accountID] } -func (s *serverConfigV12) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) { +func (s *serverConfigV13) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) { serverConfigMu.Lock() defer serverConfigMu.Unlock() s.Notify.PostgreSQL[accountID] = pgn } -func (s serverConfigV12) GetPostgreSQL() map[string]postgreSQLNotify { +func (s serverConfigV13) GetPostgreSQL() map[string]postgreSQLNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() return s.Notify.PostgreSQL } -func (s serverConfigV12) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify { +func (s serverConfigV13) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -238,21 +263,21 @@ func (s serverConfigV12) GetPostgreSQLNotifyByID(accountID string) postgreSQLNot } // Kafka related functions -func (s *serverConfigV12) SetKafkaNotifyByID(accountID string, kn kafkaNotify) { +func (s *serverConfigV13) SetKafkaNotifyByID(accountID string, kn kafkaNotify) { serverConfigMu.Lock() defer serverConfigMu.Unlock() s.Notify.Kafka[accountID] = kn } -func (s serverConfigV12) GetKafka() map[string]kafkaNotify { +func (s serverConfigV13) GetKafka() map[string]kafkaNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() return s.Notify.Kafka } -func (s serverConfigV12) GetKafkaNotifyByID(accountID string) kafkaNotify { +func (s serverConfigV13) GetKafkaNotifyByID(accountID string) kafkaNotify { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -260,7 +285,7 @@ func (s serverConfigV12) GetKafkaNotifyByID(accountID string) kafkaNotify { } // SetFileLogger set new file logger. -func (s *serverConfigV12) SetFileLogger(flogger fileLogger) { +func (s *serverConfigV13) SetFileLogger(flogger fileLogger) { serverConfigMu.Lock() defer serverConfigMu.Unlock() @@ -268,7 +293,7 @@ func (s *serverConfigV12) SetFileLogger(flogger fileLogger) { } // GetFileLogger get current file logger. -func (s serverConfigV12) GetFileLogger() fileLogger { +func (s serverConfigV13) GetFileLogger() fileLogger { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -276,7 +301,7 @@ func (s serverConfigV12) GetFileLogger() fileLogger { } // SetConsoleLogger set new console logger. -func (s *serverConfigV12) SetConsoleLogger(clogger consoleLogger) { +func (s *serverConfigV13) SetConsoleLogger(clogger consoleLogger) { serverConfigMu.Lock() defer serverConfigMu.Unlock() @@ -284,7 +309,7 @@ func (s *serverConfigV12) SetConsoleLogger(clogger consoleLogger) { } // GetConsoleLogger get current console logger. -func (s serverConfigV12) GetConsoleLogger() consoleLogger { +func (s serverConfigV13) GetConsoleLogger() consoleLogger { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -292,7 +317,7 @@ func (s serverConfigV12) GetConsoleLogger() consoleLogger { } // SetRegion set new region. -func (s *serverConfigV12) SetRegion(region string) { +func (s *serverConfigV13) SetRegion(region string) { serverConfigMu.Lock() defer serverConfigMu.Unlock() @@ -300,7 +325,7 @@ func (s *serverConfigV12) SetRegion(region string) { } // GetRegion get current region. -func (s serverConfigV12) GetRegion() string { +func (s serverConfigV13) GetRegion() string { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -308,7 +333,7 @@ func (s serverConfigV12) GetRegion() string { } // SetCredentials set new credentials. -func (s *serverConfigV12) SetCredential(creds credential) { +func (s *serverConfigV13) SetCredential(creds credential) { serverConfigMu.Lock() defer serverConfigMu.Unlock() @@ -316,7 +341,7 @@ func (s *serverConfigV12) SetCredential(creds credential) { } // GetCredentials get current credentials. -func (s serverConfigV12) GetCredential() credential { +func (s serverConfigV13) GetCredential() credential { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -324,7 +349,7 @@ func (s serverConfigV12) GetCredential() credential { } // Save config. -func (s serverConfigV12) Save() error { +func (s serverConfigV13) Save() error { serverConfigMu.RLock() defer serverConfigMu.RUnlock() diff --git a/cmd/config-v12_test.go b/cmd/config-v13_test.go similarity index 92% rename from cmd/config-v12_test.go rename to cmd/config-v13_test.go index 755976d43..5a80e43aa 100644 --- a/cmd/config-v12_test.go +++ b/cmd/config-v13_test.go @@ -67,6 +67,13 @@ func TestServerConfig(t *testing.T) { t.Errorf("Expecting Kafka config %#v found %#v", kafkaNotify{}, savedNotifyCfg4) } + // Set new Webhook notification id. + serverConfig.SetWebhookNotifyByID("2", webhookNotify{}) + savedNotifyCfg5 := serverConfig.GetWebhookNotifyByID("2") + if !reflect.DeepEqual(savedNotifyCfg5, webhookNotify{}) { + t.Errorf("Expecting Webhook config %#v found %#v", webhookNotify{}, savedNotifyCfg3) + } + // Set new console logger. serverConfig.SetConsoleLogger(consoleLogger{ Enable: true, diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 85c600242..fa3eb5b7b 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -612,6 +612,28 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { } queueTargets[queueARN] = redisLog } + + // Load Webhook targets, initialize their respective loggers. + for accountID, webhookN := range serverConfig.GetWebhook() { + if !webhookN.Enable { + continue + } + // Construct the queue ARN for Webhook. + queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeWebhook + _, ok := queueTargets[queueARN] + if ok { + continue + } + + // Using accountID we can now initialize a new Webhook logrus instance. + webhookLog, err := newWebhookNotify(accountID) + if err != nil { + + return nil, err + } + queueTargets[queueARN] = webhookLog + } + // Load elastic targets, initialize their respective loggers. for accountID, elasticN := range serverConfig.GetElasticSearch() { if !elasticN.Enable { @@ -637,6 +659,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { } queueTargets[queueARN] = elasticLog } + // Load PostgreSQL targets, initialize their respective loggers. for accountID, pgN := range serverConfig.GetPostgreSQL() { if !pgN.Enable { diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index ee842676a..45e8a0b02 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -77,6 +77,99 @@ func TestInitEventNotifierFaultyDisks(t *testing.T) { } } +// InitEventNotifierWithPostgreSQL - tests InitEventNotifier when PostgreSQL is not prepared +func TestInitEventNotifierWithPostgreSQL(t *testing.T) { + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root directory after the test ends. + defer removeAll(rootPath) + + disks, err := getRandomDisks(1) + defer removeAll(disks[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + endpoints, err := parseStorageEndpoints(disks) + if err != nil { + t.Fatal(err) + } + fs, _, err := initObjectLayer(endpoints) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + serverConfig.SetPostgreSQLNotifyByID("1", postgreSQLNotify{Enable: true}) + if err := initEventNotifier(fs); err == nil { + t.Fatal("PostgreSQL config didn't fail.") + } +} + +// InitEventNotifierWithNATS - tests InitEventNotifier when NATS is not prepared +func TestInitEventNotifierWithNATS(t *testing.T) { + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root directory after the test ends. + defer removeAll(rootPath) + + disks, err := getRandomDisks(1) + defer removeAll(disks[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + endpoints, err := parseStorageEndpoints(disks) + if err != nil { + t.Fatal(err) + } + fs, _, err := initObjectLayer(endpoints) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + serverConfig.SetNATSNotifyByID("1", natsNotify{Enable: true}) + if err := initEventNotifier(fs); err == nil { + t.Fatal("NATS config didn't fail.") + } +} + +// InitEventNotifierWithWebHook - tests InitEventNotifier when WebHook is not prepared +func TestInitEventNotifierWithWebHook(t *testing.T) { + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root directory after the test ends. + defer removeAll(rootPath) + + disks, err := getRandomDisks(1) + defer removeAll(disks[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + endpoints, err := parseStorageEndpoints(disks) + if err != nil { + t.Fatal(err) + } + fs, _, err := initObjectLayer(endpoints) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + serverConfig.SetWebhookNotifyByID("1", webhookNotify{Enable: true}) + if err := initEventNotifier(fs); err == nil { + t.Fatal("WebHook config didn't fail.") + } +} + // InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared func TestInitEventNotifierWithAMQP(t *testing.T) { // initialize the server and obtain the credentials and root. diff --git a/cmd/globals.go b/cmd/globals.go index 0f0eb7476..419886e00 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -19,6 +19,7 @@ package cmd import ( "crypto/x509" "os" + "runtime" "strings" "time" @@ -36,7 +37,7 @@ const ( // minio configuration related constants. const ( - globalMinioConfigVersion = "12" + globalMinioConfigVersion = "13" globalMinioConfigDir = ".minio" globalMinioCertsDir = "certs" globalMinioCertsCADir = "CAs" @@ -96,6 +97,9 @@ var ( // List of admin peers. globalAdminPeers = adminPeers{} + // Minio server user agent string. + globalServerUserAgent = "Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")" + // Add new variable global values here. ) diff --git a/cmd/notifiers.go b/cmd/notifiers.go index 8e65b9d9a..b7b5ec496 100644 --- a/cmd/notifiers.go +++ b/cmd/notifiers.go @@ -40,6 +40,8 @@ const ( queueTypePostgreSQL = "postgresql" // Static string indicating queue type 'kafka'. queueTypeKafka = "kafka" + // Static string for Webhooks + queueTypeWebhook = "webhook" ) // Topic type. @@ -61,6 +63,7 @@ type notifier struct { Redis map[string]redisNotify `json:"redis"` PostgreSQL map[string]postgreSQLNotify `json:"postgresql"` Kafka map[string]kafkaNotify `json:"kafka"` + Webhook map[string]webhookNotify `json:"webhook"` // Add new notification queues. } @@ -102,6 +105,18 @@ func isNATSQueue(sqsArn arnSQS) bool { return true } +// Returns true if queueArn is for an Webhook queue +func isWebhookQueue(sqsArn arnSQS) bool { + if sqsArn.Type != queueTypeWebhook { + return false + } + rNotify := serverConfig.GetWebhookNotifyByID(sqsArn.AccountID) + if !rNotify.Enable { + return false + } + return true +} + // Returns true if queueArn is for an Redis queue. func isRedisQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeRedis { diff --git a/cmd/notify-amqp.go b/cmd/notify-amqp.go index ac5698d62..a5b87f72d 100644 --- a/cmd/notify-amqp.go +++ b/cmd/notify-amqp.go @@ -82,7 +82,7 @@ func newAMQPNotify(accountID string) (*logrus.Logger, error) { return amqpLog, nil } -// Fire is called when an event should be sent to the message broker.k +// Fire is called when an event should be sent to the message broker. func (q amqpConn) Fire(entry *logrus.Entry) error { ch, err := q.Connection.Channel() if err != nil { diff --git a/cmd/notify-webhook.go b/cmd/notify-webhook.go new file mode 100644 index 000000000..10904672e --- /dev/null +++ b/cmd/notify-webhook.go @@ -0,0 +1,136 @@ +/* + * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" + + "github.com/Sirupsen/logrus" +) + +type webhookNotify struct { + Enable bool `json:"enable"` + Endpoint string `json:"endpoint"` +} + +type httpConn struct { + *http.Client + Endpoint string +} + +// Lookup host address by dialing. +func lookupHost(addr string) error { + dialer := &net.Dialer{ + Timeout: 300 * time.Millisecond, + KeepAlive: 300 * time.Millisecond, + } + nconn, err := dialer.Dial("tcp", addr) + if err != nil { + return err + } + return nconn.Close() +} + +// Initializes new webhook logrus notifier. +func newWebhookNotify(accountID string) (*logrus.Logger, error) { + rNotify := serverConfig.GetWebhookNotifyByID(accountID) + + if rNotify.Endpoint == "" { + return nil, errInvalidArgument + } + + u, err := url.Parse(rNotify.Endpoint) + if err != nil { + return nil, err + } + + if err = lookupHost(u.Host); err != nil { + return nil, err + } + + conn := httpConn{ + // Configure aggressive timeouts for client posts. + Client: &http.Client{ + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 5 * time.Second, + KeepAlive: 5 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 3 * time.Second, + ResponseHeaderTimeout: 3 * time.Second, + ExpectContinueTimeout: 2 * time.Second, + }, + }, + Endpoint: rNotify.Endpoint, + } + + notifyLog := logrus.New() + notifyLog.Out = ioutil.Discard + + // Set default JSON formatter. + notifyLog.Formatter = new(logrus.JSONFormatter) + + notifyLog.Hooks.Add(conn) + + // Success + return notifyLog, nil +} + +// Fire is called when an event should be sent to the message broker. +func (n httpConn) Fire(entry *logrus.Entry) error { + body, err := entry.Reader() + if err != nil { + return err + } + + req, err := http.NewRequest("POST", n.Endpoint, body) + if err != nil { + return err + } + + // Set content-type. + req.Header.Set("Content-Type", "application/json") + + // Set proper server user-agent. + req.Header.Set("User-Agent", globalServerUserAgent) + + // Initiate the http request. + resp, err := n.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK && + resp.StatusCode != http.StatusAccepted && + resp.StatusCode != http.StatusContinue { + return fmt.Errorf("Unable to send event %s", resp.Status) + } + + return nil +} + +// Levels are Required for logrus hook implementation +func (httpConn) Levels() []logrus.Level { + return []logrus.Level{ + logrus.InfoLevel, + } +} diff --git a/cmd/notify-webhook_test.go b/cmd/notify-webhook_test.go new file mode 100644 index 000000000..3f298694f --- /dev/null +++ b/cmd/notify-webhook_test.go @@ -0,0 +1,79 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "path" + "testing" + + "github.com/Sirupsen/logrus" +) + +// Custom post handler to handle POST requests. +type postHandler struct{} + +func (p postHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, fmt.Sprintf("Unexpected method %s", r.Method), http.StatusBadRequest) + return + } + io.Copy(w, r.Body) +} + +// Tests web hook initialization. +func TestNewWebHookNotify(t *testing.T) { + root, err := newTestConfig("us-east-1") + if err != nil { + t.Fatal(err) + } + defer removeAll(root) + + _, err = newWebhookNotify("1") + if err == nil { + t.Fatal("Unexpected should fail") + } + + serverConfig.SetWebhookNotifyByID("10", webhookNotify{Enable: true, Endpoint: "http://www."}) + _, err = newWebhookNotify("10") + if err == nil { + t.Fatal("Unexpected should fail with lookupHost") + } + + serverConfig.SetWebhookNotifyByID("15", webhookNotify{Enable: true, Endpoint: "http://%"}) + _, err = newWebhookNotify("15") + if err == nil { + t.Fatal("Unexpected should fail with invalid URL escape") + } + + server := httptest.NewServer(postHandler{}) + defer server.Close() + + serverConfig.SetWebhookNotifyByID("20", webhookNotify{Enable: true, Endpoint: server.URL}) + webhook, err := newWebhookNotify("20") + if err != nil { + t.Fatal("Unexpected shouldn't fail", err) + } + + webhook.WithFields(logrus.Fields{ + "Key": path.Join("bucket", "object"), + "EventType": "s3:ObjectCreated:Put", + }).Info() +} diff --git a/cmd/server_test.go b/cmd/server_test.go index 5f6fa9b83..04af7e129 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -100,7 +100,37 @@ func (s *TestSuiteCommon) TestAuth(c *C) { c.Assert(len(cred.SecretKey), Equals, secretKeyMaxLen) } -func (s *TestSuiteCommon) TestBucketSQSNotification(c *C) { +func (s *TestSuiteCommon) TestBucketSQSNotificationWebHook(c *C) { + // Sample bucket notification. + bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sqs:us-east-1:444455556666:webhook` + // generate a random bucket Name. + bucketName := getRandomBucketName() + // HTTP request to create the bucket. + request, err := newTestSignedRequest("PUT", getMakeBucketURL(s.endPoint, bucketName), + 0, nil, s.accessKey, s.secretKey, s.signer) + c.Assert(err, IsNil) + + client := http.Client{Transport: s.transport} + // execute the request. + response, err := client.Do(request) + c.Assert(err, IsNil) + + // assert the http response status code. + c.Assert(response.StatusCode, Equals, http.StatusOK) + + request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), + int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey, s.signer) + c.Assert(err, IsNil) + + client = http.Client{Transport: s.transport} + // execute the HTTP request. + response, err = client.Do(request) + + c.Assert(err, IsNil) + verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest) +} + +func (s *TestSuiteCommon) TestBucketSQSNotificationAMQP(c *C) { // Sample bucket notification. bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sqs:us-east-1:444455556666:amqp` // generate a random bucket Name.