diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 9db12cccf..1aee1c626 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -537,6 +537,28 @@ func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationCon return nConfigs, lConfigs, nil } +// addQueueTarget - calls newTargetFunc function and adds its returned value to queueTargets +func addQueueTarget(queueTargets map[string]*logrus.Logger, + accountID, queueType string, + newTargetFunc func(string) (*logrus.Logger, error)) (string, error) { + + // Construct the queue ARN for AMQP. + queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueType + + // Queue target if already initialized we move to the next ARN. + if _, ok := queueTargets[queueARN]; ok { + return queueARN, nil + } + + // Using accountID we can now initialize a new AMQP logrus instance. + logger, err := newTargetFunc(accountID) + if err == nil { + queueTargets[queueARN] = logger + } + + return queueARN, err +} + // Loads all queue targets, initializes each queueARNs depending on their config. // Each instance of queueARN registers its own logrus to communicate with the // queue service. QueueARN once initialized is not initialized again for the @@ -548,54 +570,37 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { if !amqpN.Enable { continue } - // Construct the queue ARN for AMQP. - queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeAMQP - // Queue target if already initialized we move to the next ARN. - _, ok := queueTargets[queueARN] - if ok { - continue - } - // Using accountID we can now initialize a new AMQP logrus instance. - amqpLog, err := newAMQPNotify(accountID) - if err != nil { - // Encapsulate network error to be more informative. + + if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeAMQP, newAMQPNotify); err != nil { if _, ok := err.(net.Error); ok { - return nil, &net.OpError{ + err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } + return nil, err } - queueTargets[queueARN] = amqpLog } + // Load all nats targets, initialize their respective loggers. for accountID, natsN := range serverConfig.Notify.GetNATS() { if !natsN.Enable { continue } - // Construct the queue ARN for NATS. - queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeNATS - // Queue target if already initialized we move to the next ARN. - _, ok := queueTargets[queueARN] - if ok { - continue - } - // Using accountID we can now initialize a new NATS logrus instance. - natsLog, err := newNATSNotify(accountID) - if err != nil { - // Encapsulate network error to be more informative. + + if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeNATS, newNATSNotify); err != nil { if _, ok := err.(net.Error); ok { - return nil, &net.OpError{ + err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } + return nil, err } - queueTargets[queueARN] = natsLog } // Load redis targets, initialize their respective loggers. @@ -603,27 +608,18 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { if !redisN.Enable { continue } - // Construct the queue ARN for Redis. - queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeRedis - // Queue target if already initialized we move to the next ARN. - _, ok := queueTargets[queueARN] - if ok { - continue - } - // Using accountID we can now initialize a new Redis logrus instance. - redisLog, err := newRedisNotify(accountID) - if err != nil { - // Encapsulate network error to be more informative. + + if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeRedis, newRedisNotify); err != nil { if _, ok := err.(net.Error); ok { - return nil, &net.OpError{ + err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } + return nil, err } - queueTargets[queueARN] = redisLog } // Load Webhook targets, initialize their respective loggers. @@ -631,20 +627,10 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { if !webhookN.Enable { continue } - // Construct the queue ARN for Webhook. - queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeWebhook - _, ok := queueTargets[queueARN] - if ok { - continue - } - - // Using accountID we can now initialize a new Webhook logrus instance. - webhookLog, err := newWebhookNotify(accountID) - if err != nil { + if _, err := addQueueTarget(queueTargets, accountID, queueTypeWebhook, newWebhookNotify); err != nil { return nil, err } - queueTargets[queueARN] = webhookLog } // Load elastic targets, initialize their respective loggers. @@ -652,25 +638,18 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { if !elasticN.Enable { continue } - // Construct the queue ARN for Elastic. - queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeElastic - _, ok := queueTargets[queueARN] - if ok { - continue - } - // Using accountID we can now initialize a new ElasticSearch logrus instance. - elasticLog, err := newElasticNotify(accountID) - if err != nil { - // Encapsulate network error to be more informative. + + if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeElastic, newElasticNotify); err != nil { if _, ok := err.(net.Error); ok { - return nil, &net.OpError{ - Op: "Connecting to " + queueARN, Net: "tcp", + err = &net.OpError{ + Op: "Connecting to " + queueARN, + Net: "tcp", Err: err, } } + return nil, err } - queueTargets[queueARN] = elasticLog } // Load PostgreSQL targets, initialize their respective loggers. @@ -678,50 +657,37 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { if !pgN.Enable { continue } - // Construct the queue ARN for Postgres. - queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypePostgreSQL - _, ok := queueTargets[queueARN] - if ok { - continue - } - // Using accountID initialize a new Postgresql logrus instance. - pgLog, err := newPostgreSQLNotify(accountID) - if err != nil { - // Encapsulate network error to be more informative. + + if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypePostgreSQL, newPostgreSQLNotify); err != nil { if _, ok := err.(net.Error); ok { - return nil, &net.OpError{ - Op: "Connecting to " + queueARN, Net: "tcp", + err = &net.OpError{ + Op: "Connecting to " + queueARN, + Net: "tcp", Err: err, } } + return nil, err } - queueTargets[queueARN] = pgLog } + // Load Kafka targets, initialize their respective loggers. for accountID, kafkaN := range serverConfig.Notify.GetKafka() { if !kafkaN.Enable { continue } - // Construct the queue ARN for Kafka. - queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeKafka - _, ok := queueTargets[queueARN] - if ok { - continue - } - // Using accountID initialize a new Kafka logrus instance. - kafkaLog, err := newKafkaNotify(accountID) - if err != nil { - // Encapsulate network error to be more informative. + + if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeKafka, newKafkaNotify); err != nil { if _, ok := err.(net.Error); ok { - return nil, &net.OpError{ - Op: "Connecting to " + queueARN, Net: "tcp", + err = &net.OpError{ + Op: "Connecting to " + queueARN, + Net: "tcp", Err: err, } } + return nil, err } - queueTargets[queueARN] = kafkaLog } // Successfully initialized queue targets.