mirror of
https://github.com/matrix-org/dendrite
synced 2025-01-07 14:33:44 +01:00
Merge branch 'master' into nats
This commit is contained in:
commit
8d4e5b447a
25 changed files with 95 additions and 117 deletions
1
.github/CODEOWNERS
vendored
Normal file
1
.github/CODEOWNERS
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
* @matrix-org/dendrite-core
|
|
@ -309,7 +309,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
rsAPI := roomserver.NewInternalAPI(base)
|
rsAPI := roomserver.NewInternalAPI(base)
|
||||||
|
|
||||||
fsAPI := federationapi.NewInternalAPI(
|
fsAPI := federationapi.NewInternalAPI(
|
||||||
base, federation, rsAPI, base.Caches, true,
|
base, federation, rsAPI, base.Caches, keyRing, true,
|
||||||
)
|
)
|
||||||
|
|
||||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||||
|
@ -324,8 +324,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
|
|
||||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||||
rsAPI.SetFederationAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
||||||
rsAPI.SetKeyring(keyRing)
|
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
|
|
|
@ -113,7 +113,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
rsAPI := roomserver.NewInternalAPI(base)
|
rsAPI := roomserver.NewInternalAPI(base)
|
||||||
|
|
||||||
fsAPI := federationapi.NewInternalAPI(
|
fsAPI := federationapi.NewInternalAPI(
|
||||||
base, federation, rsAPI, base.Caches, true,
|
base, federation, rsAPI, base.Caches, keyRing, true,
|
||||||
)
|
)
|
||||||
|
|
||||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
|
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
|
||||||
|
@ -129,8 +129,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
|
|
||||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||||
rsAPI.SetFederationAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
||||||
rsAPI.SetKeyring(keyRing)
|
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
|
|
|
@ -157,10 +157,10 @@ func main() {
|
||||||
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
fsAPI := federationapi.NewInternalAPI(
|
fsAPI := federationapi.NewInternalAPI(
|
||||||
&base.Base, federation, rsAPI, base.Base.Caches, true,
|
&base.Base, federation, rsAPI, base.Base.Caches, nil, true,
|
||||||
)
|
)
|
||||||
keyRing := fsAPI.KeyRing()
|
keyRing := fsAPI.KeyRing()
|
||||||
rsAPI.SetFederationAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
||||||
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
|
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
|
||||||
err = provider.Start()
|
err = provider.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -184,7 +184,7 @@ func main() {
|
||||||
rsComponent := roomserver.NewInternalAPI(base)
|
rsComponent := roomserver.NewInternalAPI(base)
|
||||||
rsAPI := rsComponent
|
rsAPI := rsComponent
|
||||||
fsAPI := federationapi.NewInternalAPI(
|
fsAPI := federationapi.NewInternalAPI(
|
||||||
base, federation, rsAPI, base.Caches, true,
|
base, federation, rsAPI, base.Caches, keyRing, true,
|
||||||
)
|
)
|
||||||
|
|
||||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||||
|
@ -197,8 +197,7 @@ func main() {
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||||
|
|
||||||
rsComponent.SetFederationAPI(fsAPI)
|
rsComponent.SetFederationAPI(fsAPI, keyRing)
|
||||||
rsComponent.SetKeyring(keyRing)
|
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
|
|
|
@ -117,11 +117,10 @@ func main() {
|
||||||
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
fsAPI := federationapi.NewInternalAPI(
|
fsAPI := federationapi.NewInternalAPI(
|
||||||
base, federation, rsAPI, base.Caches, true,
|
base, federation, rsAPI, base.Caches, keyRing, true,
|
||||||
)
|
)
|
||||||
|
|
||||||
rsComponent.SetFederationAPI(fsAPI)
|
rsComponent.SetFederationAPI(fsAPI, keyRing)
|
||||||
rsComponent.SetKeyring(keyRing)
|
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
|
|
|
@ -91,7 +91,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fsAPI := federationapi.NewInternalAPI(
|
fsAPI := federationapi.NewInternalAPI(
|
||||||
base, federation, rsAPI, base.Caches, false,
|
base, federation, rsAPI, base.Caches, nil, false,
|
||||||
)
|
)
|
||||||
if base.UseHTTPAPIs {
|
if base.UseHTTPAPIs {
|
||||||
federationapi.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
federationapi.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
||||||
|
@ -101,7 +101,7 @@ func main() {
|
||||||
|
|
||||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||||
rsImpl.SetFederationAPI(fsAPI)
|
rsImpl.SetFederationAPI(fsAPI, keyRing)
|
||||||
|
|
||||||
keyImpl := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
keyImpl := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||||
keyAPI := keyImpl
|
keyAPI := keyImpl
|
||||||
|
@ -134,7 +134,7 @@ func main() {
|
||||||
// The underlying roomserver implementation needs to be able to call the fedsender.
|
// The underlying roomserver implementation needs to be able to call the fedsender.
|
||||||
// This is different to rsAPI which can be the http client which doesn't need this
|
// This is different to rsAPI which can be the http client which doesn't need this
|
||||||
// dependency. Other components also need updating after their dependencies are up.
|
// dependency. Other components also need updating after their dependencies are up.
|
||||||
rsImpl.SetFederationAPI(fsAPI)
|
rsImpl.SetFederationAPI(fsAPI, keyRing)
|
||||||
rsImpl.SetAppserviceAPI(asAPI)
|
rsImpl.SetAppserviceAPI(asAPI)
|
||||||
keyImpl.SetUserAPI(userAPI)
|
keyImpl.SetUserAPI(userAPI)
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ func FederationAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
|
||||||
&base.Cfg.MSCs, nil,
|
&base.Cfg.MSCs, nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
intAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
|
intAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, nil, true)
|
||||||
federationapi.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
federationapi.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
||||||
|
|
||||||
base.SetupAndServeHTTP(
|
base.SetupAndServeHTTP(
|
||||||
|
|
|
@ -24,7 +24,7 @@ func RoomServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
|
||||||
asAPI := base.AppserviceHTTPClient()
|
asAPI := base.AppserviceHTTPClient()
|
||||||
fsAPI := base.FederationAPIHTTPClient()
|
fsAPI := base.FederationAPIHTTPClient()
|
||||||
rsAPI := roomserver.NewInternalAPI(base)
|
rsAPI := roomserver.NewInternalAPI(base)
|
||||||
rsAPI.SetFederationAPI(fsAPI)
|
rsAPI.SetFederationAPI(fsAPI, fsAPI.KeyRing())
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
|
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
|
||||||
|
|
||||||
|
|
|
@ -197,9 +197,8 @@ func startup() {
|
||||||
base, userAPI, rsAPI,
|
base, userAPI, rsAPI,
|
||||||
)
|
)
|
||||||
rsAPI.SetAppserviceAPI(asQuery)
|
rsAPI.SetAppserviceAPI(asQuery)
|
||||||
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
|
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, keyRing, true)
|
||||||
rsAPI.SetFederationAPI(fedSenderAPI)
|
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
|
||||||
rsAPI.SetKeyring(keyRing)
|
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
Config: base.Cfg,
|
Config: base.Cfg,
|
||||||
|
|
|
@ -209,9 +209,8 @@ func main() {
|
||||||
base, userAPI, rsAPI,
|
base, userAPI, rsAPI,
|
||||||
)
|
)
|
||||||
rsAPI.SetAppserviceAPI(asQuery)
|
rsAPI.SetAppserviceAPI(asQuery)
|
||||||
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
|
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, keyRing, true)
|
||||||
rsAPI.SetFederationAPI(fedSenderAPI)
|
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
|
||||||
rsAPI.SetKeyring(keyRing)
|
|
||||||
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
@ -73,6 +73,7 @@ func NewInternalAPI(
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
caches *caching.Caches,
|
caches *caching.Caches,
|
||||||
|
keyRing *gomatrixserverlib.KeyRing,
|
||||||
resetBlacklist bool,
|
resetBlacklist bool,
|
||||||
) api.FederationInternalAPI {
|
) api.FederationInternalAPI {
|
||||||
cfg := &base.Cfg.FederationAPI
|
cfg := &base.Cfg.FederationAPI
|
||||||
|
@ -125,5 +126,5 @@ func NewInternalAPI(
|
||||||
logrus.WithError(err).Panic("failed to start key server consumer")
|
logrus.WithError(err).Panic("failed to start key server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues)
|
return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues, keyRing)
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ func TestMain(m *testing.M) {
|
||||||
|
|
||||||
// Finally, build the server key APIs.
|
// Finally, build the server key APIs.
|
||||||
sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics)
|
sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics)
|
||||||
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, true)
|
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we have built our server key APIs, start the
|
// Now that we have built our server key APIs, start the
|
||||||
|
|
|
@ -39,13 +39,15 @@ func NewFederationInternalAPI(
|
||||||
statistics *statistics.Statistics,
|
statistics *statistics.Statistics,
|
||||||
caches *caching.Caches,
|
caches *caching.Caches,
|
||||||
queues *queue.OutgoingQueues,
|
queues *queue.OutgoingQueues,
|
||||||
|
keyRing *gomatrixserverlib.KeyRing,
|
||||||
) *FederationInternalAPI {
|
) *FederationInternalAPI {
|
||||||
serverKeyDB, err := cache.NewKeyDatabase(db, caches)
|
serverKeyDB, err := cache.NewKeyDatabase(db, caches)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database")
|
logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database")
|
||||||
}
|
}
|
||||||
|
|
||||||
keyRing := &gomatrixserverlib.KeyRing{
|
if keyRing == nil {
|
||||||
|
keyRing = &gomatrixserverlib.KeyRing{
|
||||||
KeyFetchers: []gomatrixserverlib.KeyFetcher{},
|
KeyFetchers: []gomatrixserverlib.KeyFetcher{},
|
||||||
KeyDatabase: serverKeyDB,
|
KeyDatabase: serverKeyDB,
|
||||||
}
|
}
|
||||||
|
@ -92,6 +94,7 @@ func NewFederationInternalAPI(
|
||||||
"num_public_keys": len(ps.Keys),
|
"num_public_keys": len(ps.Keys),
|
||||||
}).Info("Enabled perspective key fetcher")
|
}).Info("Enabled perspective key fetcher")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &FederationInternalAPI{
|
return &FederationInternalAPI{
|
||||||
db: db,
|
db: db,
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -40,7 +40,7 @@ require (
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2
|
||||||
github.com/matrix-org/pinecone v0.0.0-20211129130654-b0bf9ad6f5c7
|
github.com/matrix-org/pinecone v0.0.0-20211216094739-095c5ea64d02
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.8
|
github.com/mattn/go-sqlite3 v1.14.8
|
||||||
github.com/morikuni/aec v1.0.0 // indirect
|
github.com/morikuni/aec v1.0.0 // indirect
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -990,8 +990,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2 h1:RFsBN3509Ql6NJ7TDVkcKoN3bb/tmqUqzur5c0AwIHQ=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2 h1:RFsBN3509Ql6NJ7TDVkcKoN3bb/tmqUqzur5c0AwIHQ=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20211129130654-b0bf9ad6f5c7 h1:HC1TdU79ly+sxtyOuNDIpG2YBHQyGmvLux4VPQbT72I=
|
github.com/matrix-org/pinecone v0.0.0-20211216094739-095c5ea64d02 h1:tLn95Nqq3KPOZAjogGZTKMEkn4mMIzKu09biRTz/Ack=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20211129130654-b0bf9ad6f5c7/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
|
github.com/matrix-org/pinecone v0.0.0-20211216094739-095c5ea64d02/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
|
||||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
|
|
|
@ -12,9 +12,8 @@ import (
|
||||||
type RoomserverInternalAPI interface {
|
type RoomserverInternalAPI interface {
|
||||||
// needed to avoid chicken and egg scenario when setting up the
|
// needed to avoid chicken and egg scenario when setting up the
|
||||||
// interdependencies between the roomserver and other input APIs
|
// interdependencies between the roomserver and other input APIs
|
||||||
SetFederationAPI(fsAPI fsAPI.FederationInternalAPI)
|
SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing)
|
||||||
SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI)
|
SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI)
|
||||||
SetKeyring(keyRing *gomatrixserverlib.KeyRing)
|
|
||||||
|
|
||||||
InputRoomEvents(
|
InputRoomEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|
|
@ -17,12 +17,8 @@ type RoomserverInternalAPITrace struct {
|
||||||
Impl RoomserverInternalAPI
|
Impl RoomserverInternalAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *RoomserverInternalAPITrace) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
|
func (t *RoomserverInternalAPITrace) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) {
|
||||||
t.Impl.SetKeyring(keyRing)
|
t.Impl.SetFederationAPI(fsAPI, keyRing)
|
||||||
}
|
|
||||||
|
|
||||||
func (t *RoomserverInternalAPITrace) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) {
|
|
||||||
t.Impl.SetFederationAPI(fsAPI)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *RoomserverInternalAPITrace) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {
|
func (t *RoomserverInternalAPITrace) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {
|
||||||
|
|
|
@ -78,18 +78,12 @@ func NewRoomserverAPI(
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetKeyring sets the keyring to a given keyring. This is only useful for the P2P
|
|
||||||
// demos and must be called after SetFederationSenderInputAPI.
|
|
||||||
func (r *RoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
|
|
||||||
r.KeyRing = keyRing
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetFederationInputAPI passes in a federation input API reference so that we can
|
// SetFederationInputAPI passes in a federation input API reference so that we can
|
||||||
// avoid the chicken-and-egg problem of both the roomserver input API and the
|
// avoid the chicken-and-egg problem of both the roomserver input API and the
|
||||||
// federation input API being interdependent.
|
// federation input API being interdependent.
|
||||||
func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) {
|
func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) {
|
||||||
r.fsAPI = fsAPI
|
r.fsAPI = fsAPI
|
||||||
r.SetKeyring(fsAPI.KeyRing())
|
r.KeyRing = keyRing
|
||||||
|
|
||||||
r.Inviter = &perform.Inviter{
|
r.Inviter = &perform.Inviter{
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
|
|
|
@ -122,7 +122,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the event.
|
// Store the event.
|
||||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
|
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -546,6 +546,7 @@ func joinEventsFromHistoryVisibility(
|
||||||
|
|
||||||
func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
|
func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
|
||||||
var roomNID types.RoomNID
|
var roomNID types.RoomNID
|
||||||
|
var eventNID types.EventNID
|
||||||
backfilledEventMap := make(map[string]types.Event)
|
backfilledEventMap := make(map[string]types.Event)
|
||||||
for j, ev := range events {
|
for j, ev := range events {
|
||||||
nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
|
nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
|
||||||
|
@ -559,10 +560,9 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
|
||||||
authNids[i] = nid
|
authNids[i] = nid
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
var stateAtEvent types.StateAtEvent
|
|
||||||
var redactedEventID string
|
var redactedEventID string
|
||||||
var redactionEvent *gomatrixserverlib.Event
|
var redactionEvent *gomatrixserverlib.Event
|
||||||
roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false)
|
eventNID, roomNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
|
||||||
continue
|
continue
|
||||||
|
@ -581,7 +581,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
|
||||||
events[j] = ev
|
events[j] = ev
|
||||||
}
|
}
|
||||||
backfilledEventMap[ev.EventID()] = types.Event{
|
backfilledEventMap[ev.EventID()] = types.Event{
|
||||||
EventNID: stateAtEvent.StateEntry.EventNID,
|
EventNID: eventNID,
|
||||||
Event: ev.Unwrap(),
|
Event: ev.Unwrap(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,12 +83,8 @@ func NewRoomserverClient(
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetKeyring no-ops in HTTP client mode as there is no chicken/egg scenario
|
|
||||||
func (h *httpRoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetFederationInputAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
// SetFederationInputAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
||||||
func (h *httpRoomserverInternalAPI) SetFederationAPI(fsAPI fsInputAPI.FederationInternalAPI) {
|
func (h *httpRoomserverInternalAPI) SetFederationAPI(fsAPI fsInputAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAppserviceAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
// SetAppserviceAPI no-ops in HTTP client mode as there is no chicken/egg scenario
|
||||||
|
|
|
@ -70,7 +70,7 @@ type Database interface {
|
||||||
StoreEvent(
|
StoreEvent(
|
||||||
ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID,
|
ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID,
|
||||||
isRejected bool,
|
isRejected bool,
|
||||||
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||||
// Look up the state entries for a list of string event IDs
|
// Look up the state entries for a list of string event IDs
|
||||||
// Returns an error if the there is an error talking to the database
|
// Returns an error if the there is an error talking to the database
|
||||||
// Returns a types.MissingEventError if the event IDs aren't in the database.
|
// Returns a types.MissingEventError if the event IDs aren't in the database.
|
||||||
|
|
|
@ -461,7 +461,7 @@ func (d *Database) GetLatestEventsForUpdate(
|
||||||
func (d *Database) StoreEvent(
|
func (d *Database) StoreEvent(
|
||||||
ctx context.Context, event *gomatrixserverlib.Event,
|
ctx context.Context, event *gomatrixserverlib.Event,
|
||||||
authEventNIDs []types.EventNID, isRejected bool,
|
authEventNIDs []types.EventNID, isRejected bool,
|
||||||
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||||
var (
|
var (
|
||||||
roomNID types.RoomNID
|
roomNID types.RoomNID
|
||||||
eventTypeNID types.EventTypeNID
|
eventTypeNID types.EventTypeNID
|
||||||
|
@ -538,7 +538,7 @@ func (d *Database) StoreEvent(
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We should attempt to update the previous events table with any
|
// We should attempt to update the previous events table with any
|
||||||
|
@ -551,10 +551,10 @@ func (d *Database) StoreEvent(
|
||||||
if prevEvents := event.PrevEvents(); len(prevEvents) > 0 {
|
if prevEvents := event.PrevEvents(); len(prevEvents) > 0 {
|
||||||
roomInfo, err = d.RoomInfo(ctx, event.RoomID())
|
roomInfo, err = d.RoomInfo(ctx, event.RoomID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
||||||
}
|
}
|
||||||
if roomInfo == nil && len(prevEvents) > 0 {
|
if roomInfo == nil && len(prevEvents) > 0 {
|
||||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
||||||
}
|
}
|
||||||
// Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of
|
// Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of
|
||||||
// GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This
|
// GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This
|
||||||
|
@ -563,7 +563,7 @@ func (d *Database) StoreEvent(
|
||||||
// to do writes however then this will need to go inside `Writer.Do`.
|
// to do writes however then this will need to go inside `Writer.Do`.
|
||||||
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
|
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
|
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
|
||||||
}
|
}
|
||||||
// Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents
|
// Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents
|
||||||
// and EndTransaction in a writer then it's possible for a new write txn to be made between the two
|
// and EndTransaction in a writer then it's possible for a new write txn to be made between the two
|
||||||
|
@ -580,11 +580,11 @@ func (d *Database) StoreEvent(
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, types.StateAtEvent{}, nil, "", err
|
return 0, 0, types.StateAtEvent{}, nil, "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return roomNID, types.StateAtEvent{
|
return eventNID, roomNID, types.StateAtEvent{
|
||||||
BeforeStateSnapshotNID: stateNID,
|
BeforeStateSnapshotNID: stateNID,
|
||||||
StateEntry: types.StateEntry{
|
StateEntry: types.StateEntry{
|
||||||
StateKeyTuple: types.StateKeyTuple{
|
StateKeyTuple: types.StateKeyTuple{
|
||||||
|
|
|
@ -49,7 +49,8 @@ const eventsSchema = `
|
||||||
const insertEventSQL = `
|
const insertEventSQL = `
|
||||||
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
|
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
ON CONFLICT DO NOTHING;
|
ON CONFLICT DO NOTHING
|
||||||
|
RETURNING event_nid, state_snapshot_nid;
|
||||||
`
|
`
|
||||||
|
|
||||||
const selectEventSQL = "" +
|
const selectEventSQL = "" +
|
||||||
|
@ -161,20 +162,13 @@ func (s *eventStatements) InsertEvent(
|
||||||
) (types.EventNID, types.StateSnapshotNID, error) {
|
) (types.EventNID, types.StateSnapshotNID, error) {
|
||||||
// attempt to insert: the last_row_id is the event NID
|
// attempt to insert: the last_row_id is the event NID
|
||||||
var eventNID int64
|
var eventNID int64
|
||||||
|
var stateNID int64
|
||||||
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
|
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
|
||||||
result, err := insertStmt.ExecContext(
|
err := insertStmt.QueryRowContext(
|
||||||
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
|
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
|
||||||
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected,
|
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected,
|
||||||
)
|
).Scan(&eventNID, &stateNID)
|
||||||
if err != nil {
|
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
modified, err := result.RowsAffected()
|
|
||||||
if modified == 0 && err == nil {
|
|
||||||
return 0, 0, sql.ErrNoRows
|
|
||||||
}
|
|
||||||
eventNID, err = result.LastInsertId()
|
|
||||||
return types.EventNID(eventNID), 0, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *eventStatements) SelectEvent(
|
func (s *eventStatements) SelectEvent(
|
||||||
|
|
Loading…
Reference in a new issue