mirror of
https://github.com/matrix-org/dendrite
synced 2024-11-18 07:40:53 +01:00
161f145176
* Add NATS JetStream support Update shopify/sarama * Fix addresses * Don't change Addresses in Defaults * Update saramajetstream * Add missing error check Keep typing events for at least one minute * Use all configured NATS addresses * Update saramajetstream * Try setting up with NATS * Make sure NATS uses own persistent directory (TODO: make this configurable) * Update go.mod/go.sum * Jetstream package * Various other refactoring * Build fixes * Config tweaks, make random jetstream storage path for CI * Disable interest policies * Try to sane default on jetstream base path * Try to use in-memory for CI * Restore storage/retention * Update nats.go dependency * Adapt changes to config * Remove unneeded TopicFor * Dep update * Revert "Remove unneeded TopicFor" This reverts commitf5a4e4a339
. * Revert changes made to streams * Fix build problems * Update nats-server * Update go.mod/go.sum * Roomserver input API queuing using NATS * Fix topic naming * Prometheus metrics * More refactoring to remove saramajetstream * Add missing topic * Don't try to populate map that doesn't exist * Roomserver output topic * Update go.mod/go.sum * Message acknowledgements * Ack tweaks * Try to resume transaction re-sends * Try to resume transaction re-sends * Update to matrix-org/gomatrixserverlib@91dadfb * Remove internal.PartitionStorer from components that don't consume keychanges * Try to reduce re-allocations a bit in resolveConflictsV2 * Tweak delivery options on RS input * Publish send-to-device messages into correct JetStream subject * Async and sync roomserver input * Update dendrite-config.yaml * Remove roomserver tests for now (they need rewriting) * Remove roomserver test again (was merged back in) * Update documentation * Docker updates * More Docker updates * Update Docker readme again * Fix lint issues * Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that * Go 1.16 instead of Go 1.13 for upgrade tests and Complement * Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit368675283f
. * Don't report any errors on `/send` to see what fun that creates * Fix panics on closed channel sends * Enforce state key matches sender * Do the same for leave * Various tweaks to make tests happier Squashed commit of the following: commit13f9028e7a
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:47:14 2022 +0000 Do the same for leave commite6be7f05c3
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:33:42 2022 +0000 Enforce state key matches sender commit85ede6d64b
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 14:07:04 2022 +0000 Fix panics on closed channel sends commit9755494a98
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:38:22 2022 +0000 Don't report any errors on `/send` to see what fun that creates commit3bb4f87b5d
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:00:26 2022 +0000 Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit368675283f
. commitfe2673ed7b
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 12:09:34 2022 +0000 Go 1.16 instead of Go 1.13 for upgrade tests and Complement commit368675283f
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 11:51:45 2022 +0000 Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that commitb028dfc085
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 10:29:08 2022 +0000 Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork * Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers * Fix consumer component name in federation API * Add comment explaining where streams are defined * Tweaks to roomserver input with comments * Finish that sentence that I apparently forgot to finish in INSTALL.md * Bump version number of config to 2 * Add comments around asynchronous sends to roomserver in processEventWithMissingState * More useful error message when the config version does not match * Set version in generate-config * Fix version in config.Defaults Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
373 lines
10 KiB
Go
373 lines
10 KiB
Go
// Copyright 2017 Vector Creations Ltd
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package notifier
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/util"
|
|
)
|
|
|
|
var (
|
|
randomMessageEvent gomatrixserverlib.HeaderedEvent
|
|
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
|
|
bobLeaveEvent gomatrixserverlib.HeaderedEvent
|
|
syncPositionVeryOld = types.StreamingToken{PDUPosition: 5}
|
|
syncPositionBefore = types.StreamingToken{PDUPosition: 11}
|
|
syncPositionAfter = types.StreamingToken{PDUPosition: 12}
|
|
//syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil)
|
|
syncPositionAfter2 = types.StreamingToken{PDUPosition: 13}
|
|
)
|
|
|
|
var (
|
|
roomID = "!test:localhost"
|
|
alice = "@alice:localhost"
|
|
aliceDev = "alicedevice"
|
|
bob = "@bob:localhost"
|
|
bobDev = "bobdev"
|
|
)
|
|
|
|
func init() {
|
|
var err error
|
|
err = json.Unmarshal([]byte(`{
|
|
"_room_version": "1",
|
|
"type": "m.room.message",
|
|
"content": {
|
|
"body": "Hello World",
|
|
"msgtype": "m.text"
|
|
},
|
|
"sender": "@noone:localhost",
|
|
"room_id": "`+roomID+`",
|
|
"origin": "localhost",
|
|
"origin_server_ts": 12345,
|
|
"event_id": "$randomMessageEvent:localhost"
|
|
}`), &randomMessageEvent)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = json.Unmarshal([]byte(`{
|
|
"_room_version": "1",
|
|
"type": "m.room.member",
|
|
"state_key": "`+bob+`",
|
|
"content": {
|
|
"membership": "invite"
|
|
},
|
|
"sender": "`+alice+`",
|
|
"room_id": "`+roomID+`",
|
|
"origin": "localhost",
|
|
"origin_server_ts": 12345,
|
|
"event_id": "$aliceInviteBobEvent:localhost"
|
|
}`), &aliceInviteBobEvent)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = json.Unmarshal([]byte(`{
|
|
"_room_version": "1",
|
|
"type": "m.room.member",
|
|
"state_key": "`+bob+`",
|
|
"content": {
|
|
"membership": "leave"
|
|
},
|
|
"sender": "`+bob+`",
|
|
"room_id": "`+roomID+`",
|
|
"origin": "localhost",
|
|
"origin_server_ts": 12345,
|
|
"event_id": "$bobLeaveEvent:localhost"
|
|
}`), &bobLeaveEvent)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
|
|
if got.String() != want.String() {
|
|
t.Fatalf("mustEqualPositions got %s want %s", got.String(), want.String())
|
|
}
|
|
}
|
|
|
|
// Test that the current position is returned if a request is already behind.
|
|
func TestImmediateNotification(t *testing.T) {
|
|
n := NewNotifier(syncPositionBefore)
|
|
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
|
|
if err != nil {
|
|
t.Fatalf("TestImmediateNotification error: %s", err)
|
|
}
|
|
mustEqualPositions(t, pos, syncPositionBefore)
|
|
}
|
|
|
|
// Test that new events to a joined room unblocks the request.
|
|
func TestNewEventAndJoinedToRoom(t *testing.T) {
|
|
n := NewNotifier(syncPositionBefore)
|
|
n.setUsersJoinedToRooms(map[string][]string{
|
|
roomID: {alice, bob},
|
|
})
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
if err != nil {
|
|
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
|
|
}
|
|
mustEqualPositions(t, pos, syncPositionAfter)
|
|
wg.Done()
|
|
}()
|
|
|
|
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
waitForBlocking(stream, 1)
|
|
|
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestCorrectStream(t *testing.T) {
|
|
n := NewNotifier(syncPositionBefore)
|
|
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
if stream.UserID != bob {
|
|
t.Fatalf("expected user %q, got %q", bob, stream.UserID)
|
|
}
|
|
if stream.DeviceID != bobDev {
|
|
t.Fatalf("expected device %q, got %q", bobDev, stream.DeviceID)
|
|
}
|
|
}
|
|
|
|
func TestCorrectStreamWakeup(t *testing.T) {
|
|
n := NewNotifier(syncPositionBefore)
|
|
awoken := make(chan string)
|
|
|
|
streamone := lockedFetchUserStream(n, alice, "one")
|
|
streamtwo := lockedFetchUserStream(n, alice, "two")
|
|
|
|
go func() {
|
|
select {
|
|
case <-streamone.signalChannel:
|
|
awoken <- "one"
|
|
case <-streamtwo.signalChannel:
|
|
awoken <- "two"
|
|
}
|
|
}()
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
wake := "two"
|
|
n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter)
|
|
|
|
if result := <-awoken; result != wake {
|
|
t.Fatalf("expected to wake %q, got %q", wake, result)
|
|
}
|
|
}
|
|
|
|
// Test that an invite unblocks the request
|
|
func TestNewInviteEventForUser(t *testing.T) {
|
|
n := NewNotifier(syncPositionBefore)
|
|
n.setUsersJoinedToRooms(map[string][]string{
|
|
roomID: {alice, bob},
|
|
})
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
if err != nil {
|
|
t.Errorf("TestNewInviteEventForUser error: %s", err)
|
|
}
|
|
mustEqualPositions(t, pos, syncPositionAfter)
|
|
wg.Done()
|
|
}()
|
|
|
|
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
waitForBlocking(stream, 1)
|
|
|
|
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// Test an EDU-only update wakes up the request.
|
|
// TODO: Fix this test, invites wake up with an incremented
|
|
// PDU position, not EDU position
|
|
/*
|
|
func TestEDUWakeup(t *testing.T) {
|
|
n := NewNotifier(syncPositionAfter)
|
|
n.setUsersJoinedToRooms(map[string][]string{
|
|
roomID: {alice, bob},
|
|
})
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
|
|
if err != nil {
|
|
t.Errorf("TestNewInviteEventForUser error: %w", err)
|
|
}
|
|
mustEqualPositions(t, pos, syncPositionNewEDU)
|
|
wg.Done()
|
|
}()
|
|
|
|
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
waitForBlocking(stream, 1)
|
|
|
|
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
|
|
|
|
wg.Wait()
|
|
}
|
|
*/
|
|
|
|
// Test that all blocked requests get woken up on a new event.
|
|
func TestMultipleRequestWakeup(t *testing.T) {
|
|
n := NewNotifier(syncPositionBefore)
|
|
n.setUsersJoinedToRooms(map[string][]string{
|
|
roomID: {alice, bob},
|
|
})
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(3)
|
|
poll := func() {
|
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
if err != nil {
|
|
t.Errorf("TestMultipleRequestWakeup error: %s", err)
|
|
}
|
|
mustEqualPositions(t, pos, syncPositionAfter)
|
|
wg.Done()
|
|
}
|
|
go poll()
|
|
go poll()
|
|
go poll()
|
|
|
|
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
waitForBlocking(stream, 3)
|
|
|
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
|
|
|
wg.Wait()
|
|
|
|
numWaiting := stream.NumWaiting()
|
|
if numWaiting != 0 {
|
|
t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
|
|
}
|
|
}
|
|
|
|
// Test that you stop getting woken up when you leave a room.
|
|
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|
// listen as bob. Make bob leave room. Make alice send event to room.
|
|
// Make sure alice gets woken up only and not bob as well.
|
|
n := NewNotifier(syncPositionBefore)
|
|
n.setUsersJoinedToRooms(map[string][]string{
|
|
roomID: {alice, bob},
|
|
})
|
|
|
|
var leaveWG sync.WaitGroup
|
|
|
|
// Make bob leave the room
|
|
leaveWG.Add(1)
|
|
go func() {
|
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
if err != nil {
|
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
|
}
|
|
mustEqualPositions(t, pos, syncPositionAfter)
|
|
leaveWG.Done()
|
|
}()
|
|
bobStream := lockedFetchUserStream(n, bob, bobDev)
|
|
waitForBlocking(bobStream, 1)
|
|
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
|
|
leaveWG.Wait()
|
|
|
|
// send an event into the room. Make sure alice gets it. Bob should not.
|
|
var aliceWG sync.WaitGroup
|
|
aliceStream := lockedFetchUserStream(n, alice, aliceDev)
|
|
aliceWG.Add(1)
|
|
go func() {
|
|
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter))
|
|
if err != nil {
|
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
|
}
|
|
mustEqualPositions(t, pos, syncPositionAfter2)
|
|
aliceWG.Done()
|
|
}()
|
|
|
|
go func() {
|
|
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
|
|
_, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
|
|
if err == nil {
|
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
|
|
}
|
|
}()
|
|
|
|
waitForBlocking(aliceStream, 1)
|
|
waitForBlocking(bobStream, 1)
|
|
|
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
|
|
aliceWG.Wait()
|
|
|
|
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
|
// for a fraction of a second to account for this race
|
|
time.Sleep(1 * time.Millisecond)
|
|
}
|
|
|
|
func waitForEvents(n *Notifier, req types.SyncRequest) (types.StreamingToken, error) {
|
|
listener := n.GetListener(req)
|
|
defer listener.Close()
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
return types.StreamingToken{}, fmt.Errorf(
|
|
"waitForEvents timed out waiting for %s (pos=%v)", req.Device.UserID, req.Since,
|
|
)
|
|
case <-listener.GetNotifyChannel(req.Since):
|
|
p := listener.GetSyncPosition()
|
|
return p, nil
|
|
}
|
|
}
|
|
|
|
// Wait until something is Wait()ing on the user stream.
|
|
func waitForBlocking(s *UserDeviceStream, numBlocking uint) {
|
|
for numBlocking != s.NumWaiting() {
|
|
// This is horrible but I don't want to add a signalling mechanism JUST for testing.
|
|
time.Sleep(1 * time.Microsecond)
|
|
}
|
|
}
|
|
|
|
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
|
|
// A new stream is made if it doesn't exist already.
|
|
func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream {
|
|
n.streamLock.Lock()
|
|
defer n.streamLock.Unlock()
|
|
|
|
return n.fetchUserDeviceStream(userID, deviceID, true)
|
|
}
|
|
|
|
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) types.SyncRequest {
|
|
return types.SyncRequest{
|
|
Device: &userapi.Device{
|
|
UserID: userID,
|
|
ID: deviceID,
|
|
},
|
|
Timeout: 1 * time.Minute,
|
|
Since: since,
|
|
WantFullState: false,
|
|
Log: util.GetLogger(context.TODO()),
|
|
Context: context.TODO(),
|
|
}
|
|
}
|