0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-10-01 07:19:06 +02:00

Tweak typing

This commit is contained in:
Neil Alexander 2020-12-14 16:55:55 +00:00
parent 1bc1347ce4
commit e8d485d881
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 20 additions and 9 deletions

View file

@ -88,7 +88,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err return err
} }
// update stream position // update stream position
s.notifier.OnNewReceipt(types.StreamingToken{ReceiptPosition: streamPos}) s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return nil return nil
} }

View file

@ -64,12 +64,7 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api // Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error { func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent( s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)})
nil, roomID, nil,
types.StreamingToken{
TypingPosition: types.StreamPosition(latestSyncPosition),
},
)
}) })
return s.typingConsumer.Start() return s.typingConsumer.Start()
@ -97,6 +92,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
} }
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.StreamingToken{TypingPosition: typingPos}) s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return nil return nil
} }

View file

@ -162,13 +162,29 @@ func (n *Notifier) OnNewSendToDevice(
} }
// OnNewReceipt updates the current position // OnNewReceipt updates the current position
func (n *Notifier) OnNewReceipt( func (n *Notifier) OnNewTyping(
roomID string,
posUpdate types.StreamingToken, posUpdate types.StreamingToken,
) { ) {
n.streamLock.Lock() n.streamLock.Lock()
defer n.streamLock.Unlock() defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate) latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos n.currPos = latestPos
n.wakeupUsers(n.joinedUsers(roomID), nil, latestPos)
}
// OnNewReceipt updates the current position
func (n *Notifier) OnNewReceipt(
roomID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.wakeupUsers(n.joinedUsers(roomID), nil, latestPos)
} }
func (n *Notifier) OnNewKeyChange( func (n *Notifier) OnNewKeyChange(