Enable event persistence in webhook (#7614)

This commit is contained in:
Praveen raj Mani 2019-07-05 15:21:41 +05:30 committed by Nitish Tiwari
parent 0ebbd3caef
commit bb871a7c31
8 changed files with 132 additions and 15 deletions

View file

@ -194,7 +194,9 @@ var (
"webhook": {
"1": {
"enable": false,
"endpoint": ""
"endpoint": "",
"queueDir": "",
"queueLimit": 0
}
}
},

View file

@ -767,7 +767,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.Webhook {
if args.Enable {
args.RootCAs = globalRootCAs
newTarget := target.NewWebhookTarget(id, args)
newTarget := target.NewWebhookTarget(id, args, GlobalServiceDoneCh)
if err := targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
continue

View file

@ -203,7 +203,7 @@ func TestValidateConfig(t *testing.T) {
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "kafka": { "1": { "enable": true, "brokers": null, "topic": "", "queueDir": "", "queueLimit": 0 } }}}`, false},
// Test 17 - Test Webhook
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "" } }}}`, false},
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "", "queueDir": "", "queueLimit": 0} }}}`, false},
// Test 18 - Test MySQL
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "" }}}}`, false},

View file

@ -884,7 +884,7 @@ type serverConfigV32 struct {
} `json:"policy"`
}
// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit in MQTT and kafka.
// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit in notification targets.
type serverConfigV33 struct {
quick.Config `json:"-"` // ignore interfaces

View file

@ -1007,10 +1007,14 @@ The MinIO server configuration file is stored on the backend in json format. Upd
"webhook": {
"1": {
"enable": true,
"endpoint": "http://localhost:3000/"
"endpoint": "http://localhost:3000/",
"queueDir": "",
"queueLimit": 0
}
```
MinIO supports persistent event store. The persistent store will backup events when the webhook goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000.
To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally.
```sh

View file

@ -159,7 +159,9 @@
"webhook": {
"1": {
"enable": false,
"endpoint": ""
"endpoint": "",
"queueDir": "",
"queueLimit": 0
}
}
},

View file

@ -19,7 +19,10 @@ package target
import (
"errors"
"fmt"
"net"
"os"
"strings"
"syscall"
"time"
"github.com/minio/minio/pkg/event"
@ -76,6 +79,18 @@ func replayEvents(store Store, doneCh <-chan struct{}) <-chan string {
return eventKeyCh
}
// isConnResetErr - Checks for connection reset errors.
func isConnResetErr(err error) bool {
if opErr, ok := err.(*net.OpError); ok {
if syscallErr, ok := opErr.Err.(*os.SyscallError); ok {
if syscallErr.Err == syscall.ECONNRESET {
return true
}
}
}
return false
}
// sendEvents - Reads events from the store and re-plays.
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}) {
retryTimer := time.NewTimer(retryInterval)
@ -88,7 +103,7 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str
break
}
if err != errNotConnected {
if err != errNotConnected && !isConnResetErr(err) {
panic(fmt.Errorf("target.Send() failed with '%v'", err))
}

View file

@ -28,6 +28,9 @@ import (
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"syscall"
"time"
"github.com/minio/minio/pkg/event"
@ -36,9 +39,11 @@ import (
// WebhookArgs - Webhook target arguments.
type WebhookArgs struct {
Enable bool `json:"enable"`
Endpoint xnet.URL `json:"endpoint"`
RootCAs *x509.CertPool `json:"-"`
Enable bool `json:"enable"`
Endpoint xnet.URL `json:"endpoint"`
RootCAs *x509.CertPool `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint16 `json:"queueLimit"`
}
// Validate WebhookArgs fields
@ -49,6 +54,14 @@ func (w WebhookArgs) Validate() error {
if w.Endpoint.IsEmpty() {
return errors.New("endpoint empty")
}
if w.QueueDir != "" {
if !filepath.IsAbs(w.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if w.QueueLimit > maxLimit {
return errors.New("queueLimit should not exceed 10000")
}
return nil
}
@ -57,6 +70,7 @@ type WebhookTarget struct {
id event.TargetID
args WebhookArgs
httpClient *http.Client
store Store
}
// ID - returns target ID.
@ -64,11 +78,27 @@ func (target WebhookTarget) ID() event.TargetID {
return target.id
}
// Save - Sends event directly without persisting.
// Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
urlStr, pErr := xnet.ParseURL(target.args.Endpoint.String())
if pErr != nil {
return pErr
}
_, dErr := net.Dial("tcp", urlStr.Host)
if dErr != nil {
// To treat "connection refused" errors as errNotConnected.
if IsConnRefusedErr(dErr) {
return errNotConnected
}
return dErr
}
return target.send(eventData)
}
// send - sends an event to the webhook.
func (target *WebhookTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
@ -104,9 +134,52 @@ func (target *WebhookTarget) send(eventData event.Event) error {
return nil
}
// Send - interface compatible method does no-op.
// IsConnRefusedErr - To check for "connection refused" errors.
func IsConnRefusedErr(err error) bool {
if opErr, ok := err.(*net.OpError); ok {
if sysErr, ok := opErr.Err.(*os.SyscallError); ok {
if errno, ok := sysErr.Err.(syscall.Errno); ok {
if errno == syscall.ECONNREFUSED {
return true
}
}
}
}
return false
}
// Send - reads an event from store and sends it to webhook.
func (target *WebhookTarget) Send(eventKey string) error {
return nil
urlStr, pErr := xnet.ParseURL(target.args.Endpoint.String())
if pErr != nil {
return pErr
}
_, dErr := net.Dial("tcp", urlStr.Host)
if dErr != nil {
// To treat "connection refused" errors as errNotConnected.
if IsConnRefusedErr(dErr) {
return errNotConnected
}
return dErr
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and would've been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - does nothing and available for interface compatibility.
@ -115,8 +188,19 @@ func (target *WebhookTarget) Close() error {
}
// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(id string, args WebhookArgs) *WebhookTarget {
return &WebhookTarget{
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}) *WebhookTarget {
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil
}
}
target := &WebhookTarget{
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
httpClient: &http.Client{
@ -131,5 +215,15 @@ func NewWebhookTarget(id string, args WebhookArgs) *WebhookTarget {
ExpectContinueTimeout: 2 * time.Second,
},
},
store: store,
}
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
}
return target
}