0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-07-04 12:08:40 +02:00

Merge branch 'master' into neilalexander/synctokens2

This commit is contained in:
Neil Alexander 2020-12-17 13:04:54 +00:00
commit aad3e573e9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
27 changed files with 408 additions and 229 deletions

View file

@ -185,6 +185,7 @@ linters:
- gocyclo - gocyclo
- goimports # Does everything gofmt does - goimports # Does everything gofmt does
- gosimple - gosimple
- govet
- ineffassign - ineffassign
- megacheck - megacheck
- misspell # Check code comments, whereas misspell in CI checks *.md files - misspell # Check code comments, whereas misspell in CI checks *.md files

View file

@ -19,4 +19,4 @@ fi
go install -trimpath -ldflags "$FLAGS" -v $PWD/`dirname $0`/cmd/... go install -trimpath -ldflags "$FLAGS" -v $PWD/`dirname $0`/cmd/...
GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o main.wasm ./cmd/dendritejs GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs

View file

@ -24,8 +24,6 @@ fi
echo "Installing golangci-lint..." echo "Installing golangci-lint..."
# Make a backup of go.{mod,sum} first # Make a backup of go.{mod,sum} first
# TODO: Once go 1.13 is out, use go get's -mod=readonly option
# https://github.com/golang/go/issues/30667
cp go.mod go.mod.bak && cp go.sum go.sum.bak cp go.mod go.mod.bak && cp go.sum go.sum.bak
go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.19.1 go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.19.1

View file

@ -17,6 +17,7 @@ package routing
import ( import (
"net/http" "net/http"
"sync" "sync"
"time"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -27,6 +28,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -40,6 +42,25 @@ var (
userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
) )
func init() {
prometheus.MustRegister(sendEventDuration)
}
var sendEventDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dendrite",
Subsystem: "clientapi",
Name: "sendevent_duration_millis",
Help: "How long it takes to build and submit a new event from the client API to the roomserver",
Buckets: []float64{ // milliseconds
5, 10, 25, 50, 75, 100, 250, 500,
1000, 2000, 3000, 4000, 5000, 6000,
7000, 8000, 9000, 10000, 15000, 20000,
},
},
[]string{"action"},
)
// SendEvent implements: // SendEvent implements:
// /rooms/{roomID}/send/{eventType} // /rooms/{roomID}/send/{eventType}
// /rooms/{roomID}/send/{eventType}/{txnID} // /rooms/{roomID}/send/{eventType}/{txnID}
@ -75,10 +96,12 @@ func SendEvent(
mutex.(*sync.Mutex).Lock() mutex.(*sync.Mutex).Lock()
defer mutex.(*sync.Mutex).Unlock() defer mutex.(*sync.Mutex).Unlock()
startedGeneratingEvent := time.Now()
e, resErr := generateSendEvent(req, device, roomID, eventType, stateKey, cfg, rsAPI) e, resErr := generateSendEvent(req, device, roomID, eventType, stateKey, cfg, rsAPI)
if resErr != nil { if resErr != nil {
return *resErr return *resErr
} }
timeToGenerateEvent := time.Since(startedGeneratingEvent)
var txnAndSessionID *api.TransactionID var txnAndSessionID *api.TransactionID
if txnID != nil { if txnID != nil {
@ -90,6 +113,7 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID // pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded // event ID in case of duplicate transaction is discarded
startedSubmittingEvent := time.Now()
if err := api.SendEvents( if err := api.SendEvents(
req.Context(), rsAPI, req.Context(), rsAPI,
api.KindNew, api.KindNew,
@ -102,6 +126,7 @@ func SendEvent(
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
timeToSubmitEvent := time.Since(startedSubmittingEvent)
util.GetLogger(req.Context()).WithFields(logrus.Fields{ util.GetLogger(req.Context()).WithFields(logrus.Fields{
"event_id": e.EventID(), "event_id": e.EventID(),
"room_id": roomID, "room_id": roomID,
@ -117,6 +142,11 @@ func SendEvent(
txnCache.AddTransaction(device.AccessToken, *txnID, &res) txnCache.AddTransaction(device.AccessToken, *txnID, &res)
} }
// Take a note of how long it took to generate the event vs submit
// it to the roomserver.
sendEventDuration.With(prometheus.Labels{"action": "build"}).Observe(float64(timeToGenerateEvent.Milliseconds()))
sendEventDuration.With(prometheus.Labels{"action": "submit"}).Observe(float64(timeToSubmitEvent.Milliseconds()))
return res return res
} }

View file

@ -242,6 +242,8 @@ func (oq *destinationQueue) backgroundSend() {
if !oq.running.CAS(false, true) { if !oq.running.CAS(false, true) {
return return
} }
destinationQueueRunning.Inc()
defer destinationQueueRunning.Dec()
defer oq.running.Store(false) defer oq.running.Store(false)
// Mark the queue as overflowed, so we will consult the database // Mark the queue as overflowed, so we will consult the database
@ -295,10 +297,14 @@ func (oq *destinationQueue) backgroundSend() {
// time. // time.
duration := time.Until(*until) duration := time.Until(*until)
log.Warnf("Backing off %q for %s", oq.destination, duration) log.Warnf("Backing off %q for %s", oq.destination, duration)
oq.backingOff.Store(true)
destinationQueueBackingOff.Inc()
select { select {
case <-time.After(duration): case <-time.After(duration):
case <-oq.interruptBackoff: case <-oq.interruptBackoff:
} }
destinationQueueBackingOff.Dec()
oq.backingOff.Store(false)
} }
// Work out which PDUs/EDUs to include in the next transaction. // Work out which PDUs/EDUs to include in the next transaction.

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
@ -45,6 +46,37 @@ type OutgoingQueues struct {
queues map[gomatrixserverlib.ServerName]*destinationQueue queues map[gomatrixserverlib.ServerName]*destinationQueue
} }
func init() {
prometheus.MustRegister(
destinationQueueTotal, destinationQueueRunning,
destinationQueueBackingOff,
)
}
var destinationQueueTotal = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "federationsender",
Name: "destination_queues_total",
},
)
var destinationQueueRunning = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "federationsender",
Name: "destination_queues_running",
},
)
var destinationQueueBackingOff = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "federationsender",
Name: "destination_queues_backing_off",
},
)
// NewOutgoingQueues makes a new OutgoingQueues // NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues( func NewOutgoingQueues(
db storage.Database, db storage.Database,
@ -116,6 +148,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
defer oqs.queuesMutex.Unlock() defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
destinationQueueTotal.Inc()
oq = &destinationQueue{ oq = &destinationQueue{
db: oqs.db, db: oqs.db,
rsAPI: oqs.rsAPI, rsAPI: oqs.rsAPI,

View file

@ -0,0 +1,45 @@
package caching
import (
"github.com/matrix-org/dendrite/roomserver/types"
)
// WARNING: This cache is mutable because it's entirely possible that
// the IsStub or StateSnaphotNID fields can change, even though the
// room version and room NID fields will not. This is only safe because
// the RoomInfoCache is used ONLY within the roomserver and because it
// will be kept up-to-date by the latest events updater. It MUST NOT be
// used from other components as we currently have no way to invalidate
// the cache in downstream components.
const (
RoomInfoCacheName = "roominfo"
RoomInfoCacheMaxEntries = 1024
RoomInfoCacheMutable = true
)
// RoomInfosCache contains the subset of functions needed for
// a room Info cache. It must only be used from the roomserver only
// It is not safe for use from other components.
type RoomInfoCache interface {
GetRoomInfo(roomID string) (roomInfo types.RoomInfo, ok bool)
StoreRoomInfo(roomID string, roomInfo types.RoomInfo)
}
// GetRoomInfo must only be called from the roomserver only. It is not
// safe for use from other components.
func (c Caches) GetRoomInfo(roomID string) (types.RoomInfo, bool) {
val, found := c.RoomInfos.Get(roomID)
if found && val != nil {
if roomInfo, ok := val.(types.RoomInfo); ok {
return roomInfo, true
}
}
return types.RoomInfo{}, false
}
// StoreRoomInfo must only be called from the roomserver only. It is not
// safe for use from other components.
func (c Caches) StoreRoomInfo(roomID string, roomInfo types.RoomInfo) {
c.RoomInfos.Set(roomID, roomInfo)
}

View file

@ -15,10 +15,6 @@ const (
RoomServerEventTypeNIDsCacheMaxEntries = 64 RoomServerEventTypeNIDsCacheMaxEntries = 64
RoomServerEventTypeNIDsCacheMutable = false RoomServerEventTypeNIDsCacheMutable = false
RoomServerRoomNIDsCacheName = "roomserver_room_nids"
RoomServerRoomNIDsCacheMaxEntries = 1024
RoomServerRoomNIDsCacheMutable = false
RoomServerRoomIDsCacheName = "roomserver_room_ids" RoomServerRoomIDsCacheName = "roomserver_room_ids"
RoomServerRoomIDsCacheMaxEntries = 1024 RoomServerRoomIDsCacheMaxEntries = 1024
RoomServerRoomIDsCacheMutable = false RoomServerRoomIDsCacheMutable = false
@ -27,6 +23,7 @@ const (
type RoomServerCaches interface { type RoomServerCaches interface {
RoomServerNIDsCache RoomServerNIDsCache
RoomVersionCache RoomVersionCache
RoomInfoCache
} }
// RoomServerNIDsCache contains the subset of functions needed for // RoomServerNIDsCache contains the subset of functions needed for
@ -38,9 +35,6 @@ type RoomServerNIDsCache interface {
GetRoomServerEventTypeNID(eventType string) (types.EventTypeNID, bool) GetRoomServerEventTypeNID(eventType string) (types.EventTypeNID, bool)
StoreRoomServerEventTypeNID(eventType string, nid types.EventTypeNID) StoreRoomServerEventTypeNID(eventType string, nid types.EventTypeNID)
GetRoomServerRoomNID(roomID string) (types.RoomNID, bool)
StoreRoomServerRoomNID(roomID string, nid types.RoomNID)
GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool)
StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string)
} }
@ -73,21 +67,6 @@ func (c Caches) StoreRoomServerEventTypeNID(eventType string, nid types.EventTyp
c.RoomServerEventTypeNIDs.Set(eventType, nid) c.RoomServerEventTypeNIDs.Set(eventType, nid)
} }
func (c Caches) GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) {
val, found := c.RoomServerRoomNIDs.Get(roomID)
if found && val != nil {
if roomNID, ok := val.(types.RoomNID); ok {
return roomNID, true
}
}
return 0, false
}
func (c Caches) StoreRoomServerRoomNID(roomID string, roomNID types.RoomNID) {
c.RoomServerRoomNIDs.Set(roomID, roomNID)
c.RoomServerRoomIDs.Set(strconv.Itoa(int(roomNID)), roomID)
}
func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) { func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) {
val, found := c.RoomServerRoomIDs.Get(strconv.Itoa(int(roomNID))) val, found := c.RoomServerRoomIDs.Get(strconv.Itoa(int(roomNID)))
if found && val != nil { if found && val != nil {
@ -99,5 +78,5 @@ func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) {
} }
func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) { func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) {
c.StoreRoomServerRoomNID(roomID, roomNID) c.RoomServerRoomIDs.Set(strconv.Itoa(int(roomNID)), roomID)
} }

View file

@ -10,6 +10,7 @@ type Caches struct {
RoomServerEventTypeNIDs Cache // RoomServerNIDsCache RoomServerEventTypeNIDs Cache // RoomServerNIDsCache
RoomServerRoomNIDs Cache // RoomServerNIDsCache RoomServerRoomNIDs Cache // RoomServerNIDsCache
RoomServerRoomIDs Cache // RoomServerNIDsCache RoomServerRoomIDs Cache // RoomServerNIDsCache
RoomInfos Cache // RoomInfoCache
FederationEvents Cache // FederationEventsCache FederationEvents Cache // FederationEventsCache
} }

View file

@ -45,19 +45,19 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
roomServerRoomNIDs, err := NewInMemoryLRUCachePartition( roomServerRoomIDs, err := NewInMemoryLRUCachePartition(
RoomServerRoomNIDsCacheName, RoomServerRoomIDsCacheName,
RoomServerRoomNIDsCacheMutable, RoomServerRoomIDsCacheMutable,
RoomServerRoomNIDsCacheMaxEntries, RoomServerRoomIDsCacheMaxEntries,
enablePrometheus, enablePrometheus,
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
roomServerRoomIDs, err := NewInMemoryLRUCachePartition( roomInfos, err := NewInMemoryLRUCachePartition(
RoomServerRoomIDsCacheName, RoomInfoCacheName,
RoomServerRoomIDsCacheMutable, RoomInfoCacheMutable,
RoomServerRoomIDsCacheMaxEntries, RoomInfoCacheMaxEntries,
enablePrometheus, enablePrometheus,
) )
if err != nil { if err != nil {
@ -77,8 +77,8 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
ServerKeys: serverKeys, ServerKeys: serverKeys,
RoomServerStateKeyNIDs: roomServerStateKeyNIDs, RoomServerStateKeyNIDs: roomServerStateKeyNIDs,
RoomServerEventTypeNIDs: roomServerEventTypeNIDs, RoomServerEventTypeNIDs: roomServerEventTypeNIDs,
RoomServerRoomNIDs: roomServerRoomNIDs,
RoomServerRoomIDs: roomServerRoomIDs, RoomServerRoomIDs: roomServerRoomIDs,
RoomInfos: roomInfos,
FederationEvents: federationEvents, FederationEvents: federationEvents,
}, nil }, nil
} }

View file

@ -120,8 +120,8 @@ const bulkSelectEventNIDSQL = "" +
const selectMaxEventDepthSQL = "" + const selectMaxEventDepthSQL = "" +
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)" "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
const selectRoomNIDForEventNIDSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT room_nid FROM roomserver_events WHERE event_nid = $1" "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)"
type eventStatements struct { type eventStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
@ -137,7 +137,7 @@ type eventStatements struct {
bulkSelectEventIDStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt
bulkSelectEventNIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt
selectMaxEventDepthStmt *sql.Stmt selectMaxEventDepthStmt *sql.Stmt
selectRoomNIDForEventNIDStmt *sql.Stmt selectRoomNIDsForEventNIDsStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -161,7 +161,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL},
{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
{&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL},
{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, {&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -432,11 +432,24 @@ func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx,
return result, nil return result, nil
} }
func (s *eventStatements) SelectRoomNIDForEventNID( func (s *eventStatements) SelectRoomNIDsForEventNIDs(
ctx context.Context, eventNID types.EventNID, ctx context.Context, eventNIDs []types.EventNID,
) (roomNID types.RoomNID, err error) { ) (map[types.EventNID]types.RoomNID, error) {
err = s.selectRoomNIDForEventNIDStmt.QueryRowContext(ctx, int64(eventNID)).Scan(&roomNID) rows, err := s.selectRoomNIDsForEventNIDsStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs))
return if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomNIDsForEventNIDsStmt: rows.close() failed")
result := make(map[types.EventNID]types.RoomNID)
for rows.Next() {
var eventNID types.EventNID
var roomNID types.RoomNID
if err = rows.Scan(&eventNID, &roomNID); err != nil {
return nil, err
}
result[eventNID] = roomNID
}
return result, nil
} }
func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {

View file

@ -18,7 +18,6 @@ package postgres
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -69,8 +68,8 @@ const selectLatestEventNIDsForUpdateSQL = "" +
const updateLatestEventNIDsSQL = "" + const updateLatestEventNIDsSQL = "" +
"UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1" "UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1"
const selectRoomVersionForRoomNIDSQL = "" + const selectRoomVersionsForRoomNIDsSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" "SELECT room_nid, room_version FROM roomserver_rooms WHERE room_nid = ANY($1)"
const selectRoomInfoSQL = "" + const selectRoomInfoSQL = "" +
"SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1" "SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1"
@ -90,7 +89,7 @@ type roomStatements struct {
selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt
selectRoomVersionForRoomNIDStmt *sql.Stmt selectRoomVersionsForRoomNIDsStmt *sql.Stmt
selectRoomInfoStmt *sql.Stmt selectRoomInfoStmt *sql.Stmt
selectRoomIDsStmt *sql.Stmt selectRoomIDsStmt *sql.Stmt
bulkSelectRoomIDsStmt *sql.Stmt bulkSelectRoomIDsStmt *sql.Stmt
@ -109,7 +108,7 @@ func NewPostgresRoomsTable(db *sql.DB) (tables.Rooms, error) {
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
{&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, {&s.selectRoomVersionsForRoomNIDsStmt, selectRoomVersionsForRoomNIDsSQL},
{&s.selectRoomInfoStmt, selectRoomInfoSQL}, {&s.selectRoomInfoStmt, selectRoomInfoSQL},
{&s.selectRoomIDsStmt, selectRoomIDsSQL}, {&s.selectRoomIDsStmt, selectRoomIDsSQL},
{&s.bulkSelectRoomIDsStmt, bulkSelectRoomIDsSQL}, {&s.bulkSelectRoomIDsStmt, bulkSelectRoomIDsSQL},
@ -219,15 +218,24 @@ func (s *roomStatements) UpdateLatestEventNIDs(
return err return err
} }
func (s *roomStatements) SelectRoomVersionForRoomNID( func (s *roomStatements) SelectRoomVersionsForRoomNIDs(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNIDs []types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) { ) (map[types.RoomNID]gomatrixserverlib.RoomVersion, error) {
var roomVersion gomatrixserverlib.RoomVersion rows, err := s.selectRoomVersionsForRoomNIDsStmt.QueryContext(ctx, roomNIDsAsArray(roomNIDs))
err := s.selectRoomVersionForRoomNIDStmt.QueryRowContext(ctx, roomNID).Scan(&roomVersion) if err != nil {
if err == sql.ErrNoRows { return nil, err
return roomVersion, errors.New("room not found")
} }
return roomVersion, err defer internal.CloseAndLogIfError(ctx, rows, "selectRoomVersionsForRoomNIDsStmt: rows.close() failed")
result := make(map[types.RoomNID]gomatrixserverlib.RoomVersion)
for rows.Next() {
var roomNID types.RoomNID
var roomVersion gomatrixserverlib.RoomVersion
if err = rows.Scan(&roomNID, &roomVersion); err != nil {
return nil, err
}
result[roomNID] = roomVersion
}
return result, nil
} }
func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) { func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) {
@ -271,3 +279,11 @@ func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, roomIDs []strin
} }
return roomNIDs, nil return roomNIDs, nil
} }
func roomNIDsAsArray(roomNIDs []types.RoomNID) pq.Int64Array {
nids := make([]int64, len(roomNIDs))
for i := range roomNIDs {
nids[i] = int64(roomNIDs[i])
}
return nids
}

View file

@ -105,6 +105,13 @@ func (u *LatestEventsUpdater) SetLatestEvents(
if err := u.d.RoomsTable.UpdateLatestEventNIDs(u.ctx, txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID); err != nil { if err := u.d.RoomsTable.UpdateLatestEventNIDs(u.ctx, txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID); err != nil {
return fmt.Errorf("u.d.RoomsTable.updateLatestEventNIDs: %w", err) return fmt.Errorf("u.d.RoomsTable.updateLatestEventNIDs: %w", err)
} }
if roomID, ok := u.d.Cache.GetRoomServerRoomID(roomNID); ok {
if roomInfo, ok := u.d.Cache.GetRoomInfo(roomID); ok {
roomInfo.StateSnapshotNID = currentStateSnapshotNID
roomInfo.IsStub = false
u.d.Cache.StoreRoomInfo(roomID, roomInfo)
}
}
return nil return nil
}) })
} }

View file

@ -124,7 +124,15 @@ func (d *Database) StateEntriesForTuples(
} }
func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) { func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) {
return d.RoomsTable.SelectRoomInfo(ctx, roomID) if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok {
return &roomInfo, nil
}
roomInfo, err := d.RoomsTable.SelectRoomInfo(ctx, roomID)
if err == nil && roomInfo != nil {
d.Cache.StoreRoomServerRoomID(roomInfo.RoomNID, roomID)
d.Cache.StoreRoomInfo(roomID, *roomInfo)
}
return roomInfo, err
} }
func (d *Database) AddState( func (d *Database) AddState(
@ -313,25 +321,39 @@ func (d *Database) Events(
if err != nil { if err != nil {
eventIDs = map[types.EventNID]string{} eventIDs = map[types.EventNID]string{}
} }
results := make([]types.Event, len(eventJSONs)) var roomNIDs map[types.EventNID]types.RoomNID
for i, eventJSON := range eventJSONs { roomNIDs, err = d.EventsTable.SelectRoomNIDsForEventNIDs(ctx, eventNIDs)
var roomNID types.RoomNID if err != nil {
var roomVersion gomatrixserverlib.RoomVersion return nil, err
result := &results[i] }
result.EventNID = eventJSON.EventNID uniqueRoomNIDs := make(map[types.RoomNID]struct{})
roomNID, err = d.EventsTable.SelectRoomNIDForEventNID(ctx, eventJSON.EventNID) for _, n := range roomNIDs {
if err != nil { uniqueRoomNIDs[n] = struct{}{}
return nil, err }
} roomVersions := make(map[types.RoomNID]gomatrixserverlib.RoomVersion)
if roomID, ok := d.Cache.GetRoomServerRoomID(roomNID); ok { fetchNIDList := make([]types.RoomNID, 0, len(uniqueRoomNIDs))
roomVersion, _ = d.Cache.GetRoomVersion(roomID) for n := range uniqueRoomNIDs {
} if roomID, ok := d.Cache.GetRoomServerRoomID(n); ok {
if roomVersion == "" { if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok {
roomVersion, err = d.RoomsTable.SelectRoomVersionForRoomNID(ctx, roomNID) roomVersions[n] = roomInfo.RoomVersion
if err != nil { continue
return nil, err
} }
} }
fetchNIDList = append(fetchNIDList, n)
}
dbRoomVersions, err := d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, fetchNIDList)
if err != nil {
return nil, err
}
for n, v := range dbRoomVersions {
roomVersions[n] = v
}
results := make([]types.Event, len(eventJSONs))
for i, eventJSON := range eventJSONs {
result := &results[i]
result.EventNID = eventJSON.EventNID
roomNID := roomNIDs[result.EventNID]
roomVersion := roomVersions[roomNID]
result.Event, err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID( result.Event, err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion, eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion,
) )
@ -552,8 +574,8 @@ func (d *Database) assignRoomNID(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, roomVersion gomatrixserverlib.RoomVersion, roomID string, roomVersion gomatrixserverlib.RoomVersion,
) (types.RoomNID, error) { ) (types.RoomNID, error) {
if roomNID, ok := d.Cache.GetRoomServerRoomNID(roomID); ok { if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok {
return roomNID, nil return roomInfo.RoomNID, nil
} }
// Check if we already have a numeric ID in the database. // Check if we already have a numeric ID in the database.
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID) roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
@ -565,9 +587,6 @@ func (d *Database) assignRoomNID(
roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID) roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
} }
} }
if err == nil {
d.Cache.StoreRoomServerRoomNID(roomID, roomNID)
}
return roomNID, err return roomNID, err
} }

View file

@ -95,8 +95,8 @@ const bulkSelectEventNIDSQL = "" +
const selectMaxEventDepthSQL = "" + const selectMaxEventDepthSQL = "" +
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)" "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)"
const selectRoomNIDForEventNIDSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT room_nid FROM roomserver_events WHERE event_nid = $1" "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)"
type eventStatements struct { type eventStatements struct {
db *sql.DB db *sql.DB
@ -112,7 +112,7 @@ type eventStatements struct {
bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventReferenceStmt *sql.Stmt
bulkSelectEventIDStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt
bulkSelectEventNIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt
selectRoomNIDForEventNIDStmt *sql.Stmt //selectRoomNIDsForEventNIDsStmt *sql.Stmt
} }
func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) {
@ -137,7 +137,7 @@ func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) {
{&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL}, {&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL},
{&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL},
{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, //{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -480,11 +480,33 @@ func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx,
return result, nil return result, nil
} }
func (s *eventStatements) SelectRoomNIDForEventNID( func (s *eventStatements) SelectRoomNIDsForEventNIDs(
ctx context.Context, eventNID types.EventNID, ctx context.Context, eventNIDs []types.EventNID,
) (roomNID types.RoomNID, err error) { ) (map[types.EventNID]types.RoomNID, error) {
err = s.selectRoomNIDForEventNIDStmt.QueryRowContext(ctx, int64(eventNID)).Scan(&roomNID) sqlStr := strings.Replace(selectRoomNIDsForEventNIDsSQL, "($1)", sqlutil.QueryVariadic(len(eventNIDs)), 1)
return sqlPrep, err := s.db.Prepare(sqlStr)
if err != nil {
return nil, err
}
iEventNIDs := make([]interface{}, len(eventNIDs))
for i, v := range eventNIDs {
iEventNIDs[i] = v
}
rows, err := sqlPrep.QueryContext(ctx, iEventNIDs...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomNIDsForEventNIDsStmt: rows.close() failed")
result := make(map[types.EventNID]types.RoomNID)
for rows.Next() {
var eventNID types.EventNID
var roomNID types.RoomNID
if err = rows.Scan(&eventNID, &roomNID); err != nil {
return nil, err
}
result[eventNID] = roomNID
}
return result, nil
} }
func eventNIDsAsArray(eventNIDs []types.EventNID) string { func eventNIDsAsArray(eventNIDs []types.EventNID) string {

View file

@ -19,7 +19,6 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strings" "strings"
@ -60,8 +59,8 @@ const selectLatestEventNIDsForUpdateSQL = "" +
const updateLatestEventNIDsSQL = "" + const updateLatestEventNIDsSQL = "" +
"UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4" "UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4"
const selectRoomVersionForRoomNIDSQL = "" + const selectRoomVersionsForRoomNIDsSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" "SELECT room_nid, room_version FROM roomserver_rooms WHERE room_nid IN ($1)"
const selectRoomInfoSQL = "" + const selectRoomInfoSQL = "" +
"SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1" "SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1"
@ -82,9 +81,9 @@ type roomStatements struct {
selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt
selectRoomVersionForRoomNIDStmt *sql.Stmt //selectRoomVersionForRoomNIDStmt *sql.Stmt
selectRoomInfoStmt *sql.Stmt selectRoomInfoStmt *sql.Stmt
selectRoomIDsStmt *sql.Stmt selectRoomIDsStmt *sql.Stmt
} }
func NewSqliteRoomsTable(db *sql.DB) (tables.Rooms, error) { func NewSqliteRoomsTable(db *sql.DB) (tables.Rooms, error) {
@ -101,7 +100,7 @@ func NewSqliteRoomsTable(db *sql.DB) (tables.Rooms, error) {
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
{&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, //{&s.selectRoomVersionForRoomNIDsStmt, selectRoomVersionForRoomNIDsSQL},
{&s.selectRoomInfoStmt, selectRoomInfoSQL}, {&s.selectRoomInfoStmt, selectRoomInfoSQL},
{&s.selectRoomIDsStmt, selectRoomIDsSQL}, {&s.selectRoomIDsStmt, selectRoomIDsSQL},
}.Prepare(db) }.Prepare(db)
@ -223,15 +222,33 @@ func (s *roomStatements) UpdateLatestEventNIDs(
return err return err
} }
func (s *roomStatements) SelectRoomVersionForRoomNID( func (s *roomStatements) SelectRoomVersionsForRoomNIDs(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNIDs []types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) { ) (map[types.RoomNID]gomatrixserverlib.RoomVersion, error) {
var roomVersion gomatrixserverlib.RoomVersion sqlStr := strings.Replace(selectRoomVersionsForRoomNIDsSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1)
err := s.selectRoomVersionForRoomNIDStmt.QueryRowContext(ctx, roomNID).Scan(&roomVersion) sqlPrep, err := s.db.Prepare(sqlStr)
if err == sql.ErrNoRows { if err != nil {
return roomVersion, errors.New("room not found") return nil, err
} }
return roomVersion, err iRoomNIDs := make([]interface{}, len(roomNIDs))
for i, v := range roomNIDs {
iRoomNIDs[i] = v
}
rows, err := sqlPrep.QueryContext(ctx, iRoomNIDs...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomVersionsForRoomNIDsStmt: rows.close() failed")
result := make(map[types.RoomNID]gomatrixserverlib.RoomVersion)
for rows.Next() {
var roomNID types.RoomNID
var roomVersion gomatrixserverlib.RoomVersion
if err = rows.Scan(&roomNID, &roomVersion); err != nil {
return nil, err
}
result[roomNID] = roomVersion
}
return result, nil
} }
func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) { func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) {

View file

@ -10,8 +10,9 @@ import (
) )
type EventJSONPair struct { type EventJSONPair struct {
EventNID types.EventNID EventNID types.EventNID
EventJSON []byte RoomVersion gomatrixserverlib.RoomVersion
EventJSON []byte
} }
type EventJSON interface { type EventJSON interface {
@ -58,7 +59,7 @@ type Events interface {
// If an event ID is not in the database then it is omitted from the map. // If an event ID is not in the database then it is omitted from the map.
BulkSelectEventNID(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error) BulkSelectEventNID(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
SelectRoomNIDForEventNID(ctx context.Context, eventNID types.EventNID) (roomNID types.RoomNID, err error) SelectRoomNIDsForEventNIDs(ctx context.Context, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
} }
type Rooms interface { type Rooms interface {
@ -67,7 +68,7 @@ type Rooms interface {
SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error) SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error)
SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error) SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error)
UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error
SelectRoomVersionForRoomNID(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error) SelectRoomVersionsForRoomNIDs(ctx context.Context, roomNID []types.RoomNID) (map[types.RoomNID]gomatrixserverlib.RoomVersion, error)
SelectRoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) SelectRoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
SelectRoomIDs(ctx context.Context) ([]string, error) SelectRoomIDs(ctx context.Context) ([]string, error)
BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error)

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync" syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -115,11 +114,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
} }
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.StreamingToken{ posUpdate := types.StreamingToken{
Logs: map[string]*types.LogPosition{ DeviceListPosition: types.LogPosition{
syncinternal.DeviceListLogName: { Offset: msg.Offset,
Offset: msg.Offset, Partition: msg.Partition,
Partition: msg.Partition,
},
}, },
} }
for userID := range queryRes.UserIDsToCount { for userID := range queryRes.UserIDsToCount {

View file

@ -73,15 +73,13 @@ func DeviceListCatchup(
offset = sarama.OffsetOldest offset = sarama.OffsetOldest
// Extract partition/offset from sync token // Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := from.Log(DeviceListLogName) if !from.DeviceListPosition.IsEmpty() {
if logOffset != nil { partition = from.DeviceListPosition.Partition
partition = logOffset.Partition offset = from.DeviceListPosition.Offset
offset = logOffset.Offset
} }
var toOffset int64 var toOffset int64
toOffset = sarama.OffsetNewest toOffset = sarama.OffsetNewest
toLog := to.Log(DeviceListLogName) if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 {
if toLog != nil && toLog.Offset > 0 {
toOffset = toLog.Offset toOffset = toLog.Offset
} }
var queryRes api.QueryKeyChangesResponse var queryRes api.QueryKeyChangesResponse
@ -130,7 +128,7 @@ func DeviceListCatchup(
} }
} }
// set the new token // set the new token
to.SetLog(DeviceListLogName, &types.LogPosition{ to.DeviceListPosition = types.LogPosition{
Partition: queryRes.Partition, Partition: queryRes.Partition,
Offset: queryRes.Offset, Offset: queryRes.Offset,
}) })

View file

@ -18,11 +18,9 @@ var (
syncingUser = "@alice:localhost" syncingUser = "@alice:localhost"
emptyToken = types.StreamingToken{} emptyToken = types.StreamingToken{}
newestToken = types.StreamingToken{ newestToken = types.StreamingToken{
Logs: map[string]*types.LogPosition{ DeviceListPosition: types.LogPosition{
DeviceListLogName: { Offset: sarama.OffsetNewest,
Offset: sarama.OffsetNewest, Partition: 0,
Partition: 0,
},
}, },
} }
) )

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -49,9 +50,10 @@ type messagesReq struct {
} }
type messagesResp struct { type messagesResp struct {
Start string `json:"start"` Start string `json:"start"`
End string `json:"end"` StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` End string `json:"end"`
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
} }
const defaultMessagesLimit = 10 const defaultMessagesLimit = 10
@ -65,6 +67,7 @@ func OnIncomingMessagesRequest(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
cfg *config.SyncAPI, cfg *config.SyncAPI,
srp *sync.RequestPool,
) util.JSONResponse { ) util.JSONResponse {
var err error var err error
@ -84,9 +87,18 @@ func OnIncomingMessagesRequest(
// Extract parameters from the request's URL. // Extract parameters from the request's URL.
// Pagination tokens. // Pagination tokens.
var fromStream *types.StreamingToken var fromStream *types.StreamingToken
from, err := types.NewTopologyTokenFromString(req.URL.Query().Get("from")) fromQuery := req.URL.Query().Get("from")
emptyFromSupplied := fromQuery == ""
if emptyFromSupplied {
// NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided.
// We do this to allow clients to get messages without having to call `/sync` e.g Cerulean
currPos := srp.Notifier.CurrentPosition()
fromQuery = currPos.String()
}
from, err := types.NewTopologyTokenFromString(fromQuery)
if err != nil { if err != nil {
fs, err2 := types.NewStreamTokenFromString(req.URL.Query().Get("from")) fs, err2 := types.NewStreamTokenFromString(fromQuery)
fromStream = &fs fromStream = &fs
if err2 != nil { if err2 != nil {
return util.JSONResponse{ return util.JSONResponse{
@ -185,14 +197,19 @@ func OnIncomingMessagesRequest(
"return_end": end.String(), "return_end": end.String(),
}).Info("Responding") }).Info("Responding")
res := messagesResp{
Chunk: clientEvents,
Start: start.String(),
End: end.String(),
}
if emptyFromSupplied {
res.StartStream = fromStream.String()
}
// Respond with the events. // Respond with the events.
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: messagesResp{ JSON: res,
Chunk: clientEvents,
Start: start.String(),
End: end.String(),
},
} }
} }

View file

@ -51,7 +51,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg) return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg, srp)
})).Methods(http.MethodGet, http.MethodOptions) })).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/user/{userId}/filter", r0mux.Handle("/user/{userId}/filter",

View file

@ -58,6 +58,8 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url); CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url);
-- for querying membership states of users -- for querying membership states of users
CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
-- for querying state by event IDs
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON syncapi_current_room_state(event_id);
` `
const upsertRoomStateSQL = "" + const upsertRoomStateSQL = "" +

View file

@ -46,6 +46,8 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url); CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url);
-- for querying membership states of users -- for querying membership states of users
-- CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; -- CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
-- for querying state by event IDs
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON syncapi_current_room_state(event_id);
` `
const upsertRoomStateSQL = "" + const upsertRoomStateSQL = "" +

View file

@ -35,6 +35,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -43,7 +44,7 @@ type RequestPool struct {
db storage.Database db storage.Database
cfg *config.SyncAPI cfg *config.SyncAPI
userAPI userapi.UserInternalAPI userAPI userapi.UserInternalAPI
notifier *Notifier Notifier *Notifier
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map lastseen sync.Map
@ -99,6 +100,30 @@ func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device)
rp.lastseen.Store(device.UserID+device.ID, time.Now()) rp.lastseen.Store(device.UserID+device.ID, time.Now())
} }
func init() {
prometheus.MustRegister(
activeSyncRequests, waitingSyncRequests,
)
}
var activeSyncRequests = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "syncapi",
Name: "active_sync_requests",
Help: "The number of sync requests that are active right now",
},
)
var waitingSyncRequests = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "syncapi",
Name: "waiting_sync_requests",
Help: "The number of sync requests that are waiting to be woken by a notifier",
},
)
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
// called in a dedicated goroutine for this request. This function will block the goroutine // called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out. // until a response is ready, or it times out.
@ -122,9 +147,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
"limit": syncReq.limit, "limit": syncReq.limit,
}) })
activeSyncRequests.Inc()
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device) rp.updateLastSeen(req, device)
currPos := rp.notifier.CurrentPosition() currPos := rp.Notifier.CurrentPosition()
if rp.shouldReturnImmediately(syncReq) { if rp.shouldReturnImmediately(syncReq) {
syncData, err = rp.currentSyncForUser(*syncReq, currPos) syncData, err = rp.currentSyncForUser(*syncReq, currPos)
@ -139,13 +167,16 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
} }
} }
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
// Otherwise, we wait for the notifier to tell us if something *may* have // Otherwise, we wait for the notifier to tell us if something *may* have
// happened. We loop in case it turns out that nothing did happen. // happened. We loop in case it turns out that nothing did happen.
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
defer timer.Stop() defer timer.Stop()
userStreamListener := rp.notifier.GetListener(*syncReq) userStreamListener := rp.Notifier.GetListener(*syncReq)
defer userStreamListener.Close() defer userStreamListener.Close()
// We need the loop in case userStreamListener wakes up even if there isn't // We need the loop in case userStreamListener wakes up even if there isn't

View file

@ -17,7 +17,6 @@ package types
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sort"
"strconv" "strconv"
"strings" "strings"
@ -45,6 +44,10 @@ type LogPosition struct {
Offset int64 Offset int64
} }
func (p *LogPosition) IsEmpty() bool {
return p.Offset == 0
}
// IsAfter returns true if this position is after `lp`. // IsAfter returns true if this position is after `lp`.
func (p *LogPosition) IsAfter(lp *LogPosition) bool { func (p *LogPosition) IsAfter(lp *LogPosition) bool {
if lp == nil { if lp == nil {
@ -110,33 +113,7 @@ type StreamingToken struct {
TypingPosition StreamPosition TypingPosition StreamPosition
ReceiptPosition StreamPosition ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition SendToDevicePosition StreamPosition
Logs map[string]*LogPosition DeviceListPosition LogPosition
}
// This will be used as a fallback by json.Marshal.
func (t StreamingToken) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}
// This will be used as a fallback by json.Unmarshal.
func (t *StreamingToken) UnmarshalText(text []byte) (err error) {
*t, err = NewStreamTokenFromString(string(text))
return err
}
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
if t.Logs == nil {
t.Logs = make(map[string]*LogPosition)
}
t.Logs[name] = lp
}
func (t *StreamingToken) Log(name string) *LogPosition {
l, ok := t.Logs[name]
if !ok {
return nil
}
return l
} }
func (t StreamingToken) String() string { func (t StreamingToken) String() string {
@ -145,14 +122,10 @@ func (t StreamingToken) String() string {
t.PDUPosition, t.TypingPosition, t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition, t.ReceiptPosition, t.SendToDevicePosition,
) )
var logStrings []string if dl := t.DeviceListPosition; !dl.IsEmpty() {
for name, lp := range t.Logs { posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
logStrings = append(logStrings, logStr)
} }
sort.Strings(logStrings) return posStr
// E.g s11_22_33_44.dl0-134.ab1-441
return strings.Join(append([]string{posStr}, logStrings...), ".")
} }
// IsAfter returns true if ANY position in this token is greater than `other`. // IsAfter returns true if ANY position in this token is greater than `other`.
@ -166,21 +139,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true return true
case t.SendToDevicePosition > other.SendToDevicePosition: case t.SendToDevicePosition > other.SendToDevicePosition:
return true return true
} case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
for name := range t.Logs { return true
otherLog := other.Log(name)
if otherLog == nil {
continue
}
if t.Logs[name].IsAfter(otherLog) {
return true
}
} }
return false return false
} }
func (t *StreamingToken) IsEmpty() bool { func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0 return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0 && t.DeviceListPosition.IsEmpty()
} }
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -206,13 +172,8 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
t.ReceiptPosition = other.ReceiptPosition t.ReceiptPosition = other.ReceiptPosition
case other.SendToDevicePosition > 0: case other.SendToDevicePosition > 0:
t.SendToDevicePosition = other.SendToDevicePosition t.SendToDevicePosition = other.SendToDevicePosition
} case other.DeviceListPosition.Offset > 0:
if t.Logs == nil { t.DeviceListPosition = other.DeviceListPosition
t.Logs = make(map[string]*LogPosition)
}
for name, pos := range other.Logs {
copy := *pos
t.Logs[name] = &copy
} }
} }
@ -321,30 +282,31 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
TypingPosition: positions[1], TypingPosition: positions[1],
ReceiptPosition: positions[2], ReceiptPosition: positions[2],
SendToDevicePosition: positions[3], SendToDevicePosition: positions[3],
Logs: make(map[string]*LogPosition),
} }
// dl-0-1234 // dl-0-1234
// $log_name-$partition-$offset // $log_name-$partition-$offset
for _, logStr := range categories[1:] { for _, logStr := range categories[1:] {
segments := strings.Split(logStr, "-") segments := strings.Split(logStr, "-")
if len(segments) != 3 { if len(segments) != 3 {
err = fmt.Errorf("token %s - invalid log: %s", tok, logStr) err = fmt.Errorf("invalid log position %q", logStr)
return return
} }
var partition int64 switch segments[0] {
partition, err = strconv.ParseInt(segments[1], 10, 32) case "dl":
if err != nil { // Device list syncing
var partition, offset int
if partition, err = strconv.Atoi(segments[1]); err != nil {
return
}
if offset, err = strconv.Atoi(segments[2]); err != nil {
return
}
token.DeviceListPosition.Partition = int32(partition)
token.DeviceListPosition.Offset = int64(offset)
default:
err = fmt.Errorf("unrecognised token type %q", segments[0])
return return
} }
var offset int64
offset, err = strconv.ParseInt(segments[2], 10, 64)
if err != nil {
return
}
token.Logs[segments[0]] = &LogPosition{
Partition: int32(partition),
Offset: offset,
}
} }
return token, nil return token, nil
} }

View file

@ -12,28 +12,12 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{ tests := map[string]*StreamingToken{
"s4_0_0_0": { "s4_0_0_0": {
PDUPosition: 4, PDUPosition: 4,
Logs: make(map[string]*LogPosition),
}, },
"s4_0_0_0.dl-0-123": { "s4_0_0_0.dl-0-123": {
PDUPosition: 4, PDUPosition: 4,
Logs: map[string]*LogPosition{ DeviceListPosition: LogPosition{
"dl": { Partition: 0,
Partition: 0, Offset: 123,
Offset: 123,
},
},
},
"s4_0_0_0.ab-1-14419482332.dl-0-123": {
PDUPosition: 4,
Logs: map[string]*LogPosition{
"ab": {
Partition: 1,
Offset: 14419482332,
},
"dl": {
Partition: 0,
Offset: 123,
},
}, },
}, },
} }
@ -58,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) { func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{ shouldPass := map[string]string{
"s4_0_0_0": StreamingToken{4, 0, 0, 0, nil}.String(), "s4_0_0_0": StreamingToken{4, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0": StreamingToken{3, 1, 0, 0, nil}.String(), "s3_1_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3": StreamingToken{3, 1, 2, 3, nil}.String(), "s3_1_2_3": StreamingToken{3, 1, 2, 3, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(), "t3_1": TopologyToken{3, 1}.String(),
} }
for a, b := range shouldPass { for a, b := range shouldPass {