2020-01-03 15:07:05 +01:00
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
2017-04-21 00:40:52 +02:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2020-01-03 15:07:05 +01:00
package postgres
2017-03-30 16:29:23 +02:00
import (
2017-09-18 17:52:22 +02:00
"context"
2017-03-30 16:29:23 +02:00
"database/sql"
2019-08-07 12:12:09 +02:00
"encoding/json"
2023-02-07 14:31:23 +01:00
"fmt"
2018-11-07 20:12:23 +01:00
"sort"
2017-03-30 16:29:23 +02:00
2023-02-07 14:31:23 +01:00
"github.com/lib/pq"
2020-06-12 15:55:57 +02:00
"github.com/matrix-org/dendrite/internal"
2023-02-07 14:31:23 +01:00
"github.com/matrix-org/dendrite/internal/sqlutil"
2017-12-06 10:37:18 +01:00
"github.com/matrix-org/dendrite/roomserver/api"
2023-04-27 13:54:20 +02:00
rstypes "github.com/matrix-org/dendrite/roomserver/types"
2022-07-25 11:39:22 +02:00
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
2020-05-14 10:53:55 +02:00
"github.com/matrix-org/dendrite/syncapi/storage/tables"
2023-04-04 19:16:53 +02:00
"github.com/matrix-org/dendrite/syncapi/synctypes"
2020-01-23 18:51:10 +01:00
"github.com/matrix-org/dendrite/syncapi/types"
2017-04-05 11:30:13 +02:00
"github.com/matrix-org/gomatrixserverlib"
2017-03-30 16:29:23 +02:00
)
const outputRoomEventsSchema = `
2017-09-19 18:15:46 +02:00
-- This sequence is shared between all the tables generated from kafka logs .
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id ;
2017-03-30 16:29:23 +02:00
-- Stores output room events received from the roomserver .
2017-08-07 12:51:46 +02:00
CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
2020-01-23 18:51:10 +01:00
-- An incrementing ID which denotes the position in the log that this event resides at .
-- NB : ' serial ' makes no guarantees to increment by 1 every time , only that it increments .
-- This isn ' t a problem for us since we just want to order by this field .
id BIGINT PRIMARY KEY DEFAULT nextval ( ' syncapi_stream_id ' ) ,
-- The event ID for the event
2023-02-07 14:31:23 +01:00
event_id TEXT NOT NULL CONSTRAINT syncapi_output_room_event_id_idx UNIQUE ,
2020-01-23 18:51:10 +01:00
-- The ' room_id ' key for the event .
room_id TEXT NOT NULL ,
2020-03-19 13:07:01 +01:00
-- The headered JSON for the event , containing potentially additional metadata such as
-- the room version . Stored as TEXT because this should be valid UTF - 8.
headered_event_json TEXT NOT NULL ,
2020-01-23 18:51:10 +01:00
-- The event type e . g ' m . room . member ' .
type TEXT NOT NULL ,
-- The ' sender ' property of the event .
sender TEXT NOT NULL ,
-- true if the event content contains a url key .
contains_url BOOL NOT NULL ,
-- A list of event IDs which represent a delta of added / removed room state . This can be NULL
-- if there is no delta .
add_state_ids TEXT [ ] ,
remove_state_ids TEXT [ ] ,
-- The client session that sent the event , if any
session_id BIGINT ,
-- The transaction id used to send the event , if any
transaction_id TEXT ,
-- Should the event be excluded from responses to / sync requests . Useful for
-- events retrieved through backfilling that have a position in the stream
-- that relates to the moment these were retrieved rather than the moment these
-- were emitted .
2022-07-18 14:46:15 +02:00
exclude_from_sync BOOL DEFAULT FALSE ,
-- The history visibility before this event ( 1 - world_readable ; 2 - shared ; 3 - invited ; 4 - joined )
history_visibility SMALLINT NOT NULL DEFAULT 2
2017-03-30 16:29:23 +02:00
) ;
2022-05-10 12:23:36 +02:00
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events ( type ) ;
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_sender_idx ON syncapi_output_room_events ( sender ) ;
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_room_id_idx ON syncapi_output_room_events ( room_id ) ;
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON syncapi_output_room_events ( exclude_from_sync ) ;
2022-10-04 17:43:10 +02:00
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_add_state_ids_idx ON syncapi_output_room_events ( ( add_state_ids IS NOT NULL ) ) ;
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_remove_state_ids_idx ON syncapi_output_room_events ( ( remove_state_ids IS NOT NULL ) ) ;
2023-02-07 14:31:23 +01:00
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_recent_events_idx ON syncapi_output_room_events ( room_id , exclude_from_sync , id , sender , type ) ;
2017-03-30 16:29:23 +02:00
`
const insertEventSQL = "" +
2017-08-07 12:51:46 +02:00
"INSERT INTO syncapi_output_room_events (" +
2022-07-18 14:46:15 +02:00
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
2023-02-07 14:31:23 +01:00
"ON CONFLICT ON CONSTRAINT syncapi_output_room_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
2020-01-24 12:40:27 +01:00
"RETURNING id"
2017-04-05 11:30:13 +02:00
const selectEventsSQL = "" +
2022-07-18 14:46:15 +02:00
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)"
2017-04-10 16:12:18 +02:00
2022-04-13 13:16:02 +02:00
const selectEventsWithFilterSQL = "" +
2022-07-18 14:46:15 +02:00
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
2022-04-13 13:16:02 +02:00
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" +
" AND ( $6::bool IS NULL OR contains_url = $6 )" +
" LIMIT $7"
2017-04-13 17:56:46 +02:00
const selectRecentEventsSQL = "" +
2022-07-18 14:46:15 +02:00
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
2017-08-07 12:51:46 +02:00
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
2021-01-19 19:00:42 +01:00
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $8"
2017-04-13 17:56:46 +02:00
2023-02-07 14:31:23 +01:00
// selectRecentEventsForSyncSQL contains an optimization to get the recent events for a list of rooms, using a LATERAL JOIN
// The sub select inside LATERAL () is executed for all room_ids it gets as a parameter $1
const selectRecentEventsForSyncSQL = `
WITH room_ids AS (
SELECT unnest ( $ 1 : : text [ ] ) AS room_id
)
SELECT x . *
FROM room_ids ,
LATERAL (
SELECT room_id , event_id , id , headered_event_json , session_id , exclude_from_sync , transaction_id , history_visibility
FROM syncapi_output_room_events recent_events
WHERE
recent_events . room_id = room_ids . room_id
AND recent_events . exclude_from_sync = FALSE
AND id > $ 2 AND id <= $ 3
AND ( $ 4 : : text [ ] IS NULL OR sender = ANY ( $ 4 ) )
AND ( $ 5 : : text [ ] IS NULL OR NOT ( sender = ANY ( $ 5 ) ) )
AND ( $ 6 : : text [ ] IS NULL OR type LIKE ANY ( $ 6 ) )
AND ( $ 7 : : text [ ] IS NULL OR NOT ( type LIKE ANY ( $ 7 ) ) )
ORDER BY recent_events . id DESC
LIMIT $ 8
) AS x
`
2020-01-23 18:51:10 +01:00
2017-09-19 18:15:46 +02:00
const selectMaxEventIDSQL = "" +
2017-08-07 12:51:46 +02:00
"SELECT MAX(id) FROM syncapi_output_room_events"
2017-04-10 16:12:18 +02:00
2020-07-08 18:45:39 +02:00
const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
2017-04-19 17:04:01 +02:00
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
2022-10-19 14:05:39 +02:00
const selectStateInRangeFilteredSQL = "" +
2022-07-18 14:46:15 +02:00
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
2017-08-07 12:51:46 +02:00
" FROM syncapi_output_room_events" +
2017-05-12 17:56:17 +02:00
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
2022-03-11 13:48:45 +01:00
" AND room_id = ANY($3)" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" AND ( $8::bool IS NULL OR contains_url = $8 )" +
2022-11-02 10:34:19 +01:00
" ORDER BY id ASC"
2017-04-19 17:04:01 +02:00
2022-10-19 14:05:39 +02:00
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND room_id = ANY($3)" +
2022-11-02 10:34:19 +01:00
" ORDER BY id ASC"
2022-10-19 14:05:39 +02:00
2020-09-15 12:17:46 +02:00
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
2022-02-21 17:12:22 +01:00
const selectContextEventSQL = "" +
2022-07-18 14:46:15 +02:00
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
2022-02-21 17:12:22 +01:00
const selectContextBeforeEventSQL = "" +
2022-07-18 14:46:15 +02:00
"SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
2022-02-21 17:12:22 +01:00
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $3"
const selectContextAfterEventSQL = "" +
2022-07-18 14:46:15 +02:00
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
2022-02-21 17:12:22 +01:00
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
2023-01-19 21:02:32 +01:00
const purgeEventsSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
2022-09-27 18:06:49 +02:00
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
2017-03-30 16:29:23 +02:00
type outputRoomEventsStatements struct {
2022-10-19 14:05:39 +02:00
insertEventStmt * sql . Stmt
selectEventsStmt * sql . Stmt
selectEventsWitFilterStmt * sql . Stmt
selectMaxEventIDStmt * sql . Stmt
selectRecentEventsStmt * sql . Stmt
selectRecentEventsForSyncStmt * sql . Stmt
selectStateInRangeFilteredStmt * sql . Stmt
selectStateInRangeStmt * sql . Stmt
updateEventJSONStmt * sql . Stmt
deleteEventsForRoomStmt * sql . Stmt
selectContextEventStmt * sql . Stmt
selectContextBeforeEventStmt * sql . Stmt
selectContextAfterEventStmt * sql . Stmt
2023-01-19 21:02:32 +01:00
purgeEventsStmt * sql . Stmt
2022-10-19 14:05:39 +02:00
selectSearchStmt * sql . Stmt
2017-03-30 16:29:23 +02:00
}
2020-05-14 10:53:55 +02:00
func NewPostgresEventsTable ( db * sql . DB ) ( tables . Events , error ) {
s := & outputRoomEventsStatements { }
_ , err := db . Exec ( outputRoomEventsSchema )
2017-03-30 16:29:23 +02:00
if err != nil {
2020-05-14 10:53:55 +02:00
return nil , err
2017-03-30 16:29:23 +02:00
}
2022-07-25 11:39:22 +02:00
2023-02-07 14:31:23 +01:00
migrationName := "syncapi: rename dupe index (output_room_events)"
var cName string
err = db . QueryRowContext ( context . Background ( ) , "select constraint_name from information_schema.table_constraints where table_name = 'syncapi_output_room_events' AND constraint_name = 'syncapi_event_id_idx'" ) . Scan ( & cName )
switch err {
case sql . ErrNoRows : // migration was already executed, as the index was renamed
if err = sqlutil . InsertMigration ( context . Background ( ) , db , migrationName ) ; err != nil {
return nil , fmt . Errorf ( "unable to manually insert migration '%s': %w" , migrationName , err )
}
case nil :
default :
return nil , err
}
2022-07-25 11:39:22 +02:00
m := sqlutil . NewMigrator ( db )
2022-08-11 18:23:35 +02:00
m . AddMigrations (
sqlutil . Migration {
Version : "syncapi: add history visibility column (output_room_events)" ,
Up : deltas . UpAddHistoryVisibilityColumnOutputRoomEvents ,
} ,
2023-02-07 14:31:23 +01:00
sqlutil . Migration {
Version : migrationName ,
Up : deltas . UpRenameOutputRoomEventsIndex ,
} ,
2022-08-11 18:23:35 +02:00
)
2022-07-25 11:39:22 +02:00
err = m . Up ( context . Background ( ) )
if err != nil {
return nil , err
}
2022-02-21 17:12:22 +01:00
return s , sqlutil . StatementList {
{ & s . insertEventStmt , insertEventSQL } ,
{ & s . selectEventsStmt , selectEventsSQL } ,
2022-04-13 13:16:02 +02:00
{ & s . selectEventsWitFilterStmt , selectEventsWithFilterSQL } ,
2022-02-21 17:12:22 +01:00
{ & s . selectMaxEventIDStmt , selectMaxEventIDSQL } ,
{ & s . selectRecentEventsStmt , selectRecentEventsSQL } ,
{ & s . selectRecentEventsForSyncStmt , selectRecentEventsForSyncSQL } ,
2022-10-19 14:05:39 +02:00
{ & s . selectStateInRangeFilteredStmt , selectStateInRangeFilteredSQL } ,
2022-02-21 17:12:22 +01:00
{ & s . selectStateInRangeStmt , selectStateInRangeSQL } ,
{ & s . updateEventJSONStmt , updateEventJSONSQL } ,
{ & s . deleteEventsForRoomStmt , deleteEventsForRoomSQL } ,
{ & s . selectContextEventStmt , selectContextEventSQL } ,
{ & s . selectContextBeforeEventStmt , selectContextBeforeEventSQL } ,
{ & s . selectContextAfterEventStmt , selectContextAfterEventSQL } ,
2023-01-19 21:02:32 +01:00
{ & s . purgeEventsStmt , purgeEventsSQL } ,
2022-09-27 18:06:49 +02:00
{ & s . selectSearchStmt , selectSearchSQL } ,
2022-02-21 17:12:22 +01:00
} . Prepare ( db )
2017-04-10 16:12:18 +02:00
}
2023-04-27 13:54:20 +02:00
func ( s * outputRoomEventsStatements ) UpdateEventJSON ( ctx context . Context , txn * sql . Tx , event * rstypes . HeaderedEvent ) error {
2020-07-08 18:45:39 +02:00
headeredJSON , err := json . Marshal ( event )
if err != nil {
return err
}
2022-09-28 11:18:03 +02:00
_ , err = sqlutil . TxStmt ( txn , s . updateEventJSONStmt ) . ExecContext ( ctx , headeredJSON , event . EventID ( ) )
2020-07-08 18:45:39 +02:00
return err
}
2019-07-12 16:59:53 +02:00
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
2017-04-19 17:04:01 +02:00
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
2020-05-14 10:53:55 +02:00
func ( s * outputRoomEventsStatements ) SelectStateInRange (
2020-05-15 10:41:12 +02:00
ctx context . Context , txn * sql . Tx , r types . Range ,
2023-04-04 19:16:53 +02:00
stateFilter * synctypes . StateFilter , roomIDs [ ] string ,
2020-01-23 18:51:10 +01:00
) ( map [ string ] map [ string ] bool , map [ string ] types . StreamEvent , error ) {
2022-10-19 14:05:39 +02:00
var rows * sql . Rows
var err error
if stateFilter != nil {
stmt := sqlutil . TxStmt ( txn , s . selectStateInRangeFilteredStmt )
senders , notSenders := getSendersStateFilterFilter ( stateFilter )
rows , err = stmt . QueryContext (
ctx , r . Low ( ) , r . High ( ) , pq . StringArray ( roomIDs ) ,
pq . StringArray ( senders ) ,
pq . StringArray ( notSenders ) ,
pq . StringArray ( filterConvertTypeWildcardToSQL ( stateFilter . Types ) ) ,
pq . StringArray ( filterConvertTypeWildcardToSQL ( stateFilter . NotTypes ) ) ,
stateFilter . ContainsURL ,
)
} else {
stmt := sqlutil . TxStmt ( txn , s . selectStateInRangeStmt )
rows , err = stmt . QueryContext (
ctx , r . Low ( ) , r . High ( ) , pq . StringArray ( roomIDs ) ,
)
}
2017-04-19 17:04:01 +02:00
if err != nil {
2017-06-05 11:37:04 +02:00
return nil , nil , err
2017-04-19 17:04:01 +02:00
}
2020-05-21 15:40:13 +02:00
defer internal . CloseAndLogIfError ( ctx , rows , "selectStateInRange: rows.close() failed" )
2017-04-19 17:04:01 +02:00
// Fetch all the state change events for all rooms between the two positions then loop each event and:
// - Keep a cache of the event by ID (99% of state change events are for the event itself)
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
// if they aren't in the event ID cache. We don't handle state deletion yet.
2020-01-23 18:51:10 +01:00
eventIDToEvent := make ( map [ string ] types . StreamEvent )
2017-04-19 17:04:01 +02:00
// RoomID => A set (map[string]bool) of state event IDs which are between the two positions
stateNeeded := make ( map [ string ] map [ string ] bool )
for rows . Next ( ) {
var (
2022-07-18 14:46:15 +02:00
eventID string
streamPos types . StreamPosition
eventBytes [ ] byte
excludeFromSync bool
addIDs pq . StringArray
delIDs pq . StringArray
historyVisibility gomatrixserverlib . HistoryVisibility
2017-04-19 17:04:01 +02:00
)
2022-07-18 14:46:15 +02:00
if err := rows . Scan ( & eventID , & streamPos , & eventBytes , & excludeFromSync , & addIDs , & delIDs , & historyVisibility ) ; err != nil {
2017-06-05 11:37:04 +02:00
return nil , nil , err
2017-04-19 17:04:01 +02:00
}
// TODO: Handle redacted events
2023-04-27 13:54:20 +02:00
var ev rstypes . HeaderedEvent
if err := json . Unmarshal ( eventBytes , & ev ) ; err != nil {
2017-06-05 11:37:04 +02:00
return nil , nil , err
2017-04-19 17:04:01 +02:00
}
needSet := stateNeeded [ ev . RoomID ( ) ]
if needSet == nil { // make set if required
needSet = make ( map [ string ] bool )
}
for _ , id := range delIDs {
needSet [ id ] = false
}
for _ , id := range addIDs {
needSet [ id ] = true
}
stateNeeded [ ev . RoomID ( ) ] = needSet
2022-07-18 14:46:15 +02:00
ev . Visibility = historyVisibility
2017-04-19 17:04:01 +02:00
2021-11-03 10:53:37 +01:00
eventIDToEvent [ eventID ] = types . StreamEvent {
2020-11-16 16:44:53 +01:00
HeaderedEvent : & ev ,
2020-01-23 18:51:10 +01:00
StreamPosition : streamPos ,
ExcludeFromSync : excludeFromSync ,
2017-12-06 10:37:18 +01:00
}
2017-04-19 17:04:01 +02:00
}
2020-02-11 15:12:21 +01:00
return stateNeeded , eventIDToEvent , rows . Err ( )
2017-04-19 17:04:01 +02:00
}
2017-04-13 17:56:46 +02:00
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
// then this function should only ever be used at startup, as it will race with inserting events if it is
// done afterwards. If there are no inserted events, 0 is returned.
2020-05-14 10:53:55 +02:00
func ( s * outputRoomEventsStatements ) SelectMaxEventID (
2017-09-18 17:52:22 +02:00
ctx context . Context , txn * sql . Tx ,
) ( id int64 , err error ) {
2017-04-10 16:12:18 +02:00
var nullableID sql . NullInt64
2020-06-12 15:55:57 +02:00
stmt := sqlutil . TxStmt ( txn , s . selectMaxEventIDStmt )
2017-09-18 17:52:22 +02:00
err = stmt . QueryRowContext ( ctx ) . Scan ( & nullableID )
2017-04-10 16:12:18 +02:00
if nullableID . Valid {
id = nullableID . Int64
}
2017-03-30 16:29:23 +02:00
return
}
2017-04-10 16:12:18 +02:00
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
// of the inserted event.
2020-05-14 10:53:55 +02:00
func ( s * outputRoomEventsStatements ) InsertEvent (
2017-09-18 17:52:22 +02:00
ctx context . Context , txn * sql . Tx ,
2023-04-27 13:54:20 +02:00
event * rstypes . HeaderedEvent , addState , removeState [ ] string ,
2022-07-18 14:46:15 +02:00
transactionID * api . TransactionID , excludeFromSync bool , historyVisibility gomatrixserverlib . HistoryVisibility ,
2020-01-23 18:51:10 +01:00
) ( streamPos types . StreamPosition , err error ) {
2019-08-23 18:55:40 +02:00
var txnID * string
var sessionID * int64
2017-12-06 10:37:18 +01:00
if transactionID != nil {
2019-08-23 18:55:40 +02:00
sessionID = & transactionID . SessionID
2017-12-06 10:37:18 +01:00
txnID = & transactionID . TransactionID
}
2019-08-07 12:12:09 +02:00
// Parse content as JSON and search for an "url" key
containsURL := false
var content map [ string ] interface { }
2022-04-13 13:16:02 +02:00
if json . Unmarshal ( event . Content ( ) , & content ) == nil {
2019-08-07 12:12:09 +02:00
// Set containsURL to true if url is present
_ , containsURL = content [ "url" ]
}
2020-03-19 13:07:01 +01:00
var headeredJSON [ ] byte
headeredJSON , err = json . Marshal ( event )
if err != nil {
return
}
2020-06-12 15:55:57 +02:00
stmt := sqlutil . TxStmt ( txn , s . insertEventStmt )
2017-09-18 17:52:22 +02:00
err = stmt . QueryRowContext (
ctx ,
event . RoomID ( ) ,
event . EventID ( ) ,
2020-03-19 13:07:01 +01:00
headeredJSON ,
2019-08-07 12:12:09 +02:00
event . Type ( ) ,
2023-06-28 20:29:49 +02:00
event . UserID . String ( ) ,
2019-08-07 12:12:09 +02:00
containsURL ,
2017-09-18 17:52:22 +02:00
pq . StringArray ( addState ) ,
pq . StringArray ( removeState ) ,
2019-08-23 18:55:40 +02:00
sessionID ,
2017-12-06 10:37:18 +01:00
txnID ,
2020-01-23 18:51:10 +01:00
excludeFromSync ,
2022-07-18 14:46:15 +02:00
historyVisibility ,
2017-04-10 16:12:18 +02:00
) . Scan ( & streamPos )
return
2017-03-30 16:29:23 +02:00
}
2017-04-05 11:30:13 +02:00
2020-01-23 18:51:10 +01:00
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
// from sync.
2020-05-14 10:53:55 +02:00
func ( s * outputRoomEventsStatements ) SelectRecentEvents (
2017-09-18 17:52:22 +02:00
ctx context . Context , txn * sql . Tx ,
2023-04-04 19:16:53 +02:00
roomIDs [ ] string , ra types . Range , eventFilter * synctypes . RoomEventFilter ,
2020-01-23 18:51:10 +01:00
chronologicalOrder bool , onlySyncEvents bool ,
2023-02-07 14:31:23 +01:00
) ( map [ string ] types . RecentEvents , error ) {
2020-01-23 18:51:10 +01:00
var stmt * sql . Stmt
if onlySyncEvents {
2020-06-12 15:55:57 +02:00
stmt = sqlutil . TxStmt ( txn , s . selectRecentEventsForSyncStmt )
2020-01-23 18:51:10 +01:00
} else {
2020-06-12 15:55:57 +02:00
stmt = sqlutil . TxStmt ( txn , s . selectRecentEventsStmt )
2020-01-23 18:51:10 +01:00
}
2022-04-11 09:05:23 +02:00
senders , notSenders := getSendersRoomEventFilter ( eventFilter )
2023-02-07 14:31:23 +01:00
2021-01-19 19:00:42 +01:00
rows , err := stmt . QueryContext (
2023-02-07 14:31:23 +01:00
ctx , pq . StringArray ( roomIDs ) , ra . Low ( ) , ra . High ( ) ,
2022-04-11 09:05:23 +02:00
pq . StringArray ( senders ) ,
pq . StringArray ( notSenders ) ,
2021-01-19 19:00:42 +01:00
pq . StringArray ( filterConvertTypeWildcardToSQL ( eventFilter . Types ) ) ,
pq . StringArray ( filterConvertTypeWildcardToSQL ( eventFilter . NotTypes ) ) ,
eventFilter . Limit + 1 ,
)
2020-01-23 18:51:10 +01:00
if err != nil {
2023-02-07 14:31:23 +01:00
return nil , err
2020-01-23 18:51:10 +01:00
}
2020-05-21 15:40:13 +02:00
defer internal . CloseAndLogIfError ( ctx , rows , "selectRecentEvents: rows.close() failed" )
2023-02-07 14:31:23 +01:00
result := make ( map [ string ] types . RecentEvents )
for rows . Next ( ) {
var (
roomID string
eventID string
streamPos types . StreamPosition
eventBytes [ ] byte
excludeFromSync bool
sessionID * int64
txnID * string
transactionID * api . TransactionID
historyVisibility gomatrixserverlib . HistoryVisibility
)
if err := rows . Scan ( & roomID , & eventID , & streamPos , & eventBytes , & sessionID , & excludeFromSync , & txnID , & historyVisibility ) ; err != nil {
return nil , err
}
// TODO: Handle redacted events
2023-04-27 13:54:20 +02:00
var ev rstypes . HeaderedEvent
if err := json . Unmarshal ( eventBytes , & ev ) ; err != nil {
2023-02-07 14:31:23 +01:00
return nil , err
}
if sessionID != nil && txnID != nil {
transactionID = & api . TransactionID {
SessionID : * sessionID ,
TransactionID : * txnID ,
}
}
r := result [ roomID ]
ev . Visibility = historyVisibility
r . Events = append ( r . Events , types . StreamEvent {
HeaderedEvent : & ev ,
StreamPosition : streamPos ,
TransactionID : transactionID ,
ExcludeFromSync : excludeFromSync ,
2020-01-23 18:51:10 +01:00
} )
2023-02-07 14:31:23 +01:00
result [ roomID ] = r
2020-01-23 18:51:10 +01:00
}
2023-02-07 14:31:23 +01:00
if chronologicalOrder {
for roomID , evs := range result {
// The events need to be returned from oldest to latest, which isn't
// necessary the way the SQL query returns them, so a sort is necessary to
// ensure the events are in the right order in the slice.
sort . SliceStable ( evs . Events , func ( i int , j int ) bool {
return evs . Events [ i ] . StreamPosition < evs . Events [ j ] . StreamPosition
} )
if len ( evs . Events ) > eventFilter . Limit {
evs . Limited = true
evs . Events = evs . Events [ 1 : ]
}
result [ roomID ] = evs
2020-06-26 16:34:41 +02:00
}
2023-02-07 14:31:23 +01:00
} else {
for roomID , evs := range result {
if len ( evs . Events ) > eventFilter . Limit {
evs . Limited = true
evs . Events = evs . Events [ : len ( evs . Events ) - 1 ]
}
2020-06-26 16:34:41 +02:00
2023-02-07 14:31:23 +01:00
result [ roomID ] = evs
}
}
return result , rows . Err ( )
2020-01-23 18:51:10 +01:00
}
// selectEvents returns the events for the given event IDs. If an event is
// missing from the database, it will be omitted.
2020-05-14 10:53:55 +02:00
func ( s * outputRoomEventsStatements ) SelectEvents (
2023-04-04 19:16:53 +02:00
ctx context . Context , txn * sql . Tx , eventIDs [ ] string , filter * synctypes . RoomEventFilter , preserveOrder bool ,
2020-01-23 18:51:10 +01:00
) ( [ ] types . StreamEvent , error ) {
2022-04-13 13:16:02 +02:00
var (
stmt * sql . Stmt
rows * sql . Rows
err error
)
if filter == nil {
stmt = sqlutil . TxStmt ( txn , s . selectEventsStmt )
rows , err = stmt . QueryContext ( ctx , pq . StringArray ( eventIDs ) )
} else {
senders , notSenders := getSendersRoomEventFilter ( filter )
stmt = sqlutil . TxStmt ( txn , s . selectEventsWitFilterStmt )
rows , err = stmt . QueryContext ( ctx ,
pq . StringArray ( eventIDs ) ,
pq . StringArray ( senders ) ,
pq . StringArray ( notSenders ) ,
pq . StringArray ( filterConvertTypeWildcardToSQL ( filter . Types ) ) ,
pq . StringArray ( filterConvertTypeWildcardToSQL ( filter . NotTypes ) ) ,
filter . ContainsURL ,
filter . Limit ,
)
}
2017-04-13 17:56:46 +02:00
if err != nil {
return nil , err
}
2020-05-21 15:40:13 +02:00
defer internal . CloseAndLogIfError ( ctx , rows , "selectEvents: rows.close() failed" )
2022-04-08 18:53:24 +02:00
streamEvents , err := rowsToStreamEvents ( rows )
if err != nil {
return nil , err
}
if preserveOrder {
eventMap := make ( map [ string ] types . StreamEvent )
for _ , ev := range streamEvents {
eventMap [ ev . EventID ( ) ] = ev
}
var returnEvents [ ] types . StreamEvent
for _ , eventID := range eventIDs {
ev , ok := eventMap [ eventID ]
if ok {
returnEvents = append ( returnEvents , ev )
}
}
return returnEvents , nil
}
return streamEvents , nil
2017-04-13 17:56:46 +02:00
}
2020-09-15 12:17:46 +02:00
func ( s * outputRoomEventsStatements ) DeleteEventsForRoom (
ctx context . Context , txn * sql . Tx , roomID string ,
) ( err error ) {
_ , err = sqlutil . TxStmt ( txn , s . deleteEventsForRoomStmt ) . ExecContext ( ctx , roomID )
return err
}
2023-04-27 13:54:20 +02:00
func ( s * outputRoomEventsStatements ) SelectContextEvent ( ctx context . Context , txn * sql . Tx , roomID , eventID string ) ( id int , evt rstypes . HeaderedEvent , err error ) {
2022-02-21 17:12:22 +01:00
row := sqlutil . TxStmt ( txn , s . selectContextEventStmt ) . QueryRowContext ( ctx , roomID , eventID )
var eventAsString string
2022-07-18 14:46:15 +02:00
var historyVisibility gomatrixserverlib . HistoryVisibility
if err = row . Scan ( & id , & eventAsString , & historyVisibility ) ; err != nil {
2022-02-21 17:12:22 +01:00
return 0 , evt , err
}
if err = json . Unmarshal ( [ ] byte ( eventAsString ) , & evt ) ; err != nil {
return 0 , evt , err
}
2022-07-18 14:46:15 +02:00
evt . Visibility = historyVisibility
2022-02-21 17:12:22 +01:00
return id , evt , nil
}
func ( s * outputRoomEventsStatements ) SelectContextBeforeEvent (
2023-04-04 19:16:53 +02:00
ctx context . Context , txn * sql . Tx , id int , roomID string , filter * synctypes . RoomEventFilter ,
2023-04-27 13:54:20 +02:00
) ( evts [ ] * rstypes . HeaderedEvent , err error ) {
2022-04-11 09:05:23 +02:00
senders , notSenders := getSendersRoomEventFilter ( filter )
2022-02-21 17:12:22 +01:00
rows , err := sqlutil . TxStmt ( txn , s . selectContextBeforeEventStmt ) . QueryContext (
ctx , roomID , id , filter . Limit ,
2022-04-11 09:05:23 +02:00
pq . StringArray ( senders ) ,
pq . StringArray ( notSenders ) ,
2022-02-21 17:12:22 +01:00
pq . StringArray ( filterConvertTypeWildcardToSQL ( filter . Types ) ) ,
pq . StringArray ( filterConvertTypeWildcardToSQL ( filter . NotTypes ) ) ,
)
if err != nil {
return
}
2022-03-24 11:03:22 +01:00
defer internal . CloseAndLogIfError ( ctx , rows , "rows.close() failed" )
2022-02-21 17:12:22 +01:00
for rows . Next ( ) {
var (
2022-07-18 14:46:15 +02:00
eventBytes [ ] byte
2023-04-27 13:54:20 +02:00
evt * rstypes . HeaderedEvent
2022-07-18 14:46:15 +02:00
historyVisibility gomatrixserverlib . HistoryVisibility
2022-02-21 17:12:22 +01:00
)
2022-07-18 14:46:15 +02:00
if err = rows . Scan ( & eventBytes , & historyVisibility ) ; err != nil {
2022-02-21 17:12:22 +01:00
return evts , err
}
if err = json . Unmarshal ( eventBytes , & evt ) ; err != nil {
return evts , err
}
2022-07-18 14:46:15 +02:00
evt . Visibility = historyVisibility
2022-02-21 17:12:22 +01:00
evts = append ( evts , evt )
}
return evts , rows . Err ( )
}
func ( s * outputRoomEventsStatements ) SelectContextAfterEvent (
2023-04-04 19:16:53 +02:00
ctx context . Context , txn * sql . Tx , id int , roomID string , filter * synctypes . RoomEventFilter ,
2023-04-27 13:54:20 +02:00
) ( lastID int , evts [ ] * rstypes . HeaderedEvent , err error ) {
2022-04-11 09:05:23 +02:00
senders , notSenders := getSendersRoomEventFilter ( filter )
2022-02-21 17:12:22 +01:00
rows , err := sqlutil . TxStmt ( txn , s . selectContextAfterEventStmt ) . QueryContext (
ctx , roomID , id , filter . Limit ,
2022-04-11 09:05:23 +02:00
pq . StringArray ( senders ) ,
pq . StringArray ( notSenders ) ,
2022-02-21 17:12:22 +01:00
pq . StringArray ( filterConvertTypeWildcardToSQL ( filter . Types ) ) ,
pq . StringArray ( filterConvertTypeWildcardToSQL ( filter . NotTypes ) ) ,
)
if err != nil {
return
}
2022-03-24 11:03:22 +01:00
defer internal . CloseAndLogIfError ( ctx , rows , "rows.close() failed" )
2022-02-21 17:12:22 +01:00
for rows . Next ( ) {
var (
2022-07-18 14:46:15 +02:00
eventBytes [ ] byte
2023-04-27 13:54:20 +02:00
evt * rstypes . HeaderedEvent
2022-07-18 14:46:15 +02:00
historyVisibility gomatrixserverlib . HistoryVisibility
2022-02-21 17:12:22 +01:00
)
2022-07-18 14:46:15 +02:00
if err = rows . Scan ( & lastID , & eventBytes , & historyVisibility ) ; err != nil {
2022-02-21 17:12:22 +01:00
return 0 , evts , err
}
if err = json . Unmarshal ( eventBytes , & evt ) ; err != nil {
return 0 , evts , err
}
2022-07-18 14:46:15 +02:00
evt . Visibility = historyVisibility
2022-02-21 17:12:22 +01:00
evts = append ( evts , evt )
}
return lastID , evts , rows . Err ( )
}
2020-01-23 18:51:10 +01:00
func rowsToStreamEvents ( rows * sql . Rows ) ( [ ] types . StreamEvent , error ) {
var result [ ] types . StreamEvent
2017-04-13 17:56:46 +02:00
for rows . Next ( ) {
2017-05-17 17:21:27 +02:00
var (
2022-07-18 14:46:15 +02:00
eventID string
streamPos types . StreamPosition
eventBytes [ ] byte
excludeFromSync bool
sessionID * int64
txnID * string
transactionID * api . TransactionID
historyVisibility gomatrixserverlib . HistoryVisibility
2017-05-17 17:21:27 +02:00
)
2022-07-18 14:46:15 +02:00
if err := rows . Scan ( & eventID , & streamPos , & eventBytes , & sessionID , & excludeFromSync , & txnID , & historyVisibility ) ; err != nil {
2017-04-05 11:30:13 +02:00
return nil , err
}
2017-04-13 17:56:46 +02:00
// TODO: Handle redacted events
2023-04-27 13:54:20 +02:00
var ev rstypes . HeaderedEvent
if err := json . Unmarshal ( eventBytes , & ev ) ; err != nil {
2017-04-05 11:30:13 +02:00
return nil , err
}
2017-12-06 10:37:18 +01:00
2019-08-23 18:55:40 +02:00
if sessionID != nil && txnID != nil {
2017-12-06 10:37:18 +01:00
transactionID = & api . TransactionID {
2019-08-23 18:55:40 +02:00
SessionID : * sessionID ,
2017-12-06 10:37:18 +01:00
TransactionID : * txnID ,
}
}
2022-07-18 18:19:44 +02:00
ev . Visibility = historyVisibility
2020-01-23 18:51:10 +01:00
result = append ( result , types . StreamEvent {
2020-11-16 16:44:53 +01:00
HeaderedEvent : & ev ,
2020-01-23 18:51:10 +01:00
StreamPosition : streamPos ,
TransactionID : transactionID ,
ExcludeFromSync : excludeFromSync ,
2017-12-06 10:37:18 +01:00
} )
2017-04-05 11:30:13 +02:00
}
2020-02-11 15:12:21 +01:00
return result , rows . Err ( )
2017-04-05 11:30:13 +02:00
}
2022-09-27 18:06:49 +02:00
2023-01-19 21:02:32 +01:00
func ( s * outputRoomEventsStatements ) PurgeEvents (
ctx context . Context , txn * sql . Tx , roomID string ,
) error {
_ , err := sqlutil . TxStmt ( txn , s . purgeEventsStmt ) . ExecContext ( ctx , roomID )
return err
}
2023-04-27 13:54:20 +02:00
func ( s * outputRoomEventsStatements ) ReIndex ( ctx context . Context , txn * sql . Tx , limit , afterID int64 , types [ ] string ) ( map [ int64 ] rstypes . HeaderedEvent , error ) {
2022-09-27 18:06:49 +02:00
rows , err := sqlutil . TxStmt ( txn , s . selectSearchStmt ) . QueryContext ( ctx , afterID , pq . StringArray ( types ) , limit )
if err != nil {
return nil , err
}
defer internal . CloseAndLogIfError ( ctx , rows , "rows.close() failed" )
var eventID string
var id int64
2023-04-27 13:54:20 +02:00
result := make ( map [ int64 ] rstypes . HeaderedEvent )
2022-09-27 18:06:49 +02:00
for rows . Next ( ) {
2023-04-27 13:54:20 +02:00
var ev rstypes . HeaderedEvent
2022-09-27 18:06:49 +02:00
var eventBytes [ ] byte
if err = rows . Scan ( & id , & eventID , & eventBytes ) ; err != nil {
return nil , err
}
2023-04-27 13:54:20 +02:00
if err = json . Unmarshal ( eventBytes , & ev ) ; err != nil {
2022-09-27 18:06:49 +02:00
return nil , err
}
result [ id ] = ev
}
return result , rows . Err ( )
}