From f7cfa24027258873cefb41a7f7818bc43b86ad75 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Fri, 4 Jun 2021 20:23:25 +0200 Subject: [PATCH 1/3] Add missing error check Keep typing events for at least one minute --- setup/kafka/kafka.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go index 5f2d40552..c8882e8bf 100644 --- a/setup/kafka/kafka.go +++ b/setup/kafka/kafka.go @@ -91,9 +91,9 @@ func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { // Typing events can be removed from the stream, as they are only relevant for a short time if topic == config.TopicOutputTypingEvent { - maxLifeTime = time.Second * 30 + maxLifeTime = time.Second * 60 } - _, _ = s.AddStream(&nats.StreamConfig{ + _, err = s.AddStream(&nats.StreamConfig{ Name: sn, Subjects: []string{topic}, MaxBytes: int64(*cfg.MaxMessageBytes), @@ -101,6 +101,9 @@ func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { MaxAge: maxLifeTime, Duplicates: maxLifeTime / 2, }) + if err != nil { + logrus.WithError(err).WithField("stream", sn).Fatal("unable to add nats stream") + } } } From 0c88ac6158d3c96995292f1e61407a3bbc8ad469 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Sun, 4 Jul 2021 22:11:00 +0200 Subject: [PATCH 2/3] Use all configured NATS addresses --- setup/kafka/kafka.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go index c8882e8bf..eafdbc0fc 100644 --- a/setup/kafka/kafka.go +++ b/setup/kafka/kafka.go @@ -1,6 +1,7 @@ package kafka import ( + "strings" "time" js "github.com/S7evinK/saramajetstream" @@ -66,7 +67,7 @@ func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { logrus.WithField("servers", cfg.Addresses).Debug("connecting to nats") - nc, err := nats.Connect(cfg.Addresses[0]) + nc, err := nats.Connect(strings.Join(cfg.Addresses, ",")) if err != nil { logrus.WithError(err).Panic("failed to connect to nats") return nil, nil From 341351c75a15a6b0a89194138bfb6436480b79f0 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Fri, 9 Jul 2021 08:16:33 +0200 Subject: [PATCH 3/3] Update saramajetstream --- go.mod | 2 +- go.sum | 9 +++++++-- setup/kafka/kafka.go | 4 ++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index c06afb49d..45817e2a9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/matrix-org/dendrite require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect - github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537 + github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86 github.com/Shopify/sarama v1.29.0 github.com/getsentry/sentry-go v0.10.0 github.com/gologme/log v1.2.0 diff --git a/go.sum b/go.sum index 552440c14..efff6cbc3 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBK github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RyanCarrier/dijkstra v1.0.0/go.mod h1:5agGUBNEtUAGIANmbw09fuO3a2htPEkc1jNH01qxCWA= github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30= -github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537 h1:j/jlzVQRvUvNANJlz5Puac0Avc6Xe+rhvy2Se/f+Fwo= -github.com/S7evinK/saramajetstream v0.0.0-20210604172822-4f305c9f1537/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc= +github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86 h1:ZFbfRbhZDohUiouv361CC0XWTESwUlVicz/zgGSO964= +github.com/S7evinK/saramajetstream v0.0.0-20210709060522-786e3e6abe86/go.mod h1:ne+jkLlzafIzaE4Q0Ze81T27dNgXe1wxovVEoAtSHTc= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= @@ -249,6 +249,7 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= @@ -269,6 +270,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -847,11 +849,13 @@ github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= +github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats-server/v2 v2.2.3 h1:tCuswUogVn/+SV5qfgOVnuHGBvAW7C1hMdsXghi/5ds= github.com/nats-io/nats-server/v2 v2.2.3/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI= @@ -1380,6 +1384,7 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go index eafdbc0fc..e3ef50c5f 100644 --- a/setup/kafka/kafka.go +++ b/setup/kafka/kafka.go @@ -108,7 +108,7 @@ func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { } } - consumer := js.NewJetStreamConsumer(s, cfg.TopicPrefix) - producer := js.NewJetStreamProducer(s, cfg.TopicPrefix) + consumer := js.NewJetStreamConsumer(nc, s, cfg.TopicPrefix) + producer := js.NewJetStreamProducer(nc, s, cfg.TopicPrefix) return consumer, producer }