mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-13 09:33:28 +01:00
Remove membership table from account DB (#1172)
* Remove membership table from account DB And make code which needs that data use the currentstate server * Unbreak tests; use a membership enum for space
This commit is contained in:
parent
ca5bbffd8d
commit
6f49758b90
28 changed files with 211 additions and 722 deletions
|
@ -18,9 +18,9 @@ import (
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi/consumers"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||||
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
|
@ -30,14 +30,12 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
|
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
|
||||||
func AddPublicRoutes(
|
func AddPublicRoutes(
|
||||||
router *mux.Router,
|
router *mux.Router,
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
consumer sarama.Consumer,
|
|
||||||
producer sarama.SyncProducer,
|
producer sarama.SyncProducer,
|
||||||
deviceDB devices.Database,
|
deviceDB devices.Database,
|
||||||
accountsDB accounts.Database,
|
accountsDB accounts.Database,
|
||||||
|
@ -45,6 +43,7 @@ func AddPublicRoutes(
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
eduInputAPI eduServerAPI.EDUServerInputAPI,
|
eduInputAPI eduServerAPI.EDUServerInputAPI,
|
||||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||||
|
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
transactionsCache *transactions.Cache,
|
transactionsCache *transactions.Cache,
|
||||||
fsAPI federationSenderAPI.FederationSenderInternalAPI,
|
fsAPI federationSenderAPI.FederationSenderInternalAPI,
|
||||||
userAPI userapi.UserInternalAPI,
|
userAPI userapi.UserInternalAPI,
|
||||||
|
@ -54,16 +53,9 @@ func AddPublicRoutes(
|
||||||
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||||
}
|
}
|
||||||
|
|
||||||
roomEventConsumer := consumers.NewOutputRoomEventConsumer(
|
|
||||||
cfg, consumer, accountsDB, rsAPI,
|
|
||||||
)
|
|
||||||
if err := roomEventConsumer.Start(); err != nil {
|
|
||||||
logrus.WithError(err).Panicf("failed to start room server consumer")
|
|
||||||
}
|
|
||||||
|
|
||||||
routing.Setup(
|
routing.Setup(
|
||||||
router, cfg, eduInputAPI, rsAPI, asAPI,
|
router, cfg, eduInputAPI, rsAPI, asAPI,
|
||||||
accountsDB, deviceDB, userAPI, federation,
|
accountsDB, deviceDB, userAPI, federation,
|
||||||
syncProducer, transactionsCache, fsAPI,
|
syncProducer, transactionsCache, fsAPI, stateAPI,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,92 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package consumers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
|
||||||
type OutputRoomEventConsumer struct {
|
|
||||||
rsAPI api.RoomserverInternalAPI
|
|
||||||
rsConsumer *internal.ContinualConsumer
|
|
||||||
db accounts.Database
|
|
||||||
serverName string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
|
||||||
func NewOutputRoomEventConsumer(
|
|
||||||
cfg *config.Dendrite,
|
|
||||||
kafkaConsumer sarama.Consumer,
|
|
||||||
store accounts.Database,
|
|
||||||
rsAPI api.RoomserverInternalAPI,
|
|
||||||
) *OutputRoomEventConsumer {
|
|
||||||
|
|
||||||
consumer := internal.ContinualConsumer{
|
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
|
||||||
Consumer: kafkaConsumer,
|
|
||||||
PartitionStore: store,
|
|
||||||
}
|
|
||||||
s := &OutputRoomEventConsumer{
|
|
||||||
rsConsumer: &consumer,
|
|
||||||
db: store,
|
|
||||||
rsAPI: rsAPI,
|
|
||||||
serverName: string(cfg.Matrix.ServerName),
|
|
||||||
}
|
|
||||||
consumer.ProcessMessage = s.onMessage
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start consuming from room servers
|
|
||||||
func (s *OutputRoomEventConsumer) Start() error {
|
|
||||||
return s.rsConsumer.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
|
||||||
// sync stream position may race and be incorrectly calculated.
|
|
||||||
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|
||||||
// Parse out the event JSON
|
|
||||||
var output api.OutputEvent
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if output.Type != api.OutputTypeNewRoomEvent {
|
|
||||||
log.WithField("type", output.Type).Debug(
|
|
||||||
"roomserver output log: ignoring unknown output type",
|
|
||||||
)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.db.UpdateMemberships(
|
|
||||||
context.TODO(),
|
|
||||||
gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()),
|
|
||||||
output.NewRoomEvent.RemovesStateEventIDs,
|
|
||||||
)
|
|
||||||
}
|
|
|
@ -18,9 +18,8 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
@ -95,20 +94,19 @@ func GetMemberships(
|
||||||
func GetJoinedRooms(
|
func GetJoinedRooms(
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
accountsDB accounts.Database,
|
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
var res currentstateAPI.QueryRoomsForUserResponse
|
||||||
|
err := stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
||||||
|
UserID: device.UserID,
|
||||||
|
WantMembership: "join",
|
||||||
|
}, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
|
util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
|
||||||
return jsonerror.InternalServerError()
|
|
||||||
}
|
|
||||||
joinedRooms, err := accountsDB.GetRoomIDsByLocalPart(req.Context(), localpart)
|
|
||||||
if err != nil {
|
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("accountsDB.GetRoomIDsByLocalPart failed")
|
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: getJoinedRoomsResponse{joinedRooms},
|
JSON: getJoinedRoomsResponse{res.RoomIDs},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -93,8 +94,8 @@ func GetAvatarURL(
|
||||||
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func SetAvatarURL(
|
func SetAvatarURL(
|
||||||
req *http.Request, accountDB accounts.Database, device *userapi.Device,
|
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
if userID != device.UserID {
|
if userID != device.UserID {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
@ -139,9 +140,13 @@ func SetAvatarURL(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart)
|
var res currentstateAPI.QueryRoomsForUserResponse
|
||||||
|
err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
||||||
|
UserID: device.UserID,
|
||||||
|
WantMembership: "join",
|
||||||
|
}, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed")
|
util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +157,7 @@ func SetAvatarURL(
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := buildMembershipEvents(
|
events, err := buildMembershipEvents(
|
||||||
req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI,
|
req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI,
|
||||||
)
|
)
|
||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
case nil:
|
case nil:
|
||||||
|
@ -207,8 +212,8 @@ func GetDisplayName(
|
||||||
// SetDisplayName implements PUT /profile/{userID}/displayname
|
// SetDisplayName implements PUT /profile/{userID}/displayname
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func SetDisplayName(
|
func SetDisplayName(
|
||||||
req *http.Request, accountDB accounts.Database, device *userapi.Device,
|
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
if userID != device.UserID {
|
if userID != device.UserID {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
@ -253,9 +258,13 @@ func SetDisplayName(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart)
|
var res currentstateAPI.QueryRoomsForUserResponse
|
||||||
|
err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
||||||
|
UserID: device.UserID,
|
||||||
|
WantMembership: "join",
|
||||||
|
}, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed")
|
util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +275,7 @@ func SetDisplayName(
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := buildMembershipEvents(
|
events, err := buildMembershipEvents(
|
||||||
req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI,
|
req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI,
|
||||||
)
|
)
|
||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
case nil:
|
case nil:
|
||||||
|
@ -335,14 +344,14 @@ func getProfile(
|
||||||
|
|
||||||
func buildMembershipEvents(
|
func buildMembershipEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
memberships []authtypes.Membership,
|
roomIDs []string,
|
||||||
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
|
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
|
||||||
evTime time.Time, rsAPI api.RoomserverInternalAPI,
|
evTime time.Time, rsAPI api.RoomserverInternalAPI,
|
||||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||||
evs := []gomatrixserverlib.HeaderedEvent{}
|
evs := []gomatrixserverlib.HeaderedEvent{}
|
||||||
|
|
||||||
for _, membership := range memberships {
|
for _, roomID := range roomIDs {
|
||||||
verReq := api.QueryRoomVersionForRoomRequest{RoomID: membership.RoomID}
|
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
|
||||||
verRes := api.QueryRoomVersionForRoomResponse{}
|
verRes := api.QueryRoomVersionForRoomResponse{}
|
||||||
if err := rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
|
if err := rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
|
||||||
return []gomatrixserverlib.HeaderedEvent{}, err
|
return []gomatrixserverlib.HeaderedEvent{}, err
|
||||||
|
@ -350,7 +359,7 @@ func buildMembershipEvents(
|
||||||
|
|
||||||
builder := gomatrixserverlib.EventBuilder{
|
builder := gomatrixserverlib.EventBuilder{
|
||||||
Sender: userID,
|
Sender: userID,
|
||||||
RoomID: membership.RoomID,
|
RoomID: roomID,
|
||||||
Type: "m.room.member",
|
Type: "m.room.member",
|
||||||
StateKey: &userID,
|
StateKey: &userID,
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
|
@ -58,6 +59,7 @@ func Setup(
|
||||||
syncProducer *producers.SyncAPIProducer,
|
syncProducer *producers.SyncAPIProducer,
|
||||||
transactionsCache *transactions.Cache,
|
transactionsCache *transactions.Cache,
|
||||||
federationSender federationSenderAPI.FederationSenderInternalAPI,
|
federationSender federationSenderAPI.FederationSenderInternalAPI,
|
||||||
|
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
) {
|
) {
|
||||||
|
|
||||||
publicAPIMux.Handle("/client/versions",
|
publicAPIMux.Handle("/client/versions",
|
||||||
|
@ -98,7 +100,7 @@ func Setup(
|
||||||
).Methods(http.MethodPost, http.MethodOptions)
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
r0mux.Handle("/joined_rooms",
|
r0mux.Handle("/joined_rooms",
|
||||||
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *api.Device) util.JSONResponse {
|
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *api.Device) util.JSONResponse {
|
||||||
return GetJoinedRooms(req, device, accountDB)
|
return GetJoinedRooms(req, device, stateAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodGet, http.MethodOptions)
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
r0mux.Handle("/rooms/{roomID}/join",
|
r0mux.Handle("/rooms/{roomID}/join",
|
||||||
|
@ -307,7 +309,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI)
|
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI, stateAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
|
|
||||||
|
@ -404,7 +406,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
|
return SetAvatarURL(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||||
|
@ -426,7 +428,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
|
return SetDisplayName(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||||
|
|
|
@ -13,15 +13,15 @@
|
||||||
package routing
|
package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/userutil"
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/eduserver/api"
|
"github.com/matrix-org/dendrite/eduserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ func SendTyping(
|
||||||
req *http.Request, device *userapi.Device, roomID string,
|
req *http.Request, device *userapi.Device, roomID string,
|
||||||
userID string, accountDB accounts.Database,
|
userID string, accountDB accounts.Database,
|
||||||
eduAPI api.EDUServerInputAPI,
|
eduAPI api.EDUServerInputAPI,
|
||||||
|
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
if device.UserID != userID {
|
if device.UserID != userID {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
@ -44,23 +45,38 @@ func SendTyping(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
localpart, err := userutil.ParseUsernameParam(userID, nil)
|
// Verify that the user is a member of this room
|
||||||
|
tuple := gomatrixserverlib.StateKeyTuple{
|
||||||
|
EventType: gomatrixserverlib.MRoomMember,
|
||||||
|
StateKey: userID,
|
||||||
|
}
|
||||||
|
var res currentstateAPI.QueryCurrentStateResponse
|
||||||
|
err := stateAPI.QueryCurrentState(req.Context(), ¤tstateAPI.QueryCurrentStateRequest{
|
||||||
|
RoomID: roomID,
|
||||||
|
StateTuples: []gomatrixserverlib.StateKeyTuple{tuple},
|
||||||
|
}, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("userutil.ParseUsernameParam failed")
|
util.GetLogger(req.Context()).WithError(err).Error("QueryCurrentState failed")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
ev := res.StateEvents[tuple]
|
||||||
// Verify that the user is a member of this room
|
if ev == nil {
|
||||||
_, err = accountDB.GetMembershipInRoomByLocalpart(req.Context(), localpart, roomID)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusForbidden,
|
Code: http.StatusForbidden,
|
||||||
JSON: jsonerror.Forbidden("User not in this room"),
|
JSON: jsonerror.Forbidden("User not in this room"),
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
}
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipInRoomByLocalPart failed")
|
membership, err := ev.Membership()
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(req.Context()).WithError(err).Error("Member event isn't valid")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
if membership != gomatrixserverlib.Join {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusForbidden,
|
||||||
|
JSON: jsonerror.Forbidden("User not in this room"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// parse the incoming http request
|
// parse the incoming http request
|
||||||
var r typingContentJSON
|
var r typingContentJSON
|
||||||
|
|
|
@ -35,10 +35,11 @@ func main() {
|
||||||
fsAPI := base.FederationSenderHTTPClient()
|
fsAPI := base.FederationSenderHTTPClient()
|
||||||
eduInputAPI := base.EDUServerClient()
|
eduInputAPI := base.EDUServerClient()
|
||||||
userAPI := base.UserAPIClient()
|
userAPI := base.UserAPIClient()
|
||||||
|
stateAPI := base.CurrentStateAPIClient()
|
||||||
|
|
||||||
clientapi.AddPublicRoutes(
|
clientapi.AddPublicRoutes(
|
||||||
base.PublicAPIMux, base.Cfg, base.KafkaConsumer, base.KafkaProducer, deviceDB, accountDB, federation,
|
base.PublicAPIMux, base.Cfg, base.KafkaProducer, deviceDB, accountDB, federation,
|
||||||
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI,
|
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI,
|
||||||
)
|
)
|
||||||
|
|
||||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))
|
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
|
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
|
||||||
|
"github.com/matrix-org/dendrite/currentstateserver"
|
||||||
"github.com/matrix-org/dendrite/eduserver"
|
"github.com/matrix-org/dendrite/eduserver"
|
||||||
"github.com/matrix-org/dendrite/federationsender"
|
"github.com/matrix-org/dendrite/federationsender"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
|
@ -166,6 +167,7 @@ func main() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||||
}
|
}
|
||||||
|
stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Base.Cfg,
|
Config: base.Base.Cfg,
|
||||||
|
@ -182,6 +184,7 @@ func main() {
|
||||||
FederationSenderAPI: fsAPI,
|
FederationSenderAPI: fsAPI,
|
||||||
RoomserverAPI: rsAPI,
|
RoomserverAPI: rsAPI,
|
||||||
ServerKeyAPI: serverKeyAPI,
|
ServerKeyAPI: serverKeyAPI,
|
||||||
|
StateAPI: stateAPI,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
|
|
||||||
PublicRoomsDB: publicRoomsDB,
|
PublicRoomsDB: publicRoomsDB,
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
|
||||||
|
"github.com/matrix-org/dendrite/currentstateserver"
|
||||||
"github.com/matrix-org/dendrite/eduserver"
|
"github.com/matrix-org/dendrite/eduserver"
|
||||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||||
"github.com/matrix-org/dendrite/federationsender"
|
"github.com/matrix-org/dendrite/federationsender"
|
||||||
|
@ -115,6 +116,8 @@ func main() {
|
||||||
|
|
||||||
embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo")
|
embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo")
|
||||||
|
|
||||||
|
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
AccountDB: accountDB,
|
AccountDB: accountDB,
|
||||||
|
@ -130,6 +133,7 @@ func main() {
|
||||||
FederationSenderAPI: fsAPI,
|
FederationSenderAPI: fsAPI,
|
||||||
RoomserverAPI: rsAPI,
|
RoomserverAPI: rsAPI,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
|
StateAPI: stateAPI,
|
||||||
//ServerKeyAPI: serverKeyAPI,
|
//ServerKeyAPI: serverKeyAPI,
|
||||||
|
|
||||||
PublicRoomsDB: publicRoomsDB,
|
PublicRoomsDB: publicRoomsDB,
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
|
"github.com/matrix-org/dendrite/currentstateserver"
|
||||||
"github.com/matrix-org/dendrite/eduserver"
|
"github.com/matrix-org/dendrite/eduserver"
|
||||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||||
"github.com/matrix-org/dendrite/federationsender"
|
"github.com/matrix-org/dendrite/federationsender"
|
||||||
|
@ -122,6 +123,8 @@ func main() {
|
||||||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
AccountDB: accountDB,
|
AccountDB: accountDB,
|
||||||
|
@ -137,6 +140,7 @@ func main() {
|
||||||
FederationSenderAPI: fsAPI,
|
FederationSenderAPI: fsAPI,
|
||||||
RoomserverAPI: rsAPI,
|
RoomserverAPI: rsAPI,
|
||||||
ServerKeyAPI: serverKeyAPI,
|
ServerKeyAPI: serverKeyAPI,
|
||||||
|
StateAPI: stateAPI,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
|
|
||||||
PublicRoomsDB: publicRoomsDB,
|
PublicRoomsDB: publicRoomsDB,
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"syscall/js"
|
"syscall/js"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
|
"github.com/matrix-org/dendrite/currentstateserver"
|
||||||
"github.com/matrix-org/dendrite/eduserver"
|
"github.com/matrix-org/dendrite/eduserver"
|
||||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||||
"github.com/matrix-org/dendrite/federationsender"
|
"github.com/matrix-org/dendrite/federationsender"
|
||||||
|
@ -218,6 +219,8 @@ func main() {
|
||||||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
AccountDB: accountDB,
|
AccountDB: accountDB,
|
||||||
|
@ -232,6 +235,7 @@ func main() {
|
||||||
EDUInternalAPI: eduInputAPI,
|
EDUInternalAPI: eduInputAPI,
|
||||||
FederationSenderAPI: fedSenderAPI,
|
FederationSenderAPI: fedSenderAPI,
|
||||||
RoomserverAPI: rsAPI,
|
RoomserverAPI: rsAPI,
|
||||||
|
StateAPI: stateAPI,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
//ServerKeyAPI: serverKeyAPI,
|
//ServerKeyAPI: serverKeyAPI,
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,21 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type CurrentStateInternalAPI interface {
|
type CurrentStateInternalAPI interface {
|
||||||
|
// QueryCurrentState retrieves the requested state events. If state events are not found, they will be missing from
|
||||||
|
// the response.
|
||||||
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
|
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
|
||||||
|
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
||||||
|
QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryRoomsForUserRequest struct {
|
||||||
|
UserID string
|
||||||
|
// The desired membership of the user. If this is the empty string then no rooms are returned.
|
||||||
|
WantMembership string
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryRoomsForUserResponse struct {
|
||||||
|
RoomIDs []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryCurrentStateRequest struct {
|
type QueryCurrentStateRequest struct {
|
||||||
|
@ -33,12 +47,12 @@ type QueryCurrentStateRequest struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryCurrentStateResponse struct {
|
type QueryCurrentStateResponse struct {
|
||||||
StateEvents map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent
|
StateEvents map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
|
// MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
|
||||||
func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) {
|
func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) {
|
||||||
se := make(map[string]gomatrixserverlib.HeaderedEvent, len(r.StateEvents))
|
se := make(map[string]*gomatrixserverlib.HeaderedEvent, len(r.StateEvents))
|
||||||
for k, v := range r.StateEvents {
|
for k, v := range r.StateEvents {
|
||||||
// use 0x1F (unit separator) as the delimiter between type/state key,
|
// use 0x1F (unit separator) as the delimiter between type/state key,
|
||||||
se[fmt.Sprintf("%s\x1F%s", k.EventType, k.StateKey)] = v
|
se[fmt.Sprintf("%s\x1F%s", k.EventType, k.StateKey)] = v
|
||||||
|
@ -47,12 +61,12 @@ func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *QueryCurrentStateResponse) UnmarshalJSON(data []byte) error {
|
func (r *QueryCurrentStateResponse) UnmarshalJSON(data []byte) error {
|
||||||
res := make(map[string]gomatrixserverlib.HeaderedEvent)
|
res := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||||
err := json.Unmarshal(data, &res)
|
err := json.Unmarshal(data, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent, len(res))
|
r.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent, len(res))
|
||||||
for k, v := range res {
|
for k, v := range res {
|
||||||
fields := strings.Split(k, "\x1F")
|
fields := strings.Split(k, "\x1F")
|
||||||
r.StateEvents[gomatrixserverlib.StateKeyTuple{
|
r.StateEvents[gomatrixserverlib.StateKeyTuple{
|
||||||
|
|
|
@ -136,8 +136,8 @@ func TestQueryCurrentState(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantRes: api.QueryCurrentStateResponse{
|
wantRes: api.QueryCurrentStateResponse{
|
||||||
StateEvents: map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent{
|
StateEvents: map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent{
|
||||||
plTuple: plEvent,
|
plTuple: &plEvent,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -27,15 +27,24 @@ type CurrentStateInternalAPI struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *CurrentStateInternalAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error {
|
func (a *CurrentStateInternalAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error {
|
||||||
res.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent)
|
res.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent)
|
||||||
for _, tuple := range req.StateTuples {
|
for _, tuple := range req.StateTuples {
|
||||||
ev, err := a.DB.GetStateEvent(ctx, req.RoomID, tuple.EventType, tuple.StateKey)
|
ev, err := a.DB.GetStateEvent(ctx, req.RoomID, tuple.EventType, tuple.StateKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if ev != nil {
|
if ev != nil {
|
||||||
res.StateEvents[tuple] = *ev
|
res.StateEvents[tuple] = ev
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *CurrentStateInternalAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
|
||||||
|
roomIDs, err := a.DB.GetRoomsByMembership(ctx, req.UserID, req.WantMembership)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
res.RoomIDs = roomIDs
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
// HTTP paths for the internal HTTP APIs
|
// HTTP paths for the internal HTTP APIs
|
||||||
const (
|
const (
|
||||||
QueryCurrentStatePath = "/currentstateserver/queryCurrentState"
|
QueryCurrentStatePath = "/currentstateserver/queryCurrentState"
|
||||||
|
QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
|
// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
|
||||||
|
@ -60,3 +61,15 @@ func (h *httpCurrentStateInternalAPI) QueryCurrentState(
|
||||||
apiURL := h.apiURL + QueryCurrentStatePath
|
apiURL := h.apiURL + QueryCurrentStatePath
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpCurrentStateInternalAPI) QueryRoomsForUser(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.QueryRoomsForUserRequest,
|
||||||
|
response *api.QueryRoomsForUserResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryRoomsForUser")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.apiURL + QueryRoomsForUserPath
|
||||||
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
|
|
@ -38,4 +38,17 @@ func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
internalAPIMux.Handle(QueryRoomsForUserPath,
|
||||||
|
httputil.MakeInternalAPI("queryRoomsForUser", func(req *http.Request) util.JSONResponse {
|
||||||
|
request := api.QueryRoomsForUserRequest{}
|
||||||
|
response := api.QueryRoomsForUserResponse{}
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
|
}
|
||||||
|
if err := intAPI.QueryRoomsForUser(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,4 +29,6 @@ type Database interface {
|
||||||
// If no event could be found, returns nil
|
// If no event could be found, returns nil
|
||||||
// If there was an issue during the retrieval, returns an error
|
// If there was an issue during the retrieval, returns an error
|
||||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||||
|
// GetRoomsByMembership returns a list of room IDs matching the provided membership and user ID (as state_key).
|
||||||
|
GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||||
|
@ -26,7 +27,9 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
const currentRoomStateSchema = `
|
var leaveEnum = strconv.Itoa(tables.MembershipToEnum["leave"])
|
||||||
|
|
||||||
|
var currentRoomStateSchema = `
|
||||||
-- Stores the current room state for every room.
|
-- Stores the current room state for every room.
|
||||||
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
||||||
-- The 'room_id' key for the state event.
|
-- The 'room_id' key for the state event.
|
||||||
|
@ -41,22 +44,22 @@ CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
||||||
state_key TEXT NOT NULL,
|
state_key TEXT NOT NULL,
|
||||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||||
headered_event_json TEXT NOT NULL,
|
headered_event_json TEXT NOT NULL,
|
||||||
-- The 'content.membership' value if this event is an m.room.member event. For other
|
-- The 'content.membership' enum value if this event is an m.room.member event.
|
||||||
-- events, this will be NULL.
|
membership SMALLINT NOT NULL DEFAULT 0,
|
||||||
membership TEXT,
|
|
||||||
-- Clobber based on 3-uple of room_id, type and state_key
|
-- Clobber based on 3-uple of room_id, type and state_key
|
||||||
CONSTRAINT currentstate_current_room_state_unique UNIQUE (room_id, type, state_key)
|
CONSTRAINT currentstate_current_room_state_unique UNIQUE (room_id, type, state_key)
|
||||||
);
|
);
|
||||||
-- for event deletion
|
-- for event deletion
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
|
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
|
||||||
-- for querying membership states of users
|
-- for querying membership states of users
|
||||||
CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership)
|
||||||
|
WHERE membership IS NOT NULL AND membership != ` + leaveEnum + `;
|
||||||
`
|
`
|
||||||
|
|
||||||
const upsertRoomStateSQL = "" +
|
const upsertRoomStateSQL = "" +
|
||||||
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" +
|
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" +
|
||||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||||
" ON CONFLICT ON CONSTRAINT currentstate_room_state_unique" +
|
" ON CONFLICT ON CONSTRAINT currentstate_current_room_state_unique" +
|
||||||
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7"
|
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7"
|
||||||
|
|
||||||
const deleteRoomStateByEventIDSQL = "" +
|
const deleteRoomStateByEventIDSQL = "" +
|
||||||
|
@ -108,10 +111,10 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
txn *sql.Tx,
|
txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
membership string,
|
membershipEnum int,
|
||||||
) ([]string, error) {
|
) ([]string, error) {
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
||||||
rows, err := stmt.QueryContext(ctx, userID, membership)
|
rows, err := stmt.QueryContext(ctx, userID, membershipEnum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -138,7 +141,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
event gomatrixserverlib.HeaderedEvent, membership *string,
|
event gomatrixserverlib.HeaderedEvent, membershipEnum int,
|
||||||
) error {
|
) error {
|
||||||
headeredJSON, err := json.Marshal(event)
|
headeredJSON, err := json.Marshal(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -155,7 +158,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
|
||||||
event.Sender(),
|
event.Sender(),
|
||||||
*event.StateKey(),
|
*event.StateKey(),
|
||||||
headeredJSON,
|
headeredJSON,
|
||||||
membership,
|
membershipEnum,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package shared
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
@ -47,19 +48,31 @@ func (d *Database) StoreStateEvents(ctx context.Context, addStateEvents []gomatr
|
||||||
// ignore non state events
|
// ignore non state events
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var membership *string
|
var membershipEnum int
|
||||||
if event.Type() == "m.room.member" {
|
if event.Type() == "m.room.member" {
|
||||||
value, err := event.Membership()
|
membership, err := event.Membership()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
membership = &value
|
enum, ok := tables.MembershipToEnum[membership]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("unknown membership: %s", membership)
|
||||||
|
}
|
||||||
|
membershipEnum = enum
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership); err != nil {
|
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membershipEnum); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error) {
|
||||||
|
enum, ok := tables.MembershipToEnum[membership]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown membership: %s", membership)
|
||||||
|
}
|
||||||
|
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, enum)
|
||||||
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
||||||
sender TEXT NOT NULL,
|
sender TEXT NOT NULL,
|
||||||
state_key TEXT NOT NULL,
|
state_key TEXT NOT NULL,
|
||||||
headered_event_json TEXT NOT NULL,
|
headered_event_json TEXT NOT NULL,
|
||||||
membership TEXT,
|
membership INTEGER NOT NULL DEFAULT 0,
|
||||||
UNIQUE (room_id, type, state_key)
|
UNIQUE (room_id, type, state_key)
|
||||||
);
|
);
|
||||||
-- for event deletion
|
-- for event deletion
|
||||||
|
@ -100,10 +100,10 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
txn *sql.Tx,
|
txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
membership string, // nolint: unparam
|
membershipEnum int,
|
||||||
) ([]string, error) {
|
) ([]string, error) {
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
||||||
rows, err := stmt.QueryContext(ctx, userID, membership)
|
rows, err := stmt.QueryContext(ctx, userID, membershipEnum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
event gomatrixserverlib.HeaderedEvent, membership *string,
|
event gomatrixserverlib.HeaderedEvent, membershipEnum int,
|
||||||
) error {
|
) error {
|
||||||
headeredJSON, err := json.Marshal(event)
|
headeredJSON, err := json.Marshal(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -147,7 +147,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
|
||||||
event.Sender(),
|
event.Sender(),
|
||||||
*event.StateKey(),
|
*event.StateKey(),
|
||||||
headeredJSON,
|
headeredJSON,
|
||||||
membership,
|
membershipEnum,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,11 +21,24 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var MembershipToEnum = map[string]int{
|
||||||
|
gomatrixserverlib.Invite: 1,
|
||||||
|
gomatrixserverlib.Join: 2,
|
||||||
|
gomatrixserverlib.Leave: 3,
|
||||||
|
gomatrixserverlib.Ban: 4,
|
||||||
|
}
|
||||||
|
var EnumToMembership = map[int]string{
|
||||||
|
1: gomatrixserverlib.Invite,
|
||||||
|
2: gomatrixserverlib.Join,
|
||||||
|
3: gomatrixserverlib.Leave,
|
||||||
|
4: gomatrixserverlib.Ban,
|
||||||
|
}
|
||||||
|
|
||||||
type CurrentRoomState interface {
|
type CurrentRoomState interface {
|
||||||
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
|
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
|
||||||
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string) error
|
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membershipEnum int) error
|
||||||
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
|
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
|
||||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||||
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
|
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membershipEnum int) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
@ -37,6 +38,7 @@ import (
|
||||||
|
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
|
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
|
||||||
|
currentstateinthttp "github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
||||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
|
eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
|
||||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
|
@ -171,6 +173,15 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI {
|
||||||
return userAPI
|
return userAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CurrentStateAPIClient returns CurrentStateInternalAPI for hitting the currentstateserver over HTTP.
|
||||||
|
func (b *BaseDendrite) CurrentStateAPIClient() currentstateAPI.CurrentStateInternalAPI {
|
||||||
|
stateAPI, err := currentstateinthttp.NewCurrentStateAPIClient(b.Cfg.CurrentStateAPIURL(), b.httpClient)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Panic("UserAPIClient failed", b.httpClient)
|
||||||
|
}
|
||||||
|
return stateAPI
|
||||||
|
}
|
||||||
|
|
||||||
// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
|
// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
|
||||||
func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
|
func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
|
||||||
e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient)
|
e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient)
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi"
|
"github.com/matrix-org/dendrite/clientapi"
|
||||||
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
"github.com/matrix-org/dendrite/federationapi"
|
"github.com/matrix-org/dendrite/federationapi"
|
||||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
|
@ -56,6 +57,7 @@ type Monolith struct {
|
||||||
RoomserverAPI roomserverAPI.RoomserverInternalAPI
|
RoomserverAPI roomserverAPI.RoomserverInternalAPI
|
||||||
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
|
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
|
||||||
UserAPI userapi.UserInternalAPI
|
UserAPI userapi.UserInternalAPI
|
||||||
|
StateAPI currentstateAPI.CurrentStateInternalAPI
|
||||||
|
|
||||||
// TODO: can we remove this? It's weird that we are required the database
|
// TODO: can we remove this? It's weird that we are required the database
|
||||||
// yet every other component can do that on its own. libp2p-demo uses a custom
|
// yet every other component can do that on its own. libp2p-demo uses a custom
|
||||||
|
@ -69,9 +71,9 @@ type Monolith struct {
|
||||||
// AddAllPublicRoutes attaches all public paths to the given router
|
// AddAllPublicRoutes attaches all public paths to the given router
|
||||||
func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
|
func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
|
||||||
clientapi.AddPublicRoutes(
|
clientapi.AddPublicRoutes(
|
||||||
publicMux, m.Config, m.KafkaConsumer, m.KafkaProducer, m.DeviceDB, m.AccountDB,
|
publicMux, m.Config, m.KafkaProducer, m.DeviceDB, m.AccountDB,
|
||||||
m.FedClient, m.RoomserverAPI,
|
m.FedClient, m.RoomserverAPI,
|
||||||
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
|
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
|
||||||
m.FederationSenderAPI, m.UserAPI,
|
m.FederationSenderAPI, m.UserAPI,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database interface {
|
type Database interface {
|
||||||
|
@ -36,10 +35,6 @@ type Database interface {
|
||||||
// account already exists, it will return nil, ErrUserExists.
|
// account already exists, it will return nil, ErrUserExists.
|
||||||
CreateAccount(ctx context.Context, localpart, plaintextPassword, appserviceID string) (*api.Account, error)
|
CreateAccount(ctx context.Context, localpart, plaintextPassword, appserviceID string) (*api.Account, error)
|
||||||
CreateGuestAccount(ctx context.Context) (*api.Account, error)
|
CreateGuestAccount(ctx context.Context) (*api.Account, error)
|
||||||
UpdateMemberships(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string) error
|
|
||||||
GetMembershipInRoomByLocalpart(ctx context.Context, localpart, roomID string) (authtypes.Membership, error)
|
|
||||||
GetRoomIDsByLocalPart(ctx context.Context, localpart string) ([]string, error)
|
|
||||||
GetMembershipsByLocalpart(ctx context.Context, localpart string) (memberships []authtypes.Membership, err error)
|
|
||||||
SaveAccountData(ctx context.Context, localpart, roomID, dataType string, content json.RawMessage) error
|
SaveAccountData(ctx context.Context, localpart, roomID, dataType string, content json.RawMessage) error
|
||||||
GetAccountData(ctx context.Context, localpart string) (global map[string]json.RawMessage, rooms map[string]map[string]json.RawMessage, err error)
|
GetAccountData(ctx context.Context, localpart string) (global map[string]json.RawMessage, rooms map[string]map[string]json.RawMessage, err error)
|
||||||
// GetAccountDataByType returns account data matching a given
|
// GetAccountDataByType returns account data matching a given
|
||||||
|
|
|
@ -1,159 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package postgres
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
|
|
||||||
"github.com/lib/pq"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
|
||||||
)
|
|
||||||
|
|
||||||
const membershipSchema = `
|
|
||||||
-- Stores data about users memberships to rooms.
|
|
||||||
CREATE TABLE IF NOT EXISTS account_memberships (
|
|
||||||
-- The Matrix user ID localpart for the member
|
|
||||||
localpart TEXT NOT NULL,
|
|
||||||
-- The room this user is a member of
|
|
||||||
room_id TEXT NOT NULL,
|
|
||||||
-- The ID of the join membership event
|
|
||||||
event_id TEXT NOT NULL,
|
|
||||||
|
|
||||||
-- A user can only be member of a room once
|
|
||||||
PRIMARY KEY (localpart, room_id)
|
|
||||||
);
|
|
||||||
|
|
||||||
-- Use index to process deletion by ID more efficiently
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS account_membership_event_id ON account_memberships(event_id);
|
|
||||||
`
|
|
||||||
|
|
||||||
const insertMembershipSQL = `
|
|
||||||
INSERT INTO account_memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
|
|
||||||
ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id
|
|
||||||
`
|
|
||||||
|
|
||||||
const selectMembershipsByLocalpartSQL = "" +
|
|
||||||
"SELECT room_id, event_id FROM account_memberships WHERE localpart = $1"
|
|
||||||
|
|
||||||
const selectMembershipInRoomByLocalpartSQL = "" +
|
|
||||||
"SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2"
|
|
||||||
|
|
||||||
const selectRoomIDsByLocalPartSQL = "" +
|
|
||||||
"SELECT room_id FROM account_memberships WHERE localpart = $1"
|
|
||||||
|
|
||||||
const deleteMembershipsByEventIDsSQL = "" +
|
|
||||||
"DELETE FROM account_memberships WHERE event_id = ANY($1)"
|
|
||||||
|
|
||||||
type membershipStatements struct {
|
|
||||||
deleteMembershipsByEventIDsStmt *sql.Stmt
|
|
||||||
insertMembershipStmt *sql.Stmt
|
|
||||||
selectMembershipInRoomByLocalpartStmt *sql.Stmt
|
|
||||||
selectMembershipsByLocalpartStmt *sql.Stmt
|
|
||||||
selectRoomIDsByLocalPartStmt *sql.Stmt
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
|
|
||||||
_, err = db.Exec(membershipSchema)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.deleteMembershipsByEventIDsStmt, err = db.Prepare(deleteMembershipsByEventIDsSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.selectRoomIDsByLocalPartStmt, err = db.Prepare(selectRoomIDsByLocalPartSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) insertMembership(
|
|
||||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
|
||||||
) (err error) {
|
|
||||||
stmt := txn.Stmt(s.insertMembershipStmt)
|
|
||||||
_, err = stmt.ExecContext(ctx, localpart, roomID, eventID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) deleteMembershipsByEventIDs(
|
|
||||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
|
||||||
) (err error) {
|
|
||||||
stmt := txn.Stmt(s.deleteMembershipsByEventIDsStmt)
|
|
||||||
_, err = stmt.ExecContext(ctx, pq.StringArray(eventIDs))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) selectMembershipInRoomByLocalpart(
|
|
||||||
ctx context.Context, localpart, roomID string,
|
|
||||||
) (authtypes.Membership, error) {
|
|
||||||
membership := authtypes.Membership{Localpart: localpart, RoomID: roomID}
|
|
||||||
stmt := s.selectMembershipInRoomByLocalpartStmt
|
|
||||||
err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID)
|
|
||||||
|
|
||||||
return membership, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) selectMembershipsByLocalpart(
|
|
||||||
ctx context.Context, localpart string,
|
|
||||||
) (memberships []authtypes.Membership, err error) {
|
|
||||||
stmt := s.selectMembershipsByLocalpartStmt
|
|
||||||
rows, err := stmt.QueryContext(ctx, localpart)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
memberships = []authtypes.Membership{}
|
|
||||||
|
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectMembershipsByLocalpart: rows.close() failed")
|
|
||||||
for rows.Next() {
|
|
||||||
var m authtypes.Membership
|
|
||||||
m.Localpart = localpart
|
|
||||||
if err = rows.Scan(&m.RoomID, &m.EventID); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
memberships = append(memberships, m)
|
|
||||||
}
|
|
||||||
return memberships, rows.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) selectRoomIDsByLocalPart(
|
|
||||||
ctx context.Context, localPart string,
|
|
||||||
) ([]string, error) {
|
|
||||||
stmt := s.selectRoomIDsByLocalPartStmt
|
|
||||||
rows, err := stmt.QueryContext(ctx, localPart)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
roomIDs := []string{}
|
|
||||||
defer rows.Close() // nolint: errcheck
|
|
||||||
for rows.Next() {
|
|
||||||
var roomID string
|
|
||||||
if err = rows.Scan(&roomID); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
roomIDs = append(roomIDs, roomID)
|
|
||||||
}
|
|
||||||
return roomIDs, rows.Err()
|
|
||||||
}
|
|
|
@ -37,7 +37,6 @@ type Database struct {
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
accounts accountsStatements
|
accounts accountsStatements
|
||||||
profiles profilesStatements
|
profiles profilesStatements
|
||||||
memberships membershipStatements
|
|
||||||
accountDatas accountDataStatements
|
accountDatas accountDataStatements
|
||||||
threepids threepidStatements
|
threepids threepidStatements
|
||||||
serverName gomatrixserverlib.ServerName
|
serverName gomatrixserverlib.ServerName
|
||||||
|
@ -62,10 +61,6 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties, serve
|
||||||
if err = p.prepare(db); err != nil {
|
if err = p.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
m := membershipStatements{}
|
|
||||||
if err = m.prepare(db); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ac := accountDataStatements{}
|
ac := accountDataStatements{}
|
||||||
if err = ac.prepare(db); err != nil {
|
if err = ac.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -74,7 +69,7 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties, serve
|
||||||
if err = t.prepare(db); err != nil {
|
if err = t.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Database{db, partitions, a, p, m, ac, t, serverName}, nil
|
return &Database{db, partitions, a, p, ac, t, serverName}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAccountByPassword returns the account associated with the given localpart and password.
|
// GetAccountByPassword returns the account associated with the given localpart and password.
|
||||||
|
@ -179,112 +174,6 @@ func (d *Database) createAccount(
|
||||||
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID)
|
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveMembership saves the user matching a given localpart as a member of a given
|
|
||||||
// room. It also stores the ID of the membership event.
|
|
||||||
// If a membership already exists between the user and the room, or if the
|
|
||||||
// insert fails, returns the SQL error
|
|
||||||
func (d *Database) saveMembership(
|
|
||||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
|
||||||
) error {
|
|
||||||
return d.memberships.insertMembership(ctx, txn, localpart, roomID, eventID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// removeMembershipsByEventIDs removes the memberships corresponding to the
|
|
||||||
// `join` membership events IDs in the eventIDs slice.
|
|
||||||
// If the removal fails, or if there is no membership to remove, returns an error
|
|
||||||
func (d *Database) removeMembershipsByEventIDs(
|
|
||||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
|
||||||
) error {
|
|
||||||
return d.memberships.deleteMembershipsByEventIDs(ctx, txn, eventIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateMemberships adds the "join" membership events included in a given state
|
|
||||||
// events array, and removes those which ID is included in a given array of events
|
|
||||||
// IDs. All of the process is run in a transaction, which commits only once/if every
|
|
||||||
// insertion and deletion has been successfully processed.
|
|
||||||
// Returns a SQL error if there was an issue with any part of the process
|
|
||||||
func (d *Database) UpdateMemberships(
|
|
||||||
ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string,
|
|
||||||
) error {
|
|
||||||
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
|
||||||
if err := d.removeMembershipsByEventIDs(ctx, txn, idsToRemove); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, event := range eventsToAdd {
|
|
||||||
if err := d.newMembership(ctx, txn, event); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMembershipInRoomByLocalpart returns the membership for an user
|
|
||||||
// matching the given localpart if he is a member of the room matching roomID,
|
|
||||||
// if not sql.ErrNoRows is returned.
|
|
||||||
// If there was an issue during the retrieval, returns the SQL error
|
|
||||||
func (d *Database) GetMembershipInRoomByLocalpart(
|
|
||||||
ctx context.Context, localpart, roomID string,
|
|
||||||
) (authtypes.Membership, error) {
|
|
||||||
return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRoomIDsByLocalPart returns an array containing the room ids of all
|
|
||||||
// the rooms a user matching a given localpart is a member of
|
|
||||||
// If no membership match the given localpart, returns an empty array
|
|
||||||
// If there was an issue during the retrieval, returns the SQL error
|
|
||||||
func (d *Database) GetRoomIDsByLocalPart(
|
|
||||||
ctx context.Context, localpart string,
|
|
||||||
) ([]string, error) {
|
|
||||||
return d.memberships.selectRoomIDsByLocalPart(ctx, localpart)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMembershipsByLocalpart returns an array containing the memberships for all
|
|
||||||
// the rooms a user matching a given localpart is a member of
|
|
||||||
// If no membership match the given localpart, returns an empty array
|
|
||||||
// If there was an issue during the retrieval, returns the SQL error
|
|
||||||
func (d *Database) GetMembershipsByLocalpart(
|
|
||||||
ctx context.Context, localpart string,
|
|
||||||
) (memberships []authtypes.Membership, err error) {
|
|
||||||
return d.memberships.selectMembershipsByLocalpart(ctx, localpart)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newMembership saves a new membership in the database.
|
|
||||||
// If the event isn't a valid m.room.member event with type `join`, does nothing.
|
|
||||||
// If an error occurred, returns the SQL error
|
|
||||||
func (d *Database) newMembership(
|
|
||||||
ctx context.Context, txn *sql.Tx, ev gomatrixserverlib.Event,
|
|
||||||
) error {
|
|
||||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
|
||||||
localpart, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// We only want state events from local users
|
|
||||||
if string(serverName) != string(d.serverName) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
eventID := ev.EventID()
|
|
||||||
roomID := ev.RoomID()
|
|
||||||
membership, err := ev.Membership()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only "join" membership events can be considered as new memberships
|
|
||||||
if membership == gomatrixserverlib.Join {
|
|
||||||
if err := d.saveMembership(ctx, txn, localpart, roomID, eventID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveAccountData saves new account data for a given user and a given room.
|
// SaveAccountData saves new account data for a given user and a given room.
|
||||||
// If the account data is not specific to a room, the room ID should be an empty string
|
// If the account data is not specific to a room, the room ID should be an empty string
|
||||||
// If an account data already exists for a given set (user, room, data type), it will
|
// If an account data already exists for a given set (user, room, data type), it will
|
||||||
|
|
|
@ -1,159 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package sqlite3
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
const membershipSchema = `
|
|
||||||
-- Stores data about users memberships to rooms.
|
|
||||||
CREATE TABLE IF NOT EXISTS account_memberships (
|
|
||||||
-- The Matrix user ID localpart for the member
|
|
||||||
localpart TEXT NOT NULL,
|
|
||||||
-- The room this user is a member of
|
|
||||||
room_id TEXT NOT NULL,
|
|
||||||
-- The ID of the join membership event
|
|
||||||
event_id TEXT NOT NULL,
|
|
||||||
|
|
||||||
-- A user can only be member of a room once
|
|
||||||
PRIMARY KEY (localpart, room_id),
|
|
||||||
|
|
||||||
UNIQUE (event_id)
|
|
||||||
);
|
|
||||||
`
|
|
||||||
|
|
||||||
const insertMembershipSQL = `
|
|
||||||
INSERT INTO account_memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
|
|
||||||
ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id
|
|
||||||
`
|
|
||||||
|
|
||||||
const selectMembershipsByLocalpartSQL = "" +
|
|
||||||
"SELECT room_id, event_id FROM account_memberships WHERE localpart = $1"
|
|
||||||
|
|
||||||
const selectMembershipInRoomByLocalpartSQL = "" +
|
|
||||||
"SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2"
|
|
||||||
|
|
||||||
const selectRoomIDsByLocalPartSQL = "" +
|
|
||||||
"SELECT room_id FROM account_memberships WHERE localpart = $1"
|
|
||||||
|
|
||||||
const deleteMembershipsByEventIDsSQL = "" +
|
|
||||||
"DELETE FROM account_memberships WHERE event_id IN ($1)"
|
|
||||||
|
|
||||||
type membershipStatements struct {
|
|
||||||
insertMembershipStmt *sql.Stmt
|
|
||||||
selectMembershipInRoomByLocalpartStmt *sql.Stmt
|
|
||||||
selectMembershipsByLocalpartStmt *sql.Stmt
|
|
||||||
selectRoomIDsByLocalPartStmt *sql.Stmt
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
|
|
||||||
_, err = db.Exec(membershipSchema)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.selectRoomIDsByLocalPartStmt, err = db.Prepare(selectRoomIDsByLocalPartSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) insertMembership(
|
|
||||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
|
||||||
) (err error) {
|
|
||||||
stmt := txn.Stmt(s.insertMembershipStmt)
|
|
||||||
_, err = stmt.ExecContext(ctx, localpart, roomID, eventID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) deleteMembershipsByEventIDs(
|
|
||||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
|
||||||
) (err error) {
|
|
||||||
sqlStr := strings.Replace(deleteMembershipsByEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(eventIDs)), 1)
|
|
||||||
iEventIDs := make([]interface{}, len(eventIDs))
|
|
||||||
for i, e := range eventIDs {
|
|
||||||
iEventIDs[i] = e
|
|
||||||
}
|
|
||||||
_, err = txn.ExecContext(ctx, sqlStr, iEventIDs...)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) selectMembershipInRoomByLocalpart(
|
|
||||||
ctx context.Context, localpart, roomID string,
|
|
||||||
) (authtypes.Membership, error) {
|
|
||||||
membership := authtypes.Membership{Localpart: localpart, RoomID: roomID}
|
|
||||||
stmt := s.selectMembershipInRoomByLocalpartStmt
|
|
||||||
err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID)
|
|
||||||
|
|
||||||
return membership, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *membershipStatements) selectMembershipsByLocalpart(
|
|
||||||
ctx context.Context, localpart string,
|
|
||||||
) (memberships []authtypes.Membership, err error) {
|
|
||||||
stmt := s.selectMembershipsByLocalpartStmt
|
|
||||||
rows, err := stmt.QueryContext(ctx, localpart)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
memberships = []authtypes.Membership{}
|
|
||||||
|
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectMembershipsByLocalpart: rows.close() failed")
|
|
||||||
for rows.Next() {
|
|
||||||
var m authtypes.Membership
|
|
||||||
m.Localpart = localpart
|
|
||||||
if err := rows.Scan(&m.RoomID, &m.EventID); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
memberships = append(memberships, m)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
func (s *membershipStatements) selectRoomIDsByLocalPart(
|
|
||||||
ctx context.Context, localPart string,
|
|
||||||
) ([]string, error) {
|
|
||||||
stmt := s.selectRoomIDsByLocalPartStmt
|
|
||||||
rows, err := stmt.QueryContext(ctx, localPart)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
roomIDs := []string{}
|
|
||||||
defer rows.Close() // nolint: errcheck
|
|
||||||
for rows.Next() {
|
|
||||||
var roomID string
|
|
||||||
if err = rows.Scan(&roomID); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
roomIDs = append(roomIDs, roomID)
|
|
||||||
}
|
|
||||||
return roomIDs, rows.Err()
|
|
||||||
}
|
|
|
@ -36,7 +36,6 @@ type Database struct {
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
accounts accountsStatements
|
accounts accountsStatements
|
||||||
profiles profilesStatements
|
profiles profilesStatements
|
||||||
memberships membershipStatements
|
|
||||||
accountDatas accountDataStatements
|
accountDatas accountDataStatements
|
||||||
threepids threepidStatements
|
threepids threepidStatements
|
||||||
serverName gomatrixserverlib.ServerName
|
serverName gomatrixserverlib.ServerName
|
||||||
|
@ -67,10 +66,6 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName)
|
||||||
if err = p.prepare(db); err != nil {
|
if err = p.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
m := membershipStatements{}
|
|
||||||
if err = m.prepare(db); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ac := accountDataStatements{}
|
ac := accountDataStatements{}
|
||||||
if err = ac.prepare(db); err != nil {
|
if err = ac.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -79,7 +74,7 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName)
|
||||||
if err = t.prepare(db); err != nil {
|
if err = t.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Database{db, partitions, a, p, m, ac, t, serverName, sync.Mutex{}}, nil
|
return &Database{db, partitions, a, p, ac, t, serverName, sync.Mutex{}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAccountByPassword returns the account associated with the given localpart and password.
|
// GetAccountByPassword returns the account associated with the given localpart and password.
|
||||||
|
@ -193,112 +188,6 @@ func (d *Database) createAccount(
|
||||||
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID)
|
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveMembership saves the user matching a given localpart as a member of a given
|
|
||||||
// room. It also stores the ID of the membership event.
|
|
||||||
// If a membership already exists between the user and the room, or if the
|
|
||||||
// insert fails, returns the SQL error
|
|
||||||
func (d *Database) saveMembership(
|
|
||||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
|
||||||
) error {
|
|
||||||
return d.memberships.insertMembership(ctx, txn, localpart, roomID, eventID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// removeMembershipsByEventIDs removes the memberships corresponding to the
|
|
||||||
// `join` membership events IDs in the eventIDs slice.
|
|
||||||
// If the removal fails, or if there is no membership to remove, returns an error
|
|
||||||
func (d *Database) removeMembershipsByEventIDs(
|
|
||||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
|
||||||
) error {
|
|
||||||
return d.memberships.deleteMembershipsByEventIDs(ctx, txn, eventIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateMemberships adds the "join" membership events included in a given state
|
|
||||||
// events array, and removes those which ID is included in a given array of events
|
|
||||||
// IDs. All of the process is run in a transaction, which commits only once/if every
|
|
||||||
// insertion and deletion has been successfully processed.
|
|
||||||
// Returns a SQL error if there was an issue with any part of the process
|
|
||||||
func (d *Database) UpdateMemberships(
|
|
||||||
ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string,
|
|
||||||
) error {
|
|
||||||
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
|
||||||
if err := d.removeMembershipsByEventIDs(ctx, txn, idsToRemove); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, event := range eventsToAdd {
|
|
||||||
if err := d.newMembership(ctx, txn, event); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMembershipInRoomByLocalpart returns the membership for an user
|
|
||||||
// matching the given localpart if he is a member of the room matching roomID,
|
|
||||||
// if not sql.ErrNoRows is returned.
|
|
||||||
// If there was an issue during the retrieval, returns the SQL error
|
|
||||||
func (d *Database) GetMembershipInRoomByLocalpart(
|
|
||||||
ctx context.Context, localpart, roomID string,
|
|
||||||
) (authtypes.Membership, error) {
|
|
||||||
return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMembershipsByLocalpart returns an array containing the memberships for all
|
|
||||||
// the rooms a user matching a given localpart is a member of
|
|
||||||
// If no membership match the given localpart, returns an empty array
|
|
||||||
// If there was an issue during the retrieval, returns the SQL error
|
|
||||||
func (d *Database) GetMembershipsByLocalpart(
|
|
||||||
ctx context.Context, localpart string,
|
|
||||||
) (memberships []authtypes.Membership, err error) {
|
|
||||||
return d.memberships.selectMembershipsByLocalpart(ctx, localpart)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRoomIDsByLocalPart returns an array containing the room ids of all
|
|
||||||
// the rooms a user matching a given localpart is a member of
|
|
||||||
// If no membership match the given localpart, returns an empty array
|
|
||||||
// If there was an issue during the retrieval, returns the SQL error
|
|
||||||
func (d *Database) GetRoomIDsByLocalPart(
|
|
||||||
ctx context.Context, localpart string,
|
|
||||||
) ([]string, error) {
|
|
||||||
return d.memberships.selectRoomIDsByLocalPart(ctx, localpart)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newMembership saves a new membership in the database.
|
|
||||||
// If the event isn't a valid m.room.member event with type `join`, does nothing.
|
|
||||||
// If an error occurred, returns the SQL error
|
|
||||||
func (d *Database) newMembership(
|
|
||||||
ctx context.Context, txn *sql.Tx, ev gomatrixserverlib.Event,
|
|
||||||
) error {
|
|
||||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
|
||||||
localpart, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// We only want state events from local users
|
|
||||||
if string(serverName) != string(d.serverName) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
eventID := ev.EventID()
|
|
||||||
roomID := ev.RoomID()
|
|
||||||
membership, err := ev.Membership()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only "join" membership events can be considered as new memberships
|
|
||||||
if membership == gomatrixserverlib.Join {
|
|
||||||
if err := d.saveMembership(ctx, txn, localpart, roomID, eventID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveAccountData saves new account data for a given user and a given room.
|
// SaveAccountData saves new account data for a given user and a given room.
|
||||||
// If the account data is not specific to a room, the room ID should be an empty string
|
// If the account data is not specific to a room, the room ID should be an empty string
|
||||||
// If an account data already exists for a given set (user, room, data type), it will
|
// If an account data already exists for a given set (user, room, data type), it will
|
||||||
|
|
Loading…
Reference in a new issue