2022-05-09 18:23:02 +02:00
package syncapi
import (
"context"
"encoding/json"
2022-08-02 17:00:16 +02:00
"fmt"
2024-07-27 22:30:17 +02:00
"io"
2022-05-09 18:23:02 +02:00
"net/http"
"net/http/httptest"
2022-08-02 17:00:16 +02:00
"reflect"
2022-05-09 18:23:02 +02:00
"testing"
"time"
2024-07-27 22:30:17 +02:00
"github.com/gorilla/mux"
2023-03-17 12:09:45 +01:00
"github.com/matrix-org/dendrite/internal/caching"
2023-03-22 09:21:32 +01:00
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
2022-08-18 08:56:57 +02:00
"github.com/matrix-org/gomatrixserverlib"
2023-04-19 16:50:33 +02:00
"github.com/matrix-org/gomatrixserverlib/spec"
2022-08-18 08:56:57 +02:00
"github.com/nats-io/nats.go"
2024-07-27 22:30:17 +02:00
"github.com/stretchr/testify/assert"
2022-08-18 08:56:57 +02:00
"github.com/tidwall/gjson"
2023-04-27 13:54:20 +02:00
rstypes "github.com/matrix-org/dendrite/roomserver/types"
2023-03-06 12:43:59 +01:00
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
2023-04-04 19:16:53 +02:00
"github.com/matrix-org/dendrite/syncapi/synctypes"
2023-03-06 12:43:59 +01:00
2022-08-02 17:00:16 +02:00
"github.com/matrix-org/dendrite/clientapi/producers"
2022-08-11 18:23:35 +02:00
"github.com/matrix-org/dendrite/roomserver"
2022-05-09 18:23:02 +02:00
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
2022-05-17 14:23:35 +02:00
"github.com/matrix-org/dendrite/test/testrig"
2022-05-09 18:23:02 +02:00
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type syncRoomserverAPI struct {
rsapi . SyncRoomserverAPI
rooms [ ] * test . Room
}
2023-06-14 16:23:46 +02:00
func ( s * syncRoomserverAPI ) QueryUserIDForSender ( ctx context . Context , roomID spec . RoomID , senderID spec . SenderID ) ( * spec . UserID , error ) {
2023-06-07 19:14:35 +02:00
return spec . NewUserID ( string ( senderID ) , true )
2023-06-06 22:55:18 +02:00
}
2023-08-15 13:37:04 +02:00
func ( s * syncRoomserverAPI ) QuerySenderIDForUser ( ctx context . Context , roomID spec . RoomID , userID spec . UserID ) ( * spec . SenderID , error ) {
senderID := spec . SenderID ( userID . String ( ) )
return & senderID , nil
}
2022-05-09 18:23:02 +02:00
func ( s * syncRoomserverAPI ) QueryLatestEventsAndState ( ctx context . Context , req * rsapi . QueryLatestEventsAndStateRequest , res * rsapi . QueryLatestEventsAndStateResponse ) error {
var room * test . Room
for _ , r := range s . rooms {
if r . ID == req . RoomID {
room = r
break
}
}
if room == nil {
res . RoomExists = false
return nil
}
res . RoomVersion = room . Version
return nil // TODO: return state
}
2022-05-11 14:44:32 +02:00
func ( s * syncRoomserverAPI ) QuerySharedUsers ( ctx context . Context , req * rsapi . QuerySharedUsersRequest , res * rsapi . QuerySharedUsersResponse ) error {
res . UserIDsToCount = make ( map [ string ] int )
return nil
}
func ( s * syncRoomserverAPI ) QueryBulkStateContent ( ctx context . Context , req * rsapi . QueryBulkStateContentRequest , res * rsapi . QueryBulkStateContentResponse ) error {
return nil
}
2022-08-11 18:23:35 +02:00
func ( s * syncRoomserverAPI ) QueryMembershipForUser ( ctx context . Context , req * rsapi . QueryMembershipForUserRequest , res * rsapi . QueryMembershipForUserResponse ) error {
res . IsRoomForgotten = false
res . RoomExists = true
return nil
}
2023-08-15 13:37:04 +02:00
func ( s * syncRoomserverAPI ) QueryMembershipAtEvent (
ctx context . Context ,
roomID spec . RoomID ,
eventIDs [ ] string ,
senderID spec . SenderID ,
) ( map [ string ] * rstypes . HeaderedEvent , error ) {
return map [ string ] * rstypes . HeaderedEvent { } , nil
2022-08-11 18:23:35 +02:00
}
2022-05-09 18:23:02 +02:00
type syncUserAPI struct {
userapi . SyncUserAPI
accounts [ ] userapi . Device
}
func ( s * syncUserAPI ) QueryAccessToken ( ctx context . Context , req * userapi . QueryAccessTokenRequest , res * userapi . QueryAccessTokenResponse ) error {
for _ , acc := range s . accounts {
if acc . AccessToken == req . AccessToken {
res . Device = & acc
return nil
}
}
res . Err = "unknown user"
return nil
}
2023-02-20 14:58:03 +01:00
func ( s * syncUserAPI ) QueryKeyChanges ( ctx context . Context , req * userapi . QueryKeyChangesRequest , res * userapi . QueryKeyChangesResponse ) error {
2022-05-09 18:23:02 +02:00
return nil
}
2023-02-20 14:58:03 +01:00
func ( s * syncUserAPI ) QueryOneTimeKeys ( ctx context . Context , req * userapi . QueryOneTimeKeysRequest , res * userapi . QueryOneTimeKeysResponse ) error {
2022-08-11 16:29:33 +02:00
return nil
2022-05-11 14:44:32 +02:00
}
2023-02-20 14:58:03 +01:00
func ( s * syncUserAPI ) PerformLastSeenUpdate ( ctx context . Context , req * userapi . PerformLastSeenUpdateRequest , res * userapi . PerformLastSeenUpdateResponse ) error {
2022-08-11 16:29:33 +02:00
return nil
2022-05-09 18:23:02 +02:00
}
2022-05-11 14:44:32 +02:00
func TestSyncAPIAccessTokens ( t * testing . T ) {
2022-05-09 18:23:02 +02:00
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
2022-05-11 14:44:32 +02:00
testSyncAccessTokens ( t , dbType )
2022-05-09 18:23:02 +02:00
} )
}
2022-05-11 14:44:32 +02:00
func testSyncAccessTokens ( t * testing . T , dbType test . DBType ) {
2022-05-17 14:23:35 +02:00
user := test . NewUser ( t )
2022-05-09 18:23:02 +02:00
room := test . NewRoom ( t , user )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
natsInstance := jetstream . NATSInstance { }
2022-05-09 18:23:02 +02:00
defer close ( )
2023-03-22 09:21:32 +01:00
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
msgs := toNATSMsgs ( t , cfg , room . Events ( ) ... )
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { rooms : [ ] * test . Room { room } } , caches , caching . DisableMetrics )
2022-05-17 14:23:35 +02:00
testrig . MustPublishMsgs ( t , jsctx , msgs ... )
2022-05-09 18:23:02 +02:00
testCases := [ ] struct {
name string
req * http . Request
wantCode int
wantJoinedRooms [ ] string
} {
{
name : "missing access token" ,
req : test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"timeout" : "0" ,
} ) ) ,
wantCode : 401 ,
} ,
{
name : "unknown access token" ,
req : test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : "foo" ,
"timeout" : "0" ,
} ) ) ,
wantCode : 401 ,
} ,
{
name : "valid access token" ,
req : test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
} ) ) ,
wantCode : 200 ,
wantJoinedRooms : [ ] string { room . ID } ,
} ,
}
2022-08-19 11:03:55 +02:00
2023-03-22 09:21:32 +01:00
syncUntil ( t , routers , alice . AccessToken , false , func ( syncBody string ) bool {
2022-08-19 11:03:55 +02:00
// wait for the last sent eventID to come down sync
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(event_id=="%s") ` , room . ID , room . Events ( ) [ len ( room . Events ( ) ) - 1 ] . EventID ( ) )
return gjson . Get ( syncBody , path ) . Exists ( )
} )
2022-05-09 18:23:02 +02:00
for _ , tc := range testCases {
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , tc . req )
2022-05-09 18:23:02 +02:00
if w . Code != tc . wantCode {
t . Fatalf ( "%s: got HTTP %d want %d" , tc . name , w . Code , tc . wantCode )
}
if tc . wantJoinedRooms != nil {
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Fatalf ( "%s: failed to decode response body: %s" , tc . name , err )
}
if len ( res . Rooms . Join ) != len ( tc . wantJoinedRooms ) {
t . Errorf ( "%s: got %v joined rooms, want %v.\nResponse: %+v" , tc . name , len ( res . Rooms . Join ) , len ( tc . wantJoinedRooms ) , res )
}
t . Logf ( "res: %+v" , res . Rooms . Join [ room . ID ] )
gotEventIDs := make ( [ ] string , len ( res . Rooms . Join [ room . ID ] . Timeline . Events ) )
for i , ev := range res . Rooms . Join [ room . ID ] . Timeline . Events {
gotEventIDs [ i ] = ev . EventID
}
test . AssertEventIDsEqual ( t , gotEventIDs , room . Events ( ) )
}
}
}
2022-05-11 14:44:32 +02:00
2023-08-31 17:33:38 +02:00
func TestSyncAPIEventFormatPowerLevels ( t * testing . T ) {
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
testSyncEventFormatPowerLevels ( t , dbType )
} )
}
func testSyncEventFormatPowerLevels ( t * testing . T , dbType test . DBType ) {
user := test . NewUser ( t )
setRoomVersion := func ( t * testing . T , r * test . Room ) { r . Version = gomatrixserverlib . RoomVersionPseudoIDs }
room := test . NewRoom ( t , user , setRoomVersion )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
room . CreateAndInsert ( t , user , spec . MRoomPowerLevels , gomatrixserverlib . PowerLevelContent {
Users : map [ string ] int64 {
user . ID : 100 ,
} ,
} , test . WithStateKey ( "" ) )
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
natsInstance := jetstream . NATSInstance { }
defer close ( )
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
msgs := toNATSMsgs ( t , cfg , room . Events ( ) ... )
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { rooms : [ ] * test . Room { room } } , caches , caching . DisableMetrics )
testrig . MustPublishMsgs ( t , jsctx , msgs ... )
testCases := [ ] struct {
name string
wantCode int
wantJoinedRooms [ ] string
eventFormat synctypes . ClientEventFormat
} {
{
name : "Client format" ,
wantCode : 200 ,
wantJoinedRooms : [ ] string { room . ID } ,
eventFormat : synctypes . FormatSync ,
} ,
{
name : "Federation format" ,
wantCode : 200 ,
wantJoinedRooms : [ ] string { room . ID } ,
eventFormat : synctypes . FormatSyncFederation ,
} ,
}
syncUntil ( t , routers , alice . AccessToken , false , func ( syncBody string ) bool {
// wait for the last sent eventID to come down sync
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(event_id=="%s") ` , room . ID , room . Events ( ) [ len ( room . Events ( ) ) - 1 ] . EventID ( ) )
return gjson . Get ( syncBody , path ) . Exists ( )
} )
for _ , tc := range testCases {
format := ""
if tc . eventFormat == synctypes . FormatSyncFederation {
format = "federation"
}
w := httptest . NewRecorder ( )
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
"filter" : fmt . Sprintf ( ` { "event_format":"%s"} ` , format ) ,
} ) ) )
if w . Code != tc . wantCode {
t . Fatalf ( "%s: got HTTP %d want %d" , tc . name , w . Code , tc . wantCode )
}
if tc . wantJoinedRooms != nil {
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Fatalf ( "%s: failed to decode response body: %s" , tc . name , err )
}
if len ( res . Rooms . Join ) != len ( tc . wantJoinedRooms ) {
t . Errorf ( "%s: got %v joined rooms, want %v.\nResponse: %+v" , tc . name , len ( res . Rooms . Join ) , len ( tc . wantJoinedRooms ) , res )
}
t . Logf ( "res: %+v" , res . Rooms . Join [ room . ID ] )
gotEventIDs := make ( [ ] string , len ( res . Rooms . Join [ room . ID ] . Timeline . Events ) )
for i , ev := range res . Rooms . Join [ room . ID ] . Timeline . Events {
gotEventIDs [ i ] = ev . EventID
}
test . AssertEventIDsEqual ( t , gotEventIDs , room . Events ( ) )
event := room . CreateAndInsert ( t , user , spec . MRoomPowerLevels , gomatrixserverlib . PowerLevelContent {
Users : map [ string ] int64 {
user . ID : 100 ,
"@otheruser:localhost" : 50 ,
} ,
} , test . WithStateKey ( "" ) )
msgs := toNATSMsgs ( t , cfg , event )
testrig . MustPublishMsgs ( t , jsctx , msgs ... )
syncUntil ( t , routers , alice . AccessToken , false , func ( syncBody string ) bool {
// wait for the last sent eventID to come down sync
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(event_id=="%s") ` , room . ID , room . Events ( ) [ len ( room . Events ( ) ) - 1 ] . EventID ( ) )
return gjson . Get ( syncBody , path ) . Exists ( )
} )
since := res . NextBatch . String ( )
w := httptest . NewRecorder ( )
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
"filter" : fmt . Sprintf ( ` { "event_format":"%s"} ` , format ) ,
"since" : since ,
} ) ) )
if w . Code != 200 {
t . Errorf ( "since=%s got HTTP %d want 200" , since , w . Code )
}
res = * types . NewResponse ( )
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
if len ( res . Rooms . Join ) != 1 {
t . Fatalf ( "since=%s got %d joined rooms, want 1" , since , len ( res . Rooms . Join ) )
}
gotEventIDs = make ( [ ] string , len ( res . Rooms . Join [ room . ID ] . Timeline . Events ) )
for j , ev := range res . Rooms . Join [ room . ID ] . Timeline . Events {
gotEventIDs [ j ] = ev . EventID
if ev . Type == spec . MRoomPowerLevels {
content := gomatrixserverlib . PowerLevelContent { }
err := json . Unmarshal ( ev . Content , & content )
if err != nil {
t . Errorf ( "failed to unmarshal power level content: %s" , err )
}
otherUserLevel := content . UserLevel ( "@otheruser:localhost" )
if otherUserLevel != 50 {
t . Errorf ( "Expected user PL of %d but got %d" , 50 , otherUserLevel )
}
}
}
events := [ ] * rstypes . HeaderedEvent { room . Events ( ) [ len ( room . Events ( ) ) - 1 ] }
test . AssertEventIDsEqual ( t , gotEventIDs , events )
}
}
}
2022-05-11 14:44:32 +02:00
// Tests what happens when we create a room and then /sync before all events from /createRoom have
// been sent to the syncapi
func TestSyncAPICreateRoomSyncEarly ( t * testing . T ) {
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
testSyncAPICreateRoomSyncEarly ( t , dbType )
} )
}
func testSyncAPICreateRoomSyncEarly ( t * testing . T , dbType test . DBType ) {
2022-08-25 14:42:47 +02:00
t . Skip ( "Skipped, possibly fixed" )
2022-05-17 14:23:35 +02:00
user := test . NewUser ( t )
2022-05-11 14:44:32 +02:00
room := test . NewRoom ( t , user )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
2022-05-11 14:44:32 +02:00
defer close ( )
2023-03-22 09:21:32 +01:00
natsInstance := jetstream . NATSInstance { }
2022-05-11 14:44:32 +02:00
2023-03-22 09:21:32 +01:00
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
2022-05-11 14:44:32 +02:00
// order is:
// m.room.create
// m.room.member
// m.room.power_levels
// m.room.join_rules
// m.room.history_visibility
2023-03-22 09:21:32 +01:00
msgs := toNATSMsgs ( t , cfg , room . Events ( ) ... )
2022-05-11 14:44:32 +02:00
sinceTokens := make ( [ ] string , len ( msgs ) )
2023-03-22 09:21:32 +01:00
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { rooms : [ ] * test . Room { room } } , caches , caching . DisableMetrics )
2022-05-11 14:44:32 +02:00
for i , msg := range msgs {
2022-05-17 14:23:35 +02:00
testrig . MustPublishMsgs ( t , jsctx , msg )
2022-05-12 11:11:46 +02:00
time . Sleep ( 100 * time . Millisecond )
2022-05-11 14:44:32 +02:00
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
2022-05-11 14:44:32 +02:00
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
} ) ) )
if w . Code != 200 {
t . Errorf ( "got HTTP %d want 200" , w . Code )
continue
}
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
sinceTokens [ i ] = res . NextBatch . String ( )
if i == 0 { // create event does not produce a room section
if len ( res . Rooms . Join ) != 0 {
t . Fatalf ( "i=%v got %d joined rooms, want 0" , i , len ( res . Rooms . Join ) )
}
} else { // we should have that room somewhere
if len ( res . Rooms . Join ) != 1 {
t . Fatalf ( "i=%v got %d joined rooms, want 1" , i , len ( res . Rooms . Join ) )
}
}
}
// sync with no token "" and with the penultimate token and this should neatly return room events in the timeline block
sinceTokens = append ( [ ] string { "" } , sinceTokens [ : len ( sinceTokens ) - 1 ] ... )
t . Logf ( "waited for events to be consumed; syncing with %v" , sinceTokens )
for i , since := range sinceTokens {
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
2022-05-11 14:44:32 +02:00
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
"since" : since ,
} ) ) )
if w . Code != 200 {
t . Errorf ( "since=%s got HTTP %d want 200" , since , w . Code )
}
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
if len ( res . Rooms . Join ) != 1 {
t . Fatalf ( "since=%s got %d joined rooms, want 1" , since , len ( res . Rooms . Join ) )
}
t . Logf ( "since=%s res state:%+v res timeline:%+v" , since , res . Rooms . Join [ room . ID ] . State . Events , res . Rooms . Join [ room . ID ] . Timeline . Events )
gotEventIDs := make ( [ ] string , len ( res . Rooms . Join [ room . ID ] . Timeline . Events ) )
for j , ev := range res . Rooms . Join [ room . ID ] . Timeline . Events {
gotEventIDs [ j ] = ev . EventID
}
test . AssertEventIDsEqual ( t , gotEventIDs , room . Events ( ) [ i : ] )
}
}
2022-05-17 16:53:08 +02:00
// Test that if we hit /sync we get back presence: online, regardless of whether messages get delivered
// via NATS. Regression test for a flakey test "User sees their own presence in a sync"
func TestSyncAPIUpdatePresenceImmediately ( t * testing . T ) {
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
testSyncAPIUpdatePresenceImmediately ( t , dbType )
} )
}
func testSyncAPIUpdatePresenceImmediately ( t * testing . T , dbType test . DBType ) {
user := test . NewUser ( t )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
cfg . Global . Presence . EnableOutbound = true
cfg . Global . Presence . EnableInbound = true
2022-05-17 16:53:08 +02:00
defer close ( )
2023-03-22 09:21:32 +01:00
natsInstance := jetstream . NATSInstance { }
2022-05-17 16:53:08 +02:00
2023-03-22 09:21:32 +01:00
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { } , caches , caching . DisableMetrics )
2022-05-17 16:53:08 +02:00
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
2022-05-17 16:53:08 +02:00
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
"set_presence" : "online" ,
} ) ) )
if w . Code != 200 {
t . Fatalf ( "got HTTP %d want %d" , w . Code , 200 )
}
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
if len ( res . Presence . Events ) != 1 {
t . Fatalf ( "expected 1 presence events, got: %+v" , res . Presence . Events )
}
if res . Presence . Events [ 0 ] . Sender != alice . UserID {
t . Errorf ( "sender: got %v want %v" , res . Presence . Events [ 0 ] . Sender , alice . UserID )
}
if res . Presence . Events [ 0 ] . Type != "m.presence" {
t . Errorf ( "type: got %v want %v" , res . Presence . Events [ 0 ] . Type , "m.presence" )
}
if gjson . ParseBytes ( res . Presence . Events [ 0 ] . Content ) . Get ( "presence" ) . Str != "online" {
t . Errorf ( "content: not online, got %v" , res . Presence . Events [ 0 ] . Content )
}
}
2022-08-11 18:23:35 +02:00
// This is mainly what Sytest is doing in "test_history_visibility"
func TestMessageHistoryVisibility ( t * testing . T ) {
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
testHistoryVisibility ( t , dbType )
} )
}
func testHistoryVisibility ( t * testing . T , dbType test . DBType ) {
type result struct {
seeWithoutJoin bool
seeBeforeJoin bool
seeAfterInvite bool
}
// create the users
alice := test . NewUser ( t )
2022-08-19 11:03:55 +02:00
aliceDev := userapi . Device {
ID : "ALICEID" ,
UserID : alice . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "ALICE" ,
}
2022-08-11 18:23:35 +02:00
bob := test . NewUser ( t )
bobDev := userapi . Device {
ID : "BOBID" ,
UserID : bob . ID ,
AccessToken : "BOD_BEARER_TOKEN" ,
DisplayName : "BOB" ,
}
ctx := context . Background ( )
// check guest and normal user accounts
for _ , accType := range [ ] userapi . AccountType { userapi . AccountTypeGuest , userapi . AccountTypeUser } {
testCases := [ ] struct {
historyVisibility gomatrixserverlib . HistoryVisibility
wantResult result
} {
{
historyVisibility : gomatrixserverlib . HistoryVisibilityWorldReadable ,
wantResult : result {
seeWithoutJoin : true ,
seeBeforeJoin : true ,
seeAfterInvite : true ,
} ,
} ,
{
historyVisibility : gomatrixserverlib . HistoryVisibilityShared ,
wantResult : result {
seeWithoutJoin : false ,
seeBeforeJoin : true ,
seeAfterInvite : true ,
} ,
} ,
{
historyVisibility : gomatrixserverlib . HistoryVisibilityInvited ,
wantResult : result {
seeWithoutJoin : false ,
seeBeforeJoin : false ,
seeAfterInvite : true ,
} ,
} ,
{
historyVisibility : gomatrixserverlib . HistoryVisibilityJoined ,
wantResult : result {
seeWithoutJoin : false ,
seeBeforeJoin : false ,
seeAfterInvite : false ,
} ,
} ,
}
bobDev . AccountType = accType
userType := "guest"
if accType == userapi . AccountTypeUser {
userType = "real user"
}
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
2023-07-13 14:18:37 +02:00
cfg . ClientAPI . RateLimiting = config . RateLimiting { Enabled : false }
2023-03-22 09:21:32 +01:00
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
2022-08-11 18:23:35 +02:00
defer close ( )
2023-03-22 09:21:32 +01:00
natsInstance := jetstream . NATSInstance { }
2022-08-11 18:23:35 +02:00
2023-03-22 09:21:32 +01:00
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
2022-08-11 18:23:35 +02:00
// Use the actual internal roomserver API
2023-03-22 09:21:32 +01:00
rsAPI := roomserver . NewInternalAPI ( processCtx , cfg , cm , & natsInstance , caches , caching . DisableMetrics )
2022-08-11 18:23:35 +02:00
rsAPI . SetFederationAPI ( nil , nil )
2023-03-22 09:21:32 +01:00
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { aliceDev , bobDev } } , rsAPI , caches , caching . DisableMetrics )
2022-08-11 18:23:35 +02:00
for _ , tc := range testCases {
testname := fmt . Sprintf ( "%s - %s" , tc . historyVisibility , userType )
t . Run ( testname , func ( t * testing . T ) {
// create a room with the given visibility
room := test . NewRoom ( t , alice , test . RoomHistoryVisibility ( tc . historyVisibility ) )
// send the events/messages to NATS to create the rooms
2022-08-19 11:03:55 +02:00
beforeJoinBody := fmt . Sprintf ( "Before invite in a %s room" , tc . historyVisibility )
beforeJoinEv := room . CreateAndInsert ( t , alice , "m.room.message" , map [ string ] interface { } { "body" : beforeJoinBody } )
2022-08-11 18:23:35 +02:00
eventsToSend := append ( room . Events ( ) , beforeJoinEv )
2022-11-15 16:05:23 +01:00
if err := api . SendEvents ( ctx , rsAPI , api . KindNew , eventsToSend , "test" , "test" , "test" , nil , false ) ; err != nil {
2022-08-11 18:23:35 +02:00
t . Fatalf ( "failed to send events: %v" , err )
}
2023-03-22 09:21:32 +01:00
syncUntil ( t , routers , aliceDev . AccessToken , false ,
2022-08-19 11:03:55 +02:00
func ( syncBody string ) bool {
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(content.body=="%s") ` , room . ID , beforeJoinBody )
return gjson . Get ( syncBody , path ) . Exists ( )
} ,
)
2022-08-11 18:23:35 +02:00
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/messages" , room . ID ) , test . WithQueryParams ( map [ string ] string {
2022-08-11 18:23:35 +02:00
"access_token" : bobDev . AccessToken ,
"dir" : "b" ,
2023-02-07 14:31:23 +01:00
"filter" : ` { "lazy_load_members":true} ` , // check that lazy loading doesn't break history visibility
2022-08-11 18:23:35 +02:00
} ) ) )
if w . Code != 200 {
t . Logf ( "%s" , w . Body . String ( ) )
t . Fatalf ( "got HTTP %d want %d" , w . Code , 200 )
}
// We only care about the returned events at this point
var res struct {
2023-04-04 19:16:53 +02:00
Chunk [ ] synctypes . ClientEvent ` json:"chunk" `
2022-08-11 18:23:35 +02:00
}
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
verifyEventVisible ( t , tc . wantResult . seeWithoutJoin , beforeJoinEv , res . Chunk )
// Create invite, a message, join the room and create another message.
inviteEv := room . CreateAndInsert ( t , alice , "m.room.member" , map [ string ] interface { } { "membership" : "invite" } , test . WithStateKey ( bob . ID ) )
afterInviteEv := room . CreateAndInsert ( t , alice , "m.room.message" , map [ string ] interface { } { "body" : fmt . Sprintf ( "After invite in a %s room" , tc . historyVisibility ) } )
joinEv := room . CreateAndInsert ( t , bob , "m.room.member" , map [ string ] interface { } { "membership" : "join" } , test . WithStateKey ( bob . ID ) )
2022-08-19 11:03:55 +02:00
afterJoinBody := fmt . Sprintf ( "After join in a %s room" , tc . historyVisibility )
msgEv := room . CreateAndInsert ( t , alice , "m.room.message" , map [ string ] interface { } { "body" : afterJoinBody } )
2022-08-11 18:23:35 +02:00
2023-04-27 13:54:20 +02:00
eventsToSend = append ( [ ] * rstypes . HeaderedEvent { } , inviteEv , afterInviteEv , joinEv , msgEv )
2022-08-11 18:23:35 +02:00
2022-11-15 16:05:23 +01:00
if err := api . SendEvents ( ctx , rsAPI , api . KindNew , eventsToSend , "test" , "test" , "test" , nil , false ) ; err != nil {
2022-08-11 18:23:35 +02:00
t . Fatalf ( "failed to send events: %v" , err )
}
2023-03-22 09:21:32 +01:00
syncUntil ( t , routers , aliceDev . AccessToken , false ,
2022-08-19 11:03:55 +02:00
func ( syncBody string ) bool {
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(content.body=="%s") ` , room . ID , afterJoinBody )
return gjson . Get ( syncBody , path ) . Exists ( )
} ,
)
2022-08-11 18:23:35 +02:00
// Verify the messages after/before invite are visible or not
w = httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/messages" , room . ID ) , test . WithQueryParams ( map [ string ] string {
2022-08-11 18:23:35 +02:00
"access_token" : bobDev . AccessToken ,
"dir" : "b" ,
} ) ) )
if w . Code != 200 {
t . Logf ( "%s" , w . Body . String ( ) )
t . Fatalf ( "got HTTP %d want %d" , w . Code , 200 )
}
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
// verify results
verifyEventVisible ( t , tc . wantResult . seeBeforeJoin , beforeJoinEv , res . Chunk )
verifyEventVisible ( t , tc . wantResult . seeAfterInvite , afterInviteEv , res . Chunk )
} )
}
}
}
2023-04-27 13:54:20 +02:00
func verifyEventVisible ( t * testing . T , wantVisible bool , wantVisibleEvent * rstypes . HeaderedEvent , chunk [ ] synctypes . ClientEvent ) {
2022-08-11 18:23:35 +02:00
t . Helper ( )
if wantVisible {
for _ , ev := range chunk {
if ev . EventID == wantVisibleEvent . EventID ( ) {
return
}
}
t . Fatalf ( "expected to see event %s but didn't: %+v" , wantVisibleEvent . EventID ( ) , chunk )
} else {
for _ , ev := range chunk {
if ev . EventID == wantVisibleEvent . EventID ( ) {
t . Fatalf ( "expected not to see event %s: %+v" , wantVisibleEvent . EventID ( ) , string ( ev . Content ) )
}
}
}
}
2023-01-17 10:08:23 +01:00
func TestGetMembership ( t * testing . T ) {
alice := test . NewUser ( t )
aliceDev := userapi . Device {
ID : "ALICEID" ,
UserID : alice . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
bob := test . NewUser ( t )
bobDev := userapi . Device {
ID : "BOBID" ,
UserID : bob . ID ,
AccessToken : "notjoinedtoanyrooms" ,
}
testCases := [ ] struct {
name string
roomID string
additionalEvents func ( t * testing . T , room * test . Room )
request func ( t * testing . T , room * test . Room ) * http . Request
wantOK bool
wantMemberCount int
useSleep bool // :/
} {
{
name : "/members - Alice joined" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
} ) )
} ,
wantOK : true ,
wantMemberCount : 1 ,
} ,
{
name : "/members - Bob never joined" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : bobDev . AccessToken ,
} ) )
} ,
wantOK : false ,
} ,
{
name : "Alice leaves before Bob joins, should not be able to see Bob" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
} ) )
} ,
additionalEvents : func ( t * testing . T , room * test . Room ) {
2023-04-19 16:50:33 +02:00
room . CreateAndInsert ( t , alice , spec . MRoomMember , map [ string ] interface { } {
2023-01-17 10:08:23 +01:00
"membership" : "leave" ,
} , test . WithStateKey ( alice . ID ) )
2023-04-19 16:50:33 +02:00
room . CreateAndInsert ( t , bob , spec . MRoomMember , map [ string ] interface { } {
2023-01-17 10:08:23 +01:00
"membership" : "join" ,
} , test . WithStateKey ( bob . ID ) )
} ,
useSleep : true ,
wantOK : true ,
wantMemberCount : 1 ,
} ,
{
name : "Alice leaves after Bob joins, should be able to see Bob" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
} ) )
} ,
additionalEvents : func ( t * testing . T , room * test . Room ) {
2023-04-19 16:50:33 +02:00
room . CreateAndInsert ( t , bob , spec . MRoomMember , map [ string ] interface { } {
2023-01-17 10:08:23 +01:00
"membership" : "join" ,
} , test . WithStateKey ( bob . ID ) )
2023-04-19 16:50:33 +02:00
room . CreateAndInsert ( t , alice , spec . MRoomMember , map [ string ] interface { } {
2023-01-17 10:08:23 +01:00
"membership" : "leave" ,
} , test . WithStateKey ( alice . ID ) )
} ,
useSleep : true ,
wantOK : true ,
wantMemberCount : 2 ,
} ,
{
name : "'at' specified, returns memberships before Bob joins" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
"at" : "t2_5" ,
} ) )
} ,
additionalEvents : func ( t * testing . T , room * test . Room ) {
2023-04-19 16:50:33 +02:00
room . CreateAndInsert ( t , bob , spec . MRoomMember , map [ string ] interface { } {
2023-01-17 10:08:23 +01:00
"membership" : "join" ,
} , test . WithStateKey ( bob . ID ) )
} ,
useSleep : true ,
wantOK : true ,
wantMemberCount : 1 ,
} ,
{
name : "'membership=leave' specified, returns no memberships" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
"membership" : "leave" ,
} ) )
} ,
wantOK : true ,
wantMemberCount : 0 ,
} ,
{
name : "'not_membership=join' specified, returns no memberships" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
"not_membership" : "join" ,
} ) )
} ,
wantOK : true ,
wantMemberCount : 0 ,
} ,
{
name : "'not_membership=leave' & 'membership=join' specified, returns correct memberships" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
"not_membership" : "leave" ,
"membership" : "join" ,
} ) )
} ,
additionalEvents : func ( t * testing . T , room * test . Room ) {
2023-04-19 16:50:33 +02:00
room . CreateAndInsert ( t , bob , spec . MRoomMember , map [ string ] interface { } {
2023-01-17 10:08:23 +01:00
"membership" : "join" ,
} , test . WithStateKey ( bob . ID ) )
2023-04-19 16:50:33 +02:00
room . CreateAndInsert ( t , bob , spec . MRoomMember , map [ string ] interface { } {
2023-01-17 10:08:23 +01:00
"membership" : "leave" ,
} , test . WithStateKey ( bob . ID ) )
} ,
wantOK : true ,
wantMemberCount : 1 ,
} ,
{
name : "non-existent room ID" ,
request : func ( t * testing . T , room * test . Room ) * http . Request {
return test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/members" , "!notavalidroom:test" ) , test . WithQueryParams ( map [ string ] string {
"access_token" : aliceDev . AccessToken ,
} ) )
} ,
wantOK : false ,
} ,
}
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
2023-01-17 10:08:23 +01:00
defer close ( )
2023-03-22 09:21:32 +01:00
natsInstance := jetstream . NATSInstance { }
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
2023-01-17 10:08:23 +01:00
// Use an actual roomserver for this
2023-03-22 09:21:32 +01:00
rsAPI := roomserver . NewInternalAPI ( processCtx , cfg , cm , & natsInstance , caches , caching . DisableMetrics )
2023-01-17 10:08:23 +01:00
rsAPI . SetFederationAPI ( nil , nil )
2023-03-22 09:21:32 +01:00
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { aliceDev , bobDev } } , rsAPI , caches , caching . DisableMetrics )
2023-01-17 10:08:23 +01:00
for _ , tc := range testCases {
t . Run ( tc . name , func ( t * testing . T ) {
room := test . NewRoom ( t , alice )
t . Cleanup ( func ( ) {
t . Logf ( "running cleanup for %s" , tc . name )
} )
// inject additional events
if tc . additionalEvents != nil {
tc . additionalEvents ( t , room )
}
if err := api . SendEvents ( context . Background ( ) , rsAPI , api . KindNew , room . Events ( ) , "test" , "test" , "test" , nil , false ) ; err != nil {
t . Fatalf ( "failed to send events: %v" , err )
}
// wait for the events to come down sync
if tc . useSleep {
time . Sleep ( time . Millisecond * 100 )
} else {
2023-03-22 09:21:32 +01:00
syncUntil ( t , routers , aliceDev . AccessToken , false , func ( syncBody string ) bool {
2023-01-17 10:08:23 +01:00
// wait for the last sent eventID to come down sync
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(event_id=="%s") ` , room . ID , room . Events ( ) [ len ( room . Events ( ) ) - 1 ] . EventID ( ) )
return gjson . Get ( syncBody , path ) . Exists ( )
} )
}
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , tc . request ( t , room ) )
2023-01-17 10:08:23 +01:00
if w . Code != 200 && tc . wantOK {
t . Logf ( "%s" , w . Body . String ( ) )
t . Fatalf ( "got HTTP %d want %d" , w . Code , 200 )
}
t . Logf ( "[%s] Resp: %s" , tc . name , w . Body . String ( ) )
// check we got the expected events
if tc . wantOK {
memberCount := len ( gjson . GetBytes ( w . Body . Bytes ( ) , "chunk" ) . Array ( ) )
if memberCount != tc . wantMemberCount {
t . Fatalf ( "expected %d members, got %d" , tc . wantMemberCount , memberCount )
}
}
} )
}
} )
}
2022-08-02 17:00:16 +02:00
func TestSendToDevice ( t * testing . T ) {
test . WithAllDatabases ( t , testSendToDevice )
}
func testSendToDevice ( t * testing . T , dbType test . DBType ) {
user := test . NewUser ( t )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
defer close ( )
natsInstance := jetstream . NATSInstance { }
2022-08-02 17:00:16 +02:00
2023-03-22 09:21:32 +01:00
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { } , caches , caching . DisableMetrics )
2022-08-02 17:00:16 +02:00
producer := producers . SyncAPIProducer {
2023-03-22 09:21:32 +01:00
TopicSendToDeviceEvent : cfg . Global . JetStream . Prefixed ( jetstream . OutputSendToDeviceEvent ) ,
2022-08-02 17:00:16 +02:00
JetStream : jsctx ,
}
msgCounter := 0
testCases := [ ] struct {
name string
since string
want [ ] string
sendMessagesCount int
} {
{
name : "initial sync, no messages" ,
want : [ ] string { } ,
} ,
{
name : "initial sync, one new message" ,
sendMessagesCount : 1 ,
want : [ ] string {
"message 1" ,
} ,
} ,
{
name : "initial sync, two new messages" , // we didn't advance the since token, so we'll receive two messages
sendMessagesCount : 1 ,
want : [ ] string {
"message 1" ,
"message 2" ,
} ,
} ,
{
name : "incremental sync, one message" , // this deletes message 1, as we advanced the since token
since : types . StreamingToken { SendToDevicePosition : 1 } . String ( ) ,
want : [ ] string {
"message 2" ,
} ,
} ,
{
name : "failed incremental sync, one message" , // didn't advance since, so still the same message
since : types . StreamingToken { SendToDevicePosition : 1 } . String ( ) ,
want : [ ] string {
"message 2" ,
} ,
} ,
{
name : "incremental sync, no message" , // this should delete message 2
since : types . StreamingToken { SendToDevicePosition : 2 } . String ( ) , // next_batch from previous sync
want : [ ] string { } ,
} ,
{
name : "incremental sync, three new messages" ,
since : types . StreamingToken { SendToDevicePosition : 2 } . String ( ) ,
sendMessagesCount : 3 ,
want : [ ] string {
"message 3" , // message 2 was deleted in the previous test
"message 4" ,
"message 5" ,
} ,
} ,
{
name : "initial sync, three messages" , // we expect three messages, as we didn't go beyond "2"
want : [ ] string {
"message 3" ,
"message 4" ,
"message 5" ,
} ,
} ,
{
name : "incremental sync, no messages" , // advance the sync token, no new messages
since : types . StreamingToken { SendToDevicePosition : 5 } . String ( ) ,
want : [ ] string { } ,
} ,
}
ctx := context . Background ( )
for _ , tc := range testCases {
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
for i := 0 ; i < tc . sendMessagesCount ; i ++ {
msgCounter ++
2022-09-13 09:35:45 +02:00
msg := json . RawMessage ( fmt . Sprintf ( ` { "dummy":"message %d"} ` , msgCounter ) )
2022-08-02 17:00:16 +02:00
if err := producer . SendToDevice ( ctx , user . ID , user . ID , alice . ID , "m.dendrite.test" , msg ) ; err != nil {
t . Fatalf ( "unable to send to device message: %v" , err )
}
}
2022-08-19 11:03:55 +02:00
2023-03-22 09:21:32 +01:00
syncUntil ( t , routers , alice . AccessToken ,
2022-08-19 11:03:55 +02:00
len ( tc . want ) == 0 ,
func ( body string ) bool {
return gjson . Get ( body , fmt . Sprintf ( ` to_device.events.#(content.dummy=="message %d") ` , msgCounter ) ) . Exists ( )
} ,
)
2022-08-02 17:00:16 +02:00
// Execute a /sync request, recording the response
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
2022-08-02 17:00:16 +02:00
"access_token" : alice . AccessToken ,
"since" : tc . since ,
} ) ) )
// Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
events := gjson . Get ( w . Body . String ( ) , "to_device.events.#.content.dummy" ) . Array ( )
got := make ( [ ] string , len ( events ) )
for i := range events {
got [ i ] = events [ i ] . String ( )
}
// Ensure the messages we received are as we expect them to be
if ! reflect . DeepEqual ( got , tc . want ) {
t . Logf ( "[%s|since=%s]: Sync: %s" , tc . name , tc . since , w . Body . String ( ) )
t . Fatalf ( "[%s|since=%s]: got: %+v, want: %+v" , tc . name , tc . since , got , tc . want )
}
}
}
2023-02-07 14:31:23 +01:00
func TestContext ( t * testing . T ) {
test . WithAllDatabases ( t , testContext )
}
func testContext ( t * testing . T , dbType test . DBType ) {
tests := [ ] struct {
name string
roomID string
eventID string
params map [ string ] string
wantError bool
wantStateLength int
wantBeforeLength int
wantAfterLength int
} {
{
name : "invalid filter" ,
params : map [ string ] string {
"filter" : "{" ,
} ,
wantError : true ,
} ,
{
name : "invalid limit" ,
params : map [ string ] string {
"limit" : "abc" ,
} ,
wantError : true ,
} ,
{
name : "high limit" ,
params : map [ string ] string {
"limit" : "100000" ,
} ,
} ,
{
name : "fine limit" ,
params : map [ string ] string {
"limit" : "10" ,
} ,
} ,
{
name : "last event without lazy loading" ,
wantStateLength : 5 ,
} ,
{
name : "last event with lazy loading" ,
params : map [ string ] string {
"filter" : ` { "lazy_load_members":true} ` ,
} ,
wantStateLength : 1 ,
} ,
{
name : "invalid room" ,
roomID : "!doesnotexist" ,
wantError : true ,
} ,
{
name : "invalid eventID" ,
eventID : "$doesnotexist" ,
wantError : true ,
} ,
{
name : "state is limited" ,
params : map [ string ] string {
"limit" : "1" ,
} ,
wantStateLength : 1 ,
} ,
{
name : "events are not limited" ,
2024-02-20 08:38:51 +01:00
wantBeforeLength : 5 ,
2023-02-07 14:31:23 +01:00
} ,
{
name : "all events are limited" ,
params : map [ string ] string {
"limit" : "1" ,
} ,
wantStateLength : 1 ,
wantBeforeLength : 1 ,
wantAfterLength : 1 ,
} ,
}
user := test . NewUser ( t )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
routers := httputil . NewRouters ( )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
defer close ( )
2023-02-07 14:31:23 +01:00
// Use an actual roomserver for this
2023-03-22 09:21:32 +01:00
natsInstance := jetstream . NATSInstance { }
rsAPI := roomserver . NewInternalAPI ( processCtx , cfg , cm , & natsInstance , caches , caching . DisableMetrics )
2023-02-07 14:31:23 +01:00
rsAPI . SetFederationAPI ( nil , nil )
2023-03-22 09:21:32 +01:00
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , rsAPI , caches , caching . DisableMetrics )
2023-02-07 14:31:23 +01:00
room := test . NewRoom ( t , user )
room . CreateAndInsert ( t , user , "m.room.message" , map [ string ] interface { } { "body" : "hello world 1!" } )
room . CreateAndInsert ( t , user , "m.room.message" , map [ string ] interface { } { "body" : "hello world 2!" } )
thirdMsg := room . CreateAndInsert ( t , user , "m.room.message" , map [ string ] interface { } { "body" : "hello world3!" } )
room . CreateAndInsert ( t , user , "m.room.message" , map [ string ] interface { } { "body" : "hello world4!" } )
if err := api . SendEvents ( context . Background ( ) , rsAPI , api . KindNew , room . Events ( ) , "test" , "test" , "test" , nil , false ) ; err != nil {
t . Fatalf ( "failed to send events: %v" , err )
}
2023-03-22 09:21:32 +01:00
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
2023-02-07 14:31:23 +01:00
2023-03-22 09:21:32 +01:00
syncUntil ( t , routers , alice . AccessToken , false , func ( syncBody string ) bool {
2023-02-07 14:31:23 +01:00
// wait for the last sent eventID to come down sync
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(event_id=="%s") ` , room . ID , thirdMsg . EventID ( ) )
return gjson . Get ( syncBody , path ) . Exists ( )
} )
for _ , tc := range tests {
t . Run ( tc . name , func ( t * testing . T ) {
params := map [ string ] string {
"access_token" : alice . AccessToken ,
}
w := httptest . NewRecorder ( )
// test overrides
roomID := room . ID
if tc . roomID != "" {
roomID = tc . roomID
}
eventID := thirdMsg . EventID ( )
if tc . eventID != "" {
eventID = tc . eventID
}
requestPath := fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/context/%s" , roomID , eventID )
if tc . params != nil {
for k , v := range tc . params {
params [ k ] = v
}
}
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , requestPath , test . WithQueryParams ( params ) ) )
2023-02-07 14:31:23 +01:00
if tc . wantError && w . Code == 200 {
t . Fatalf ( "Expected an error, but got none" )
}
t . Log ( w . Body . String ( ) )
resp := routing . ContextRespsonse { }
if err := json . Unmarshal ( w . Body . Bytes ( ) , & resp ) ; err != nil {
t . Fatal ( err )
}
if tc . wantStateLength > 0 && tc . wantStateLength != len ( resp . State ) {
t . Fatalf ( "expected %d state events, got %d" , tc . wantStateLength , len ( resp . State ) )
}
if tc . wantBeforeLength > 0 && tc . wantBeforeLength != len ( resp . EventsBefore ) {
t . Fatalf ( "expected %d before events, got %d" , tc . wantBeforeLength , len ( resp . EventsBefore ) )
}
if tc . wantAfterLength > 0 && tc . wantAfterLength != len ( resp . EventsAfter ) {
t . Fatalf ( "expected %d after events, got %d" , tc . wantAfterLength , len ( resp . EventsAfter ) )
}
if ! tc . wantError && resp . Event . EventID != eventID {
t . Fatalf ( "unexpected eventID %s, expected %s" , resp . Event . EventID , eventID )
}
} )
}
}
2023-03-06 12:43:59 +01:00
func TestUpdateRelations ( t * testing . T ) {
testCases := [ ] struct {
name string
eventContent map [ string ] interface { }
eventType string
} {
{
name : "empty event content should not error" ,
} ,
{
name : "unable to unmarshal event should not error" ,
eventContent : map [ string ] interface { } {
"m.relates_to" : map [ string ] interface { } {
"event_id" : map [ string ] interface { } { } , // this should be a string and not struct
} ,
} ,
} ,
{
name : "empty event ID is ignored" ,
eventContent : map [ string ] interface { } {
"m.relates_to" : map [ string ] interface { } {
"event_id" : "" ,
} ,
} ,
} ,
{
name : "empty rel_type is ignored" ,
eventContent : map [ string ] interface { } {
"m.relates_to" : map [ string ] interface { } {
"event_id" : "$randomEventID" ,
"rel_type" : "" ,
} ,
} ,
} ,
{
name : "redactions are ignored" ,
2023-04-19 16:50:33 +02:00
eventType : spec . MRoomRedaction ,
2023-03-06 12:43:59 +01:00
eventContent : map [ string ] interface { } {
"m.relates_to" : map [ string ] interface { } {
"event_id" : "$randomEventID" ,
"rel_type" : "m.replace" ,
} ,
} ,
} ,
{
name : "valid event is correctly written" ,
eventContent : map [ string ] interface { } {
"m.relates_to" : map [ string ] interface { } {
"event_id" : "$randomEventID" ,
"rel_type" : "m.replace" ,
} ,
} ,
} ,
}
ctx := context . Background ( )
alice := test . NewUser ( t )
room := test . NewRoom ( t , alice )
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
2023-03-22 09:21:32 +01:00
cfg , processCtx , close := testrig . CreateConfig ( t , dbType )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
t . Cleanup ( close )
db , err := storage . NewSyncServerDatasource ( processCtx . Context ( ) , cm , & cfg . SyncAPI . Database )
2023-03-06 12:43:59 +01:00
if err != nil {
t . Fatal ( err )
}
for _ , tc := range testCases {
t . Run ( tc . name , func ( t * testing . T ) {
evType := "m.room.message"
if tc . eventType != "" {
evType = tc . eventType
}
ev := room . CreateEvent ( t , alice , evType , tc . eventContent )
err = db . UpdateRelations ( ctx , ev )
if err != nil {
t . Fatal ( err )
}
} )
}
} )
}
2024-07-27 22:30:17 +02:00
func TestRemoveEditedEventFromSearchIndex ( t * testing . T ) {
user := test . NewUser ( t )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
routers := httputil . NewRouters ( )
cfg , processCtx , close := testrig . CreateConfig ( t , test . DBTypeSQLite )
cm := sqlutil . NewConnectionManager ( processCtx , cfg . Global . DatabaseOptions )
caches := caching . NewRistrettoCache ( 128 * 1024 * 1024 , time . Hour , caching . DisableMetrics )
defer close ( )
// Use an actual roomserver for this
natsInstance := jetstream . NATSInstance { }
jsctx , _ := natsInstance . Prepare ( processCtx , & cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & cfg . Global . JetStream )
rsAPI := roomserver . NewInternalAPI ( processCtx , cfg , cm , & natsInstance , caches , caching . DisableMetrics )
rsAPI . SetFederationAPI ( nil , nil )
room := test . NewRoom ( t , user )
AddPublicRoutes ( processCtx , routers , cfg , cm , & natsInstance , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { rooms : [ ] * test . Room { room } } , caches , caching . DisableMetrics )
if err := api . SendEvents ( processCtx . Context ( ) , rsAPI , api . KindNew , room . Events ( ) , "test" , "test" , "test" , nil , false ) ; err != nil {
t . Fatalf ( "failed to send events: %v" , err )
}
ev1 := room . CreateAndInsert ( t , user , "m.room.message" , map [ string ] interface { } { "body" : "first" } )
ev2 := room . CreateAndInsert ( t , user , "m.room.message" , map [ string ] interface { } {
"body" : " * first" ,
"m.new_content" : map [ string ] interface { } {
"body" : "first" ,
"msgtype" : "m.text" ,
} ,
"m.relates_to" : map [ string ] interface { } {
"event_id" : ev1 . EventID ( ) ,
"rel_type" : "m.replace" ,
} ,
} )
events := [ ] * rstypes . HeaderedEvent { ev1 , ev2 }
for _ , e := range events {
roomEvents := append ( [ ] * rstypes . HeaderedEvent { } , e )
if err := api . SendEvents ( processCtx . Context ( ) , rsAPI , api . KindNew , roomEvents , "test" , "test" , "test" , nil , false ) ; err != nil {
t . Fatalf ( "failed to send events: %v" , err )
}
syncUntil ( t , routers , alice . AccessToken , false , func ( syncBody string ) bool {
// wait for the last sent eventID to come down sync
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(event_id=="%s") ` , room . ID , e . EventID ( ) )
return gjson . Get ( syncBody , path ) . Exists ( )
} )
// We search that event is the only one nad is the exact event we sent
searchResult := searchRequest ( t , routers . Client , alice . AccessToken , "first" , [ ] string { room . ID } )
results := gjson . GetBytes ( searchResult , fmt . Sprintf ( ` search_categories.room_events.groups.room_id.%s.results ` , room . ID ) )
assert . True ( t , results . Exists ( ) , "Should be a search response" )
assert . Equal ( t , 1 , len ( results . Array ( ) ) , "Should be exactly one result" )
assert . Equal ( t , e . EventID ( ) , results . Array ( ) [ 0 ] . String ( ) , "Should be only found exact event" )
}
}
func searchRequest ( t * testing . T , router * mux . Router , accessToken , searchTerm string , roomList [ ] string ) [ ] byte {
t . Helper ( )
w := httptest . NewRecorder ( )
rq := test . NewRequest ( t , "POST" , "/_matrix/client/v3/search" , test . WithQueryParams ( map [ string ] string {
"access_token" : accessToken ,
} ) , test . WithJSONBody ( t , map [ string ] interface { } {
"search_categories" : map [ string ] interface { } {
"room_events" : map [ string ] interface { } {
"filters" : roomList ,
"search_term" : searchTerm ,
} ,
} ,
} ) )
router . ServeHTTP ( w , rq )
assert . Equal ( t , 200 , w . Code )
defer w . Result ( ) . Body . Close ( )
body , err := io . ReadAll ( w . Result ( ) . Body )
assert . NoError ( t , err )
return body
}
2022-08-19 11:03:55 +02:00
func syncUntil ( t * testing . T ,
2023-03-22 09:21:32 +01:00
routers httputil . Routers , accessToken string ,
2022-08-19 11:03:55 +02:00
skip bool ,
checkFunc func ( syncBody string ) bool ,
) {
2023-03-22 09:21:32 +01:00
t . Helper ( )
2022-08-19 11:03:55 +02:00
if checkFunc == nil {
t . Fatalf ( "No checkFunc defined" )
}
if skip {
return
}
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
// to the syncAPI when hitting /sync
done := make ( chan bool )
defer close ( done )
go func ( ) {
for {
w := httptest . NewRecorder ( )
2023-03-22 09:21:32 +01:00
routers . Client . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
2022-08-19 11:03:55 +02:00
"access_token" : accessToken ,
"timeout" : "1000" ,
} ) ) )
if checkFunc ( w . Body . String ( ) ) {
done <- true
return
}
}
} ( )
select {
case <- done :
case <- time . After ( time . Second * 5 ) :
t . Fatalf ( "Timed out waiting for messages" )
}
}
2023-04-27 13:54:20 +02:00
func toNATSMsgs ( t * testing . T , cfg * config . Dendrite , input ... * rstypes . HeaderedEvent ) [ ] * nats . Msg {
2022-05-11 14:44:32 +02:00
result := make ( [ ] * nats . Msg , len ( input ) )
for i , ev := range input {
var addsStateIDs [ ] string
if ev . StateKey ( ) != nil {
addsStateIDs = append ( addsStateIDs , ev . EventID ( ) )
}
2023-09-15 16:39:06 +02:00
result [ i ] = testrig . NewOutputEventMsg ( t , cfg , ev . RoomID ( ) . String ( ) , api . OutputEvent {
2022-05-11 14:44:32 +02:00
Type : rsapi . OutputTypeNewRoomEvent ,
NewRoomEvent : & rsapi . OutputNewRoomEvent {
Event : ev ,
AddsStateEventIDs : addsStateIDs ,
2022-08-11 18:23:35 +02:00
HistoryVisibility : ev . Visibility ,
2022-05-11 14:44:32 +02:00
} ,
} )
}
return result
}