mirror of
https://github.com/matrix-org/dendrite
synced 2024-10-31 21:19:04 +01:00
State storage refactor (#1839)
* Hash-deduplicated state storage (and migrations) for PostgreSQL and SQLite * Refactor droomserver database setup for migrations * Fix conflict statements * Update migration names * Set a boundary for old to new block/snapshot IDs so we don't rewrite them more than once accidentally * Create sequence if not exists * Fix boundary queries * Fix boundary queries * Use Query * Break out queries a bit * More sequence tweaks * Query parameters are not playing the game * Injection escaping may not work for CREATE SEQUENCE after all * Fix snapshot sequence name * Use boundaried IDs in SQLite too * Use IFNULL for SQLite * Use COALESCE in PostgreSQL * Review comments @Kegsay
This commit is contained in:
parent
d6e9b7b307
commit
5ce1fe80de
39 changed files with 1076 additions and 554 deletions
2
go.mod
2
go.mod
|
@ -25,7 +25,7 @@ require (
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c
|
||||||
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
|
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.6
|
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
|
||||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||||
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
|
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
|
||||||
github.com/opentracing/opentracing-go v1.2.0
|
github.com/opentracing/opentracing-go v1.2.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -684,8 +684,8 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
|
||||||
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||||
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||||
github.com/mattn/go-sqlite3 v1.14.2/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
|
github.com/mattn/go-sqlite3 v1.14.2/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
|
||||||
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
|
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb h1:ax2vG2unlxsjwS7PMRo4FECIfAdQLowd6ejWYwPQhBo=
|
||||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||||
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
|
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
func LoadFromGoose() {
|
func LoadFromGoose() {
|
||||||
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
|
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
|
||||||
|
goose.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
|
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
|
||||||
|
|
|
@ -0,0 +1,223 @@
|
||||||
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package deltas
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type stateSnapshotData struct {
|
||||||
|
StateSnapshotNID types.StateSnapshotNID
|
||||||
|
RoomNID types.RoomNID
|
||||||
|
}
|
||||||
|
|
||||||
|
type stateBlockData struct {
|
||||||
|
stateSnapshotData
|
||||||
|
StateBlockNID types.StateBlockNID
|
||||||
|
EventNIDs types.EventNIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
|
||||||
|
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
|
||||||
|
}
|
||||||
|
|
||||||
|
// nolint:gocyclo
|
||||||
|
func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
|
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
|
||||||
|
defer logrus.Warn("State storage upgrade complete")
|
||||||
|
|
||||||
|
var snapshotcount int
|
||||||
|
var maxsnapshotid int
|
||||||
|
var maxblockid int
|
||||||
|
if err := tx.QueryRow(`SELECT COUNT(DISTINCT state_snapshot_nid) FROM roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
|
||||||
|
}
|
||||||
|
if err := tx.QueryRow(`SELECT COALESCE(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
|
||||||
|
}
|
||||||
|
if err := tx.QueryRow(`SELECT COALESCE(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
|
||||||
|
}
|
||||||
|
maxsnapshotid++
|
||||||
|
maxblockid++
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(`ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
// We create new sequences starting with the maximum state snapshot and block NIDs.
|
||||||
|
// This means that all newly created snapshots and blocks by the migration will have
|
||||||
|
// NIDs higher than these values, so that when we come to update the references to
|
||||||
|
// these NIDs using UPDATE statements, we can guarantee we are only ever updating old
|
||||||
|
// values and not accidentally overwriting new ones.
|
||||||
|
if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_block_nid_sequence START WITH %d;`, maxblockid)); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_snapshot_nid_sequence START WITH %d;`, maxsnapshotid)); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS roomserver_state_block (
|
||||||
|
state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_sequence'),
|
||||||
|
state_block_hash BYTEA UNIQUE,
|
||||||
|
event_nids bigint[] NOT NULL
|
||||||
|
);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (create blocks table): %w", err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
|
||||||
|
state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_sequence'),
|
||||||
|
state_snapshot_hash BYTEA UNIQUE,
|
||||||
|
room_nid bigint NOT NULL,
|
||||||
|
state_block_nids bigint[] NOT NULL
|
||||||
|
);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (create snapshots table): %w", err)
|
||||||
|
}
|
||||||
|
logrus.Warn("New tables created...")
|
||||||
|
|
||||||
|
batchsize := 100
|
||||||
|
for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize {
|
||||||
|
var snapshotrows *sql.Rows
|
||||||
|
snapshotrows, err = tx.Query(`
|
||||||
|
SELECT
|
||||||
|
state_snapshot_nid,
|
||||||
|
room_nid,
|
||||||
|
state_block_nid,
|
||||||
|
ARRAY_AGG(event_nid) AS event_nids
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
_roomserver_state_snapshots.state_snapshot_nid,
|
||||||
|
_roomserver_state_snapshots.room_nid,
|
||||||
|
_roomserver_state_block.state_block_nid,
|
||||||
|
_roomserver_state_block.event_nid
|
||||||
|
FROM
|
||||||
|
_roomserver_state_snapshots
|
||||||
|
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
|
||||||
|
WHERE
|
||||||
|
_roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT
|
||||||
|
_roomserver_state_snapshots.state_snapshot_nid
|
||||||
|
FROM
|
||||||
|
_roomserver_state_snapshots
|
||||||
|
LIMIT $1 OFFSET $2)) AS _roomserver_state_block
|
||||||
|
GROUP BY
|
||||||
|
state_snapshot_nid,
|
||||||
|
room_nid,
|
||||||
|
state_block_nid;
|
||||||
|
`, batchsize, batchoffset)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.Query: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount)
|
||||||
|
var snapshots []stateBlockData
|
||||||
|
|
||||||
|
for snapshotrows.Next() {
|
||||||
|
var snapshot stateBlockData
|
||||||
|
var eventsarray pq.Int64Array
|
||||||
|
if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil {
|
||||||
|
return fmt.Errorf("rows.Scan: %w", err)
|
||||||
|
}
|
||||||
|
for _, e := range eventsarray {
|
||||||
|
snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e))
|
||||||
|
}
|
||||||
|
snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)]
|
||||||
|
snapshots = append(snapshots, snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = snapshotrows.Close(); err != nil {
|
||||||
|
return fmt.Errorf("snapshots.Close: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{}
|
||||||
|
|
||||||
|
for _, snapshot := range snapshots {
|
||||||
|
var eventsarray pq.Int64Array
|
||||||
|
for _, e := range snapshot.EventNIDs {
|
||||||
|
eventsarray = append(eventsarray, int64(e))
|
||||||
|
}
|
||||||
|
|
||||||
|
var blocknid types.StateBlockNID
|
||||||
|
err = tx.QueryRow(`
|
||||||
|
INSERT INTO roomserver_state_block (state_block_hash, event_nids)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2
|
||||||
|
RETURNING state_block_nid
|
||||||
|
`, snapshot.EventNIDs.Hash(), eventsarray).Scan(&blocknid)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (insert new block with %d events): %w", len(eventsarray), err)
|
||||||
|
}
|
||||||
|
index := stateSnapshotData{snapshot.StateSnapshotNID, snapshot.RoomNID}
|
||||||
|
newsnapshots[index] = append(newsnapshots[index], blocknid)
|
||||||
|
}
|
||||||
|
|
||||||
|
for snapshotdata, newblocks := range newsnapshots {
|
||||||
|
var newblocksarray pq.Int64Array
|
||||||
|
for _, b := range newblocks {
|
||||||
|
newblocksarray = append(newblocksarray, int64(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
var newNID types.StateSnapshotNID
|
||||||
|
err = tx.QueryRow(`
|
||||||
|
INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2
|
||||||
|
RETURNING state_snapshot_nid
|
||||||
|
`, newblocks.Hash(), snapshotdata.RoomNID, newblocksarray).Scan(&newNID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (update events): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (update rooms): %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = tx.Exec(`
|
||||||
|
DROP TABLE _roomserver_state_snapshots;
|
||||||
|
DROP SEQUENCE roomserver_state_snapshot_nid_seq;
|
||||||
|
`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
|
||||||
|
}
|
||||||
|
if _, err = tx.Exec(`
|
||||||
|
DROP TABLE _roomserver_state_block;
|
||||||
|
DROP SEQUENCE roomserver_state_block_nid_seq;
|
||||||
|
`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (delete old block table): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DownStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
|
panic("Downgrading state storage is not supported")
|
||||||
|
}
|
|
@ -59,12 +59,14 @@ type eventJSONStatements struct {
|
||||||
bulkSelectEventJSONStmt *sql.Stmt
|
bulkSelectEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
func createEventJSONTable(db *sql.DB) error {
|
||||||
s := &eventJSONStatements{}
|
|
||||||
_, err := db.Exec(eventJSONSchema)
|
_, err := db.Exec(eventJSONSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
||||||
|
s := &eventJSONStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventJSONStmt, insertEventJSONSQL},
|
{&s.insertEventJSONStmt, insertEventJSONSQL},
|
||||||
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
|
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
|
||||||
|
|
|
@ -77,12 +77,14 @@ type eventStateKeyStatements struct {
|
||||||
bulkSelectEventStateKeyStmt *sql.Stmt
|
bulkSelectEventStateKeyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
func createEventStateKeysTable(db *sql.DB) error {
|
||||||
s := &eventStateKeyStatements{}
|
|
||||||
_, err := db.Exec(eventStateKeysSchema)
|
_, err := db.Exec(eventStateKeysSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
||||||
|
s := &eventStateKeyStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL},
|
{&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL},
|
||||||
{&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL},
|
{&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL},
|
||||||
|
|
|
@ -100,12 +100,13 @@ type eventTypeStatements struct {
|
||||||
bulkSelectEventTypeNIDStmt *sql.Stmt
|
bulkSelectEventTypeNIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
func createEventTypesTable(db *sql.DB) error {
|
||||||
s := &eventTypeStatements{}
|
|
||||||
_, err := db.Exec(eventTypesSchema)
|
_, err := db.Exec(eventTypesSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
||||||
|
s := &eventTypeStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
|
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
@ -88,6 +89,16 @@ const bulkSelectStateEventByIDSQL = "" +
|
||||||
" WHERE event_id = ANY($1)" +
|
" WHERE event_id = ANY($1)" +
|
||||||
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||||
|
|
||||||
|
// Bulk look up of events by event NID, optionally filtering by the event type
|
||||||
|
// or event state key NIDs if provided. (The CARDINALITY check will return true
|
||||||
|
// if the provided arrays are empty, ergo no filtering).
|
||||||
|
const bulkSelectStateEventByNIDSQL = "" +
|
||||||
|
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
||||||
|
" WHERE event_nid = ANY($1)" +
|
||||||
|
" AND (CARDINALITY($2::bigint[]) = 0 OR event_type_nid = ANY($2))" +
|
||||||
|
" AND (CARDINALITY($3::bigint[]) = 0 OR event_state_key_nid = ANY($3))" +
|
||||||
|
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||||
|
|
||||||
const bulkSelectStateAtEventByIDSQL = "" +
|
const bulkSelectStateAtEventByIDSQL = "" +
|
||||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
|
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
|
||||||
" WHERE event_id = ANY($1)"
|
" WHERE event_id = ANY($1)"
|
||||||
|
@ -127,6 +138,7 @@ type eventStatements struct {
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
selectEventStmt *sql.Stmt
|
selectEventStmt *sql.Stmt
|
||||||
bulkSelectStateEventByIDStmt *sql.Stmt
|
bulkSelectStateEventByIDStmt *sql.Stmt
|
||||||
|
bulkSelectStateEventByNIDStmt *sql.Stmt
|
||||||
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
||||||
updateEventStateStmt *sql.Stmt
|
updateEventStateStmt *sql.Stmt
|
||||||
selectEventSentToOutputStmt *sql.Stmt
|
selectEventSentToOutputStmt *sql.Stmt
|
||||||
|
@ -140,17 +152,19 @@ type eventStatements struct {
|
||||||
selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
func createEventsTable(db *sql.DB) error {
|
||||||
s := &eventStatements{}
|
|
||||||
_, err := db.Exec(eventsSchema)
|
_, err := db.Exec(eventsSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
|
s := &eventStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventStmt, insertEventSQL},
|
{&s.insertEventStmt, insertEventSQL},
|
||||||
{&s.selectEventStmt, selectEventSQL},
|
{&s.selectEventStmt, selectEventSQL},
|
||||||
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
|
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
|
||||||
|
{&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL},
|
||||||
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
|
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
|
||||||
{&s.updateEventStateStmt, updateEventStateSQL},
|
{&s.updateEventStateStmt, updateEventStateSQL},
|
||||||
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
|
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
|
||||||
|
@ -238,6 +252,42 @@ func (s *eventStatements) BulkSelectStateEventByID(
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// bulkSelectStateEventByNID lookups a list of state events by event NID.
|
||||||
|
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
||||||
|
func (s *eventStatements) BulkSelectStateEventByNID(
|
||||||
|
ctx context.Context, eventNIDs []types.EventNID,
|
||||||
|
stateKeyTuples []types.StateKeyTuple,
|
||||||
|
) ([]types.StateEntry, error) {
|
||||||
|
tuples := stateKeyTupleSorter(stateKeyTuples)
|
||||||
|
sort.Sort(tuples)
|
||||||
|
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
||||||
|
rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs), eventTypeNIDArray, eventStateKeyNIDArray)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
|
||||||
|
// We know that we will only get as many results as event IDs
|
||||||
|
// because of the unique constraint on event IDs.
|
||||||
|
// So we can allocate an array of the correct size now.
|
||||||
|
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
|
||||||
|
results := make([]types.StateEntry, len(eventNIDs))
|
||||||
|
i := 0
|
||||||
|
for ; rows.Next(); i++ {
|
||||||
|
result := &results[i]
|
||||||
|
if err = rows.Scan(
|
||||||
|
&result.EventTypeNID,
|
||||||
|
&result.EventStateKeyNID,
|
||||||
|
&result.EventNID,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err = rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return results[:i], nil
|
||||||
|
}
|
||||||
|
|
||||||
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
||||||
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
||||||
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
||||||
|
|
|
@ -82,12 +82,13 @@ type inviteStatements struct {
|
||||||
updateInviteRetiredStmt *sql.Stmt
|
updateInviteRetiredStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
|
func createInvitesTable(db *sql.DB) error {
|
||||||
s := &inviteStatements{}
|
|
||||||
_, err := db.Exec(inviteSchema)
|
_, err := db.Exec(inviteSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareInvitesTable(db *sql.DB) (tables.Invites, error) {
|
||||||
|
s := &inviteStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertInviteEventStmt, insertInviteEventSQL},
|
{&s.insertInviteEventStmt, insertInviteEventSQL},
|
||||||
|
|
|
@ -139,12 +139,13 @@ type membershipStatements struct {
|
||||||
updateMembershipForgetRoomStmt *sql.Stmt
|
updateMembershipForgetRoomStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresMembershipTable(db *sql.DB) (tables.Membership, error) {
|
func createMembershipTable(db *sql.DB) error {
|
||||||
s := &membershipStatements{}
|
|
||||||
_, err := db.Exec(membershipSchema)
|
_, err := db.Exec(membershipSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
||||||
|
s := &membershipStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertMembershipStmt, insertMembershipSQL},
|
{&s.insertMembershipStmt, insertMembershipSQL},
|
||||||
|
@ -162,11 +163,6 @@ func NewPostgresMembershipTable(db *sql.DB) (tables.Membership, error) {
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *membershipStatements) execSchema(db *sql.DB) error {
|
|
||||||
_, err := db.Exec(membershipSchema)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) InsertMembership(
|
func (s *membershipStatements) InsertMembership(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||||
|
|
|
@ -65,12 +65,13 @@ type previousEventStatements struct {
|
||||||
selectPreviousEventExistsStmt *sql.Stmt
|
selectPreviousEventExistsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresPreviousEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
|
func createPrevEventsTable(db *sql.DB) error {
|
||||||
s := &previousEventStatements{}
|
|
||||||
_, err := db.Exec(previousEventSchema)
|
_, err := db.Exec(previousEventSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func preparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
|
||||||
|
s := &previousEventStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertPreviousEventStmt, insertPreviousEventSQL},
|
{&s.insertPreviousEventStmt, insertPreviousEventSQL},
|
||||||
|
|
|
@ -50,12 +50,14 @@ type publishedStatements struct {
|
||||||
selectPublishedStmt *sql.Stmt
|
selectPublishedStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresPublishedTable(db *sql.DB) (tables.Published, error) {
|
func createPublishedTable(db *sql.DB) error {
|
||||||
s := &publishedStatements{}
|
|
||||||
_, err := db.Exec(publishedSchema)
|
_, err := db.Exec(publishedSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func preparePublishedTable(db *sql.DB) (tables.Published, error) {
|
||||||
|
s := &publishedStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.upsertPublishedStmt, upsertPublishedSQL},
|
{&s.upsertPublishedStmt, upsertPublishedSQL},
|
||||||
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
|
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
|
||||||
|
|
|
@ -60,12 +60,13 @@ type redactionStatements struct {
|
||||||
markRedactionValidatedStmt *sql.Stmt
|
markRedactionValidatedStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresRedactionsTable(db *sql.DB) (tables.Redactions, error) {
|
func createRedactionsTable(db *sql.DB) error {
|
||||||
s := &redactionStatements{}
|
|
||||||
_, err := db.Exec(redactionsSchema)
|
_, err := db.Exec(redactionsSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareRedactionsTable(db *sql.DB) (tables.Redactions, error) {
|
||||||
|
s := &redactionStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertRedactionStmt, insertRedactionSQL},
|
{&s.insertRedactionStmt, insertRedactionSQL},
|
||||||
|
|
|
@ -62,12 +62,14 @@ type roomAliasesStatements struct {
|
||||||
deleteRoomAliasStmt *sql.Stmt
|
deleteRoomAliasStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
|
func createRoomAliasesTable(db *sql.DB) error {
|
||||||
s := &roomAliasesStatements{}
|
|
||||||
_, err := db.Exec(roomAliasesSchema)
|
_, err := db.Exec(roomAliasesSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
|
||||||
|
s := &roomAliasesStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertRoomAliasStmt, insertRoomAliasSQL},
|
{&s.insertRoomAliasStmt, insertRoomAliasSQL},
|
||||||
{&s.selectRoomIDFromAliasStmt, selectRoomIDFromAliasSQL},
|
{&s.selectRoomIDFromAliasStmt, selectRoomIDFromAliasSQL},
|
||||||
|
|
|
@ -96,12 +96,14 @@ type roomStatements struct {
|
||||||
bulkSelectRoomNIDsStmt *sql.Stmt
|
bulkSelectRoomNIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresRoomsTable(db *sql.DB) (tables.Rooms, error) {
|
func createRoomsTable(db *sql.DB) error {
|
||||||
s := &roomStatements{}
|
|
||||||
_, err := db.Exec(roomsSchema)
|
_, err := db.Exec(roomsSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
|
||||||
|
s := &roomStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
|
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
|
||||||
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
|
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
|
||||||
|
|
|
@ -41,141 +41,88 @@ const stateDataSchema = `
|
||||||
-- which in turn makes it easier to merge state data blocks.
|
-- which in turn makes it easier to merge state data blocks.
|
||||||
CREATE SEQUENCE IF NOT EXISTS roomserver_state_block_nid_seq;
|
CREATE SEQUENCE IF NOT EXISTS roomserver_state_block_nid_seq;
|
||||||
CREATE TABLE IF NOT EXISTS roomserver_state_block (
|
CREATE TABLE IF NOT EXISTS roomserver_state_block (
|
||||||
-- Local numeric ID for this state data.
|
-- The state snapshot NID that identifies this snapshot.
|
||||||
state_block_nid bigint NOT NULL,
|
state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_seq'),
|
||||||
event_type_nid bigint NOT NULL,
|
-- The hash of the state block, which is used to enforce uniqueness. The hash is
|
||||||
event_state_key_nid bigint NOT NULL,
|
-- generated in Dendrite and passed through to the database, as a btree index over
|
||||||
event_nid bigint NOT NULL,
|
-- this column is cheap and fits within the maximum index size.
|
||||||
UNIQUE (state_block_nid, event_type_nid, event_state_key_nid)
|
state_block_hash BYTEA UNIQUE,
|
||||||
|
-- The event NIDs contained within the state block.
|
||||||
|
event_nids bigint[] NOT NULL
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
|
// Insert a new state block. If we conflict on the hash column then
|
||||||
|
// we must perform an update so that the RETURNING statement returns the
|
||||||
|
// ID of the row that we conflicted with, so that we can then refer to
|
||||||
|
// the original block.
|
||||||
const insertStateDataSQL = "" +
|
const insertStateDataSQL = "" +
|
||||||
"INSERT INTO roomserver_state_block (state_block_nid, event_type_nid, event_state_key_nid, event_nid)" +
|
"INSERT INTO roomserver_state_block (state_block_hash, event_nids)" +
|
||||||
" VALUES ($1, $2, $3, $4)"
|
" VALUES ($1, $2)" +
|
||||||
|
" ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2" +
|
||||||
|
" RETURNING state_block_nid"
|
||||||
|
|
||||||
const selectNextStateBlockNIDSQL = "" +
|
|
||||||
"SELECT nextval('roomserver_state_block_nid_seq')"
|
|
||||||
|
|
||||||
// Bulk state lookup by numeric state block ID.
|
|
||||||
// Sort by the state_block_nid, event_type_nid, event_state_key_nid
|
|
||||||
// This means that all the entries for a given state_block_nid will appear
|
|
||||||
// together in the list and those entries will sorted by event_type_nid
|
|
||||||
// and event_state_key_nid. This property makes it easier to merge two
|
|
||||||
// state data blocks together.
|
|
||||||
const bulkSelectStateBlockEntriesSQL = "" +
|
const bulkSelectStateBlockEntriesSQL = "" +
|
||||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
"SELECT state_block_nid, event_nids" +
|
||||||
" FROM roomserver_state_block WHERE state_block_nid = ANY($1)" +
|
" FROM roomserver_state_block WHERE state_block_nid = ANY($1)"
|
||||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
|
||||||
|
|
||||||
// Bulk state lookup by numeric state block ID.
|
|
||||||
// Filters the rows in each block to the requested types and state keys.
|
|
||||||
// We would like to restrict to particular type state key pairs but we are
|
|
||||||
// restricted by the query language to pull the cross product of a list
|
|
||||||
// of types and a list state_keys. So we have to filter the result in the
|
|
||||||
// application to restrict it to the list of event types and state keys we
|
|
||||||
// actually wanted.
|
|
||||||
const bulkSelectFilteredStateBlockEntriesSQL = "" +
|
|
||||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
|
||||||
" FROM roomserver_state_block WHERE state_block_nid = ANY($1)" +
|
|
||||||
" AND event_type_nid = ANY($2) AND event_state_key_nid = ANY($3)" +
|
|
||||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
|
||||||
|
|
||||||
type stateBlockStatements struct {
|
type stateBlockStatements struct {
|
||||||
insertStateDataStmt *sql.Stmt
|
insertStateDataStmt *sql.Stmt
|
||||||
selectNextStateBlockNIDStmt *sql.Stmt
|
bulkSelectStateBlockEntriesStmt *sql.Stmt
|
||||||
bulkSelectStateBlockEntriesStmt *sql.Stmt
|
|
||||||
bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
|
func createStateBlockTable(db *sql.DB) error {
|
||||||
s := &stateBlockStatements{}
|
|
||||||
_, err := db.Exec(stateDataSchema)
|
_, err := db.Exec(stateDataSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
|
||||||
|
s := &stateBlockStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertStateDataStmt, insertStateDataSQL},
|
{&s.insertStateDataStmt, insertStateDataSQL},
|
||||||
{&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL},
|
|
||||||
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
|
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
|
||||||
{&s.bulkSelectFilteredStateBlockEntriesStmt, bulkSelectFilteredStateBlockEntriesSQL},
|
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateBlockStatements) BulkInsertStateData(
|
func (s *stateBlockStatements) BulkInsertStateData(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
txn *sql.Tx,
|
txn *sql.Tx,
|
||||||
entries []types.StateEntry,
|
entries types.StateEntries,
|
||||||
) (types.StateBlockNID, error) {
|
) (id types.StateBlockNID, err error) {
|
||||||
stateBlockNID, err := s.selectNextStateBlockNID(ctx)
|
entries = entries[:util.SortAndUnique(entries)]
|
||||||
if err != nil {
|
var nids types.EventNIDs
|
||||||
return 0, err
|
for _, e := range entries {
|
||||||
|
nids = append(nids, e.EventNID)
|
||||||
}
|
}
|
||||||
for _, entry := range entries {
|
err = s.insertStateDataStmt.QueryRowContext(
|
||||||
_, err := s.insertStateDataStmt.ExecContext(
|
ctx, nids.Hash(), eventNIDsAsArray(nids),
|
||||||
ctx,
|
).Scan(&id)
|
||||||
int64(stateBlockNID),
|
return
|
||||||
int64(entry.EventTypeNID),
|
|
||||||
int64(entry.EventStateKeyNID),
|
|
||||||
int64(entry.EventNID),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stateBlockNID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *stateBlockStatements) selectNextStateBlockNID(
|
|
||||||
ctx context.Context,
|
|
||||||
) (types.StateBlockNID, error) {
|
|
||||||
var stateBlockNID int64
|
|
||||||
err := s.selectNextStateBlockNIDStmt.QueryRowContext(ctx).Scan(&stateBlockNID)
|
|
||||||
return types.StateBlockNID(stateBlockNID), err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateBlockStatements) BulkSelectStateBlockEntries(
|
func (s *stateBlockStatements) BulkSelectStateBlockEntries(
|
||||||
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
|
ctx context.Context, stateBlockNIDs types.StateBlockNIDs,
|
||||||
) ([]types.StateEntryList, error) {
|
) ([][]types.EventNID, error) {
|
||||||
nids := make([]int64, len(stateBlockNIDs))
|
rows, err := s.bulkSelectStateBlockEntriesStmt.QueryContext(ctx, stateBlockNIDsAsArray(stateBlockNIDs))
|
||||||
for i := range stateBlockNIDs {
|
|
||||||
nids[i] = int64(stateBlockNIDs[i])
|
|
||||||
}
|
|
||||||
rows, err := s.bulkSelectStateBlockEntriesStmt.QueryContext(ctx, pq.Int64Array(nids))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateBlockEntries: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateBlockEntries: rows.close() failed")
|
||||||
|
|
||||||
results := make([]types.StateEntryList, len(stateBlockNIDs))
|
results := make([][]types.EventNID, len(stateBlockNIDs))
|
||||||
// current is a pointer to the StateEntryList to append the state entries to.
|
|
||||||
var current *types.StateEntryList
|
|
||||||
i := 0
|
i := 0
|
||||||
for rows.Next() {
|
for ; rows.Next(); i++ {
|
||||||
var (
|
var stateBlockNID types.StateBlockNID
|
||||||
stateBlockNID int64
|
var result pq.Int64Array
|
||||||
eventTypeNID int64
|
if err = rows.Scan(&stateBlockNID, &result); err != nil {
|
||||||
eventStateKeyNID int64
|
|
||||||
eventNID int64
|
|
||||||
entry types.StateEntry
|
|
||||||
)
|
|
||||||
if err = rows.Scan(
|
|
||||||
&stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
|
|
||||||
); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
|
r := []types.EventNID{}
|
||||||
entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
|
for _, e := range result {
|
||||||
entry.EventNID = types.EventNID(eventNID)
|
r = append(r, types.EventNID(e))
|
||||||
if current == nil || types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
|
|
||||||
// The state entry row is for a different state data block to the current one.
|
|
||||||
// So we start appending to the next entry in the list.
|
|
||||||
current = &results[i]
|
|
||||||
current.StateBlockNID = types.StateBlockNID(stateBlockNID)
|
|
||||||
i++
|
|
||||||
}
|
}
|
||||||
current.StateEntries = append(current.StateEntries, entry)
|
results[i] = r
|
||||||
}
|
}
|
||||||
if err = rows.Err(); err != nil {
|
if err = rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -186,71 +133,6 @@ func (s *stateBlockStatements) BulkSelectStateBlockEntries(
|
||||||
return results, err
|
return results, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries(
|
|
||||||
ctx context.Context,
|
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
|
||||||
stateKeyTuples []types.StateKeyTuple,
|
|
||||||
) ([]types.StateEntryList, error) {
|
|
||||||
tuples := stateKeyTupleSorter(stateKeyTuples)
|
|
||||||
// Sort the tuples so that we can run binary search against them as we filter the rows returned by the db.
|
|
||||||
sort.Sort(tuples)
|
|
||||||
|
|
||||||
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
|
||||||
rows, err := s.bulkSelectFilteredStateBlockEntriesStmt.QueryContext(
|
|
||||||
ctx,
|
|
||||||
stateBlockNIDsAsArray(stateBlockNIDs),
|
|
||||||
eventTypeNIDArray,
|
|
||||||
eventStateKeyNIDArray,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectFilteredStateBlockEntries: rows.close() failed")
|
|
||||||
|
|
||||||
var results []types.StateEntryList
|
|
||||||
var current types.StateEntryList
|
|
||||||
for rows.Next() {
|
|
||||||
var (
|
|
||||||
stateBlockNID int64
|
|
||||||
eventTypeNID int64
|
|
||||||
eventStateKeyNID int64
|
|
||||||
eventNID int64
|
|
||||||
entry types.StateEntry
|
|
||||||
)
|
|
||||||
if err := rows.Scan(
|
|
||||||
&stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
|
|
||||||
); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
|
|
||||||
entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
|
|
||||||
entry.EventNID = types.EventNID(eventNID)
|
|
||||||
|
|
||||||
// We can use binary search here because we sorted the tuples earlier
|
|
||||||
if !tuples.contains(entry.StateKeyTuple) {
|
|
||||||
// The select will return the cross product of types and state keys.
|
|
||||||
// So we need to check if type of the entry is in the list.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
|
|
||||||
// The state entry row is for a different state data block to the current one.
|
|
||||||
// So we append the current entry to the results and start adding to a new one.
|
|
||||||
// The first time through the loop current will be empty.
|
|
||||||
if current.StateEntries != nil {
|
|
||||||
results = append(results, current)
|
|
||||||
}
|
|
||||||
current = types.StateEntryList{StateBlockNID: types.StateBlockNID(stateBlockNID)}
|
|
||||||
}
|
|
||||||
current.StateEntries = append(current.StateEntries, entry)
|
|
||||||
}
|
|
||||||
// Add the last entry to the list if it is not empty.
|
|
||||||
if current.StateEntries != nil {
|
|
||||||
results = append(results, current)
|
|
||||||
}
|
|
||||||
return results, rows.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array {
|
func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array {
|
||||||
nids := make([]int64, len(stateBlockNIDs))
|
nids := make([]int64, len(stateBlockNIDs))
|
||||||
for i := range stateBlockNIDs {
|
for i := range stateBlockNIDs {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const stateSnapshotSchema = `
|
const stateSnapshotSchema = `
|
||||||
|
@ -40,19 +41,29 @@ const stateSnapshotSchema = `
|
||||||
-- the full state under single state_block_nid.
|
-- the full state under single state_block_nid.
|
||||||
CREATE SEQUENCE IF NOT EXISTS roomserver_state_snapshot_nid_seq;
|
CREATE SEQUENCE IF NOT EXISTS roomserver_state_snapshot_nid_seq;
|
||||||
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
|
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
|
||||||
-- Local numeric ID for the state.
|
-- The state snapshot NID that identifies this snapshot.
|
||||||
state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'),
|
state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'),
|
||||||
-- Local numeric ID of the room this state is for.
|
-- The hash of the state snapshot, which is used to enforce uniqueness. The hash is
|
||||||
-- Unused in normal operation, but useful for background work or ad-hoc debugging.
|
-- generated in Dendrite and passed through to the database, as a btree index over
|
||||||
room_nid bigint NOT NULL,
|
-- this column is cheap and fits within the maximum index size.
|
||||||
-- List of state_block_nids, stored sorted by state_block_nid.
|
state_snapshot_hash BYTEA UNIQUE,
|
||||||
state_block_nids bigint[] NOT NULL
|
-- The room NID that the snapshot belongs to.
|
||||||
|
room_nid bigint NOT NULL,
|
||||||
|
-- The state blocks contained within this snapshot.
|
||||||
|
state_block_nids bigint[] NOT NULL
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
|
// Insert a new state snapshot. If we conflict on the hash column then
|
||||||
|
// we must perform an update so that the RETURNING statement returns the
|
||||||
|
// ID of the row that we conflicted with, so that we can then refer to
|
||||||
|
// the original snapshot.
|
||||||
const insertStateSQL = "" +
|
const insertStateSQL = "" +
|
||||||
"INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids)" +
|
"INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)" +
|
||||||
" VALUES ($1, $2)" +
|
" VALUES ($1, $2, $3)" +
|
||||||
|
" ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2" +
|
||||||
|
// Performing an update, above, ensures that the RETURNING statement
|
||||||
|
// below will always return a valid state snapshot ID
|
||||||
" RETURNING state_snapshot_nid"
|
" RETURNING state_snapshot_nid"
|
||||||
|
|
||||||
// Bulk state data NID lookup.
|
// Bulk state data NID lookup.
|
||||||
|
@ -67,12 +78,13 @@ type stateSnapshotStatements struct {
|
||||||
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
func createStateSnapshotTable(db *sql.DB) error {
|
||||||
s := &stateSnapshotStatements{}
|
|
||||||
_, err := db.Exec(stateSnapshotSchema)
|
_, err := db.Exec(stateSnapshotSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
||||||
|
s := &stateSnapshotStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertStateStmt, insertStateSQL},
|
{&s.insertStateStmt, insertStateSQL},
|
||||||
|
@ -81,13 +93,15 @@ func NewPostgresStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateSnapshotStatements) InsertState(
|
func (s *stateSnapshotStatements) InsertState(
|
||||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID,
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, nids types.StateBlockNIDs,
|
||||||
) (stateNID types.StateSnapshotNID, err error) {
|
) (stateNID types.StateSnapshotNID, err error) {
|
||||||
nids := make([]int64, len(stateBlockNIDs))
|
nids = nids[:util.SortAndUnique(nids)]
|
||||||
for i := range stateBlockNIDs {
|
var id int64
|
||||||
nids[i] = int64(stateBlockNIDs[i])
|
err = sqlutil.TxStmt(txn, s.insertStateStmt).QueryRowContext(ctx, nids.Hash(), int64(roomNID), stateBlockNIDsAsArray(nids)).Scan(&id)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
err = sqlutil.TxStmt(txn, s.insertStateStmt).QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID)
|
stateNID = types.StateSnapshotNID(id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ package postgres
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
@ -39,20 +40,25 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
|
||||||
var db *sql.DB
|
var db *sql.DB
|
||||||
var err error
|
var err error
|
||||||
if db, err = sqlutil.Open(dbProperties); err != nil {
|
if db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
|
return nil, fmt.Errorf("sqlutil.Open: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the tables.
|
||||||
|
if err := d.create(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create tables before executing migrations so we don't fail if the table is missing,
|
// Then execute the migrations. By this point the tables are created with the latest
|
||||||
// and THEN prepare statements so we don't fail due to referencing new columns
|
// schemas.
|
||||||
ms := membershipStatements{}
|
|
||||||
if err := ms.execSchema(db); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
m := sqlutil.NewMigrations()
|
m := sqlutil.NewMigrations()
|
||||||
deltas.LoadAddForgottenColumn(m)
|
deltas.LoadAddForgottenColumn(m)
|
||||||
|
deltas.LoadStateBlocksRefactor(m)
|
||||||
if err := m.RunDeltas(db, dbProperties); err != nil {
|
if err := m.RunDeltas(db, dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Then prepare the statements. Now that the migrations have run, any columns referred
|
||||||
|
// to in the database code should now exist.
|
||||||
if err := d.prepare(db, cache); err != nil {
|
if err := d.prepare(db, cache); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -60,61 +66,107 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
|
||||||
return &d, nil
|
return &d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: gocyclo
|
func (d *Database) create(db *sql.DB) error {
|
||||||
func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) (err error) {
|
if err := createEventStateKeysTable(db); err != nil {
|
||||||
eventStateKeys, err := NewPostgresEventStateKeysTable(db)
|
return err
|
||||||
|
}
|
||||||
|
if err := createEventTypesTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createEventJSONTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createEventsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createRoomsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createTransactionsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createStateBlockTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createStateSnapshotTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createPrevEventsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createRoomAliasesTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createInvitesTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createMembershipTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createPublishedTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createRedactionsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
|
||||||
|
eventStateKeys, err := prepareEventStateKeysTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
eventTypes, err := NewPostgresEventTypesTable(db)
|
eventTypes, err := prepareEventTypesTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
eventJSON, err := NewPostgresEventJSONTable(db)
|
eventJSON, err := prepareEventJSONTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
events, err := NewPostgresEventsTable(db)
|
events, err := prepareEventsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rooms, err := NewPostgresRoomsTable(db)
|
rooms, err := prepareRoomsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
transactions, err := NewPostgresTransactionsTable(db)
|
transactions, err := prepareTransactionsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
stateBlock, err := NewPostgresStateBlockTable(db)
|
stateBlock, err := prepareStateBlockTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
stateSnapshot, err := NewPostgresStateSnapshotTable(db)
|
stateSnapshot, err := prepareStateSnapshotTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
roomAliases, err := NewPostgresRoomAliasesTable(db)
|
prevEvents, err := preparePrevEventsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
prevEvents, err := NewPostgresPreviousEventsTable(db)
|
roomAliases, err := prepareRoomAliasesTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
invites, err := NewPostgresInvitesTable(db)
|
invites, err := prepareInvitesTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
membership, err := NewPostgresMembershipTable(db)
|
membership, err := prepareMembershipTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
published, err := NewPostgresPublishedTable(db)
|
published, err := preparePublishedTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
redactions, err := NewPostgresRedactionsTable(db)
|
redactions, err := prepareRedactionsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,12 +54,13 @@ type transactionStatements struct {
|
||||||
selectTransactionEventIDStmt *sql.Stmt
|
selectTransactionEventIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresTransactionsTable(db *sql.DB) (tables.Transactions, error) {
|
func createTransactionsTable(db *sql.DB) error {
|
||||||
s := &transactionStatements{}
|
|
||||||
_, err := db.Exec(transactionsSchema)
|
_, err := db.Exec(transactionsSchema)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
func prepareTransactionsTable(db *sql.DB) (tables.Transactions, error) {
|
||||||
|
s := &transactionStatements{}
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertTransactionStmt, insertTransactionSQL},
|
{&s.insertTransactionStmt, insertTransactionSQL},
|
||||||
|
|
|
@ -118,9 +118,24 @@ func (d *Database) StateEntriesForTuples(
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
stateBlockNIDs []types.StateBlockNID,
|
||||||
stateKeyTuples []types.StateKeyTuple,
|
stateKeyTuples []types.StateKeyTuple,
|
||||||
) ([]types.StateEntryList, error) {
|
) ([]types.StateEntryList, error) {
|
||||||
return d.StateBlockTable.BulkSelectFilteredStateBlockEntries(
|
entries, err := d.StateBlockTable.BulkSelectStateBlockEntries(
|
||||||
ctx, stateBlockNIDs, stateKeyTuples,
|
ctx, stateBlockNIDs,
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("d.StateBlockTable.BulkSelectStateBlockEntries: %w", err)
|
||||||
|
}
|
||||||
|
lists := []types.StateEntryList{}
|
||||||
|
for i, entry := range entries {
|
||||||
|
entries, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry, stateKeyTuples)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("d.EventsTable.BulkSelectStateEventByNID: %w", err)
|
||||||
|
}
|
||||||
|
lists = append(lists, types.StateEntryList{
|
||||||
|
StateBlockNID: stateBlockNIDs[i],
|
||||||
|
StateEntries: entries,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return lists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) {
|
func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) {
|
||||||
|
@ -141,8 +156,28 @@ func (d *Database) AddState(
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
stateBlockNIDs []types.StateBlockNID,
|
||||||
state []types.StateEntry,
|
state []types.StateEntry,
|
||||||
) (stateNID types.StateSnapshotNID, err error) {
|
) (stateNID types.StateSnapshotNID, err error) {
|
||||||
|
if len(stateBlockNIDs) > 0 {
|
||||||
|
// Check to see if the event already appears in any of the existing state
|
||||||
|
// blocks. If it does then we should not add it again, as this will just
|
||||||
|
// result in excess state blocks and snapshots.
|
||||||
|
// TODO: Investigate why this is happening - probably input_events.go!
|
||||||
|
blocks, berr := d.StateBlockTable.BulkSelectStateBlockEntries(ctx, stateBlockNIDs)
|
||||||
|
if berr != nil {
|
||||||
|
return 0, fmt.Errorf("d.StateBlockTable.BulkSelectStateBlockEntries: %w", berr)
|
||||||
|
}
|
||||||
|
for i := len(state) - 1; i >= 0; i-- {
|
||||||
|
for _, events := range blocks {
|
||||||
|
for _, event := range events {
|
||||||
|
if state[i].EventNID == event {
|
||||||
|
state = append(state[:i], state[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if len(state) > 0 {
|
if len(state) > 0 {
|
||||||
|
// If there's any state left to add then let's add new blocks.
|
||||||
var stateBlockNID types.StateBlockNID
|
var stateBlockNID types.StateBlockNID
|
||||||
stateBlockNID, err = d.StateBlockTable.BulkInsertStateData(ctx, txn, state)
|
stateBlockNID, err = d.StateBlockTable.BulkInsertStateData(ctx, txn, state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -237,7 +272,24 @@ func (d *Database) StateBlockNIDs(
|
||||||
func (d *Database) StateEntries(
|
func (d *Database) StateEntries(
|
||||||
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
|
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
|
||||||
) ([]types.StateEntryList, error) {
|
) ([]types.StateEntryList, error) {
|
||||||
return d.StateBlockTable.BulkSelectStateBlockEntries(ctx, stateBlockNIDs)
|
entries, err := d.StateBlockTable.BulkSelectStateBlockEntries(
|
||||||
|
ctx, stateBlockNIDs,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("d.StateBlockTable.BulkSelectStateBlockEntries: %w", err)
|
||||||
|
}
|
||||||
|
lists := make([]types.StateEntryList, 0, len(entries))
|
||||||
|
for i, entry := range entries {
|
||||||
|
eventNIDs, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("d.EventsTable.BulkSelectStateEventByNID: %w", err)
|
||||||
|
}
|
||||||
|
lists = append(lists, types.StateEntryList{
|
||||||
|
StateBlockNID: stateBlockNIDs[i],
|
||||||
|
StateEntries: eventNIDs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return lists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error {
|
func (d *Database) SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
func LoadFromGoose() {
|
func LoadFromGoose() {
|
||||||
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
|
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
|
||||||
|
goose.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
|
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
|
||||||
|
|
|
@ -0,0 +1,168 @@
|
||||||
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package deltas
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
|
||||||
|
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
|
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
|
||||||
|
defer logrus.Warn("State storage upgrade complete")
|
||||||
|
|
||||||
|
var maxsnapshotid int
|
||||||
|
var maxblockid int
|
||||||
|
if err := tx.QueryRow(`SELECT IFNULL(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
|
||||||
|
}
|
||||||
|
if err := tx.QueryRow(`SELECT IFNULL(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
|
||||||
|
}
|
||||||
|
maxsnapshotid++
|
||||||
|
maxblockid++
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(`ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS roomserver_state_block (
|
||||||
|
state_block_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
state_block_hash BLOB UNIQUE,
|
||||||
|
event_nids TEXT NOT NULL DEFAULT '[]'
|
||||||
|
);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
|
||||||
|
state_snapshot_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
state_snapshot_hash BLOB UNIQUE,
|
||||||
|
room_nid INTEGER NOT NULL,
|
||||||
|
state_block_nids TEXT NOT NULL DEFAULT '[]'
|
||||||
|
);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
}
|
||||||
|
snapshotrows, err := tx.Query(`SELECT state_snapshot_nid, room_nid, state_block_nids FROM _roomserver_state_snapshots;`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.Query: %w", err)
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(context.TODO(), snapshotrows, "rows.close() failed")
|
||||||
|
for snapshotrows.Next() {
|
||||||
|
var snapshot types.StateSnapshotNID
|
||||||
|
var room types.RoomNID
|
||||||
|
var jsonblocks string
|
||||||
|
var blocks []types.StateBlockNID
|
||||||
|
if err = snapshotrows.Scan(&snapshot, &room, &jsonblocks); err != nil {
|
||||||
|
return fmt.Errorf("rows.Scan: %w", err)
|
||||||
|
}
|
||||||
|
if err = json.Unmarshal([]byte(jsonblocks), &blocks); err != nil {
|
||||||
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var newblocks types.StateBlockNIDs
|
||||||
|
for _, block := range blocks {
|
||||||
|
if err = func() error {
|
||||||
|
blockrows, berr := tx.Query(`SELECT event_nid FROM _roomserver_state_block WHERE state_block_nid = $1`, block)
|
||||||
|
if berr != nil {
|
||||||
|
return fmt.Errorf("tx.Query (event nids from old block): %w", berr)
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(context.TODO(), blockrows, "rows.close() failed")
|
||||||
|
events := types.EventNIDs{}
|
||||||
|
for blockrows.Next() {
|
||||||
|
var event types.EventNID
|
||||||
|
if err = blockrows.Scan(&event); err != nil {
|
||||||
|
return fmt.Errorf("rows.Scan: %w", err)
|
||||||
|
}
|
||||||
|
events = append(events, event)
|
||||||
|
}
|
||||||
|
events = events[:util.SortAndUnique(events)]
|
||||||
|
eventjson, eerr := json.Marshal(events)
|
||||||
|
if eerr != nil {
|
||||||
|
return fmt.Errorf("json.Marshal: %w", eerr)
|
||||||
|
}
|
||||||
|
|
||||||
|
var blocknid types.StateBlockNID
|
||||||
|
err = tx.QueryRow(`
|
||||||
|
INSERT INTO roomserver_state_block (state_block_nid, state_block_hash, event_nids)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$3
|
||||||
|
RETURNING state_block_nid
|
||||||
|
`, maxblockid, events.Hash(), eventjson).Scan(&blocknid)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (insert new block): %w", err)
|
||||||
|
}
|
||||||
|
maxblockid++
|
||||||
|
newblocks = append(newblocks, blocknid)
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newblocksjson, jerr := json.Marshal(newblocks)
|
||||||
|
if jerr != nil {
|
||||||
|
return fmt.Errorf("json.Marshal (new blocks): %w", jerr)
|
||||||
|
}
|
||||||
|
var newsnapshot types.StateSnapshotNID
|
||||||
|
err = tx.QueryRow(`
|
||||||
|
INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids)
|
||||||
|
VALUES ($1, $2, $3, $4)
|
||||||
|
ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$3
|
||||||
|
RETURNING state_snapshot_nid
|
||||||
|
`, maxsnapshotid, newblocks.Hash(), room, newblocksjson).Scan(&newsnapshot)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
|
||||||
|
}
|
||||||
|
maxsnapshotid++
|
||||||
|
if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$2`, newsnapshot, snapshot, maxsnapshotid); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (update events): %w", err)
|
||||||
|
}
|
||||||
|
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$2`, newsnapshot, snapshot, maxsnapshotid); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (update rooms): %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
|
||||||
|
}
|
||||||
|
if _, err = tx.Exec(`DROP TABLE _roomserver_state_block;`); err != nil {
|
||||||
|
return fmt.Errorf("tx.Exec (delete old block table): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DownStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
|
panic("Downgrading state storage is not supported")
|
||||||
|
}
|
|
@ -53,14 +53,16 @@ type eventJSONStatements struct {
|
||||||
bulkSelectEventJSONStmt *sql.Stmt
|
bulkSelectEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
func createEventJSONTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(eventJSONSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
||||||
s := &eventJSONStatements{
|
s := &eventJSONStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventJSONSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventJSONStmt, insertEventJSONSQL},
|
{&s.insertEventJSONStmt, insertEventJSONSQL},
|
||||||
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
|
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
|
||||||
|
|
|
@ -70,14 +70,16 @@ type eventStateKeyStatements struct {
|
||||||
bulkSelectEventStateKeyStmt *sql.Stmt
|
bulkSelectEventStateKeyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
func createEventStateKeysTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(eventStateKeysSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
||||||
s := &eventStateKeyStatements{
|
s := &eventStateKeyStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventStateKeysSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL},
|
{&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL},
|
||||||
{&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL},
|
{&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL},
|
||||||
|
|
|
@ -85,14 +85,15 @@ type eventTypeStatements struct {
|
||||||
bulkSelectEventTypeNIDStmt *sql.Stmt
|
bulkSelectEventTypeNIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
func createEventTypesTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(eventTypesSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
||||||
s := &eventTypeStatements{
|
s := &eventTypeStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventTypesSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
|
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
@ -63,6 +64,11 @@ const bulkSelectStateEventByIDSQL = "" +
|
||||||
" WHERE event_id IN ($1)" +
|
" WHERE event_id IN ($1)" +
|
||||||
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||||
|
|
||||||
|
const bulkSelectStateEventByNIDSQL = "" +
|
||||||
|
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
||||||
|
" WHERE event_nid IN ($1)"
|
||||||
|
// Rest of query is built by BulkSelectStateEventByNID
|
||||||
|
|
||||||
const bulkSelectStateAtEventByIDSQL = "" +
|
const bulkSelectStateAtEventByIDSQL = "" +
|
||||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
|
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
|
||||||
" WHERE event_id IN ($1)"
|
" WHERE event_id IN ($1)"
|
||||||
|
@ -115,14 +121,15 @@ type eventStatements struct {
|
||||||
//selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
//selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) {
|
func createEventsTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(eventsSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
s := &eventStatements{
|
s := &eventStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventsSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertEventStmt, insertEventSQL},
|
{&s.insertEventStmt, insertEventSQL},
|
||||||
|
@ -232,6 +239,61 @@ func (s *eventStatements) BulkSelectStateEventByID(
|
||||||
return results, err
|
return results, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// bulkSelectStateEventByID lookups a list of state events by event ID.
|
||||||
|
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
||||||
|
func (s *eventStatements) BulkSelectStateEventByNID(
|
||||||
|
ctx context.Context, eventNIDs []types.EventNID,
|
||||||
|
stateKeyTuples []types.StateKeyTuple,
|
||||||
|
) ([]types.StateEntry, error) {
|
||||||
|
tuples := stateKeyTupleSorter(stateKeyTuples)
|
||||||
|
sort.Sort(tuples)
|
||||||
|
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
||||||
|
params := make([]interface{}, 0, len(eventNIDs)+len(eventTypeNIDArray)+len(eventStateKeyNIDArray))
|
||||||
|
selectOrig := strings.Replace(bulkSelectStateEventByNIDSQL, "($1)", sqlutil.QueryVariadic(len(eventNIDs)), 1)
|
||||||
|
for _, v := range eventNIDs {
|
||||||
|
params = append(params, v)
|
||||||
|
}
|
||||||
|
if len(eventTypeNIDArray) > 0 {
|
||||||
|
selectOrig += " AND event_type_nid IN " + sqlutil.QueryVariadicOffset(len(eventTypeNIDArray), len(params))
|
||||||
|
for _, v := range eventTypeNIDArray {
|
||||||
|
params = append(params, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(eventStateKeyNIDArray) > 0 {
|
||||||
|
selectOrig += " AND event_state_key_nid IN " + sqlutil.QueryVariadicOffset(len(eventStateKeyNIDArray), len(params))
|
||||||
|
for _, v := range eventStateKeyNIDArray {
|
||||||
|
params = append(params, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
selectOrig += " ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||||
|
selectStmt, err := s.db.Prepare(selectOrig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("s.db.Prepare: %w", err)
|
||||||
|
}
|
||||||
|
rows, err := selectStmt.QueryContext(ctx, params...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("selectStmt.QueryContext: %w", err)
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
|
||||||
|
// We know that we will only get as many results as event IDs
|
||||||
|
// because of the unique constraint on event IDs.
|
||||||
|
// So we can allocate an array of the correct size now.
|
||||||
|
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
|
||||||
|
results := make([]types.StateEntry, len(eventNIDs))
|
||||||
|
i := 0
|
||||||
|
for ; rows.Next(); i++ {
|
||||||
|
result := &results[i]
|
||||||
|
if err = rows.Scan(
|
||||||
|
&result.EventTypeNID,
|
||||||
|
&result.EventStateKeyNID,
|
||||||
|
&result.EventNID,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results[:i], err
|
||||||
|
}
|
||||||
|
|
||||||
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
||||||
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
||||||
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
||||||
|
|
|
@ -70,14 +70,15 @@ type inviteStatements struct {
|
||||||
selectInvitesAboutToRetireStmt *sql.Stmt
|
selectInvitesAboutToRetireStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteInvitesTable(db *sql.DB) (tables.Invites, error) {
|
func createInvitesTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(inviteSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareInvitesTable(db *sql.DB) (tables.Invites, error) {
|
||||||
s := &inviteStatements{
|
s := &inviteStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(inviteSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertInviteEventStmt, insertInviteEventSQL},
|
{&s.insertInviteEventStmt, insertInviteEventSQL},
|
||||||
|
|
|
@ -115,7 +115,12 @@ type membershipStatements struct {
|
||||||
updateMembershipForgetRoomStmt *sql.Stmt
|
updateMembershipForgetRoomStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteMembershipTable(db *sql.DB) (tables.Membership, error) {
|
func createMembershipTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(membershipSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
||||||
s := &membershipStatements{
|
s := &membershipStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
|
@ -135,11 +140,6 @@ func NewSqliteMembershipTable(db *sql.DB) (tables.Membership, error) {
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *membershipStatements) execSchema(db *sql.DB) error {
|
|
||||||
_, err := db.Exec(membershipSchema)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) InsertMembership(
|
func (s *membershipStatements) InsertMembership(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||||
|
|
|
@ -71,14 +71,15 @@ type previousEventStatements struct {
|
||||||
selectPreviousEventExistsStmt *sql.Stmt
|
selectPreviousEventExistsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqlitePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
|
func createPrevEventsTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(previousEventSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func preparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
|
||||||
s := &previousEventStatements{
|
s := &previousEventStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(previousEventSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertPreviousEventStmt, insertPreviousEventSQL},
|
{&s.insertPreviousEventStmt, insertPreviousEventSQL},
|
||||||
|
|
|
@ -50,14 +50,16 @@ type publishedStatements struct {
|
||||||
selectPublishedStmt *sql.Stmt
|
selectPublishedStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqlitePublishedTable(db *sql.DB) (tables.Published, error) {
|
func createPublishedTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(publishedSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func preparePublishedTable(db *sql.DB) (tables.Published, error) {
|
||||||
s := &publishedStatements{
|
s := &publishedStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(publishedSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.upsertPublishedStmt, upsertPublishedSQL},
|
{&s.upsertPublishedStmt, upsertPublishedSQL},
|
||||||
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
|
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
|
||||||
|
|
|
@ -59,14 +59,15 @@ type redactionStatements struct {
|
||||||
markRedactionValidatedStmt *sql.Stmt
|
markRedactionValidatedStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteRedactionsTable(db *sql.DB) (tables.Redactions, error) {
|
func createRedactionsTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(redactionsSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareRedactionsTable(db *sql.DB) (tables.Redactions, error) {
|
||||||
s := &redactionStatements{
|
s := &redactionStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(redactionsSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertRedactionStmt, insertRedactionSQL},
|
{&s.insertRedactionStmt, insertRedactionSQL},
|
||||||
|
|
|
@ -64,14 +64,16 @@ type roomAliasesStatements struct {
|
||||||
deleteRoomAliasStmt *sql.Stmt
|
deleteRoomAliasStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
|
func createRoomAliasesTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(roomAliasesSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
|
||||||
s := &roomAliasesStatements{
|
s := &roomAliasesStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(roomAliasesSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertRoomAliasStmt, insertRoomAliasSQL},
|
{&s.insertRoomAliasStmt, insertRoomAliasSQL},
|
||||||
{&s.selectRoomIDFromAliasStmt, selectRoomIDFromAliasSQL},
|
{&s.selectRoomIDFromAliasStmt, selectRoomIDFromAliasSQL},
|
||||||
|
|
|
@ -86,14 +86,16 @@ type roomStatements struct {
|
||||||
selectRoomIDsStmt *sql.Stmt
|
selectRoomIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteRoomsTable(db *sql.DB) (tables.Rooms, error) {
|
func createRoomsTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(roomsSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
|
||||||
s := &roomStatements{
|
s := &roomStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(roomsSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
|
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
|
||||||
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
|
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
|
||||||
|
|
|
@ -18,6 +18,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -32,228 +33,113 @@ import (
|
||||||
|
|
||||||
const stateDataSchema = `
|
const stateDataSchema = `
|
||||||
CREATE TABLE IF NOT EXISTS roomserver_state_block (
|
CREATE TABLE IF NOT EXISTS roomserver_state_block (
|
||||||
state_block_nid INTEGER NOT NULL,
|
-- The state snapshot NID that identifies this snapshot.
|
||||||
event_type_nid INTEGER NOT NULL,
|
state_block_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
event_state_key_nid INTEGER NOT NULL,
|
-- The hash of the state block, which is used to enforce uniqueness. The hash is
|
||||||
event_nid INTEGER NOT NULL,
|
-- generated in Dendrite and passed through to the database, as a btree index over
|
||||||
UNIQUE (state_block_nid, event_type_nid, event_state_key_nid)
|
-- this column is cheap and fits within the maximum index size.
|
||||||
|
state_block_hash BLOB UNIQUE,
|
||||||
|
-- The event NIDs contained within the state block, encoded as JSON.
|
||||||
|
event_nids TEXT NOT NULL DEFAULT '[]'
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
const insertStateDataSQL = "" +
|
// Insert a new state block. If we conflict on the hash column then
|
||||||
"INSERT INTO roomserver_state_block (state_block_nid, event_type_nid, event_state_key_nid, event_nid)" +
|
// we must perform an update so that the RETURNING statement returns the
|
||||||
" VALUES ($1, $2, $3, $4)"
|
// ID of the row that we conflicted with, so that we can then refer to
|
||||||
|
// the original block.
|
||||||
const selectNextStateBlockNIDSQL = `
|
const insertStateDataSQL = `
|
||||||
SELECT IFNULL(MAX(state_block_nid), 0) + 1 FROM roomserver_state_block
|
INSERT INTO roomserver_state_block (state_block_hash, event_nids)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2
|
||||||
|
RETURNING state_block_nid
|
||||||
`
|
`
|
||||||
|
|
||||||
// Bulk state lookup by numeric state block ID.
|
|
||||||
// Sort by the state_block_nid, event_type_nid, event_state_key_nid
|
|
||||||
// This means that all the entries for a given state_block_nid will appear
|
|
||||||
// together in the list and those entries will sorted by event_type_nid
|
|
||||||
// and event_state_key_nid. This property makes it easier to merge two
|
|
||||||
// state data blocks together.
|
|
||||||
const bulkSelectStateBlockEntriesSQL = "" +
|
const bulkSelectStateBlockEntriesSQL = "" +
|
||||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
"SELECT state_block_nid, event_nids" +
|
||||||
" FROM roomserver_state_block WHERE state_block_nid IN ($1)" +
|
" FROM roomserver_state_block WHERE state_block_nid IN ($1)"
|
||||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
|
||||||
|
|
||||||
// Bulk state lookup by numeric state block ID.
|
|
||||||
// Filters the rows in each block to the requested types and state keys.
|
|
||||||
// We would like to restrict to particular type state key pairs but we are
|
|
||||||
// restricted by the query language to pull the cross product of a list
|
|
||||||
// of types and a list state_keys. So we have to filter the result in the
|
|
||||||
// application to restrict it to the list of event types and state keys we
|
|
||||||
// actually wanted.
|
|
||||||
const bulkSelectFilteredStateBlockEntriesSQL = "" +
|
|
||||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
|
||||||
" FROM roomserver_state_block WHERE state_block_nid IN ($1)" +
|
|
||||||
" AND event_type_nid IN ($2) AND event_state_key_nid IN ($3)" +
|
|
||||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
|
||||||
|
|
||||||
type stateBlockStatements struct {
|
type stateBlockStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
insertStateDataStmt *sql.Stmt
|
insertStateDataStmt *sql.Stmt
|
||||||
selectNextStateBlockNIDStmt *sql.Stmt
|
bulkSelectStateBlockEntriesStmt *sql.Stmt
|
||||||
bulkSelectStateBlockEntriesStmt *sql.Stmt
|
|
||||||
bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
|
func createStateBlockTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(stateDataSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
|
||||||
s := &stateBlockStatements{
|
s := &stateBlockStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(stateDataSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertStateDataStmt, insertStateDataSQL},
|
{&s.insertStateDataStmt, insertStateDataSQL},
|
||||||
{&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL},
|
|
||||||
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
|
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
|
||||||
{&s.bulkSelectFilteredStateBlockEntriesStmt, bulkSelectFilteredStateBlockEntriesSQL},
|
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateBlockStatements) BulkInsertStateData(
|
func (s *stateBlockStatements) BulkInsertStateData(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context,
|
||||||
entries []types.StateEntry,
|
txn *sql.Tx,
|
||||||
) (types.StateBlockNID, error) {
|
entries types.StateEntries,
|
||||||
if len(entries) == 0 {
|
) (id types.StateBlockNID, err error) {
|
||||||
return 0, nil
|
entries = entries[:util.SortAndUnique(entries)]
|
||||||
|
var nids types.EventNIDs
|
||||||
|
for _, e := range entries {
|
||||||
|
nids = append(nids, e.EventNID)
|
||||||
}
|
}
|
||||||
var stateBlockNID types.StateBlockNID
|
js, err := json.Marshal(nids)
|
||||||
err := sqlutil.TxStmt(txn, s.selectNextStateBlockNIDStmt).QueryRowContext(ctx).Scan(&stateBlockNID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, fmt.Errorf("json.Marshal: %w", err)
|
||||||
}
|
}
|
||||||
for _, entry := range entries {
|
err = s.insertStateDataStmt.QueryRowContext(
|
||||||
_, err = sqlutil.TxStmt(txn, s.insertStateDataStmt).ExecContext(
|
ctx, nids.Hash(), js,
|
||||||
ctx,
|
).Scan(&id)
|
||||||
int64(stateBlockNID),
|
return
|
||||||
int64(entry.EventTypeNID),
|
|
||||||
int64(entry.EventStateKeyNID),
|
|
||||||
int64(entry.EventNID),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stateBlockNID, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateBlockStatements) BulkSelectStateBlockEntries(
|
func (s *stateBlockStatements) BulkSelectStateBlockEntries(
|
||||||
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
|
ctx context.Context, stateBlockNIDs types.StateBlockNIDs,
|
||||||
) ([]types.StateEntryList, error) {
|
) ([][]types.EventNID, error) {
|
||||||
nids := make([]interface{}, len(stateBlockNIDs))
|
intfs := make([]interface{}, len(stateBlockNIDs))
|
||||||
for k, v := range stateBlockNIDs {
|
for i := range stateBlockNIDs {
|
||||||
nids[k] = v
|
intfs[i] = int64(stateBlockNIDs[i])
|
||||||
}
|
}
|
||||||
selectOrig := strings.Replace(bulkSelectStateBlockEntriesSQL, "($1)", sqlutil.QueryVariadic(len(nids)), 1)
|
selectOrig := strings.Replace(bulkSelectStateBlockEntriesSQL, "($1)", sqlutil.QueryVariadic(len(intfs)), 1)
|
||||||
selectStmt, err := s.db.Prepare(selectOrig)
|
selectStmt, err := s.db.Prepare(selectOrig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rows, err := selectStmt.QueryContext(ctx, nids...)
|
rows, err := selectStmt.QueryContext(ctx, intfs...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateBlockEntries: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateBlockEntries: rows.close() failed")
|
||||||
|
|
||||||
results := make([]types.StateEntryList, len(stateBlockNIDs))
|
results := make([][]types.EventNID, len(stateBlockNIDs))
|
||||||
// current is a pointer to the StateEntryList to append the state entries to.
|
|
||||||
var current *types.StateEntryList
|
|
||||||
i := 0
|
i := 0
|
||||||
for rows.Next() {
|
for ; rows.Next(); i++ {
|
||||||
var (
|
var stateBlockNID types.StateBlockNID
|
||||||
stateBlockNID int64
|
var result json.RawMessage
|
||||||
eventTypeNID int64
|
if err = rows.Scan(&stateBlockNID, &result); err != nil {
|
||||||
eventStateKeyNID int64
|
|
||||||
eventNID int64
|
|
||||||
entry types.StateEntry
|
|
||||||
)
|
|
||||||
if err := rows.Scan(
|
|
||||||
&stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
|
|
||||||
); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
|
r := []types.EventNID{}
|
||||||
entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
|
if err = json.Unmarshal(result, &r); err != nil {
|
||||||
entry.EventNID = types.EventNID(eventNID)
|
return nil, fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
if current == nil || types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
|
|
||||||
// The state entry row is for a different state data block to the current one.
|
|
||||||
// So we start appending to the next entry in the list.
|
|
||||||
current = &results[i]
|
|
||||||
current.StateBlockNID = types.StateBlockNID(stateBlockNID)
|
|
||||||
i++
|
|
||||||
}
|
}
|
||||||
current.StateEntries = append(current.StateEntries, entry)
|
results[i] = r
|
||||||
}
|
}
|
||||||
if i != len(nids) {
|
if err = rows.Err(); err != nil {
|
||||||
return nil, fmt.Errorf("storage: state data NIDs missing from the database (%d != %d)", i, len(nids))
|
|
||||||
}
|
|
||||||
return results, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries(
|
|
||||||
ctx context.Context,
|
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
|
||||||
stateKeyTuples []types.StateKeyTuple,
|
|
||||||
) ([]types.StateEntryList, error) {
|
|
||||||
tuples := stateKeyTupleSorter(stateKeyTuples)
|
|
||||||
// Sort the tuples so that we can run binary search against them as we filter the rows returned by the db.
|
|
||||||
sort.Sort(tuples)
|
|
||||||
|
|
||||||
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
|
||||||
sqlStatement := strings.Replace(bulkSelectFilteredStateBlockEntriesSQL, "($1)", sqlutil.QueryVariadic(len(stateBlockNIDs)), 1)
|
|
||||||
sqlStatement = strings.Replace(sqlStatement, "($2)", sqlutil.QueryVariadicOffset(len(eventTypeNIDArray), len(stateBlockNIDs)), 1)
|
|
||||||
sqlStatement = strings.Replace(sqlStatement, "($3)", sqlutil.QueryVariadicOffset(len(eventStateKeyNIDArray), len(stateBlockNIDs)+len(eventTypeNIDArray)), 1)
|
|
||||||
|
|
||||||
var params []interface{}
|
|
||||||
for _, val := range stateBlockNIDs {
|
|
||||||
params = append(params, int64(val))
|
|
||||||
}
|
|
||||||
for _, val := range eventTypeNIDArray {
|
|
||||||
params = append(params, val)
|
|
||||||
}
|
|
||||||
for _, val := range eventStateKeyNIDArray {
|
|
||||||
params = append(params, val)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := s.db.QueryContext(
|
|
||||||
ctx,
|
|
||||||
sqlStatement,
|
|
||||||
params...,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectFilteredStateBlockEntries: rows.close() failed")
|
if i != len(stateBlockNIDs) {
|
||||||
|
return nil, fmt.Errorf("storage: state data NIDs missing from the database (%d != %d)", len(results), len(stateBlockNIDs))
|
||||||
var results []types.StateEntryList
|
|
||||||
var current types.StateEntryList
|
|
||||||
for rows.Next() {
|
|
||||||
var (
|
|
||||||
stateBlockNID int64
|
|
||||||
eventTypeNID int64
|
|
||||||
eventStateKeyNID int64
|
|
||||||
eventNID int64
|
|
||||||
entry types.StateEntry
|
|
||||||
)
|
|
||||||
if err := rows.Scan(
|
|
||||||
&stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
|
|
||||||
); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
|
|
||||||
entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
|
|
||||||
entry.EventNID = types.EventNID(eventNID)
|
|
||||||
|
|
||||||
// We can use binary search here because we sorted the tuples earlier
|
|
||||||
if !tuples.contains(entry.StateKeyTuple) {
|
|
||||||
// The select will return the cross product of types and state keys.
|
|
||||||
// So we need to check if type of the entry is in the list.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
|
|
||||||
// The state entry row is for a different state data block to the current one.
|
|
||||||
// So we append the current entry to the results and start adding to a new one.
|
|
||||||
// The first time through the loop current will be empty.
|
|
||||||
if current.StateEntries != nil {
|
|
||||||
results = append(results, current)
|
|
||||||
}
|
|
||||||
current = types.StateEntryList{StateBlockNID: types.StateBlockNID(stateBlockNID)}
|
|
||||||
}
|
|
||||||
current.StateEntries = append(current.StateEntries, entry)
|
|
||||||
}
|
}
|
||||||
// Add the last entry to the list if it is not empty.
|
return results, err
|
||||||
if current.StateEntries != nil {
|
|
||||||
results = append(results, current)
|
|
||||||
}
|
|
||||||
return results, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type stateKeyTupleSorter []types.StateKeyTuple
|
type stateKeyTupleSorter []types.StateKeyTuple
|
||||||
|
|
|
@ -27,19 +27,34 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const stateSnapshotSchema = `
|
const stateSnapshotSchema = `
|
||||||
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
|
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
|
||||||
|
-- The state snapshot NID that identifies this snapshot.
|
||||||
state_snapshot_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
state_snapshot_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
-- The hash of the state snapshot, which is used to enforce uniqueness. The hash is
|
||||||
|
-- generated in Dendrite and passed through to the database, as a btree index over
|
||||||
|
-- this column is cheap and fits within the maximum index size.
|
||||||
|
state_snapshot_hash BLOB UNIQUE,
|
||||||
|
-- The room NID that the snapshot belongs to.
|
||||||
room_nid INTEGER NOT NULL,
|
room_nid INTEGER NOT NULL,
|
||||||
|
-- The state blocks contained within this snapshot, encoded as JSON.
|
||||||
state_block_nids TEXT NOT NULL DEFAULT '[]'
|
state_block_nids TEXT NOT NULL DEFAULT '[]'
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
|
// Insert a new state snapshot. If we conflict on the hash column then
|
||||||
|
// we must perform an update so that the RETURNING statement returns the
|
||||||
|
// ID of the row that we conflicted with, so that we can then refer to
|
||||||
|
// the original snapshot.
|
||||||
const insertStateSQL = `
|
const insertStateSQL = `
|
||||||
INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids)
|
INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)
|
||||||
VALUES ($1, $2);`
|
VALUES ($1, $2, $3)
|
||||||
|
ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2
|
||||||
|
RETURNING state_snapshot_nid
|
||||||
|
`
|
||||||
|
|
||||||
// Bulk state data NID lookup.
|
// Bulk state data NID lookup.
|
||||||
// Sorting by state_snapshot_nid means we can use binary search over the result
|
// Sorting by state_snapshot_nid means we can use binary search over the result
|
||||||
|
@ -54,14 +69,15 @@ type stateSnapshotStatements struct {
|
||||||
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
func createStateSnapshotTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(stateSnapshotSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
||||||
s := &stateSnapshotStatements{
|
s := &stateSnapshotStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(stateSnapshotSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertStateStmt, insertStateSQL},
|
{&s.insertStateStmt, insertStateSQL},
|
||||||
|
@ -70,22 +86,20 @@ func NewSqliteStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateSnapshotStatements) InsertState(
|
func (s *stateSnapshotStatements) InsertState(
|
||||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID,
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs types.StateBlockNIDs,
|
||||||
) (stateNID types.StateSnapshotNID, err error) {
|
) (stateNID types.StateSnapshotNID, err error) {
|
||||||
|
stateBlockNIDs = stateBlockNIDs[:util.SortAndUnique(stateBlockNIDs)]
|
||||||
stateBlockNIDsJSON, err := json.Marshal(stateBlockNIDs)
|
stateBlockNIDsJSON, err := json.Marshal(stateBlockNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
insertStmt := sqlutil.TxStmt(txn, s.insertStateStmt)
|
insertStmt := sqlutil.TxStmt(txn, s.insertStateStmt)
|
||||||
res, err := insertStmt.ExecContext(ctx, int64(roomNID), string(stateBlockNIDsJSON))
|
var id int64
|
||||||
|
err = insertStmt.QueryRowContext(ctx, stateBlockNIDs.Hash(), int64(roomNID), string(stateBlockNIDsJSON)).Scan(&id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
lastRowID, err := res.LastInsertId()
|
stateNID = types.StateSnapshotNID(id)
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
stateNID = types.StateSnapshotNID(lastRowID)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,17 +53,22 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
|
||||||
// which it will never obtain.
|
// which it will never obtain.
|
||||||
db.SetMaxOpenConns(20)
|
db.SetMaxOpenConns(20)
|
||||||
|
|
||||||
// Create tables before executing migrations so we don't fail if the table is missing,
|
// Create the tables.
|
||||||
// and THEN prepare statements so we don't fail due to referencing new columns
|
if err := d.create(db); err != nil {
|
||||||
ms := membershipStatements{}
|
|
||||||
if err := ms.execSchema(db); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Then execute the migrations. By this point the tables are created with the latest
|
||||||
|
// schemas.
|
||||||
m := sqlutil.NewMigrations()
|
m := sqlutil.NewMigrations()
|
||||||
deltas.LoadAddForgottenColumn(m)
|
deltas.LoadAddForgottenColumn(m)
|
||||||
|
deltas.LoadStateBlocksRefactor(m)
|
||||||
if err := m.RunDeltas(db, dbProperties); err != nil {
|
if err := m.RunDeltas(db, dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Then prepare the statements. Now that the migrations have run, any columns referred
|
||||||
|
// to in the database code should now exist.
|
||||||
if err := d.prepare(db, cache); err != nil {
|
if err := d.prepare(db, cache); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -71,62 +76,107 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
|
||||||
return &d, nil
|
return &d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: gocyclo
|
func (d *Database) create(db *sql.DB) error {
|
||||||
|
if err := createEventStateKeysTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createEventTypesTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createEventJSONTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createEventsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createRoomsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createTransactionsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createStateBlockTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createStateSnapshotTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createPrevEventsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createRoomAliasesTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createInvitesTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createMembershipTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createPublishedTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := createRedactionsTable(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
|
func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
|
||||||
var err error
|
eventStateKeys, err := prepareEventStateKeysTable(db)
|
||||||
eventStateKeys, err := NewSqliteEventStateKeysTable(db)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
eventTypes, err := NewSqliteEventTypesTable(db)
|
eventTypes, err := prepareEventTypesTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
eventJSON, err := NewSqliteEventJSONTable(db)
|
eventJSON, err := prepareEventJSONTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
events, err := NewSqliteEventsTable(db)
|
events, err := prepareEventsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rooms, err := NewSqliteRoomsTable(db)
|
rooms, err := prepareRoomsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
transactions, err := NewSqliteTransactionsTable(db)
|
transactions, err := prepareTransactionsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
stateBlock, err := NewSqliteStateBlockTable(db)
|
stateBlock, err := prepareStateBlockTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
stateSnapshot, err := NewSqliteStateSnapshotTable(db)
|
stateSnapshot, err := prepareStateSnapshotTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
prevEvents, err := NewSqlitePrevEventsTable(db)
|
prevEvents, err := preparePrevEventsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
roomAliases, err := NewSqliteRoomAliasesTable(db)
|
roomAliases, err := prepareRoomAliasesTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
invites, err := NewSqliteInvitesTable(db)
|
invites, err := prepareInvitesTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
membership, err := NewSqliteMembershipTable(db)
|
membership, err := prepareMembershipTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
published, err := NewSqlitePublishedTable(db)
|
published, err := preparePublishedTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
redactions, err := NewSqliteRedactionsTable(db)
|
redactions, err := prepareRedactionsTable(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,14 +49,15 @@ type transactionStatements struct {
|
||||||
selectTransactionEventIDStmt *sql.Stmt
|
selectTransactionEventIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteTransactionsTable(db *sql.DB) (tables.Transactions, error) {
|
func createTransactionsTable(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(transactionsSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareTransactionsTable(db *sql.DB) (tables.Transactions, error) {
|
||||||
s := &transactionStatements{
|
s := &transactionStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(transactionsSchema)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, shared.StatementList{
|
return s, shared.StatementList{
|
||||||
{&s.insertTransactionStmt, insertTransactionSQL},
|
{&s.insertTransactionStmt, insertTransactionSQL},
|
||||||
|
|
|
@ -43,6 +43,7 @@ type Events interface {
|
||||||
// bulkSelectStateEventByID lookups a list of state events by event ID.
|
// bulkSelectStateEventByID lookups a list of state events by event ID.
|
||||||
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
||||||
BulkSelectStateEventByID(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
|
BulkSelectStateEventByID(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
|
||||||
|
BulkSelectStateEventByNID(ctx context.Context, eventNIDs []types.EventNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error)
|
||||||
// BulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
// BulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
||||||
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
||||||
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
||||||
|
@ -81,14 +82,14 @@ type Transactions interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type StateSnapshot interface {
|
type StateSnapshot interface {
|
||||||
InsertState(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID) (stateNID types.StateSnapshotNID, err error)
|
InsertState(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs types.StateBlockNIDs) (stateNID types.StateSnapshotNID, err error)
|
||||||
BulkSelectStateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
|
BulkSelectStateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StateBlock interface {
|
type StateBlock interface {
|
||||||
BulkInsertStateData(ctx context.Context, txn *sql.Tx, entries []types.StateEntry) (types.StateBlockNID, error)
|
BulkInsertStateData(ctx context.Context, txn *sql.Tx, entries types.StateEntries) (types.StateBlockNID, error)
|
||||||
BulkSelectStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
|
BulkSelectStateBlockEntries(ctx context.Context, stateBlockNIDs types.StateBlockNIDs) ([][]types.EventNID, error)
|
||||||
BulkSelectFilteredStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
//BulkSelectFilteredStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type RoomAliases interface {
|
type RoomAliases interface {
|
||||||
|
|
|
@ -16,9 +16,11 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"golang.org/x/crypto/blake2b"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EventTypeNID is a numeric ID for an event type.
|
// EventTypeNID is a numeric ID for an event type.
|
||||||
|
@ -40,6 +42,38 @@ type StateSnapshotNID int64
|
||||||
// These blocks of state data are combined to form the actual state.
|
// These blocks of state data are combined to form the actual state.
|
||||||
type StateBlockNID int64
|
type StateBlockNID int64
|
||||||
|
|
||||||
|
// EventNIDs is used to sort and dedupe event NIDs.
|
||||||
|
type EventNIDs []EventNID
|
||||||
|
|
||||||
|
func (a EventNIDs) Len() int { return len(a) }
|
||||||
|
func (a EventNIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a EventNIDs) Less(i, j int) bool { return a[i] < a[j] }
|
||||||
|
|
||||||
|
func (a EventNIDs) Hash() []byte {
|
||||||
|
j, err := json.Marshal(a)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
h := blake2b.Sum256(j)
|
||||||
|
return h[:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// StateBlockNIDs is used to sort and dedupe state block NIDs.
|
||||||
|
type StateBlockNIDs []StateBlockNID
|
||||||
|
|
||||||
|
func (a StateBlockNIDs) Len() int { return len(a) }
|
||||||
|
func (a StateBlockNIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a StateBlockNIDs) Less(i, j int) bool { return a[i] < a[j] }
|
||||||
|
|
||||||
|
func (a StateBlockNIDs) Hash() []byte {
|
||||||
|
j, err := json.Marshal(a)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
h := blake2b.Sum256(j)
|
||||||
|
return h[:]
|
||||||
|
}
|
||||||
|
|
||||||
// A StateKeyTuple is a pair of a numeric event type and a numeric state key.
|
// A StateKeyTuple is a pair of a numeric event type and a numeric state key.
|
||||||
// It is used to lookup state entries.
|
// It is used to lookup state entries.
|
||||||
type StateKeyTuple struct {
|
type StateKeyTuple struct {
|
||||||
|
@ -65,6 +99,12 @@ type StateEntry struct {
|
||||||
EventNID EventNID
|
EventNID EventNID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StateEntries []StateEntry
|
||||||
|
|
||||||
|
func (a StateEntries) Len() int { return len(a) }
|
||||||
|
func (a StateEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a StateEntries) Less(i, j int) bool { return a[i].EventNID < a[j].EventNID }
|
||||||
|
|
||||||
// LessThan returns true if this state entry is less than the other state entry.
|
// LessThan returns true if this state entry is less than the other state entry.
|
||||||
// The ordering is arbitrary and is used to implement binary search and to efficiently deduplicate entries.
|
// The ordering is arbitrary and is used to implement binary search and to efficiently deduplicate entries.
|
||||||
func (a StateEntry) LessThan(b StateEntry) bool {
|
func (a StateEntry) LessThan(b StateEntry) bool {
|
||||||
|
|
Loading…
Reference in a new issue