diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index a594769df..8c9adf28f 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -194,7 +194,9 @@ var ( "webhook": { "1": { "enable": false, - "endpoint": "" + "endpoint": "", + "queueDir": "", + "queueLimit": 0 } } }, diff --git a/cmd/config-current.go b/cmd/config-current.go index d996dd577..aedc960f9 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -767,7 +767,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.Webhook { if args.Enable { args.RootCAs = globalRootCAs - newTarget := target.NewWebhookTarget(id, args) + newTarget := target.NewWebhookTarget(id, args, GlobalServiceDoneCh) if err := targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/config-current_test.go b/cmd/config-current_test.go index 52cab9065..ce504ce1f 100644 --- a/cmd/config-current_test.go +++ b/cmd/config-current_test.go @@ -203,7 +203,7 @@ func TestValidateConfig(t *testing.T) { {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "kafka": { "1": { "enable": true, "brokers": null, "topic": "", "queueDir": "", "queueLimit": 0 } }}}`, false}, // Test 17 - Test Webhook - {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "" } }}}`, false}, + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "", "queueDir": "", "queueLimit": 0} }}}`, false}, // Test 18 - Test MySQL {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "" }}}}`, false}, diff --git a/cmd/config-versions.go b/cmd/config-versions.go index a92d27a4c..0cb1256a7 100644 --- a/cmd/config-versions.go +++ b/cmd/config-versions.go @@ -884,7 +884,7 @@ type serverConfigV32 struct { } `json:"policy"` } -// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit in MQTT and kafka. +// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit in notification targets. type serverConfigV33 struct { quick.Config `json:"-"` // ignore interfaces diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index 322d47f83..1d44efab3 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -1007,10 +1007,14 @@ The MinIO server configuration file is stored on the backend in json format. Upd "webhook": { "1": { "enable": true, - "endpoint": "http://localhost:3000/" + "endpoint": "http://localhost:3000/", + "queueDir": "", + "queueLimit": 0 } ``` +MinIO supports persistent event store. The persistent store will backup events when the webhook goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. + To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. ```sh diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index 2c79b2e84..7176c8396 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -159,7 +159,9 @@ "webhook": { "1": { "enable": false, - "endpoint": "" + "endpoint": "", + "queueDir": "", + "queueLimit": 0 } } }, diff --git a/pkg/event/target/store.go b/pkg/event/target/store.go index 16b125c9b..1b94ca7e1 100644 --- a/pkg/event/target/store.go +++ b/pkg/event/target/store.go @@ -19,7 +19,10 @@ package target import ( "errors" "fmt" + "net" + "os" "strings" + "syscall" "time" "github.com/minio/minio/pkg/event" @@ -76,6 +79,18 @@ func replayEvents(store Store, doneCh <-chan struct{}) <-chan string { return eventKeyCh } +// isConnResetErr - Checks for connection reset errors. +func isConnResetErr(err error) bool { + if opErr, ok := err.(*net.OpError); ok { + if syscallErr, ok := opErr.Err.(*os.SyscallError); ok { + if syscallErr.Err == syscall.ECONNRESET { + return true + } + } + } + return false +} + // sendEvents - Reads events from the store and re-plays. func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}) { retryTimer := time.NewTimer(retryInterval) @@ -88,7 +103,7 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str break } - if err != errNotConnected { + if err != errNotConnected && !isConnResetErr(err) { panic(fmt.Errorf("target.Send() failed with '%v'", err)) } diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index 78fe19575..fbc46fe74 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -28,6 +28,9 @@ import ( "net" "net/http" "net/url" + "os" + "path/filepath" + "syscall" "time" "github.com/minio/minio/pkg/event" @@ -36,9 +39,11 @@ import ( // WebhookArgs - Webhook target arguments. type WebhookArgs struct { - Enable bool `json:"enable"` - Endpoint xnet.URL `json:"endpoint"` - RootCAs *x509.CertPool `json:"-"` + Enable bool `json:"enable"` + Endpoint xnet.URL `json:"endpoint"` + RootCAs *x509.CertPool `json:"-"` + QueueDir string `json:"queueDir"` + QueueLimit uint16 `json:"queueLimit"` } // Validate WebhookArgs fields @@ -49,6 +54,14 @@ func (w WebhookArgs) Validate() error { if w.Endpoint.IsEmpty() { return errors.New("endpoint empty") } + if w.QueueDir != "" { + if !filepath.IsAbs(w.QueueDir) { + return errors.New("queueDir path should be absolute") + } + } + if w.QueueLimit > maxLimit { + return errors.New("queueLimit should not exceed 10000") + } return nil } @@ -57,6 +70,7 @@ type WebhookTarget struct { id event.TargetID args WebhookArgs httpClient *http.Client + store Store } // ID - returns target ID. @@ -64,11 +78,27 @@ func (target WebhookTarget) ID() event.TargetID { return target.id } -// Save - Sends event directly without persisting. +// Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active. func (target *WebhookTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + urlStr, pErr := xnet.ParseURL(target.args.Endpoint.String()) + if pErr != nil { + return pErr + } + _, dErr := net.Dial("tcp", urlStr.Host) + if dErr != nil { + // To treat "connection refused" errors as errNotConnected. + if IsConnRefusedErr(dErr) { + return errNotConnected + } + return dErr + } return target.send(eventData) } +// send - sends an event to the webhook. func (target *WebhookTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { @@ -104,9 +134,52 @@ func (target *WebhookTarget) send(eventData event.Event) error { return nil } -// Send - interface compatible method does no-op. +// IsConnRefusedErr - To check for "connection refused" errors. +func IsConnRefusedErr(err error) bool { + if opErr, ok := err.(*net.OpError); ok { + if sysErr, ok := opErr.Err.(*os.SyscallError); ok { + if errno, ok := sysErr.Err.(syscall.Errno); ok { + if errno == syscall.ECONNREFUSED { + return true + } + } + } + } + return false +} + +// Send - reads an event from store and sends it to webhook. func (target *WebhookTarget) Send(eventKey string) error { - return nil + + urlStr, pErr := xnet.ParseURL(target.args.Endpoint.String()) + if pErr != nil { + return pErr + } + _, dErr := net.Dial("tcp", urlStr.Host) + if dErr != nil { + // To treat "connection refused" errors as errNotConnected. + if IsConnRefusedErr(dErr) { + return errNotConnected + } + return dErr + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and would've been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + if err := target.send(eventData); err != nil { + return err + } + + // Delete the event from store. + return target.store.Del(eventKey) } // Close - does nothing and available for interface compatibility. @@ -115,8 +188,19 @@ func (target *WebhookTarget) Close() error { } // NewWebhookTarget - creates new Webhook target. -func NewWebhookTarget(id string, args WebhookArgs) *WebhookTarget { - return &WebhookTarget{ +func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}) *WebhookTarget { + + var store Store + + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + return nil + } + } + + target := &WebhookTarget{ id: event.TargetID{ID: id, Name: "webhook"}, args: args, httpClient: &http.Client{ @@ -131,5 +215,15 @@ func NewWebhookTarget(id string, args WebhookArgs) *WebhookTarget { ExpectContinueTimeout: 2 * time.Second, }, }, + store: store, } + + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) + } + + return target }