Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 18:50:19 +02:00
|
|
|
// Copyright 2017 Vector Creations Ltd
|
|
|
|
// Copyright 2017-2018 New Vector Ltd
|
|
|
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
|
|
|
//
|
2018-07-31 12:52:57 +02:00
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
const defaultTypingTimeout = 10 * time.Second
|
|
|
|
|
|
|
|
// userSet is a map of user IDs to a timer, timer fires at expiry.
|
|
|
|
type userSet map[string]*time.Timer
|
|
|
|
|
2019-07-12 16:59:53 +02:00
|
|
|
// TimeoutCallbackFn is a function called right after the removal of a user
|
|
|
|
// from the typing user list due to timeout.
|
|
|
|
// latestSyncPosition is the typing sync position after the removal.
|
|
|
|
type TimeoutCallbackFn func(userID, roomID string, latestSyncPosition int64)
|
|
|
|
|
|
|
|
type roomData struct {
|
|
|
|
syncPosition int64
|
|
|
|
userSet userSet
|
|
|
|
}
|
|
|
|
|
2020-03-30 16:02:20 +02:00
|
|
|
// EDUCache maintains a list of users typing in each room.
|
|
|
|
type EDUCache struct {
|
2018-07-31 12:52:57 +02:00
|
|
|
sync.RWMutex
|
2019-07-12 16:59:53 +02:00
|
|
|
latestSyncPosition int64
|
|
|
|
data map[string]*roomData
|
|
|
|
timeoutCallback TimeoutCallbackFn
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a roomData with its sync position set to the latest sync position.
|
|
|
|
// Must only be called after locking the cache.
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) newRoomData() *roomData {
|
2019-07-12 16:59:53 +02:00
|
|
|
return &roomData{
|
|
|
|
syncPosition: t.latestSyncPosition,
|
|
|
|
userSet: make(userSet),
|
|
|
|
}
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
|
|
|
|
2020-03-30 16:02:20 +02:00
|
|
|
// New returns a new EDUCache initialised for use.
|
|
|
|
func New() *EDUCache {
|
|
|
|
return &EDUCache{data: make(map[string]*roomData)}
|
2019-07-12 16:59:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// SetTimeoutCallback sets a callback function that is called right after
|
|
|
|
// a user is removed from the typing user list due to timeout.
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) {
|
2019-07-12 16:59:53 +02:00
|
|
|
t.timeoutCallback = fn
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// GetTypingUsers returns the list of users typing in a room.
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) GetTypingUsers(roomID string) []string {
|
2019-07-12 16:59:53 +02:00
|
|
|
users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0)
|
|
|
|
// 0 should work above because the first position used will be 1.
|
|
|
|
return users
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetTypingUsersIfUpdatedAfter returns all users typing in this room with
|
|
|
|
// updated == true if the typing sync position of the room is after the given
|
|
|
|
// position. Otherwise, returns an empty slice with updated == false.
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) GetTypingUsersIfUpdatedAfter(
|
2019-07-12 16:59:53 +02:00
|
|
|
roomID string, position int64,
|
|
|
|
) (users []string, updated bool) {
|
2018-07-31 12:52:57 +02:00
|
|
|
t.RLock()
|
2019-07-12 16:59:53 +02:00
|
|
|
defer t.RUnlock()
|
|
|
|
|
|
|
|
roomData, ok := t.data[roomID]
|
|
|
|
if ok && roomData.syncPosition > position {
|
|
|
|
updated = true
|
|
|
|
userSet := roomData.userSet
|
|
|
|
users = make([]string, 0, len(userSet))
|
|
|
|
for userID := range userSet {
|
2018-07-31 12:52:57 +02:00
|
|
|
users = append(users, userID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddTypingUser sets an user as typing in a room.
|
|
|
|
// expire is the time when the user typing should time out.
|
|
|
|
// if expire is nil, defaultTypingTimeout is assumed.
|
2019-07-12 16:59:53 +02:00
|
|
|
// Returns the latest sync position for typing after update.
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) AddTypingUser(
|
2019-07-12 16:59:53 +02:00
|
|
|
userID, roomID string, expire *time.Time,
|
|
|
|
) int64 {
|
2018-07-31 12:52:57 +02:00
|
|
|
expireTime := getExpireTime(expire)
|
|
|
|
if until := time.Until(expireTime); until > 0 {
|
2019-07-12 16:59:53 +02:00
|
|
|
timer := time.AfterFunc(until, func() {
|
|
|
|
latestSyncPosition := t.RemoveUser(userID, roomID)
|
|
|
|
if t.timeoutCallback != nil {
|
|
|
|
t.timeoutCallback(userID, roomID, latestSyncPosition)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
return t.addUser(userID, roomID, timer)
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
2019-07-12 16:59:53 +02:00
|
|
|
return t.GetLatestSyncPosition()
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
|
|
|
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 18:50:19 +02:00
|
|
|
// AddSendToDeviceMessage increases the sync position for
|
|
|
|
// send-to-device updates.
|
|
|
|
// Returns the sync position before update, as the caller
|
|
|
|
// will use this to record the current stream position
|
|
|
|
// at the time that the send-to-device message was sent.
|
|
|
|
func (t *EDUCache) AddSendToDeviceMessage() int64 {
|
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
latestSyncPosition := t.latestSyncPosition
|
|
|
|
t.latestSyncPosition++
|
|
|
|
return latestSyncPosition
|
|
|
|
}
|
|
|
|
|
2018-07-31 12:52:57 +02:00
|
|
|
// addUser with mutex lock & replace the previous timer.
|
2019-07-12 16:59:53 +02:00
|
|
|
// Returns the latest typing sync position after update.
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) addUser(
|
2019-07-12 16:59:53 +02:00
|
|
|
userID, roomID string, expiryTimer *time.Timer,
|
|
|
|
) int64 {
|
2018-07-31 12:52:57 +02:00
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
|
2019-07-12 16:59:53 +02:00
|
|
|
t.latestSyncPosition++
|
|
|
|
|
2018-07-31 12:52:57 +02:00
|
|
|
if t.data[roomID] == nil {
|
2019-07-12 16:59:53 +02:00
|
|
|
t.data[roomID] = t.newRoomData()
|
|
|
|
} else {
|
|
|
|
t.data[roomID].syncPosition = t.latestSyncPosition
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Stop the timer to cancel the call to timeoutCallback
|
2019-07-12 16:59:53 +02:00
|
|
|
if timer, ok := t.data[roomID].userSet[userID]; ok {
|
2019-07-12 17:43:01 +02:00
|
|
|
// It may happen that at this stage the timer fires, but we now have a lock on
|
|
|
|
// it. Hence the execution of timeoutCallback will happen after we unlock. So
|
|
|
|
// we may lose a typing state, though this is highly unlikely. This can be
|
|
|
|
// mitigated by keeping another time.Time in the map and checking against it
|
|
|
|
// before removing, but its occurrence is so infrequent it does not seem
|
|
|
|
// worthwhile.
|
2018-07-31 12:52:57 +02:00
|
|
|
timer.Stop()
|
|
|
|
}
|
|
|
|
|
2019-07-12 16:59:53 +02:00
|
|
|
t.data[roomID].userSet[userID] = expiryTimer
|
2018-07-31 12:52:57 +02:00
|
|
|
|
2019-07-12 16:59:53 +02:00
|
|
|
return t.latestSyncPosition
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
|
|
|
|
2018-08-02 19:22:44 +02:00
|
|
|
// RemoveUser with mutex lock & stop the timer.
|
2019-07-12 16:59:53 +02:00
|
|
|
// Returns the latest sync position for typing after update.
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) RemoveUser(userID, roomID string) int64 {
|
2018-07-31 12:52:57 +02:00
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
|
2019-07-12 16:59:53 +02:00
|
|
|
roomData, ok := t.data[roomID]
|
|
|
|
if !ok {
|
|
|
|
return t.latestSyncPosition
|
|
|
|
}
|
|
|
|
|
|
|
|
timer, ok := roomData.userSet[userID]
|
|
|
|
if !ok {
|
|
|
|
return t.latestSyncPosition
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
2019-07-12 16:59:53 +02:00
|
|
|
|
|
|
|
timer.Stop()
|
|
|
|
delete(roomData.userSet, userID)
|
|
|
|
|
|
|
|
t.latestSyncPosition++
|
|
|
|
t.data[roomID].syncPosition = t.latestSyncPosition
|
|
|
|
|
|
|
|
return t.latestSyncPosition
|
|
|
|
}
|
|
|
|
|
2020-03-30 16:02:20 +02:00
|
|
|
func (t *EDUCache) GetLatestSyncPosition() int64 {
|
2019-07-12 16:59:53 +02:00
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
return t.latestSyncPosition
|
2018-07-31 12:52:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func getExpireTime(expire *time.Time) time.Time {
|
|
|
|
if expire != nil {
|
|
|
|
return *expire
|
|
|
|
}
|
|
|
|
return time.Now().Add(defaultTypingTimeout)
|
|
|
|
}
|