/* * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "crypto/tls" "io/ioutil" "time" "github.com/Sirupsen/logrus" MQTT "github.com/eclipse/paho.mqtt.golang" ) type mqttNotify struct { Enable bool `json:"enable"` Broker string `json:"broker"` Topic string `json:"topic"` QoS int `json:"qos"` ClientID string `json:"clientId"` User string `json:"username"` Password string `json:"password"` } func (m *mqttNotify) Validate() error { if !m.Enable { return nil } if _, err := checkURL(m.Broker); err != nil { return err } return nil } type mqttConn struct { params mqttNotify Client MQTT.Client } func dialMQTT(mqttL mqttNotify) (mc mqttConn, e error) { if !mqttL.Enable { return mc, errNotifyNotEnabled } connOpts := &MQTT.ClientOptions{ ClientID: mqttL.ClientID, CleanSession: true, Username: mqttL.User, Password: mqttL.Password, MaxReconnectInterval: 1 * time.Second, KeepAlive: 30 * time.Second, TLSConfig: tls.Config{RootCAs: globalRootCAs}, } connOpts.AddBroker(mqttL.Broker) client := MQTT.NewClient(connOpts) if token := client.Connect(); token.Wait() && token.Error() != nil { return mc, token.Error() } return mqttConn{Client: client, params: mqttL}, nil } func newMQTTNotify(accountID string) (*logrus.Logger, error) { mqttL := serverConfig.Notify.GetMQTTByID(accountID) //connect to MQTT Server mqttC, err := dialMQTT(mqttL) if err != nil { return nil, err } mqttLog := logrus.New() // Disable writing to console. mqttLog.Out = ioutil.Discard // Add a mqtt hook. mqttLog.Hooks.Add(mqttC) // Set default JSON formatter mqttLog.Formatter = new(logrus.JSONFormatter) // successfully enabled all MQTTs return mqttLog, nil } // Fire if called when an event should be sent to the message broker. func (q mqttConn) Fire(entry *logrus.Entry) error { body, err := entry.String() if err != nil { return err } if !q.Client.IsConnected() { if token := q.Client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } } token := q.Client.Publish(q.params.Topic, byte(q.params.QoS), false, body) if token.Wait() && token.Error() != nil { return token.Error() } return nil } // Levels is available logging levels. func (q mqttConn) Levels() []logrus.Level { return []logrus.Level{ logrus.InfoLevel, } }