0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-10-31 21:19:04 +01:00

Basic sync filtering (#1721)

* Add some filtering (postgres only for now)

* Fix build error

* Try to use request filter

* Use default filter as a template when retrieving from the database

* Remove unused strut

* Update sytest-whitelist

* Add filtering to SelectEarlyEvents

* Fix Postgres selectEarlyEvents query

* Attempt filtering on SQLite

* Test limit, set field for limit/order in prepareWithFilters

* Remove debug logging, add comments

* Tweaks, debug logging

* Separate SQLite stream IDs

* Fix filtering in current state table

* Fix lock issues

* More tweaks

* Current state requires room ID

* Review comments
This commit is contained in:
Neil Alexander 2021-01-19 18:00:42 +00:00 committed by GitHub
parent 80aa9aa8b0
commit b70238f2d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 279 additions and 160 deletions

View file

@ -367,7 +367,6 @@ func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) typ
Timeout: 1 * time.Minute, Timeout: 1 * time.Minute,
Since: since, Since: since,
WantFullState: false, WantFullState: false,
Limit: 20,
Log: util.GetLogger(context.TODO()), Log: util.GetLogger(context.TODO()),
Context: context.TODO(), Context: context.TODO(),
} }

View file

@ -235,12 +235,15 @@ func (r *messagesReq) retrieveEvents() (
clientEvents []gomatrixserverlib.ClientEvent, start, clientEvents []gomatrixserverlib.ClientEvent, start,
end types.TopologyToken, err error, end types.TopologyToken, err error,
) { ) {
eventFilter := gomatrixserverlib.DefaultRoomEventFilter()
eventFilter.Limit = r.limit
// Retrieve the events from the local database. // Retrieve the events from the local database.
var streamEvents []types.StreamEvent var streamEvents []types.StreamEvent
if r.fromStream != nil { if r.fromStream != nil {
toStream := r.to.StreamToken() toStream := r.to.StreamToken()
streamEvents, err = r.db.GetEventsInStreamingRange( streamEvents, err = r.db.GetEventsInStreamingRange(
r.ctx, r.fromStream, &toStream, r.roomID, r.limit, r.backwardOrdering, r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering,
) )
} else { } else {
streamEvents, err = r.db.GetEventsInTopologicalRange( streamEvents, err = r.db.GetEventsInTopologicalRange(

View file

@ -40,7 +40,7 @@ type Database interface {
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error) GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
@ -105,7 +105,7 @@ type Database interface {
// Returns an error if there was a problem communicating with the database. // Returns an error if there was a problem communicating with the database.
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error) DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit. // GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error)
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit. // GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// EventPositionInTopology returns the depth and stream position of the given event. // EventPositionInTopology returns the depth and stream position of the given event.

View file

@ -83,7 +83,7 @@ func (s *filterStatements) SelectFilter(
} }
// Unmarshal JSON into Filter struct // Unmarshal JSON into Filter struct
var filter gomatrixserverlib.Filter filter := gomatrixserverlib.DefaultFilter()
if err = json.Unmarshal(filterData, &filter); err != nil { if err = json.Unmarshal(filterData, &filter); err != nil {
return nil, err return nil, err
} }

View file

@ -84,17 +84,29 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" + const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4" " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $8"
const selectRecentEventsForSyncSQL = "" + const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
" ORDER BY id DESC LIMIT $4" " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $8"
const selectEarlyEventsSQL = "" + const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4" " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $8"
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -322,7 +334,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync. // from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents( func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool, chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, bool, error) { ) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt var stmt *sql.Stmt
@ -331,7 +343,14 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
} else { } else {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
} }
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1) rows, err := stmt.QueryContext(
ctx, roomID, r.Low(), r.High(),
pq.StringArray(eventFilter.Senders),
pq.StringArray(eventFilter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
eventFilter.Limit+1,
)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -350,7 +369,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
} }
// we queried for 1 more than the limit, so if we returned one more mark limited=true // we queried for 1 more than the limit, so if we returned one more mark limited=true
limited := false limited := false
if len(events) > limit { if len(events) > eventFilter.Limit {
limited = true limited = true
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
if chronologicalOrder { if chronologicalOrder {
@ -367,10 +386,17 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
// from a given position, up to a maximum of 'limit'. // from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) SelectEarlyEvents( func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
) ([]types.StreamEvent, error) { ) ([]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt) stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) rows, err := stmt.QueryContext(
ctx, roomID, r.Low(), r.High(),
pq.StringArray(eventFilter.Senders),
pq.StringArray(eventFilter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
eventFilter.Limit,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -110,8 +110,8 @@ func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, mem
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership) return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
} }
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, limit, chronologicalOrder, onlySyncEvents) return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
} }
func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) { func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
@ -151,7 +151,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse
func (d *Database) GetEventsInStreamingRange( func (d *Database) GetEventsInStreamingRange(
ctx context.Context, ctx context.Context,
from, to *types.StreamingToken, from, to *types.StreamingToken,
roomID string, limit int, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter,
backwardOrdering bool, backwardOrdering bool,
) (events []types.StreamEvent, err error) { ) (events []types.StreamEvent, err error) {
r := types.Range{ r := types.Range{
@ -162,14 +162,14 @@ func (d *Database) GetEventsInStreamingRange(
if backwardOrdering { if backwardOrdering {
// When using backward ordering, we want the most recent events first. // When using backward ordering, we want the most recent events first.
if events, _, err = d.OutputEvents.SelectRecentEvents( if events, _, err = d.OutputEvents.SelectRecentEvents(
ctx, nil, roomID, r, limit, false, false, ctx, nil, roomID, r, eventFilter, false, false,
); err != nil { ); err != nil {
return return
} }
} else { } else {
// When using forward ordering, we want the least recent events first. // When using forward ordering, we want the least recent events first.
if events, err = d.OutputEvents.SelectEarlyEvents( if events, err = d.OutputEvents.SelectEarlyEvents(
ctx, nil, roomID, r, limit, ctx, nil, roomID, r, eventFilter,
); err != nil { ); err != nil {
return return
} }

View file

@ -82,7 +82,7 @@ func (s *accountDataStatements) InsertAccountData(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
userID, roomID, dataType string, userID, roomID, dataType string,
) (pos types.StreamPosition, err error) { ) (pos types.StreamPosition, err error) {
pos, err = s.streamIDStatements.nextStreamID(ctx, txn) pos, err = s.streamIDStatements.nextAccountDataID(ctx, txn)
if err != nil { if err != nil {
return return
} }

View file

@ -19,6 +19,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt"
"strings" "strings"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -66,13 +67,8 @@ const selectRoomIDsWithMembershipSQL = "" +
"SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" "SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
const selectCurrentStateSQL = "" + const selectCurrentStateSQL = "" +
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" + "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1"
" AND ( $2 IS NULL OR sender IN ($2) )" + // WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
" AND ( $3 IS NULL OR NOT(sender IN ($3)) )" +
" AND ( $4 IS NULL OR type IN ($4) )" +
" AND ( $5 IS NULL OR NOT(type IN ($5)) )" +
" AND ( $6 IS NULL OR contains_url = $6 )" +
" LIMIT $7"
const selectJoinedUsersSQL = "" + const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
@ -95,7 +91,6 @@ type currentRoomStateStatements struct {
deleteRoomStateByEventIDStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt
DeleteRoomStateForRoomStmt *sql.Stmt DeleteRoomStateForRoomStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt
selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt
selectStateEventStmt *sql.Stmt selectStateEventStmt *sql.Stmt
} }
@ -121,9 +116,6 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (t
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
return nil, err return nil, err
} }
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
return nil, err
}
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return nil, err return nil, err
} }
@ -185,17 +177,22 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
// CurrentState returns all the current state events for the given room. // CurrentState returns all the current state events for the given room.
func (s *currentRoomStateStatements) SelectCurrentState( func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
stateFilterPart *gomatrixserverlib.StateFilter, stateFilter *gomatrixserverlib.StateFilter,
) ([]*gomatrixserverlib.HeaderedEvent, error) { ) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt) stmt, params, err := prepareWithFilters(
rows, err := stmt.QueryContext(ctx, roomID, s.db, txn, selectCurrentStateSQL,
nil, // FIXME: pq.StringArray(stateFilterPart.Senders), []interface{}{
nil, // FIXME: pq.StringArray(stateFilterPart.NotSenders), roomID,
nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), },
nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), stateFilter.Senders, stateFilter.NotSenders,
stateFilterPart.ContainsURL, stateFilter.Types, stateFilter.NotTypes,
stateFilterPart.Limit, stateFilter.Limit, FilterOrderNone,
) )
if err != nil {
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
}
rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -87,7 +87,7 @@ func (s *filterStatements) SelectFilter(
} }
// Unmarshal JSON into Filter struct // Unmarshal JSON into Filter struct
var filter gomatrixserverlib.Filter filter := gomatrixserverlib.DefaultFilter()
if err = json.Unmarshal(filterData, &filter); err != nil { if err = json.Unmarshal(filterData, &filter); err != nil {
return nil, err return nil, err
} }

View file

@ -0,0 +1,76 @@
package sqlite3
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
type FilterOrder int
const (
FilterOrderNone = iota
FilterOrderAsc
FilterOrderDesc
)
// prepareWithFilters returns a prepared statement with the
// relevant filters included. It also includes an []interface{}
// list of all the relevant parameters to pass straight to
// QueryContext, QueryRowContext etc.
// We don't take the filter object directly here because the
// fields might come from either a StateFilter or an EventFilter,
// and it's easier just to have the caller extract the relevant
// parts.
func prepareWithFilters(
db *sql.DB, txn *sql.Tx, query string, params []interface{},
senders, notsenders, types, nottypes []string,
limit int, order FilterOrder,
) (*sql.Stmt, []interface{}, error) {
offset := len(params)
if count := len(senders); count > 0 {
query += " AND sender IN " + sqlutil.QueryVariadicOffset(count, offset)
for _, v := range senders {
params, offset = append(params, v), offset+1
}
}
if count := len(notsenders); count > 0 {
query += " AND sender NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
for _, v := range notsenders {
params, offset = append(params, v), offset+1
}
}
if count := len(types); count > 0 {
query += " AND type IN " + sqlutil.QueryVariadicOffset(count, offset)
for _, v := range types {
params, offset = append(params, v), offset+1
}
}
if count := len(nottypes); count > 0 {
query += " AND type NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
for _, v := range nottypes {
params, offset = append(params, v), offset+1
}
}
switch order {
case FilterOrderAsc:
query += " ORDER BY id ASC"
case FilterOrderDesc:
query += " ORDER BY id DESC"
}
query += fmt.Sprintf(" LIMIT $%d", offset+1)
params = append(params, limit)
var stmt *sql.Stmt
var err error
if txn != nil {
stmt, err = txn.Prepare(query)
} else {
stmt, err = db.Prepare(query)
}
if err != nil {
return nil, nil, fmt.Errorf("s.db.Prepare: %w", err)
}
return stmt, params, nil
}

View file

@ -93,7 +93,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Inv
func (s *inviteEventsStatements) InsertInviteEvent( func (s *inviteEventsStatements) InsertInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent, ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) { ) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn) streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil { if err != nil {
return return
} }
@ -119,7 +119,7 @@ func (s *inviteEventsStatements) InsertInviteEvent(
func (s *inviteEventsStatements) DeleteInviteEvent( func (s *inviteEventsStatements) DeleteInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEventID string, ctx context.Context, txn *sql.Tx, inviteEventID string,
) (types.StreamPosition, error) { ) (types.StreamPosition, error) {
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn) streamPos, err := s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil { if err != nil {
return streamPos, err return streamPos, err
} }

View file

@ -19,6 +19,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt"
"sort" "sort"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -60,18 +61,18 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" + const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id = $1 AND id > $2 AND id <= $3"
" ORDER BY id DESC LIMIT $4" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectRecentEventsForSyncSQL = "" + const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
" ORDER BY id DESC LIMIT $4" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" + const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id = $1 AND id > $2 AND id <= $3"
" ORDER BY id ASC LIMIT $4" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -79,29 +80,12 @@ const selectMaxEventIDSQL = "" +
const updateEventJSONSQL = "" + const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
/*
$1 = oldPos,
$2 = newPos,
$3 = pq.StringArray(stateFilterPart.Senders),
$4 = pq.StringArray(stateFilterPart.NotSenders),
$5 = pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
$6 = pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
$7 = stateFilterPart.ContainsURL,
$8 = stateFilterPart.Limit,
*/
const selectStateInRangeSQL = "" + const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" + " FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" + // old/new pos " WHERE (id > $1 AND id <= $2)" +
" AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
/* " AND ( $3 IS NULL OR sender IN ($3) )" + // sender // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
" AND ( $4 IS NULL OR NOT(sender IN ($4)) )" + // not sender
" AND ( $5 IS NULL OR type IN ($5) )" + // type
" AND ( $6 IS NULL OR NOT(type IN ($6)) )" + // not type
" AND ( $7 IS NULL OR contains_url = $7)" + // contains URL? */
" ORDER BY id ASC" +
" LIMIT $8" // limit
const deleteEventsForRoomSQL = "" + const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1" "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
@ -112,10 +96,6 @@ type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt
} }
@ -138,18 +118,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
return nil, err return nil, err
} }
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
return nil, err
}
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
return nil, err
}
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
return nil, err
}
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err
}
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil { if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err return nil, err
} }
@ -173,19 +141,22 @@ func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event
// two positions, only the most recent state is returned. // two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) SelectStateInRange( func (s *outputRoomEventsStatements) SelectStateInRange(
ctx context.Context, txn *sql.Tx, r types.Range, ctx context.Context, txn *sql.Tx, r types.Range,
stateFilterPart *gomatrixserverlib.StateFilter, stateFilter *gomatrixserverlib.StateFilter,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) { ) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt) stmt, params, err := prepareWithFilters(
s.db, txn, selectStateInRangeSQL,
rows, err := stmt.QueryContext( []interface{}{
ctx, r.Low(), r.High(), r.Low(), r.High(),
/*pq.StringArray(stateFilterPart.Senders), },
pq.StringArray(stateFilterPart.NotSenders), stateFilter.Senders, stateFilter.NotSenders,
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), stateFilter.Types, stateFilter.NotTypes,
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), stateFilter.Limit, FilterOrderAsc,
stateFilterPart.ContainsURL,*/
stateFilterPart.Limit,
) )
if err != nil {
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
}
rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -298,16 +269,21 @@ func (s *outputRoomEventsStatements) InsertEvent(
return 0, err return 0, err
} }
addStateJSON, err := json.Marshal(addState) var addStateJSON, removeStateJSON []byte
if err != nil { if len(addState) > 0 {
return 0, err addStateJSON, err = json.Marshal(addState)
} }
removeStateJSON, err := json.Marshal(removeState)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("json.Marshal(addState): %w", err)
}
if len(removeState) > 0 {
removeStateJSON, err = json.Marshal(removeState)
}
if err != nil {
return 0, fmt.Errorf("json.Marshal(removeState): %w", err)
} }
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn) streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -333,17 +309,30 @@ func (s *outputRoomEventsStatements) InsertEvent(
func (s *outputRoomEventsStatements) SelectRecentEvents( func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool, chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, bool, error) { ) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt var query string
if onlySyncEvents { if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) query = selectRecentEventsForSyncSQL
} else { } else {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) query = selectRecentEventsSQL
} }
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1) stmt, params, err := prepareWithFilters(
s.db, txn, query,
[]interface{}{
roomID, r.Low(), r.High(),
},
eventFilter.Senders, eventFilter.NotSenders,
eventFilter.Types, eventFilter.NotTypes,
eventFilter.Limit+1, FilterOrderDesc,
)
if err != nil {
return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
}
rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -362,7 +351,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
} }
// we queried for 1 more than the limit, so if we returned one more mark limited=true // we queried for 1 more than the limit, so if we returned one more mark limited=true
limited := false limited := false
if len(events) > limit { if len(events) > eventFilter.Limit {
limited = true limited = true
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
if chronologicalOrder { if chronologicalOrder {
@ -376,10 +365,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
func (s *outputRoomEventsStatements) SelectEarlyEvents( func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
) ([]types.StreamEvent, error) { ) ([]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt) stmt, params, err := prepareWithFilters(
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) s.db, txn, selectEarlyEventsSQL,
[]interface{}{
roomID, r.Low(), r.High(),
},
eventFilter.Senders, eventFilter.NotSenders,
eventFilter.Types, eventFilter.NotTypes,
eventFilter.Limit, FilterOrderAsc,
)
if err != nil {
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
}
rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -108,7 +108,7 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *streamIDStatements) (tables.Peeks
func (s *peekStatements) InsertPeek( func (s *peekStatements) InsertPeek(
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string, ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
) (streamPos types.StreamPosition, err error) { ) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn) streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil { if err != nil {
return return
} }
@ -120,7 +120,7 @@ func (s *peekStatements) InsertPeek(
func (s *peekStatements) DeletePeek( func (s *peekStatements) DeletePeek(
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string, ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
) (streamPos types.StreamPosition, err error) { ) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn) streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil { if err != nil {
return return
} }
@ -131,7 +131,7 @@ func (s *peekStatements) DeletePeek(
func (s *peekStatements) DeletePeeks( func (s *peekStatements) DeletePeeks(
ctx context.Context, txn *sql.Tx, roomID, userID string, ctx context.Context, txn *sql.Tx, roomID, userID string,
) (types.StreamPosition, error) { ) (types.StreamPosition, error) {
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn) streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View file

@ -20,6 +20,10 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("global", 0)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("receipt", 0) INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("receipt", 0)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING;
` `
const increaseStreamIDStmt = "" + const increaseStreamIDStmt = "" +
@ -49,7 +53,7 @@ func (s *streamIDStatements) prepare(db *sql.DB) (err error) {
return return
} }
func (s *streamIDStatements) nextStreamID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { func (s *streamIDStatements) nextPDUID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt) increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt) selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "global"); err != nil { if _, err = increaseStmt.ExecContext(ctx, "global"); err != nil {
@ -68,3 +72,23 @@ func (s *streamIDStatements) nextReceiptID(ctx context.Context, txn *sql.Tx) (po
err = selectStmt.QueryRowContext(ctx, "receipt").Scan(&pos) err = selectStmt.QueryRowContext(ctx, "receipt").Scan(&pos)
return return
} }
func (s *streamIDStatements) nextInviteID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "invite"); err != nil {
return
}
err = selectStmt.QueryRowContext(ctx, "invite").Scan(&pos)
return
}
func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "accountdata"); err != nil {
return
}
err = selectStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
return
}

View file

@ -56,9 +56,9 @@ type Events interface {
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
// SelectEarlyEvents returns the earliest events in the given room. // SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely. // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.

View file

@ -48,13 +48,14 @@ func (p *PDUStreamProvider) CompleteSync(
return from return from
} }
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request stateFilter := req.Filter.Room.State
eventFilter := req.Filter.Room.Timeline
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync( jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &stateFilter, req.Limit, req.Device, ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
) )
if err != nil { if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -74,7 +75,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted { if !peek.Deleted {
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync( jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device, ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device,
) )
if err != nil { if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -104,8 +105,8 @@ func (p *PDUStreamProvider) IncrementalSync(
var stateDeltas []types.StateDelta var stateDeltas []types.StateDelta
var joinedRooms []string var joinedRooms []string
// TODO: use filter provided in request stateFilter := req.Filter.Room.State
stateFilter := gomatrixserverlib.DefaultStateFilter() eventFilter := req.Filter.Room.Timeline
if req.WantFullState { if req.WantFullState {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
@ -124,7 +125,7 @@ func (p *PDUStreamProvider) IncrementalSync(
} }
for _, delta := range stateDeltas { for _, delta := range stateDeltas {
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil { if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return newPos return newPos
} }
@ -138,7 +139,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
device *userapi.Device, device *userapi.Device,
r types.Range, r types.Range,
delta types.StateDelta, delta types.StateDelta,
numRecentEventsPerRoom int, eventFilter *gomatrixserverlib.RoomEventFilter,
res *types.Response, res *types.Response,
) error { ) error {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
@ -152,7 +153,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
} }
recentStreamEvents, limited, err := p.DB.RecentEvents( recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, delta.RoomID, r, ctx, delta.RoomID, r,
numRecentEventsPerRoom, true, true, eventFilter, true, true,
) )
if err != nil { if err != nil {
return err return err
@ -209,7 +210,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
roomID string, roomID string,
r types.Range, r types.Range,
stateFilter *gomatrixserverlib.StateFilter, stateFilter *gomatrixserverlib.StateFilter,
numRecentEventsPerRoom int, device *userapi.Device, eventFilter *gomatrixserverlib.RoomEventFilter,
device *userapi.Device,
) (jr *types.JoinResponse, err error) { ) (jr *types.JoinResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter) stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter)
@ -221,7 +223,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
var recentStreamEvents []types.StreamEvent var recentStreamEvents []types.StreamEvent
var limited bool var limited bool
recentStreamEvents, limited, err = p.DB.RecentEvents( recentStreamEvents, limited, err = p.DB.RecentEvents(
ctx, roomID, r, numRecentEventsPerRoom, true, true, ctx, roomID, r, eventFilter, true, true,
) )
if err != nil { if err != nil {
return return

View file

@ -16,6 +16,7 @@ package sync
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -31,14 +32,6 @@ import (
const defaultSyncTimeout = time.Duration(0) const defaultSyncTimeout = time.Duration(0)
const DefaultTimelineLimit = 20 const DefaultTimelineLimit = 20
type filter struct {
Room struct {
Timeline struct {
Limit *int `json:"limit"`
} `json:"timeline"`
} `json:"room"`
}
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) { func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) {
timeout := getTimeout(req.URL.Query().Get("timeout")) timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state") fullState := req.URL.Query().Get("full_state")
@ -51,41 +44,37 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
return nil, err return nil, err
} }
} }
timelineLimit := DefaultTimelineLimit
// TODO: read from stored filters too // TODO: read from stored filters too
filter := gomatrixserverlib.DefaultFilter()
filterQuery := req.URL.Query().Get("filter") filterQuery := req.URL.Query().Get("filter")
if filterQuery != "" { if filterQuery != "" {
if filterQuery[0] == '{' { if filterQuery[0] == '{' {
// attempt to parse the timeline limit at least // Parse the filter from the query string
var f filter if err := json.Unmarshal([]byte(filterQuery), &filter); err != nil {
err := json.Unmarshal([]byte(filterQuery), &f) return nil, fmt.Errorf("json.Unmarshal: %w", err)
if err == nil && f.Room.Timeline.Limit != nil {
timelineLimit = *f.Room.Timeline.Limit
} }
} else { } else {
// attempt to load the filter ID // Try to load the filter from the database
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
return nil, err return nil, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
} }
f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery) if f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery); err != nil {
if err == nil { util.GetLogger(req.Context()).WithError(err).Error("syncDB.GetFilter failed")
timelineLimit = f.Room.Timeline.Limit return nil, fmt.Errorf("syncDB.GetFilter: %w", err)
} else {
filter = *f
} }
} }
} }
filter := gomatrixserverlib.DefaultEventFilter()
filter.Limit = timelineLimit
// TODO: Additional query params: set_presence, filter
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
"user_id": device.UserID, "user_id": device.UserID,
"device_id": device.ID, "device_id": device.ID,
"since": since, "since": since,
"timeout": timeout, "timeout": timeout,
"limit": timelineLimit, "limit": filter.Room.Timeline.Limit,
}) })
return &types.SyncRequest{ return &types.SyncRequest{
@ -96,7 +85,6 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
Filter: filter, // Filter: filter, //
Since: since, // Since: since, //
Timeout: timeout, // Timeout: timeout, //
Limit: timelineLimit, //
Rooms: make(map[string]string), // Populated by the PDU stream Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, // WantFullState: wantFullState, //
}, nil }, nil

View file

@ -14,9 +14,8 @@ type SyncRequest struct {
Log *logrus.Entry Log *logrus.Entry
Device *userapi.Device Device *userapi.Device
Response *Response Response *Response
Filter gomatrixserverlib.EventFilter Filter gomatrixserverlib.Filter
Since StreamingToken Since StreamingToken
Limit int
Timeout time.Duration Timeout time.Duration
WantFullState bool WantFullState bool

View file

@ -503,3 +503,8 @@ A next_batch token can be used in the v1 messages API
Users receive device_list updates for their own devices Users receive device_list updates for their own devices
m.room.history_visibility == "world_readable" allows/forbids appropriately for Guest users m.room.history_visibility == "world_readable" allows/forbids appropriately for Guest users
m.room.history_visibility == "world_readable" allows/forbids appropriately for Real users m.room.history_visibility == "world_readable" allows/forbids appropriately for Real users
State is included in the timeline in the initial sync
State from remote users is included in the state in the initial sync
Changes to state are included in an gapped incremental sync
A full_state incremental update returns all state
Can pass a JSON filter as a query parameter