diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 72b8ece8b..6f00e900c 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -99,6 +99,8 @@ var ( "enable": false, "brokers": null, "topic": "", + "queueDir": "", + "queueLimit": 0, "tls": { "enable": false, "skipVerify": false, diff --git a/cmd/config-current.go b/cmd/config-current.go index 3a6886d67..d996dd577 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -325,7 +325,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewKafkaTarget(k, v) + t, err := target.NewKafkaTarget(k, v, GlobalServiceDoneCh) if err != nil { return fmt.Errorf("kafka(%s): %s", k, err.Error()) } @@ -667,7 +667,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.Kafka { if args.Enable { - newTarget, err := target.NewKafkaTarget(id, args) + newTarget, err := target.NewKafkaTarget(id, args, GlobalServiceDoneCh) if err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/config-current_test.go b/cmd/config-current_test.go index 3f5b9eabc..52cab9065 100644 --- a/cmd/config-current_test.go +++ b/cmd/config-current_test.go @@ -200,7 +200,7 @@ func TestValidateConfig(t *testing.T) { {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "postgresql": { "1": { "enable": true, "connectionString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "" }}}}`, false}, // Test 16 - Test Kafka - {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "kafka": { "1": { "enable": true, "brokers": null, "topic": "" } }}}`, false}, + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "kafka": { "1": { "enable": true, "brokers": null, "topic": "", "queueDir": "", "queueLimit": 0 } }}}`, false}, // Test 17 - Test Webhook {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "" } }}}`, false}, diff --git a/cmd/config-versions.go b/cmd/config-versions.go index e4aae1bf1..a92d27a4c 100644 --- a/cmd/config-versions.go +++ b/cmd/config-versions.go @@ -884,7 +884,7 @@ type serverConfigV32 struct { } `json:"policy"` } -// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit with MQTT. +// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit in MQTT and kafka. type serverConfigV33 struct { quick.Config `json:"-"` // ignore interfaces diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index 92e02e14a..134ac8f41 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -931,10 +931,23 @@ The MinIO server configuration file is stored on the backend in json format. Upd "1": { "enable": true, "brokers": ["localhost:9092"], - "topic": "bucketevents" + "topic": "bucketevents", + "queueDir": "", + "queueLimit": 0, + "tls": { + "enable": false, + "skipVerify": false, + "clientAuth": 0 + }, + "sasl": { + "enable": false, + "username": "", + "password": "" + } } } ``` +MinIO supports persistent event store. The persistent store will backup events when the kafka broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index f72b6e1b0..2c79b2e84 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -64,6 +64,8 @@ "enable": false, "brokers": null, "topic": "", + "queueDir": "", + "queueLimit": 0, "tls": { "enable": false, "skipVerify": false, @@ -73,7 +75,7 @@ "enable": false, "username": "", "password": "" - } + } } }, "mqtt": { diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index fe1a01b68..07fa47fb4 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -20,7 +20,10 @@ import ( "crypto/tls" "encoding/json" "errors" + "net" "net/url" + "os" + "path/filepath" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" @@ -30,10 +33,12 @@ import ( // KafkaArgs - Kafka target arguments. type KafkaArgs struct { - Enable bool `json:"enable"` - Brokers []xnet.Host `json:"brokers"` - Topic string `json:"topic"` - TLS struct { + Enable bool `json:"enable"` + Brokers []xnet.Host `json:"brokers"` + Topic string `json:"topic"` + QueueDir string `json:"queueDir"` + QueueLimit uint16 `json:"queueLimit"` + TLS struct { Enable bool `json:"enable"` SkipVerify bool `json:"skipVerify"` ClientAuth tls.ClientAuthType `json:"clientAuth"` @@ -58,6 +63,14 @@ func (k KafkaArgs) Validate() error { return err } } + if k.QueueDir != "" { + if !filepath.IsAbs(k.QueueDir) { + return errors.New("queueDir path should be absolute") + } + } + if k.QueueLimit > 10000 { + return errors.New("queueLimit should not exceed 10000") + } return nil } @@ -66,6 +79,8 @@ type KafkaTarget struct { id event.TargetID args KafkaArgs producer sarama.SyncProducer + config *sarama.Config + store Store } // ID - returns target ID. @@ -73,11 +88,18 @@ func (target *KafkaTarget) ID() event.TargetID { return target.id } -// Save - Sends event directly without persisting. +// Save - saves the events to the store which will be replayed when the Kafka connection is active. func (target *KafkaTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if !target.args.pingBrokers() { + return errNotConnected + } return target.send(eventData) } +// send - sends an event to the kafka. func (target *KafkaTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { @@ -95,23 +117,79 @@ func (target *KafkaTarget) send(eventData event.Event) error { Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(data), } + _, _, err = target.producer.SendMessage(&msg) return err } -// Send - interface compatible method does no-op. +// Send - reads an event from store and sends it to Kafka. func (target *KafkaTarget) Send(eventKey string) error { - return nil + var err error + + if !target.args.pingBrokers() { + return errNotConnected + } + + if target.producer == nil { + brokers := []string{} + for _, broker := range target.args.Brokers { + brokers = append(brokers, broker.String()) + } + target.producer, err = sarama.NewSyncProducer(brokers, target.config) + if err != nil { + if err != sarama.ErrOutOfBrokers { + return err + } + return errNotConnected + } + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + err = target.send(eventData) + if err != nil { + // Sarama opens the ciruit breaker after 3 consecutive connection failures. + if err == sarama.ErrLeaderNotAvailable || err.Error() == "circuit breaker is open" { + return errNotConnected + } + return err + } + + // Delete the event from store. + return target.store.Del(eventKey) } // Close - closes underneath kafka connection. func (target *KafkaTarget) Close() error { - return target.producer.Close() + if target.producer != nil { + return target.producer.Close() + } + return nil +} + +// Check if atleast one broker in cluster is active +func (k KafkaArgs) pingBrokers() bool { + + for _, broker := range k.Brokers { + _, dErr := net.Dial("tcp", broker.String()) + if dErr == nil { + return true + } + } + return false } // NewKafkaTarget - creates new Kafka target with auth credentials. -func NewKafkaTarget(id string, args KafkaArgs) (*KafkaTarget, error) { +func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}) (*KafkaTarget, error) { config := sarama.NewConfig() config.Net.SASL.User = args.SASL.User @@ -132,14 +210,38 @@ func NewKafkaTarget(id string, args KafkaArgs) (*KafkaTarget, error) { for _, broker := range args.Brokers { brokers = append(brokers, broker.String()) } - producer, err := sarama.NewSyncProducer(brokers, config) - if err != nil { - return nil, err + + var store Store + + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + return nil, oErr + } } - return &KafkaTarget{ + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + if store == nil || err != sarama.ErrOutOfBrokers { + return nil, err + } + } + + target := &KafkaTarget{ id: event.TargetID{ID: id, Name: "kafka"}, args: args, producer: producer, - }, nil + config: config, + store: store, + } + + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) + } + + return target, nil } diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index 6de5d8fe1..fa936efa9 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -111,7 +111,6 @@ func (target *MQTTTarget) send(eventData event.Event) error { if token.Error() != nil { return token.Error() } - return nil }