0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2025-01-05 23:13:44 +01:00
dendrite/federationapi/internal/perform.go

761 lines
24 KiB
Go
Raw Normal View History

package internal
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// PerformLeaveRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformDirectoryLookup(
ctx context.Context,
request *api.PerformDirectoryLookupRequest,
response *api.PerformDirectoryLookupResponse,
) (err error) {
dir, err := r.federation.LookupRoomAlias(
ctx,
request.ServerName,
request.RoomAlias,
)
if err != nil {
r.statistics.ForServer(request.ServerName).Failure()
return err
}
response.RoomID = dir.RoomID
response.ServerNames = dir.Servers
r.statistics.ForServer(request.ServerName).Success()
return nil
}
type federatedJoin struct {
UserID string
RoomID string
}
// PerformJoin implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformJoin(
ctx context.Context,
request *api.PerformJoinRequest,
response *api.PerformJoinResponse,
) {
// Check that a join isn't already in progress for this user/room.
j := federatedJoin{request.UserID, request.RoomID}
if _, found := r.joins.Load(j); found {
response.LastError = &gomatrix.HTTPError{
Code: 429,
Message: `{
"errcode": "M_LIMIT_EXCEEDED",
"error": "There is already a federated join to this room in progress. Please wait for it to finish."
}`, // TODO: Why do none of our error types play nicely with each other?
}
return
}
r.joins.Store(j, nil)
defer r.joins.Delete(j)
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[gomatrixserverlib.ServerName]bool)
var uniqueList []gomatrixserverlib.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] || srv == r.cfg.Matrix.ServerName {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// Try each server that we were provided until we land on one that
// successfully completes the make-join send-join dance.
var lastErr error
for _, serverName := range request.ServerNames {
if err := r.performJoinUsingServer(
ctx,
request.RoomID,
request.UserID,
request.Content,
serverName,
supportedVersions,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to join room through server")
lastErr = err
continue
}
// We're all good.
response.JoinedVia = serverName
return
}
// If we reach here then we didn't complete a join for some reason.
var httpErr gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
// Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
httpErr.WrappedError = nil
response.LastError = &httpErr
} else {
response.LastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: "Unknown HTTP error",
}
if lastErr != nil {
response.LastError.Message = lastErr.Error()
}
}
logrus.Errorf(
"failed to join user %q to room %q through %d server(s): last error %s",
request.UserID, request.RoomID, len(request.ServerNames), lastErr,
)
}
func (r *FederationInternalAPI) performJoinUsingServer(
ctx context.Context,
roomID, userID string,
content map[string]interface{},
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
) error {
// Try to perform a make_join using the information supplied in the
// request.
respMakeJoin, err := r.federation.MakeJoin(
ctx,
serverName,
roomID,
userID,
supportedVersions,
)
if err != nil {
// TODO: Check if the user was not allowed to join the room.
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.MakeJoin: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
respMakeJoin.JoinEvent.Type = gomatrixserverlib.MRoomMember
respMakeJoin.JoinEvent.Sender = userID
respMakeJoin.JoinEvent.StateKey = &userID
respMakeJoin.JoinEvent.RoomID = roomID
respMakeJoin.JoinEvent.Redacts = ""
if content == nil {
content = map[string]interface{}{}
}
_ = json.Unmarshal(respMakeJoin.JoinEvent.Content, &content)
content["membership"] = gomatrixserverlib.Join
if err = respMakeJoin.JoinEvent.SetContent(content); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err)
}
if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err)
}
// Work out if we support the room version that has been supplied in
// the make_join response.
// "If not provided, the room version is assumed to be either "1" or "2"."
// https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid
if respMakeJoin.RoomVersion == "" {
respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent)
}
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err)
}
// Build the join event.
event, err := respMakeJoin.JoinEvent.Build(
time.Now(),
r.cfg.Matrix.ServerName,
r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey,
respMakeJoin.RoomVersion,
)
if err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
}
// Try to perform a send_join using the newly built event.
respSendJoin, err := r.federation.SendJoin(
context.Background(),
serverName,
event,
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.SendJoin: %w", err)
}
r.statistics.ForServer(serverName).Success()
// If the remote server returned an event in the "event" key of
// the send_join request then we should use that instead. It may
// contain signatures that we don't know about.
if len(respSendJoin.Event) > 0 {
var remoteEvent *gomatrixserverlib.Event
remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion)
if err == nil && isWellFormedMembershipEvent(
remoteEvent, roomID, userID, r.cfg.Matrix.ServerName,
) {
event = remoteEvent
}
}
// Sanity-check the join response to ensure that it has a create
// event, that the room version is known, etc.
authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion)
if err = sanityCheckAuthChain(authEvents); err != nil {
return fmt.Errorf("sanityCheckAuthChain: %w", err)
}
// Process the join response in a goroutine. The idea here is
// that we'll try and wait for as long as possible for the work
// to complete, but if the client does give up waiting, we'll
// still continue to process the join anyway so that we don't
// waste the effort.
// TODO: Can we expand Check here to return a list of missing auth
// events rather than failing one at a time?
var respState *gomatrixserverlib.RespState
respState, err = respSendJoin.Check(
context.Background(),
respMakeJoin.RoomVersion,
r.keyRing,
event,
federatedAuthProvider(ctx, r.federation, r.keyRing, serverName),
)
if err != nil {
return fmt.Errorf("respSendJoin.Check: %w", err)
}
// We need to immediately update our list of joined hosts for this room now as we are technically
// joined. We must do this synchronously: we cannot rely on the roomserver output events as they
// will happen asyncly. If we don't update this table, you can end up with bad failure modes like
// joining a room, waiting for 200 OK then changing device keys and have those keys not be sent
// to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers")
// The events are trusted now as we performed auth checks above.
joinedHosts, err := consumers.JoinedHostsFromEvents(respState.StateEvents.TrustedEvents(respMakeJoin.RoomVersion, false))
if err != nil {
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
}
logrus.WithField("hosts", joinedHosts).WithField("room", roomID).Info("Joined federated room with hosts")
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
}
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
context.Background(),
r.rsAPI,
roomserverAPI.KindNew,
respState,
event.Headered(respMakeJoin.RoomVersion),
serverName,
nil,
false,
); err != nil {
return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err)
}
return nil
}
Peeking over federation via MSC2444 (#1391) * a very very WIP first cut of peeking via MSC2753. doesn't yet compile or work. needs to actually add the peeking block into the sync response. checking in now before it gets any bigger, and to gather any initial feedback on the vague shape of it. * make PeekingDeviceSet private * add server_name param * blind stab at adding a `peek` section to /sync * make it build * make it launch * add peeking to getResponseWithPDUsForCompleteSync * cancel any peeks when we join a room * spell out how to runoutside of docker if you want speed * fix SQL * remove unnecessary txn for SelectPeeks * fix s/join/peek/ cargocult fail * HACK: Track goroutine IDs to determine when we write by the wrong thread To use: set `DENDRITE_TRACE_SQL=1` then grep for `unsafe` * Track partition offsets and only log unsafe for non-selects * Put redactions in the writer goroutine * Update filters on writer goroutine * wrap peek storage in goid hack * use exclusive writer, and MarkPeeksAsOld more efficiently * don't log ascii in binary at sql trace... * strip out empty roomd deltas * re-add txn to SelectPeeks * re-add accidentally deleted field * reject peeks for non-worldreadable rooms * move perform_peek * fix package * correctly refactor perform_peek * WIP of implementing MSC2444 * typo * Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking" This reverts commit 3cebd8dbfbccdf82b7930b7b6eda92095ca6ef41, reversing changes made to ed4b3a58a7855acc43530693cc855b439edf9c7c. * (almost) make it build * clean up bad merge * support SendEventWithState with optional event * fix build & lint * fix build & lint * reinstate federated peeks in the roomserver (doh) * fix sql thinko * todo for authenticating state returned by /peek * support returning current state from QueryStateAndAuthChain * handle SS /peek * reimplement SS /peek to prod the RS to tell the FS about the peek * rename RemotePeeks as OutboundPeeks * rename remote_peeks_table as outbound_peeks_table * add perform_handle_remote_peek.go * flesh out federation doc * add inbound peeks table and hook it up * rename ambiguous RemotePeek as InboundPeek * rename FSAPI's PerformPeek as PerformOutboundPeek * setup inbound peeks db correctly * fix api.SendEventWithState with no event * track latestevent on /peek * go fmt * document the peek send stream race better * fix SendEventWithRewrite not to bail if handed a non-state event * add fixme * switch SS /peek to use SendEventWithRewrite * fix comment * use reverse topo ordering to find latest extrem * support postgres for federated peeking * go fmt * back out bogus go.mod change * Fix performOutboundPeekUsingServer * Fix getAuthChain -> GetAuthChain * Fix build issues * Fix build again * Fix getAuthChain -> GetAuthChain * Don't repeat outbound peeks for the same room ID to the same servers * Fix lint * Don't omitempty to appease sytest Co-authored-by: Kegan Dougal <kegan@matrix.org> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
2021-01-22 15:55:08 +01:00
// isWellFormedMembershipEvent returns true if the event looks like a legitimate
// membership event.
func isWellFormedMembershipEvent(event *gomatrixserverlib.Event, roomID, userID string, origin gomatrixserverlib.ServerName) bool {
if membership, err := event.Membership(); err != nil {
return false
} else if membership != gomatrixserverlib.Join {
return false
}
if event.RoomID() != roomID {
return false
}
if event.Origin() != origin {
return false
}
if !event.StateKeyEquals(userID) {
return false
}
return true
}
// PerformOutboundPeekRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformOutboundPeek(
Peeking over federation via MSC2444 (#1391) * a very very WIP first cut of peeking via MSC2753. doesn't yet compile or work. needs to actually add the peeking block into the sync response. checking in now before it gets any bigger, and to gather any initial feedback on the vague shape of it. * make PeekingDeviceSet private * add server_name param * blind stab at adding a `peek` section to /sync * make it build * make it launch * add peeking to getResponseWithPDUsForCompleteSync * cancel any peeks when we join a room * spell out how to runoutside of docker if you want speed * fix SQL * remove unnecessary txn for SelectPeeks * fix s/join/peek/ cargocult fail * HACK: Track goroutine IDs to determine when we write by the wrong thread To use: set `DENDRITE_TRACE_SQL=1` then grep for `unsafe` * Track partition offsets and only log unsafe for non-selects * Put redactions in the writer goroutine * Update filters on writer goroutine * wrap peek storage in goid hack * use exclusive writer, and MarkPeeksAsOld more efficiently * don't log ascii in binary at sql trace... * strip out empty roomd deltas * re-add txn to SelectPeeks * re-add accidentally deleted field * reject peeks for non-worldreadable rooms * move perform_peek * fix package * correctly refactor perform_peek * WIP of implementing MSC2444 * typo * Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking" This reverts commit 3cebd8dbfbccdf82b7930b7b6eda92095ca6ef41, reversing changes made to ed4b3a58a7855acc43530693cc855b439edf9c7c. * (almost) make it build * clean up bad merge * support SendEventWithState with optional event * fix build & lint * fix build & lint * reinstate federated peeks in the roomserver (doh) * fix sql thinko * todo for authenticating state returned by /peek * support returning current state from QueryStateAndAuthChain * handle SS /peek * reimplement SS /peek to prod the RS to tell the FS about the peek * rename RemotePeeks as OutboundPeeks * rename remote_peeks_table as outbound_peeks_table * add perform_handle_remote_peek.go * flesh out federation doc * add inbound peeks table and hook it up * rename ambiguous RemotePeek as InboundPeek * rename FSAPI's PerformPeek as PerformOutboundPeek * setup inbound peeks db correctly * fix api.SendEventWithState with no event * track latestevent on /peek * go fmt * document the peek send stream race better * fix SendEventWithRewrite not to bail if handed a non-state event * add fixme * switch SS /peek to use SendEventWithRewrite * fix comment * use reverse topo ordering to find latest extrem * support postgres for federated peeking * go fmt * back out bogus go.mod change * Fix performOutboundPeekUsingServer * Fix getAuthChain -> GetAuthChain * Fix build issues * Fix build again * Fix getAuthChain -> GetAuthChain * Don't repeat outbound peeks for the same room ID to the same servers * Fix lint * Don't omitempty to appease sytest Co-authored-by: Kegan Dougal <kegan@matrix.org> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
2021-01-22 15:55:08 +01:00
ctx context.Context,
request *api.PerformOutboundPeekRequest,
response *api.PerformOutboundPeekResponse,
) error {
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[gomatrixserverlib.ServerName]bool)
var uniqueList []gomatrixserverlib.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// See if there's an existing outbound peek for this room ID with
// one of the specified servers.
if peeks, err := r.db.GetOutboundPeeks(ctx, request.RoomID); err == nil {
for _, peek := range peeks {
if _, ok := seenSet[peek.ServerName]; ok {
return nil
}
}
}
// Try each server that we were provided until we land on one that
// successfully completes the peek
var lastErr error
for _, serverName := range request.ServerNames {
if err := r.performOutboundPeekUsingServer(
ctx,
request.RoomID,
serverName,
supportedVersions,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to peek room through server")
lastErr = err
continue
}
// We're all good.
return nil
}
// If we reach here then we didn't complete a peek for some reason.
var httpErr gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
// Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
httpErr.WrappedError = nil
response.LastError = &httpErr
} else {
response.LastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: lastErr.Error(),
}
}
logrus.Errorf(
"failed to peek room %q through %d server(s): last error %s",
request.RoomID, len(request.ServerNames), lastErr,
)
return lastErr
}
func (r *FederationInternalAPI) performOutboundPeekUsingServer(
Peeking over federation via MSC2444 (#1391) * a very very WIP first cut of peeking via MSC2753. doesn't yet compile or work. needs to actually add the peeking block into the sync response. checking in now before it gets any bigger, and to gather any initial feedback on the vague shape of it. * make PeekingDeviceSet private * add server_name param * blind stab at adding a `peek` section to /sync * make it build * make it launch * add peeking to getResponseWithPDUsForCompleteSync * cancel any peeks when we join a room * spell out how to runoutside of docker if you want speed * fix SQL * remove unnecessary txn for SelectPeeks * fix s/join/peek/ cargocult fail * HACK: Track goroutine IDs to determine when we write by the wrong thread To use: set `DENDRITE_TRACE_SQL=1` then grep for `unsafe` * Track partition offsets and only log unsafe for non-selects * Put redactions in the writer goroutine * Update filters on writer goroutine * wrap peek storage in goid hack * use exclusive writer, and MarkPeeksAsOld more efficiently * don't log ascii in binary at sql trace... * strip out empty roomd deltas * re-add txn to SelectPeeks * re-add accidentally deleted field * reject peeks for non-worldreadable rooms * move perform_peek * fix package * correctly refactor perform_peek * WIP of implementing MSC2444 * typo * Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking" This reverts commit 3cebd8dbfbccdf82b7930b7b6eda92095ca6ef41, reversing changes made to ed4b3a58a7855acc43530693cc855b439edf9c7c. * (almost) make it build * clean up bad merge * support SendEventWithState with optional event * fix build & lint * fix build & lint * reinstate federated peeks in the roomserver (doh) * fix sql thinko * todo for authenticating state returned by /peek * support returning current state from QueryStateAndAuthChain * handle SS /peek * reimplement SS /peek to prod the RS to tell the FS about the peek * rename RemotePeeks as OutboundPeeks * rename remote_peeks_table as outbound_peeks_table * add perform_handle_remote_peek.go * flesh out federation doc * add inbound peeks table and hook it up * rename ambiguous RemotePeek as InboundPeek * rename FSAPI's PerformPeek as PerformOutboundPeek * setup inbound peeks db correctly * fix api.SendEventWithState with no event * track latestevent on /peek * go fmt * document the peek send stream race better * fix SendEventWithRewrite not to bail if handed a non-state event * add fixme * switch SS /peek to use SendEventWithRewrite * fix comment * use reverse topo ordering to find latest extrem * support postgres for federated peeking * go fmt * back out bogus go.mod change * Fix performOutboundPeekUsingServer * Fix getAuthChain -> GetAuthChain * Fix build issues * Fix build again * Fix getAuthChain -> GetAuthChain * Don't repeat outbound peeks for the same room ID to the same servers * Fix lint * Don't omitempty to appease sytest Co-authored-by: Kegan Dougal <kegan@matrix.org> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
2021-01-22 15:55:08 +01:00
ctx context.Context,
roomID string,
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
) error {
// create a unique ID for this peek.
// for now we just use the room ID again. In future, if we ever
// support concurrent peeks to the same room with different filters
// then we would need to disambiguate further.
peekID := roomID
// check whether we're peeking already to try to avoid needlessly
// re-peeking on the server. we don't need a transaction for this,
// given this is a nice-to-have.
outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
if err != nil {
return err
}
renewing := false
if outboundPeek != nil {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval {
logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
renewing = true
} else {
logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
return nil
}
}
// Try to perform an outbound /peek using the information supplied in the
// request.
respPeek, err := r.federation.Peek(
ctx,
serverName,
roomID,
peekID,
supportedVersions,
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.Peek: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Work out if we support the room version that has been supplied in
// the peek response.
if respPeek.RoomVersion == "" {
respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1
}
if _, err = respPeek.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err)
}
// we have the peek state now so let's process regardless of whether upstream gives up
ctx = context.Background()
respState := respPeek.ToRespState()
// authenticate the state returned (check its auth events etc)
Peeking over federation via MSC2444 (#1391) * a very very WIP first cut of peeking via MSC2753. doesn't yet compile or work. needs to actually add the peeking block into the sync response. checking in now before it gets any bigger, and to gather any initial feedback on the vague shape of it. * make PeekingDeviceSet private * add server_name param * blind stab at adding a `peek` section to /sync * make it build * make it launch * add peeking to getResponseWithPDUsForCompleteSync * cancel any peeks when we join a room * spell out how to runoutside of docker if you want speed * fix SQL * remove unnecessary txn for SelectPeeks * fix s/join/peek/ cargocult fail * HACK: Track goroutine IDs to determine when we write by the wrong thread To use: set `DENDRITE_TRACE_SQL=1` then grep for `unsafe` * Track partition offsets and only log unsafe for non-selects * Put redactions in the writer goroutine * Update filters on writer goroutine * wrap peek storage in goid hack * use exclusive writer, and MarkPeeksAsOld more efficiently * don't log ascii in binary at sql trace... * strip out empty roomd deltas * re-add txn to SelectPeeks * re-add accidentally deleted field * reject peeks for non-worldreadable rooms * move perform_peek * fix package * correctly refactor perform_peek * WIP of implementing MSC2444 * typo * Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking" This reverts commit 3cebd8dbfbccdf82b7930b7b6eda92095ca6ef41, reversing changes made to ed4b3a58a7855acc43530693cc855b439edf9c7c. * (almost) make it build * clean up bad merge * support SendEventWithState with optional event * fix build & lint * fix build & lint * reinstate federated peeks in the roomserver (doh) * fix sql thinko * todo for authenticating state returned by /peek * support returning current state from QueryStateAndAuthChain * handle SS /peek * reimplement SS /peek to prod the RS to tell the FS about the peek * rename RemotePeeks as OutboundPeeks * rename remote_peeks_table as outbound_peeks_table * add perform_handle_remote_peek.go * flesh out federation doc * add inbound peeks table and hook it up * rename ambiguous RemotePeek as InboundPeek * rename FSAPI's PerformPeek as PerformOutboundPeek * setup inbound peeks db correctly * fix api.SendEventWithState with no event * track latestevent on /peek * go fmt * document the peek send stream race better * fix SendEventWithRewrite not to bail if handed a non-state event * add fixme * switch SS /peek to use SendEventWithRewrite * fix comment * use reverse topo ordering to find latest extrem * support postgres for federated peeking * go fmt * back out bogus go.mod change * Fix performOutboundPeekUsingServer * Fix getAuthChain -> GetAuthChain * Fix build issues * Fix build again * Fix getAuthChain -> GetAuthChain * Don't repeat outbound peeks for the same room ID to the same servers * Fix lint * Don't omitempty to appease sytest Co-authored-by: Kegan Dougal <kegan@matrix.org> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
2021-01-22 15:55:08 +01:00
// the equivalent of CheckSendJoinResponse()
authEvents, _, err := respState.Check(ctx, respPeek.RoomVersion, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName))
if err != nil {
return fmt.Errorf("error checking state returned from peeking: %w", err)
}
if err = sanityCheckAuthChain(authEvents); err != nil {
return fmt.Errorf("sanityCheckAuthChain: %w", err)
}
Peeking over federation via MSC2444 (#1391) * a very very WIP first cut of peeking via MSC2753. doesn't yet compile or work. needs to actually add the peeking block into the sync response. checking in now before it gets any bigger, and to gather any initial feedback on the vague shape of it. * make PeekingDeviceSet private * add server_name param * blind stab at adding a `peek` section to /sync * make it build * make it launch * add peeking to getResponseWithPDUsForCompleteSync * cancel any peeks when we join a room * spell out how to runoutside of docker if you want speed * fix SQL * remove unnecessary txn for SelectPeeks * fix s/join/peek/ cargocult fail * HACK: Track goroutine IDs to determine when we write by the wrong thread To use: set `DENDRITE_TRACE_SQL=1` then grep for `unsafe` * Track partition offsets and only log unsafe for non-selects * Put redactions in the writer goroutine * Update filters on writer goroutine * wrap peek storage in goid hack * use exclusive writer, and MarkPeeksAsOld more efficiently * don't log ascii in binary at sql trace... * strip out empty roomd deltas * re-add txn to SelectPeeks * re-add accidentally deleted field * reject peeks for non-worldreadable rooms * move perform_peek * fix package * correctly refactor perform_peek * WIP of implementing MSC2444 * typo * Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking" This reverts commit 3cebd8dbfbccdf82b7930b7b6eda92095ca6ef41, reversing changes made to ed4b3a58a7855acc43530693cc855b439edf9c7c. * (almost) make it build * clean up bad merge * support SendEventWithState with optional event * fix build & lint * fix build & lint * reinstate federated peeks in the roomserver (doh) * fix sql thinko * todo for authenticating state returned by /peek * support returning current state from QueryStateAndAuthChain * handle SS /peek * reimplement SS /peek to prod the RS to tell the FS about the peek * rename RemotePeeks as OutboundPeeks * rename remote_peeks_table as outbound_peeks_table * add perform_handle_remote_peek.go * flesh out federation doc * add inbound peeks table and hook it up * rename ambiguous RemotePeek as InboundPeek * rename FSAPI's PerformPeek as PerformOutboundPeek * setup inbound peeks db correctly * fix api.SendEventWithState with no event * track latestevent on /peek * go fmt * document the peek send stream race better * fix SendEventWithRewrite not to bail if handed a non-state event * add fixme * switch SS /peek to use SendEventWithRewrite * fix comment * use reverse topo ordering to find latest extrem * support postgres for federated peeking * go fmt * back out bogus go.mod change * Fix performOutboundPeekUsingServer * Fix getAuthChain -> GetAuthChain * Fix build issues * Fix build again * Fix getAuthChain -> GetAuthChain * Don't repeat outbound peeks for the same room ID to the same servers * Fix lint * Don't omitempty to appease sytest Co-authored-by: Kegan Dougal <kegan@matrix.org> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
2021-01-22 15:55:08 +01:00
// If we've got this far, the remote server is peeking.
if renewing {
if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
} else {
if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
}
// logrus.Warnf("got respPeek %#v", respPeek)
// Send the newly returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
roomserverAPI.KindNew,
&respState,
respPeek.LatestEvent.Headered(respPeek.RoomVersion),
Roomserver/federation input refactor (#2104) * Put federation client functions into their own file * Look for missing auth events in RS input * Remove retrieveMissingAuthEvents from federation API * Logging * Sorta transplanted the code over * Use event origin failing all else * Don't get stuck on mutexes: * Add verifier * Don't mark state events with zero snapshot NID as not existing * Check missing state if not an outlier before storing the event * Reject instead of soft-fail, don't copy roominfo so much * Use synchronous contexts, limit time to fetch missing events * Clean up some commented out bits * Simplify `/send` endpoint significantly * Submit async * Report errors on sending to RS input * Set max payload in NATS to 16MB * Tweak metrics * Add `workerForRoom` for tidiness * Try skipping unmarshalling errors for RespMissingEvents * Track missing prev events separately to avoid calculating state when not possible * Tweak logic around checking missing state * Care about state when checking missing prev events * Don't check missing state for create events * Try that again * Handle create events better * Send create room events as new * Use given event kind when sending auth/state events * Revert "Use given event kind when sending auth/state events" This reverts commit 089d64d271b5fca8c104e1554711187420dbebca. * Only search for missing prev events or state for new events * Tweaks * We only have missing prev if we don't supply state * Room version tweaks * Allow async inputs again * Apply backpressure to consumers/synchronous requests to hopefully stop things being overwhelmed * Set timeouts on roomserver input tasks (need to decide what timeout makes sense) * Use work queue policy, deliver all on restart * Reduce chance of duplicates being sent by NATS * Limit the number of servers we attempt to reduce backpressure * Some review comment fixes * Tidy up a couple things * Don't limit servers, randomise order using map * Some context refactoring * Update gmsl * Don't resend create events * Set stateIDs length correctly or else the roomserver thinks there are missing events when there aren't * Exclude our own servername * Try backing off servers * Make excluding self behaviour optional * Exclude self from g_m_e * Update sytest-whitelist * Update consumers for the roomserver output stream * Remember to send outliers for state returned from /gme * Make full HTTP tests less upsetti * Remove 'If a device list update goes missing, the server resyncs on the next one' from the sytest blacklist * Remove debugging test * Fix blacklist again, remove unnecessary duplicate context * Clearer contexts, don't use background in case there's something happening there * Don't queue up events more than once in memory * Correctly identify create events when checking for state * Fill in gaps again in /gme code * Remove `AuthEventIDs` from `InputRoomEvent` * Remove stray field Co-authored-by: Kegan Dougal <kegan@matrix.org>
2022-01-27 15:29:14 +01:00
serverName,
nil,
false,
Peeking over federation via MSC2444 (#1391) * a very very WIP first cut of peeking via MSC2753. doesn't yet compile or work. needs to actually add the peeking block into the sync response. checking in now before it gets any bigger, and to gather any initial feedback on the vague shape of it. * make PeekingDeviceSet private * add server_name param * blind stab at adding a `peek` section to /sync * make it build * make it launch * add peeking to getResponseWithPDUsForCompleteSync * cancel any peeks when we join a room * spell out how to runoutside of docker if you want speed * fix SQL * remove unnecessary txn for SelectPeeks * fix s/join/peek/ cargocult fail * HACK: Track goroutine IDs to determine when we write by the wrong thread To use: set `DENDRITE_TRACE_SQL=1` then grep for `unsafe` * Track partition offsets and only log unsafe for non-selects * Put redactions in the writer goroutine * Update filters on writer goroutine * wrap peek storage in goid hack * use exclusive writer, and MarkPeeksAsOld more efficiently * don't log ascii in binary at sql trace... * strip out empty roomd deltas * re-add txn to SelectPeeks * re-add accidentally deleted field * reject peeks for non-worldreadable rooms * move perform_peek * fix package * correctly refactor perform_peek * WIP of implementing MSC2444 * typo * Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking" This reverts commit 3cebd8dbfbccdf82b7930b7b6eda92095ca6ef41, reversing changes made to ed4b3a58a7855acc43530693cc855b439edf9c7c. * (almost) make it build * clean up bad merge * support SendEventWithState with optional event * fix build & lint * fix build & lint * reinstate federated peeks in the roomserver (doh) * fix sql thinko * todo for authenticating state returned by /peek * support returning current state from QueryStateAndAuthChain * handle SS /peek * reimplement SS /peek to prod the RS to tell the FS about the peek * rename RemotePeeks as OutboundPeeks * rename remote_peeks_table as outbound_peeks_table * add perform_handle_remote_peek.go * flesh out federation doc * add inbound peeks table and hook it up * rename ambiguous RemotePeek as InboundPeek * rename FSAPI's PerformPeek as PerformOutboundPeek * setup inbound peeks db correctly * fix api.SendEventWithState with no event * track latestevent on /peek * go fmt * document the peek send stream race better * fix SendEventWithRewrite not to bail if handed a non-state event * add fixme * switch SS /peek to use SendEventWithRewrite * fix comment * use reverse topo ordering to find latest extrem * support postgres for federated peeking * go fmt * back out bogus go.mod change * Fix performOutboundPeekUsingServer * Fix getAuthChain -> GetAuthChain * Fix build issues * Fix build again * Fix getAuthChain -> GetAuthChain * Don't repeat outbound peeks for the same room ID to the same servers * Fix lint * Don't omitempty to appease sytest Co-authored-by: Kegan Dougal <kegan@matrix.org> Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
2021-01-22 15:55:08 +01:00
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
return nil
}
// PerformLeaveRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformLeave(
ctx context.Context,
request *api.PerformLeaveRequest,
response *api.PerformLeaveResponse,
) (err error) {
// Deduplicate the server names we were provided.
util.SortAndUnique(request.ServerNames)
// Try each server that we were provided until we land on one that
// successfully completes the make-leave send-leave dance.
for _, serverName := range request.ServerNames {
// Try to perform a make_leave using the information supplied in the
// request.
respMakeLeave, err := r.federation.MakeLeave(
ctx,
serverName,
request.RoomID,
request.UserID,
)
if err != nil {
// TODO: Check if the user was not allowed to leave the room.
logrus.WithError(err).Warnf("r.federation.MakeLeave failed")
r.statistics.ForServer(serverName).Failure()
continue
}
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
respMakeLeave.LeaveEvent.Type = gomatrixserverlib.MRoomMember
respMakeLeave.LeaveEvent.Sender = request.UserID
respMakeLeave.LeaveEvent.StateKey = &request.UserID
respMakeLeave.LeaveEvent.RoomID = request.RoomID
respMakeLeave.LeaveEvent.Redacts = ""
if respMakeLeave.LeaveEvent.Content == nil {
content := map[string]interface{}{
"membership": "leave",
}
if err = respMakeLeave.LeaveEvent.SetContent(content); err != nil {
logrus.WithError(err).Warnf("respMakeLeave.LeaveEvent.SetContent failed")
continue
}
}
if err = respMakeLeave.LeaveEvent.SetUnsigned(struct{}{}); err != nil {
logrus.WithError(err).Warnf("respMakeLeave.LeaveEvent.SetUnsigned failed")
continue
}
// Work out if we support the room version that has been supplied in
// the make_leave response.
if _, err = respMakeLeave.RoomVersion.EventFormat(); err != nil {
return gomatrixserverlib.UnsupportedRoomVersionError{}
}
// Build the leave event.
event, err := respMakeLeave.LeaveEvent.Build(
time.Now(),
r.cfg.Matrix.ServerName,
r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey,
respMakeLeave.RoomVersion,
)
if err != nil {
logrus.WithError(err).Warnf("respMakeLeave.LeaveEvent.Build failed")
continue
}
// Try to perform a send_leave using the newly built event.
err = r.federation.SendLeave(
ctx,
serverName,
event,
)
if err != nil {
logrus.WithError(err).Warnf("r.federation.SendLeave failed")
r.statistics.ForServer(serverName).Failure()
continue
}
r.statistics.ForServer(serverName).Success()
return nil
}
// If we reach here then we didn't complete a leave for some reason.
return fmt.Errorf(
"failed to leave room %q through %d server(s)",
request.RoomID, len(request.ServerNames),
)
}
// PerformLeaveRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformInvite(
ctx context.Context,
request *api.PerformInviteRequest,
response *api.PerformInviteResponse,
) (err error) {
if request.Event.StateKey() == nil {
return errors.New("invite must be a state event")
}
_, destination, err := gomatrixserverlib.SplitID('@', *request.Event.StateKey())
if err != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
logrus.WithFields(logrus.Fields{
"event_id": request.Event.EventID(),
"user_id": *request.Event.StateKey(),
"room_id": request.Event.RoomID(),
"room_version": request.RoomVersion,
"destination": destination,
}).Info("Sending invite")
inviteReq, err := gomatrixserverlib.NewInviteV2Request(request.Event, request.InviteRoomState)
if err != nil {
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq)
if err != nil {
return fmt.Errorf("r.federation.SendInviteV2: failed to send invite: %w", err)
}
inviteEvent, err := inviteRes.Event.UntrustedEvent(request.RoomVersion)
if err != nil {
return fmt.Errorf("r.federation.SendInviteV2 failed to decode event response: %w", err)
}
response.Event = inviteEvent.Headered(request.RoomVersion)
return nil
}
// PerformServersAlive implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformBroadcastEDU(
ctx context.Context,
request *api.PerformBroadcastEDURequest,
response *api.PerformBroadcastEDUResponse,
) (err error) {
destinations, err := r.db.GetAllJoinedHosts(ctx)
if err != nil {
return fmt.Errorf("r.db.GetAllJoinedHosts: %w", err)
}
if len(destinations) == 0 {
return nil
}
logrus.WithContext(ctx).Infof("Sending wake-up EDU to %d destination(s)", len(destinations))
edu := &gomatrixserverlib.EDU{
Type: "org.matrix.dendrite.wakeup",
Origin: string(r.cfg.Matrix.ServerName),
}
if err = r.queues.SendEDU(edu, r.cfg.Matrix.ServerName, destinations); err != nil {
return fmt.Errorf("r.queues.SendEDU: %w", err)
}
r.MarkServersAlive(destinations)
return nil
}
func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) {
for _, srv := range destinations {
_ = r.db.RemoveServerFromBlacklist(srv)
r.queues.RetryServer(srv)
}
}
func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error {
// sanity check we have a create event and it has a known room version
for _, ev := range authChain {
if ev.Type() == gomatrixserverlib.MRoomCreate && ev.StateKeyEquals("") {
// make sure the room version is known
content := ev.Content()
verBody := struct {
Version string `json:"room_version"`
}{}
err := json.Unmarshal(content, &verBody)
if err != nil {
return err
}
if verBody.Version == "" {
// https://matrix.org/docs/spec/client_server/r0.6.0#m-room-create
// The version of the room. Defaults to "1" if the key does not exist.
verBody.Version = "1"
}
knownVersions := gomatrixserverlib.RoomVersions()
if _, ok := knownVersions[gomatrixserverlib.RoomVersion(verBody.Version)]; !ok {
return fmt.Errorf("auth chain m.room.create event has an unknown room version: %s", verBody.Version)
}
return nil
}
}
return fmt.Errorf("auth chain response is missing m.room.create event")
}
func setDefaultRoomVersionFromJoinEvent(joinEvent gomatrixserverlib.EventBuilder) gomatrixserverlib.RoomVersion {
// if auth events are not event references we know it must be v3+
// we have to do these shenanigans to satisfy sytest, specifically for:
// "Outbound federation rejects m.room.create events with an unknown room version"
hasEventRefs := true
authEvents, ok := joinEvent.AuthEvents.([]interface{})
if ok {
if len(authEvents) > 0 {
_, ok = authEvents[0].(string)
if ok {
// event refs are objects, not strings, so we know we must be dealing with a v3+ room.
hasEventRefs = false
}
}
}
if hasEventRefs {
return gomatrixserverlib.RoomVersionV1
}
return gomatrixserverlib.RoomVersionV4
}
// FederatedAuthProvider is an auth chain provider which fetches events from the server provided
func federatedAuthProvider(
ctx context.Context, federation api.FederationClient,
keyRing gomatrixserverlib.JSONVerifier, server gomatrixserverlib.ServerName,
) gomatrixserverlib.AuthChainProvider {
// A list of events that we have retried, if they were not included in
// the auth events supplied in the send_join.
retries := map[string][]*gomatrixserverlib.Event{}
// Define a function which we can pass to Check to retrieve missing
// auth events inline. This greatly increases our chances of not having
// to repeat the entire set of checks just for a missing event or two.
return func(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]*gomatrixserverlib.Event, error) {
returning := []*gomatrixserverlib.Event{}
// See if we have retry entries for each of the supplied event IDs.
for _, eventID := range eventIDs {
// If we've already satisfied a request for this event ID before then
// just append the results. We won't retry the request.
if retry, ok := retries[eventID]; ok {
if retry == nil {
return nil, fmt.Errorf("missingAuth: not retrying failed event ID %q", eventID)
}
returning = append(returning, retry...)
continue
}
// Make a note of the fact that we tried to do something with this
// event ID, even if we don't succeed.
retries[eventID] = nil
// Try to retrieve the event from the server that sent us the send
// join response.
tx, txerr := federation.GetEvent(ctx, server, eventID)
if txerr != nil {
return nil, fmt.Errorf("missingAuth r.federation.GetEvent: %w", txerr)
}
// For each event returned, add it to the set of return events. We
// also will populate the retries, in case someone asks for this
// event ID again.
for _, pdu := range tx.PDUs {
// Try to parse the event.
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if everr != nil {
return nil, fmt.Errorf("missingAuth gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
}
// Check the signatures of the event.
if err := ev.VerifyEventSignatures(ctx, keyRing); err != nil {
return nil, fmt.Errorf("missingAuth VerifyEventSignatures: %w", err)
}
// If the event is OK then add it to the results and the retry map.
returning = append(returning, ev)
retries[ev.EventID()] = append(retries[ev.EventID()], ev)
}
}
return returning, nil
}
}