0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-11-15 22:31:07 +01:00

Remove notifs about key changes in syncapi (#1496)

The join/leave events themselves will wake up the right people so we
needn't do it twice.
This commit is contained in:
Kegsay 2020-10-08 10:27:10 +01:00 committed by GitHub
parent 8b880be57e
commit 3e12f6e9c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 1 additions and 49 deletions

View file

@ -125,31 +125,3 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
} }
return nil return nil
} }
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys:
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return
}
// TODO: f.e changed, wake up stream
for _, userID := range changed {
log.Infof("OnJoinEvent:Notify %s that %s should have device lists tracked", userID, *ev.StateKey())
}
}
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return
}
// TODO: f.e left, wake up stream
for _, userID := range left {
log.Infof("OnLeaveEvent:Notify %s that %s should no longer track device lists", userID, *ev.StateKey())
}
}

View file

@ -38,7 +38,6 @@ type OutputRoomEventConsumer struct {
rsConsumer *internal.ContinualConsumer rsConsumer *internal.ContinualConsumer
db storage.Database db storage.Database
notifier *sync.Notifier notifier *sync.Notifier
keyChanges *OutputKeyChangeEventConsumer
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -48,7 +47,6 @@ func NewOutputRoomEventConsumer(
n *sync.Notifier, n *sync.Notifier,
store storage.Database, store storage.Database,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
keyChanges *OutputKeyChangeEventConsumer,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
@ -63,7 +61,6 @@ func NewOutputRoomEventConsumer(
db: store, db: store,
notifier: n, notifier: n,
rsAPI: rsAPI, rsAPI: rsAPI,
keyChanges: keyChanges,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -182,26 +179,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifyKeyChanges(&ev)
return nil return nil
} }
func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) {
membership, err := ev.Membership()
if err != nil {
return
}
switch membership {
case gomatrixserverlib.Join:
s.keyChanges.OnJoinEvent(ev)
case gomatrixserverlib.Ban:
fallthrough
case gomatrixserverlib.Leave:
s.keyChanges.OnLeaveEvent(ev)
}
}
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) { func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
if ev.Type() != gomatrixserverlib.MRoomMember { if ev.Type() != gomatrixserverlib.MRoomMember {
return sp, nil return sp, nil

View file

@ -71,7 +71,7 @@ func AddPublicRoutes(
} }
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer, cfg, consumer, notifier, syncDB, rsAPI,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")