From 0835107f5b14ec408810da4f73da94200e19557f Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 12 Aug 2020 11:53:06 +0100 Subject: [PATCH] Deflake currentstateserver integration tests (#1263) --- currentstateserver/currentstateserver_test.go | 47 +++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/currentstateserver/currentstateserver_test.go b/currentstateserver/currentstateserver_test.go index bb5b40ff3..09b91c276 100644 --- a/currentstateserver/currentstateserver_test.go +++ b/currentstateserver/currentstateserver_test.go @@ -29,7 +29,9 @@ import ( "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/currentstateserver/internal" "github.com/matrix-org/dendrite/currentstateserver/inthttp" + "github.com/matrix-org/dendrite/currentstateserver/storage" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -76,7 +78,24 @@ func init() { } } -func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) error { +func waitForOffsetProcessed(t *testing.T, db storage.Database, offset int64) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for { + poffsets, err := db.PartitionOffsets(ctx, kafkaTopic) + if err != nil { + t.Fatalf("failed to PartitionOffsets: %s", err) + } + for _, partition := range poffsets { + if partition.Offset >= offset { + return + } + } + time.Sleep(50 * time.Millisecond) + } +} + +func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) int64 { value, err := json.Marshal(roomserverAPI.OutputEvent{ Type: roomserverAPI.OutputTypeNewRoomEvent, NewRoomEvent: out, @@ -84,7 +103,7 @@ func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *rooms if err != nil { t.Fatalf("failed to marshal output event: %s", err) } - _, _, err = producer.SendMessage(&sarama.ProducerMessage{ + _, offset, err := producer.SendMessage(&sarama.ProducerMessage{ Topic: kafkaTopic, Key: sarama.StringEncoder(out.Event.RoomID()), Value: sarama.ByteEncoder(value), @@ -92,10 +111,10 @@ func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *rooms if err != nil { t.Fatalf("failed to send message: %s", err) } - return nil + return offset } -func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.SyncProducer, func()) { +func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, storage.Database, sarama.SyncProducer, func()) { cfg := &config.Dendrite{} cfg.Defaults() stateDBName := "test_state.db" @@ -117,26 +136,28 @@ func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.Sync if err != nil { t.Fatalf("Failed to create naffka consumer: %s", err) } - return NewInternalAPI(&cfg.CurrentStateServer, naff), naff, func() { + stateAPI := NewInternalAPI(&cfg.CurrentStateServer, naff) + // type-cast to pull out the DB + stateAPIVal := stateAPI.(*internal.CurrentStateInternalAPI) + return stateAPI, stateAPIVal.DB, naff, func() { os.Remove(naffkaDBName) os.Remove(stateDBName) } } func TestQueryCurrentState(t *testing.T) { - currStateAPI, producer, cancel := MustMakeInternalAPI(t) + currStateAPI, db, producer, cancel := MustMakeInternalAPI(t) defer cancel() plTuple := gomatrixserverlib.StateKeyTuple{ EventType: "m.room.power_levels", StateKey: "", } plEvent := testEvents[4] - MustWriteOutputEvent(t, producer, &roomserverAPI.OutputNewRoomEvent{ + offset := MustWriteOutputEvent(t, producer, &roomserverAPI.OutputNewRoomEvent{ Event: plEvent, AddsStateEventIDs: []string{plEvent.EventID()}, }) - // we have no good way to know /when/ the server has consumed the event - time.Sleep(100 * time.Millisecond) + waitForOffsetProcessed(t, db, offset) testCases := []struct { req api.QueryCurrentStateRequest @@ -228,7 +249,7 @@ func mustMakeMembershipEvent(t *testing.T, roomID, userID, membership string) *r // This test makes sure that QuerySharedUsers is returning the correct users for a range of sets. func TestQuerySharedUsers(t *testing.T) { - currStateAPI, producer, cancel := MustMakeInternalAPI(t) + currStateAPI, db, producer, cancel := MustMakeInternalAPI(t) defer cancel() MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@alice:localhost", "join")) MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@bob:localhost", "join")) @@ -240,10 +261,8 @@ func TestQuerySharedUsers(t *testing.T) { MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@bob:localhost", "join")) MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@dave:localhost", "leave")) - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo4:bar", "@alice:localhost", "join")) - - // we don't know when the server has processed the events - time.Sleep(10 * time.Millisecond) + offset := MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo4:bar", "@alice:localhost", "join")) + waitForOffsetProcessed(t, db, offset) testCases := []struct { req api.QuerySharedUsersRequest