0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-11-04 15:08:59 +01:00
dendrite/roomserver/internal/api.go
Till 6b47cf0f6a
Remove PerformError (#3066)
This removes `PerformError`, which was needed when we still had
polylith.

This removes quite a bunch of
```go
if err != nil {
	return err
}
if err := res.Error; err != nil {
	return err.JSONResponse()
}
```

Hopefully can be read commit by commit. [skip ci]
2023-04-28 17:46:01 +02:00

245 lines
7.3 KiB
Go

package internal
import (
"context"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
asAPI "github.com/matrix-org/dendrite/appservice/api"
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/perform"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI
type RoomserverInternalAPI struct {
*input.Inputer
*query.Queryer
*perform.Inviter
*perform.Joiner
*perform.Peeker
*perform.InboundPeeker
*perform.Unpeeker
*perform.Leaver
*perform.Publisher
*perform.Backfiller
*perform.Forgetter
*perform.Upgrader
*perform.Admin
ProcessContext *process.ProcessContext
DB storage.Database
Cfg *config.Dendrite
Cache caching.RoomServerCaches
ServerName spec.ServerName
KeyRing gomatrixserverlib.JSONVerifier
ServerACLs *acls.ServerACLs
fsAPI fsAPI.RoomserverFederationAPI
asAPI asAPI.AppServiceInternalAPI
NATSClient *nats.Conn
JetStream nats.JetStreamContext
Durable string
InputRoomEventTopic string // JetStream topic for new input room events
OutputProducer *producers.RoomEventProducer
PerspectiveServerNames []spec.ServerName
enableMetrics bool
}
func NewRoomserverAPI(
processContext *process.ProcessContext, dendriteCfg *config.Dendrite, roomserverDB storage.Database,
js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches, enableMetrics bool,
) *RoomserverInternalAPI {
var perspectiveServerNames []spec.ServerName
for _, kp := range dendriteCfg.FederationAPI.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
}
serverACLs := acls.NewServerACLs(roomserverDB)
producer := &producers.RoomEventProducer{
Topic: string(dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)),
JetStream: js,
ACLs: serverACLs,
}
a := &RoomserverInternalAPI{
ProcessContext: processContext,
DB: roomserverDB,
Cfg: dendriteCfg,
Cache: caches,
ServerName: dendriteCfg.Global.ServerName,
PerspectiveServerNames: perspectiveServerNames,
InputRoomEventTopic: dendriteCfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent),
OutputProducer: producer,
JetStream: js,
NATSClient: nc,
Durable: dendriteCfg.Global.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs,
Queryer: &query.Queryer{
DB: roomserverDB,
Cache: caches,
IsLocalServerName: dendriteCfg.Global.IsLocalServerName,
ServerACLs: serverACLs,
},
enableMetrics: enableMetrics,
// perform-er structs get initialised when we have a federation sender to use
}
return a
}
// SetFederationInputAPI passes in a federation input API reference so that we can
// avoid the chicken-and-egg problem of both the roomserver input API and the
// federation input API being interdependent.
func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederationAPI, keyRing *gomatrixserverlib.KeyRing) {
r.fsAPI = fsAPI
r.KeyRing = keyRing
identity, err := r.Cfg.Global.SigningIdentityFor(r.ServerName)
if err != nil {
logrus.Panic(err)
}
r.Inputer = &input.Inputer{
Cfg: &r.Cfg.RoomServer,
ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,
OutputProducer: r.OutputProducer,
JetStream: r.JetStream,
NATSClient: r.NATSClient,
Durable: nats.Durable(r.Durable),
ServerName: r.ServerName,
SigningIdentity: identity,
FSAPI: fsAPI,
KeyRing: keyRing,
ACLs: r.ServerACLs,
Queryer: r.Queryer,
}
r.Inviter = &perform.Inviter{
DB: r.DB,
Cfg: &r.Cfg.RoomServer,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Joiner = &perform.Joiner{
Cfg: &r.Cfg.RoomServer,
DB: r.DB,
FSAPI: r.fsAPI,
RSAPI: r,
Inputer: r.Inputer,
Queryer: r.Queryer,
}
r.Peeker = &perform.Peeker{
ServerName: r.ServerName,
Cfg: &r.Cfg.RoomServer,
DB: r.DB,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.InboundPeeker = &perform.InboundPeeker{
DB: r.DB,
Inputer: r.Inputer,
}
r.Unpeeker = &perform.Unpeeker{
ServerName: r.ServerName,
Cfg: &r.Cfg.RoomServer,
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
r.Leaver = &perform.Leaver{
Cfg: &r.Cfg.RoomServer,
DB: r.DB,
FSAPI: r.fsAPI,
RSAPI: r,
Inputer: r.Inputer,
}
r.Publisher = &perform.Publisher{
DB: r.DB,
}
r.Backfiller = &perform.Backfiller{
IsLocalServerName: r.Cfg.Global.IsLocalServerName,
DB: r.DB,
FSAPI: r.fsAPI,
KeyRing: r.KeyRing,
// Perspective servers are trusted to not lie about server keys, so we will also
// prefer these servers when backfilling (assuming they are in the room) rather
// than trying random servers
PreferServers: r.PerspectiveServerNames,
}
r.Forgetter = &perform.Forgetter{
DB: r.DB,
}
r.Upgrader = &perform.Upgrader{
Cfg: &r.Cfg.RoomServer,
URSAPI: r,
}
r.Admin = &perform.Admin{
DB: r.DB,
Cfg: &r.Cfg.RoomServer,
Inputer: r.Inputer,
Queryer: r.Queryer,
Leaver: r.Leaver,
}
if err := r.Inputer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start roomserver input API")
}
}
func (r *RoomserverInternalAPI) SetUserAPI(userAPI userapi.RoomserverUserAPI) {
r.Leaver.UserAPI = userAPI
r.Inputer.UserAPI = userAPI
}
func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceInternalAPI) {
r.asAPI = asAPI
}
func (r *RoomserverInternalAPI) PerformInvite(
ctx context.Context,
req *api.PerformInviteRequest,
) error {
outputEvents, err := r.Inviter.PerformInvite(ctx, req)
if err != nil {
return err
}
if len(outputEvents) == 0 {
return nil
}
return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents)
}
func (r *RoomserverInternalAPI) PerformLeave(
ctx context.Context,
req *api.PerformLeaveRequest,
res *api.PerformLeaveResponse,
) error {
outputEvents, err := r.Leaver.PerformLeave(ctx, req, res)
if err != nil {
sentry.CaptureException(err)
return err
}
if len(outputEvents) == 0 {
return nil
}
return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents)
}
func (r *RoomserverInternalAPI) PerformForget(
ctx context.Context,
req *api.PerformForgetRequest,
resp *api.PerformForgetResponse,
) error {
return r.Forgetter.PerformForget(ctx, req, resp)
}