mirror of
https://github.com/matrix-org/dendrite
synced 2024-06-13 18:08:59 +02:00
Add storage layer
This commit is contained in:
parent
6e57553480
commit
c393e47e7a
|
@ -195,6 +195,7 @@ type Database interface {
|
|||
|
||||
// RoomsWithACLs returns all room IDs for rooms with ACLs
|
||||
RoomsWithACLs(ctx context.Context) ([]string, error)
|
||||
QueryAdminEventReports(ctx context.Context, from uint64, limit uint64, backwards bool, userID string, roomID string) ([]api.QueryAdminEventReportsResponse, int64, error)
|
||||
}
|
||||
|
||||
type UserRoomKeys interface {
|
||||
|
|
|
@ -19,7 +19,9 @@ import (
|
|||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
|
@ -32,8 +34,8 @@ CREATE TABLE IF NOT EXISTS roomserver_reported_events
|
|||
id BIGINT PRIMARY KEY DEFAULT nextval('roomserver_reported_events_id_seq'),
|
||||
room_nid BIGINT NOT NULL,
|
||||
event_nid BIGINT NOT NULL,
|
||||
reporting_user_nid INTEGER NOT NULL, -- the user reporting the event
|
||||
event_sender_nid INTEGER NOT NULL, -- the user who sent the reported event
|
||||
reporting_user_nid BIGINT NOT NULL, -- the user reporting the event
|
||||
event_sender_nid BIGINT NOT NULL, -- the user who sent the reported event
|
||||
reason TEXT,
|
||||
score INTEGER,
|
||||
received_ts BIGINT NOT NULL
|
||||
|
@ -45,8 +47,38 @@ const insertReportedEventSQL = `
|
|||
RETURNING id
|
||||
`
|
||||
|
||||
const selectReportedEventsDescSQL = `
|
||||
WITH countReports AS (
|
||||
SELECT count(*) as report_count
|
||||
FROM roomserver_reported_events
|
||||
WHERE ($1::BIGINT IS NULL OR room_nid = $1::BIGINT) AND ($2::TEXT IS NULL OR reporting_user_nid = $2::BIGINT)
|
||||
)
|
||||
SELECT report_count, id, room_nid, event_nid, reporting_user_nid, event_sender_nid, reason, score, received_ts
|
||||
FROM roomserver_reported_events, countReports
|
||||
WHERE ($1::BIGINT IS NULL OR room_nid = $1::BIGINT) AND ($2::TEXT IS NULL OR reporting_user_nid = $2::BIGINT)
|
||||
ORDER BY received_ts DESC
|
||||
OFFSET $3
|
||||
LIMIT $4
|
||||
`
|
||||
|
||||
const selectReportedEventsAscSQL = `
|
||||
WITH countReports AS (
|
||||
SELECT count(*) as report_count
|
||||
FROM roomserver_reported_events
|
||||
WHERE ($1::BIGINT IS NULL OR room_nid = $1::BIGINT) AND ($2::TEXT IS NULL OR reporting_user_nid = $2::BIGINT)
|
||||
)
|
||||
SELECT report_count, id, room_nid, event_nid, reporting_user_nid, event_sender_nid, reason, score, received_ts
|
||||
FROM roomserver_reported_events, countReports
|
||||
WHERE ($1::BIGINT IS NULL OR room_nid = $1::BIGINT) AND ($2::TEXT IS NULL OR reporting_user_nid = $2::BIGINT)
|
||||
ORDER BY received_ts ASC
|
||||
OFFSET $3
|
||||
LIMIT $4
|
||||
`
|
||||
|
||||
type reportedEventsStatements struct {
|
||||
insertReportedEventsStmt *sql.Stmt
|
||||
insertReportedEventsStmt *sql.Stmt
|
||||
selectReportedEventsDescStmt *sql.Stmt
|
||||
selectReportedEventsAscStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateReportedEventsTable(db *sql.DB) error {
|
||||
|
@ -59,6 +91,8 @@ func PrepareReportedEventsTable(db *sql.DB) (tables.ReportedEvents, error) {
|
|||
|
||||
return s, sqlutil.StatementList{
|
||||
{&s.insertReportedEventsStmt, insertReportedEventSQL},
|
||||
{&s.selectReportedEventsDescStmt, selectReportedEventsDescSQL},
|
||||
{&s.selectReportedEventsAscStmt, selectReportedEventsAscSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -86,3 +120,55 @@ func (r *reportedEventsStatements) InsertReportedEvent(
|
|||
).Scan(&reportID)
|
||||
return reportID, err
|
||||
}
|
||||
|
||||
func (r *reportedEventsStatements) SelectReportedEvents(ctx context.Context, txn *sql.Tx, from, limit uint64, backwards bool, reportingUserID types.EventStateKeyNID, roomNID types.RoomNID) ([]api.QueryAdminEventReportsResponse, int64, error) {
|
||||
|
||||
var stmt *sql.Stmt
|
||||
if backwards {
|
||||
stmt = sqlutil.TxStmt(txn, r.selectReportedEventsDescStmt)
|
||||
} else {
|
||||
stmt = sqlutil.TxStmt(txn, r.selectReportedEventsAscStmt)
|
||||
}
|
||||
|
||||
var qryRoomNID *types.RoomNID
|
||||
if roomNID > 0 {
|
||||
qryRoomNID = &roomNID
|
||||
}
|
||||
var qryReportingUser *types.EventStateKeyNID
|
||||
if reportingUserID > 0 {
|
||||
qryReportingUser = &reportingUserID
|
||||
}
|
||||
|
||||
rows, err := stmt.QueryContext(ctx,
|
||||
qryRoomNID,
|
||||
qryReportingUser,
|
||||
from,
|
||||
limit,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectReportedEvents: failed to close rows")
|
||||
|
||||
var result []api.QueryAdminEventReportsResponse
|
||||
var row api.QueryAdminEventReportsResponse
|
||||
var count int64
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(
|
||||
&count,
|
||||
&row.ID,
|
||||
&row.RoomNID,
|
||||
&row.EventNID,
|
||||
&row.ReportingUserNID,
|
||||
&row.SenderNID,
|
||||
&row.Reason,
|
||||
&row.Score,
|
||||
&row.ReceivedTS,
|
||||
); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
result = append(result, row)
|
||||
}
|
||||
|
||||
return result, count, rows.Err()
|
||||
}
|
||||
|
|
|
@ -1936,6 +1936,131 @@ func (d *Database) InsertReportedEvent(
|
|||
return reportID, err
|
||||
}
|
||||
|
||||
// QueryAdminEventReports returns event reports given a filter.
|
||||
func (d *Database) QueryAdminEventReports(ctx context.Context, from uint64, limit uint64, backwards bool, userID string, roomID string) ([]api.QueryAdminEventReportsResponse, int64, error) {
|
||||
// Filter on roomID, if requested
|
||||
var roomNID types.RoomNID
|
||||
if roomID != "" {
|
||||
roomInfo, err := d.RoomInfo(ctx, roomID)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
roomNID = roomInfo.RoomNID
|
||||
}
|
||||
|
||||
// Same as above, but for userID
|
||||
var userNID types.EventStateKeyNID
|
||||
if userID != "" {
|
||||
stateKeysMap, err := d.EventStateKeyNIDs(ctx, []string{userID})
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if len(stateKeysMap) != 1 {
|
||||
return nil, 0, fmt.Errorf("failed to get eventStateKeyNID for %s", userID)
|
||||
}
|
||||
userNID = stateKeysMap[userID]
|
||||
}
|
||||
|
||||
// Query all reported events matching the filters
|
||||
reports, count, err := d.ReportedEventsTable.SelectReportedEvents(ctx, nil, from, limit, backwards, userNID, roomNID)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to SelectReportedEvents: %w", err)
|
||||
}
|
||||
|
||||
// TODO: The below code may be inefficient due to many DB round trips and needs to be revisited.
|
||||
// For the time being, this is "good enough".
|
||||
qryRoomNIDs := make([]types.RoomNID, 0, len(reports))
|
||||
qryEventNIDs := make([]types.EventNID, 0, len(reports))
|
||||
qryStateKeyNIDs := make([]types.EventStateKeyNID, 0, len(reports))
|
||||
for _, report := range reports {
|
||||
qryRoomNIDs = append(qryRoomNIDs, report.RoomNID)
|
||||
qryEventNIDs = append(qryEventNIDs, report.EventNID)
|
||||
qryStateKeyNIDs = append(qryStateKeyNIDs, report.ReportingUserNID, report.SenderNID)
|
||||
}
|
||||
|
||||
// This also de-dupes the roomIDs, otherwise we would query the same
|
||||
// roomIDs in GetBulkStateContent multiple times
|
||||
roomIDs, err := d.RoomsTable.BulkSelectRoomIDs(ctx, nil, qryRoomNIDs)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// TODO: replace this with something more efficient, as it loads the entire state snapshot.
|
||||
stateContent, err := d.GetBulkStateContent(ctx, roomIDs, []gomatrixserverlib.StateKeyTuple{
|
||||
{EventType: spec.MRoomName, StateKey: ""},
|
||||
{EventType: spec.MRoomCanonicalAlias, StateKey: ""},
|
||||
}, false)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
eventIDMap, err := d.EventIDs(ctx, qryEventNIDs)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("unable to map eventNIDs to eventIDs")
|
||||
return nil, 0, err
|
||||
}
|
||||
if len(eventIDMap) != len(qryEventNIDs) {
|
||||
return nil, 0, fmt.Errorf("expected %d eventIDs, got %d", len(qryEventNIDs), len(eventIDMap))
|
||||
}
|
||||
|
||||
// Get a map from EventStateKeyNID to userID
|
||||
userNIDMap, err := d.EventStateKeys(ctx, qryStateKeyNIDs)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("unable to map userNIDs to userIDs")
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Create a cache from roomNID to roomID to avoid hitting the DB again
|
||||
roomNIDIDCache := make(map[types.RoomNID]string, len(roomIDs))
|
||||
for i := 0; i < len(reports); i++ {
|
||||
cachedRoomID := roomNIDIDCache[reports[i].RoomNID]
|
||||
if cachedRoomID == "" {
|
||||
// We need to query this again, as we otherwise don't have a way to match roomNID -> roomID
|
||||
roomIDs, err = d.RoomsTable.BulkSelectRoomIDs(ctx, nil, []types.RoomNID{reports[i].RoomNID})
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if len(roomIDs) == 0 || len(roomIDs) > 1 {
|
||||
logrus.Warnf("unable to map roomNID %d to a roomID, was this room deleted?", roomNID)
|
||||
continue
|
||||
}
|
||||
roomNIDIDCache[reports[i].RoomNID] = roomIDs[0]
|
||||
cachedRoomID = roomIDs[0]
|
||||
}
|
||||
|
||||
reports[i].EventID = eventIDMap[reports[i].EventNID]
|
||||
reports[i].RoomID = cachedRoomID
|
||||
roomName, canonicalAlias := findRoomNameAndCanonicalAlias(stateContent, cachedRoomID)
|
||||
reports[i].RoomName = roomName
|
||||
reports[i].CanonicalAlias = canonicalAlias
|
||||
reports[i].Sender = userNIDMap[reports[i].SenderNID]
|
||||
reports[i].UserID = userNIDMap[reports[i].ReportingUserNID]
|
||||
}
|
||||
|
||||
return reports, count, nil
|
||||
}
|
||||
|
||||
// findRoomNameAndCanonicalAlias loops over events to find the corresponding room name and canonicalAlias
|
||||
// for a given roomID.
|
||||
func findRoomNameAndCanonicalAlias(events []tables.StrippedEvent, roomID string) (name, canonicalAlias string) {
|
||||
for _, ev := range events {
|
||||
if ev.RoomID != roomID {
|
||||
continue
|
||||
}
|
||||
if ev.EventType == spec.MRoomName {
|
||||
name = ev.ContentValue
|
||||
}
|
||||
if ev.EventType == spec.MRoomCanonicalAlias {
|
||||
canonicalAlias = ev.ContentValue
|
||||
}
|
||||
// We found both wanted values, break the loop
|
||||
if name != "" && canonicalAlias != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
return name, canonicalAlias
|
||||
}
|
||||
|
||||
// FIXME TODO: Remove all this - horrible dupe with roomserver/state. Can't use the original impl because of circular loops
|
||||
// it should live in this package!
|
||||
|
||||
|
|
|
@ -19,7 +19,9 @@ import (
|
|||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
|
@ -44,8 +46,38 @@ const insertReportedEventSQL = `
|
|||
RETURNING id
|
||||
`
|
||||
|
||||
const selectReportedEventsDescSQL = `
|
||||
WITH countReports AS (
|
||||
SELECT count(*) as report_count
|
||||
FROM roomserver_reported_events
|
||||
WHERE ($1 IS NULL OR room_nid = $1) AND ($2 IS NULL OR reporting_user_nid = $2)
|
||||
)
|
||||
SELECT report_count, id, room_nid, event_nid, reporting_user_nid, event_sender_nid, reason, score, received_ts
|
||||
FROM roomserver_reported_events, countReports
|
||||
WHERE ($1 IS NULL OR room_nid = $1) AND ($2 IS NULL OR reporting_user_nid = $2)
|
||||
ORDER BY received_ts DESC
|
||||
LIMIT $3
|
||||
OFFSET $4
|
||||
`
|
||||
|
||||
const selectReportedEventsAscSQL = `
|
||||
WITH countReports AS (
|
||||
SELECT count(*) as report_count
|
||||
FROM roomserver_reported_events
|
||||
WHERE ($1 IS NULL OR room_nid = $1) AND ($2 IS NULL OR reporting_user_nid = $2)
|
||||
)
|
||||
SELECT report_count, id, room_nid, event_nid, reporting_user_nid, event_sender_nid, reason, score, received_ts
|
||||
FROM roomserver_reported_events, countReports
|
||||
WHERE ($1 IS NULL OR room_nid = $1) AND ($2 IS NULL OR reporting_user_nid = $2)
|
||||
ORDER BY received_ts ASC
|
||||
LIMIT $3
|
||||
OFFSET $4
|
||||
`
|
||||
|
||||
type reportedEventsStatements struct {
|
||||
insertReportedEventsStmt *sql.Stmt
|
||||
insertReportedEventsStmt *sql.Stmt
|
||||
selectReportedEventsDescStmt *sql.Stmt
|
||||
selectReportedEventsAscStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateReportedEventsTable(db *sql.DB) error {
|
||||
|
@ -58,6 +90,8 @@ func PrepareReportedEventsTable(db *sql.DB) (tables.ReportedEvents, error) {
|
|||
|
||||
return s, sqlutil.StatementList{
|
||||
{&s.insertReportedEventsStmt, insertReportedEventSQL},
|
||||
{&s.selectReportedEventsDescStmt, selectReportedEventsDescSQL},
|
||||
{&s.selectReportedEventsAscStmt, selectReportedEventsAscSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -85,3 +119,55 @@ func (r *reportedEventsStatements) InsertReportedEvent(
|
|||
).Scan(&reportID)
|
||||
return reportID, err
|
||||
}
|
||||
|
||||
func (r *reportedEventsStatements) SelectReportedEvents(ctx context.Context, txn *sql.Tx, from, limit uint64, backwards bool, reportingUserID types.EventStateKeyNID, roomNID types.RoomNID) ([]api.QueryAdminEventReportsResponse, int64, error) {
|
||||
|
||||
var stmt *sql.Stmt
|
||||
if backwards {
|
||||
stmt = sqlutil.TxStmt(txn, r.selectReportedEventsDescStmt)
|
||||
} else {
|
||||
stmt = sqlutil.TxStmt(txn, r.selectReportedEventsAscStmt)
|
||||
}
|
||||
|
||||
var qryRoomNID *types.RoomNID
|
||||
if roomNID > 0 {
|
||||
qryRoomNID = &roomNID
|
||||
}
|
||||
var qryReportingUser *types.EventStateKeyNID
|
||||
if reportingUserID > 0 {
|
||||
qryReportingUser = &reportingUserID
|
||||
}
|
||||
|
||||
rows, err := stmt.QueryContext(ctx,
|
||||
qryRoomNID,
|
||||
qryReportingUser,
|
||||
limit,
|
||||
from,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectReportedEvents: failed to close rows")
|
||||
|
||||
var result []api.QueryAdminEventReportsResponse
|
||||
var row api.QueryAdminEventReportsResponse
|
||||
var count int64
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(
|
||||
&count,
|
||||
&row.ID,
|
||||
&row.RoomNID,
|
||||
&row.EventNID,
|
||||
&row.ReportingUserNID,
|
||||
&row.SenderNID,
|
||||
&row.Reason,
|
||||
&row.Score,
|
||||
&row.ReceivedTS,
|
||||
); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
result = append(result, row)
|
||||
}
|
||||
|
||||
return result, count, rows.Err()
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"database/sql"
|
||||
"errors"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/tidwall/gjson"
|
||||
|
@ -138,6 +139,7 @@ type ReportedEvents interface {
|
|||
reason string,
|
||||
score int64,
|
||||
) (int64, error)
|
||||
SelectReportedEvents(ctx context.Context, txn *sql.Tx, from, limit uint64, backwards bool, reportingUserID types.EventStateKeyNID, roomNID types.RoomNID) ([]api.QueryAdminEventReportsResponse, int64, error)
|
||||
}
|
||||
|
||||
type MembershipState int64
|
||||
|
|
Loading…
Reference in a new issue