0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-05-20 06:13:48 +02:00

Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/latest-event-updater

This commit is contained in:
Till Faelligen 2023-12-29 19:20:49 +01:00
commit 2c81b060d6
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
5 changed files with 158 additions and 47 deletions

18
go.mod
View file

@ -26,8 +26,8 @@ require (
github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7
github.com/matrix-org/util v0.0.0-20221111132719-399730281e66
github.com/mattn/go-sqlite3 v1.14.17
github.com/nats-io/nats-server/v2 v2.9.23
github.com/nats-io/nats.go v1.28.0
github.com/nats-io/nats-server/v2 v2.10.7
github.com/nats-io/nats.go v1.31.0
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/opentracing/opentracing-go v1.2.0
@ -42,12 +42,12 @@ require (
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.4.6
go.uber.org/atomic v1.10.0
golang.org/x/crypto v0.14.0
golang.org/x/crypto v0.17.0
golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819
golang.org/x/image v0.10.0
golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e
golang.org/x/sync v0.3.0
golang.org/x/term v0.13.0
golang.org/x/term v0.15.0
gopkg.in/h2non/bimg.v1 v1.1.9
gopkg.in/yaml.v2 v2.4.0
gotest.tools/v3 v3.4.0
@ -94,7 +94,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@ -104,7 +104,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/jwt/v2 v2.5.0 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
@ -124,9 +124,9 @@ require (
go.etcd.io/bbolt v1.3.6 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.12.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/macaroon.v2 v2.1.0 // indirect

36
go.sum
View file

@ -190,8 +190,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -242,12 +242,12 @@ github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7P
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak=
github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU=
github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -422,24 +422,24 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View file

@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
w.Lock()
defer w.Unlock()
if !loaded || w.subscription == nil {
streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
logger := logrus.WithFields(logrus.Fields{
"stream_name": streamName,
"consumer": consumer,
})
// Create the consumer. We do this as a specific step rather than
// letting PullSubscribe create it for us because we need the consumer
// to outlive the subscription. If we do it this way, we can Bind in the
@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// before it. This is necessary because otherwise our consumer will never
// acknowledge things we filtered out for other subjects and therefore they
// will linger around forever.
if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
InactiveThreshold: inactiveThreshold,
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
info, err := w.r.JetStream.ConsumerInfo(streamName, consumer)
if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) {
// log and return, we will retry anyway
logger.WithError(err).Errorf("failed to get consumer info")
return
}
consumerConfig := &nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
InactiveThreshold: inactiveThreshold,
}
// The consumer already exists, try to update if necessary.
if info != nil {
// Not using reflect.DeepEqual here, since consumerConfig does not explicitly set
// e.g. the consumerName, which is added by NATS later. So this would result
// in constantly updating/recreating the consumer.
switch {
case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds():
// Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update
// existing consumers.
fallthrough
case info.Config.AckPolicy != consumerConfig.AckPolicy:
// We've changed the AckPolicy from AckAll to AckExplicit, this needs a
// recreation of the consumer. (Note: Only a few changes actually need a recreat)
logger.Warn("Consumer already exists, trying to update it.")
// Try updating the consumer first
if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil {
// We failed to update the consumer, recreate it
logger.WithError(err).Warn("Unable to update consumer, recreating...")
if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil {
logger.WithError(err).Fatal("Unable to delete consumer")
return
}
// Set info to nil, so it can be recreated with the correct config.
info = nil
}
}
}
if info == nil {
// Create the consumer with the correct config
if _, err = w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
consumerConfig,
); err != nil {
logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
return
}
}
// Bind to our durable consumer. We want to receive all messages waiting
// for this subject and we want to manually acknowledge them, so that we
// can ensure they are only cleaned up when we are done processing them.
@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
nats.InactiveThreshold(inactiveThreshold),
)
if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
return
}
@ -263,21 +309,23 @@ func (w *worker) _next() {
return
}
// Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
msg := msgs[0]
var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term()
// using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS
_ = msg.Term(nats.AckWait(time.Second * 5))
return
}
if scope := sentry.CurrentHub().Scope(); scope != nil {
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
@ -307,10 +355,15 @@ func (w *worker) _next() {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process event")
}
_ = msg.Term()
// Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled),
// the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg
// after restarting. We only Ack if the context was not yet canceled.
if w.r.ProcessContext.Context().Err() == nil {
_ = msg.AckSync()
}
errString = err.Error()
} else {
_ = msg.Ack()
_ = msg.AckSync()
}
// If it was a synchronous input request then the "sync" field
@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents(
}).Error("Roomserver failed to queue async event")
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
// Now that the event is queued, increment the room backpressure
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
return
}

View file

@ -48,8 +48,10 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
)
// TODO: Does this value make sense?
const MaximumMissingProcessingTime = time.Minute * 2
// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch
// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded
// NATS queues the event for redelivery.
const MaximumMissingProcessingTime = time.Minute * 5
var processRoomEventDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{

View file

@ -12,7 +12,9 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"
@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) {
assert.Equal(t, false, banned)
})
}
// Validate that changing the AckPolicy/AckWait of room consumers
// results in their recreation
func TestRoomConsumerRecreation(t *testing.T) {
alice := test.NewUser(t)
room := test.NewRoom(t, alice)
// As this is DB unrelated, just use SQLite
cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite)
defer closeDB()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
natsInstance := &jetstream.NATSInstance{}
// Prepare a stream and consumer using the old configuration
jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent)
consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID))
subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID))
consumerConfig := &nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: (time.Minute * 2) + (time.Second * 10),
InactiveThreshold: time.Hour * 24,
}
// Create the consumer with the old config
_, err := jsCtx.AddConsumer(streamName, consumerConfig)
assert.NoError(t, err)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
// start JetStream listeners
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
// let the RS create the events, this also recreates the Consumers
err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false)
assert.NoError(t, err)
// Validate that AckPolicy and AckWait has changed
info, err := jsCtx.ConsumerInfo(streamName, consumer)
assert.NoError(t, err)
assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy)
wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10)
assert.Equal(t, wantAckWait, info.Config.AckWait)
}