/* * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "errors" "fmt" "github.com/minio/minio/pkg/wildcard" ) // SQS type. const ( // Minio sqs ARN prefix. minioSqs = "arn:minio:sqs:" // Static string indicating queue type 'amqp'. queueTypeAMQP = "amqp" // Static string indicating queue type 'mqtt'. queueTypeMQTT = "mqtt" // Static string indicating queue type 'nats'. queueTypeNATS = "nats" // Static string indicating queue type 'elasticsearch'. queueTypeElastic = "elasticsearch" // Static string indicating queue type 'redis'. queueTypeRedis = "redis" // Static string indicating queue type 'postgresql'. queueTypePostgreSQL = "postgresql" // Static string indicating queue type 'mysql'. queueTypeMySQL = "mysql" // Static string indicating queue type 'kafka'. queueTypeKafka = "kafka" // Static string for Webhooks queueTypeWebhook = "webhook" // Notifier format value constants formatNamespace = "namespace" formatAccess = "access" ) // Topic type. const ( // Minio topic ARN prefix. minioTopic = "arn:minio:sns:" // Static string indicating sns type 'listen'. snsTypeMinio = "listen" ) var errNotifyNotEnabled = errors.New("requested notifier not enabled") // Returns true if queueArn is for an AMQP queue. func isAMQPQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeAMQP { return false } amqpL := serverConfig.Notify.GetAMQPByID(sqsArn.AccountID) if !amqpL.Enable { return false } // Connect to amqp server to validate. amqpC, err := dialAMQP(amqpL) if err != nil { errorIf(err, "Unable to connect to amqp service. %#v", amqpL) return false } defer amqpC.conn.Close() return true } // Returns true if mqttARN is for an MQTT queue. func isMQTTQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeMQTT { return false } mqttL := serverConfig.Notify.GetMQTTByID(sqsArn.AccountID) if !mqttL.Enable { return false } // Connect to mqtt server to validate. mqttC, err := dialMQTT(mqttL) if err != nil { errorIf(err, "Unable to connect to mqtt service. %#v", mqttL) return false } defer mqttC.Client.Disconnect(250) return true } // Returns true if natsArn is for an NATS queue. func isNATSQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeNATS { return false } natsL := serverConfig.Notify.GetNATSByID(sqsArn.AccountID) if !natsL.Enable { return false } // Connect to nats server to validate. natsC, err := dialNATS(natsL, true) if err != nil { errorIf(err, "Unable to connect to nats service. %#v", natsL) return false } closeNATS(natsC) return true } // Returns true if queueArn is for an Webhook queue func isWebhookQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeWebhook { return false } rNotify := serverConfig.Notify.GetWebhookByID(sqsArn.AccountID) return rNotify.Enable } // Returns true if queueArn is for an Redis queue. func isRedisQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeRedis { return false } rNotify := serverConfig.Notify.GetRedisByID(sqsArn.AccountID) if !rNotify.Enable { return false } // Connect to redis server to validate. rPool, err := dialRedis(rNotify) if err != nil { errorIf(err, "Unable to connect to redis service. %#v", rNotify) return false } defer rPool.Close() return true } // Returns true if queueArn is for an ElasticSearch queue. func isElasticQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeElastic { return false } esNotify := serverConfig.Notify.GetElasticSearchByID(sqsArn.AccountID) if !esNotify.Enable { return false } elasticC, err := dialElastic(esNotify) if err != nil { errorIf(err, "Unable to connect to elasticsearch service %#v", esNotify) return false } defer elasticC.Stop() return true } // Returns true if queueArn is for PostgreSQL. func isPostgreSQLQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypePostgreSQL { return false } pgNotify := serverConfig.Notify.GetPostgreSQLByID(sqsArn.AccountID) if !pgNotify.Enable { return false } pgC, err := dialPostgreSQL(pgNotify) if err != nil { errorIf(err, "Unable to connect to PostgreSQL server %#v", pgNotify) return false } defer pgC.Close() return true } // Returns true if queueArn is for MySQL. func isMySQLQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeMySQL { return false } msqlNotify := serverConfig.Notify.GetMySQLByID(sqsArn.AccountID) if !msqlNotify.Enable { return false } myC, err := dialMySQL(msqlNotify) if err != nil { errorIf(err, "Unable to connect to MySQL server %#v", msqlNotify) return false } defer myC.Close() return true } // Returns true if queueArn is for Kafka. func isKafkaQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeKafka { return false } kafkaNotifyCfg := serverConfig.Notify.GetKafkaByID(sqsArn.AccountID) if !kafkaNotifyCfg.Enable { return false } kafkaC, err := dialKafka(kafkaNotifyCfg) if err != nil { errorIf(err, "Unable to dial Kafka server %#v", kafkaNotifyCfg) return false } defer kafkaC.Close() return true } // Match function matches wild cards in 'pattern' for events. func eventMatch(eventType string, events []string) (ok bool) { for _, event := range events { ok = wildcard.MatchSimple(event, eventType) if ok { break } } return ok } // Filter rule match, matches an object against the filter rules. func filterRuleMatch(object string, frs []filterRule) bool { var prefixMatch, suffixMatch = true, true for _, fr := range frs { if isValidFilterNamePrefix(fr.Name) { prefixMatch = hasPrefix(object, fr.Value) } else if isValidFilterNameSuffix(fr.Name) { suffixMatch = hasSuffix(object, fr.Value) } } return prefixMatch && suffixMatch } // A type to represent dynamic error generation functions for // notifications. type notificationErrorFactoryFunc func(string, ...interface{}) error // A function to build dynamic error generation functions for // notifications by setting an error prefix string. func newNotificationErrorFactory(prefix string) notificationErrorFactoryFunc { return func(msg string, a ...interface{}) error { s := fmt.Sprintf(msg, a...) return fmt.Errorf("%s: %s", prefix, s) } }