mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-18 02:03:43 +01:00
d5b8260196
* requestpool: fix initial sync logic error in appendAccountData() In initial sync, req.since is no longer nil, but instead, req.since.PDUPosition() and req.since.EDUPosition() returns 0. This ensures forgotten rooms do not come back as zombies. * syncapi/requestpool: reintroduce req.since == nil check
441 lines
14 KiB
Go
441 lines
14 KiB
Go
// Copyright 2017 Vector Creations Ltd
|
|
// Copyright 2017-2018 New Vector Ltd
|
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package sync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
|
"github.com/matrix-org/dendrite/internal/config"
|
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/syncapi/internal"
|
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/util"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// RequestPool manages HTTP long-poll connections for /sync
|
|
type RequestPool struct {
|
|
db storage.Database
|
|
cfg *config.SyncAPI
|
|
userAPI userapi.UserInternalAPI
|
|
notifier *Notifier
|
|
keyAPI keyapi.KeyInternalAPI
|
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
|
lastseen sync.Map
|
|
}
|
|
|
|
// NewRequestPool makes a new RequestPool
|
|
func NewRequestPool(
|
|
db storage.Database, cfg *config.SyncAPI, n *Notifier,
|
|
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
|
) *RequestPool {
|
|
rp := &RequestPool{db, cfg, userAPI, n, keyAPI, rsAPI, sync.Map{}}
|
|
go rp.cleanLastSeen()
|
|
return rp
|
|
}
|
|
|
|
func (rp *RequestPool) cleanLastSeen() {
|
|
for {
|
|
rp.lastseen.Range(func(key interface{}, _ interface{}) bool {
|
|
rp.lastseen.Delete(key)
|
|
return true
|
|
})
|
|
time.Sleep(time.Minute)
|
|
}
|
|
}
|
|
|
|
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
|
|
if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok {
|
|
return
|
|
}
|
|
|
|
remoteAddr := req.RemoteAddr
|
|
if rp.cfg.RealIPHeader != "" {
|
|
if header := req.Header.Get(rp.cfg.RealIPHeader); header != "" {
|
|
// TODO: Maybe this isn't great but it will satisfy both X-Real-IP
|
|
// and X-Forwarded-For (which can be a list where the real client
|
|
// address is the first listed address). Make more intelligent?
|
|
addresses := strings.Split(header, ",")
|
|
if ip := net.ParseIP(addresses[0]); ip != nil {
|
|
remoteAddr = addresses[0]
|
|
}
|
|
}
|
|
}
|
|
|
|
lsreq := &userapi.PerformLastSeenUpdateRequest{
|
|
UserID: device.UserID,
|
|
DeviceID: device.ID,
|
|
RemoteAddr: remoteAddr,
|
|
}
|
|
lsres := &userapi.PerformLastSeenUpdateResponse{}
|
|
go rp.userAPI.PerformLastSeenUpdate(req.Context(), lsreq, lsres) // nolint:errcheck
|
|
|
|
rp.lastseen.Store(device.UserID+device.ID, time.Now())
|
|
}
|
|
|
|
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
|
// called in a dedicated goroutine for this request. This function will block the goroutine
|
|
// until a response is ready, or it times out.
|
|
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
|
|
var syncData *types.Response
|
|
|
|
// Extract values from request
|
|
syncReq, err := newSyncRequest(req, *device, rp.db)
|
|
if err != nil {
|
|
return util.JSONResponse{
|
|
Code: http.StatusBadRequest,
|
|
JSON: jsonerror.Unknown(err.Error()),
|
|
}
|
|
}
|
|
|
|
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
|
|
"user_id": device.UserID,
|
|
"device_id": device.ID,
|
|
"since": syncReq.since,
|
|
"timeout": syncReq.timeout,
|
|
"limit": syncReq.limit,
|
|
})
|
|
|
|
rp.updateLastSeen(req, device)
|
|
|
|
currPos := rp.notifier.CurrentPosition()
|
|
|
|
if rp.shouldReturnImmediately(syncReq) {
|
|
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
|
if err != nil {
|
|
logger.WithError(err).Error("rp.currentSyncForUser failed")
|
|
return jsonerror.InternalServerError()
|
|
}
|
|
logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
|
|
return util.JSONResponse{
|
|
Code: http.StatusOK,
|
|
JSON: syncData,
|
|
}
|
|
}
|
|
|
|
// Otherwise, we wait for the notifier to tell us if something *may* have
|
|
// happened. We loop in case it turns out that nothing did happen.
|
|
|
|
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
|
|
defer timer.Stop()
|
|
|
|
userStreamListener := rp.notifier.GetListener(*syncReq)
|
|
defer userStreamListener.Close()
|
|
|
|
// We need the loop in case userStreamListener wakes up even if there isn't
|
|
// anything to send down. In this case, we'll jump out of the select but
|
|
// don't want to send anything back until we get some actual content to
|
|
// respond with, so we skip the return an go back to waiting for content to
|
|
// be sent down or the request timing out.
|
|
var hasTimedOut bool
|
|
sincePos := *syncReq.since
|
|
for {
|
|
select {
|
|
// Wait for notifier to wake us up
|
|
case <-userStreamListener.GetNotifyChannel(sincePos):
|
|
currPos = userStreamListener.GetSyncPosition()
|
|
sincePos = currPos
|
|
// Or for timeout to expire
|
|
case <-timer.C:
|
|
// We just need to ensure we get out of the select after reaching the
|
|
// timeout, but there's nothing specific we want to do in this case
|
|
// apart from that, so we do nothing except stating we're timing out
|
|
// and need to respond.
|
|
hasTimedOut = true
|
|
// Or for the request to be cancelled
|
|
case <-req.Context().Done():
|
|
logger.WithError(err).Error("request cancelled")
|
|
return jsonerror.InternalServerError()
|
|
}
|
|
|
|
// Note that we don't time out during calculation of sync
|
|
// response. This ensures that we don't waste the hard work
|
|
// of calculating the sync only to get timed out before we
|
|
// can respond
|
|
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
|
if err != nil {
|
|
logger.WithError(err).Error("rp.currentSyncForUser failed")
|
|
return jsonerror.InternalServerError()
|
|
}
|
|
|
|
if !syncData.IsEmpty() || hasTimedOut {
|
|
logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
|
|
return util.JSONResponse{
|
|
Code: http.StatusOK,
|
|
JSON: syncData,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
|
|
from := req.URL.Query().Get("from")
|
|
to := req.URL.Query().Get("to")
|
|
if from == "" || to == "" {
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: jsonerror.InvalidArgumentValue("missing ?from= or ?to="),
|
|
}
|
|
}
|
|
fromToken, err := types.NewStreamTokenFromString(from)
|
|
if err != nil {
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: jsonerror.InvalidArgumentValue("bad 'from' value"),
|
|
}
|
|
}
|
|
toToken, err := types.NewStreamTokenFromString(to)
|
|
if err != nil {
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
|
|
}
|
|
}
|
|
// work out room joins/leaves
|
|
res, err := rp.db.IncrementalSync(
|
|
req.Context(), types.NewResponse(), *device, fromToken, toToken, 10, false,
|
|
)
|
|
if err != nil {
|
|
util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
|
|
return jsonerror.InternalServerError()
|
|
}
|
|
|
|
res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
|
|
if err != nil {
|
|
util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
|
|
return jsonerror.InternalServerError()
|
|
}
|
|
return util.JSONResponse{
|
|
Code: 200,
|
|
JSON: struct {
|
|
Changed []string `json:"changed"`
|
|
Left []string `json:"left"`
|
|
}{
|
|
Changed: res.DeviceLists.Changed,
|
|
Left: res.DeviceLists.Left,
|
|
},
|
|
}
|
|
}
|
|
|
|
// nolint:gocyclo
|
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) {
|
|
res := types.NewResponse()
|
|
|
|
// See if we have any new tasks to do for the send-to-device messaging.
|
|
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, *req.since)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
|
|
}
|
|
|
|
// TODO: handle ignored users
|
|
if req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0 {
|
|
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
|
|
if err != nil {
|
|
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
|
|
}
|
|
} else {
|
|
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
|
|
if err != nil {
|
|
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
|
|
}
|
|
}
|
|
|
|
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
|
|
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
|
|
if err != nil {
|
|
return res, fmt.Errorf("rp.appendAccountData: %w", err)
|
|
}
|
|
res, err = rp.appendDeviceLists(res, req.device.UserID, *req.since, latestPos)
|
|
if err != nil {
|
|
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
|
|
}
|
|
err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
|
|
if err != nil {
|
|
return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
|
|
}
|
|
|
|
// Before we return the sync response, make sure that we take action on
|
|
// any send-to-device database updates or deletions that we need to do.
|
|
// Then add the updates into the sync response.
|
|
if len(updates) > 0 || len(deletions) > 0 {
|
|
// Handle the updates and deletions in the database.
|
|
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, *req.since)
|
|
if err != nil {
|
|
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
|
|
}
|
|
}
|
|
if len(events) > 0 {
|
|
// Add the updates into the sync response.
|
|
for _, event := range events {
|
|
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
|
|
}
|
|
|
|
// Get the next_batch from the sync response and increase the
|
|
// EDU counter.
|
|
if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil {
|
|
pos.Positions[1]++
|
|
res.NextBatch = pos.String()
|
|
}
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func (rp *RequestPool) appendDeviceLists(
|
|
data *types.Response, userID string, since, to types.StreamingToken,
|
|
) (*types.Response, error) {
|
|
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// nolint:gocyclo
|
|
func (rp *RequestPool) appendAccountData(
|
|
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
|
accountDataFilter *gomatrixserverlib.EventFilter,
|
|
) (*types.Response, error) {
|
|
// TODO: Account data doesn't have a sync position of its own, meaning that
|
|
// account data might be sent multiple time to the client if multiple account
|
|
// data keys were set between two message. This isn't a huge issue since the
|
|
// duplicate data doesn't represent a huge quantity of data, but an optimisation
|
|
// here would be making sure each data is sent only once to the client.
|
|
if req.since == nil || (req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0) {
|
|
// If this is the initial sync, we don't need to check if a data has
|
|
// already been sent. Instead, we send the whole batch.
|
|
dataReq := &userapi.QueryAccountDataRequest{
|
|
UserID: userID,
|
|
}
|
|
dataRes := &userapi.QueryAccountDataResponse{}
|
|
if err := rp.userAPI.QueryAccountData(req.ctx, dataReq, dataRes); err != nil {
|
|
return nil, err
|
|
}
|
|
for datatype, databody := range dataRes.GlobalAccountData {
|
|
data.AccountData.Events = append(
|
|
data.AccountData.Events,
|
|
gomatrixserverlib.ClientEvent{
|
|
Type: datatype,
|
|
Content: gomatrixserverlib.RawJSON(databody),
|
|
},
|
|
)
|
|
}
|
|
for r, j := range data.Rooms.Join {
|
|
for datatype, databody := range dataRes.RoomAccountData[r] {
|
|
j.AccountData.Events = append(
|
|
j.AccountData.Events,
|
|
gomatrixserverlib.ClientEvent{
|
|
Type: datatype,
|
|
Content: gomatrixserverlib.RawJSON(databody),
|
|
},
|
|
)
|
|
data.Rooms.Join[r] = j
|
|
}
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
r := types.Range{
|
|
From: req.since.PDUPosition(),
|
|
To: currentPos,
|
|
}
|
|
// If both positions are the same, it means that the data was saved after the
|
|
// latest room event. In that case, we need to decrement the old position as
|
|
// results are exclusive of Low.
|
|
if r.Low() == r.High() {
|
|
r.From--
|
|
}
|
|
|
|
// Sync is not initial, get all account data since the latest sync
|
|
dataTypes, err := rp.db.GetAccountDataInRange(
|
|
req.ctx, userID, r, accountDataFilter,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
|
|
}
|
|
|
|
if len(dataTypes) == 0 {
|
|
// TODO: this fixes the sytest but is it the right thing to do?
|
|
dataTypes[""] = []string{"m.push_rules"}
|
|
}
|
|
|
|
// Iterate over the rooms
|
|
for roomID, dataTypes := range dataTypes {
|
|
// Request the missing data from the database
|
|
for _, dataType := range dataTypes {
|
|
dataReq := userapi.QueryAccountDataRequest{
|
|
UserID: userID,
|
|
RoomID: roomID,
|
|
DataType: dataType,
|
|
}
|
|
dataRes := userapi.QueryAccountDataResponse{}
|
|
err = rp.userAPI.QueryAccountData(req.ctx, &dataReq, &dataRes)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if roomID == "" {
|
|
if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
|
|
data.AccountData.Events = append(
|
|
data.AccountData.Events,
|
|
gomatrixserverlib.ClientEvent{
|
|
Type: dataType,
|
|
Content: gomatrixserverlib.RawJSON(globalData),
|
|
},
|
|
)
|
|
}
|
|
} else {
|
|
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
|
|
joinData := data.Rooms.Join[roomID]
|
|
joinData.AccountData.Events = append(
|
|
joinData.AccountData.Events,
|
|
gomatrixserverlib.ClientEvent{
|
|
Type: dataType,
|
|
Content: gomatrixserverlib.RawJSON(roomData),
|
|
},
|
|
)
|
|
data.Rooms.Join[roomID] = joinData
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// shouldReturnImmediately returns whether the /sync request is an initial sync,
|
|
// or timeout=0, or full_state=true, in any of the cases the request should
|
|
// return immediately.
|
|
func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
|
|
if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState {
|
|
return true
|
|
}
|
|
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)
|
|
return werr == nil && waiting
|
|
}
|