mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-14 00:33:53 +01:00
Fix data races reported by go test -race ./... (#748)
This commit is contained in:
parent
45d24d3fb5
commit
e66933b108
2 changed files with 16 additions and 6 deletions
|
@ -185,6 +185,7 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) {
|
||||||
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
|
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
|
||||||
// a stream will be made for this user if one doesn't exist and it will be returned. This
|
// a stream will be made for this user if one doesn't exist and it will be returned. This
|
||||||
// function does not wait for data to be available on the stream.
|
// function does not wait for data to be available on the stream.
|
||||||
|
// NB: Callers should have locked the mutex before calling this function.
|
||||||
func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
|
func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
|
||||||
stream, ok := n.userStreams[userID]
|
stream, ok := n.userStreams[userID]
|
||||||
if !ok && makeIfNotExists {
|
if !ok && makeIfNotExists {
|
||||||
|
|
|
@ -143,7 +143,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := lockedFetchUserStream(n, bob)
|
||||||
waitForBlocking(stream, 1)
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
||||||
|
@ -171,7 +171,7 @@ func TestNewInviteEventForUser(t *testing.T) {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := lockedFetchUserStream(n, bob)
|
||||||
waitForBlocking(stream, 1)
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
|
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
|
||||||
|
@ -199,7 +199,7 @@ func TestEDUWakeup(t *testing.T) {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := lockedFetchUserStream(n, bob)
|
||||||
waitForBlocking(stream, 1)
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
|
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
|
||||||
|
@ -230,7 +230,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
go poll()
|
go poll()
|
||||||
go poll()
|
go poll()
|
||||||
|
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := lockedFetchUserStream(n, bob)
|
||||||
waitForBlocking(stream, 3)
|
waitForBlocking(stream, 3)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
||||||
|
@ -266,14 +266,14 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
}
|
}
|
||||||
leaveWG.Done()
|
leaveWG.Done()
|
||||||
}()
|
}()
|
||||||
bobStream := n.fetchUserStream(bob, true)
|
bobStream := lockedFetchUserStream(n, bob)
|
||||||
waitForBlocking(bobStream, 1)
|
waitForBlocking(bobStream, 1)
|
||||||
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
|
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
|
||||||
leaveWG.Wait()
|
leaveWG.Wait()
|
||||||
|
|
||||||
// send an event into the room. Make sure alice gets it. Bob should not.
|
// send an event into the room. Make sure alice gets it. Bob should not.
|
||||||
var aliceWG sync.WaitGroup
|
var aliceWG sync.WaitGroup
|
||||||
aliceStream := n.fetchUserStream(alice, true)
|
aliceStream := lockedFetchUserStream(n, alice)
|
||||||
aliceWG.Add(1)
|
aliceWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
|
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
|
||||||
|
@ -328,6 +328,15 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
|
||||||
|
// A new stream is made if it doesn't exist already.
|
||||||
|
func lockedFetchUserStream(n *Notifier, userID string) *UserStream {
|
||||||
|
n.streamLock.Lock()
|
||||||
|
defer n.streamLock.Unlock()
|
||||||
|
|
||||||
|
return n.fetchUserStream(userID, true)
|
||||||
|
}
|
||||||
|
|
||||||
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
|
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
|
||||||
return syncRequest{
|
return syncRequest{
|
||||||
device: authtypes.Device{UserID: userID},
|
device: authtypes.Device{UserID: userID},
|
||||||
|
|
Loading…
Reference in a new issue