From 8cc6937fef562305ed96155cf288a9dde9b331ea Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 8 Nov 2021 13:02:08 +0000 Subject: [PATCH] Async and sync roomserver input --- clientapi/routing/createroom.go | 1 + clientapi/routing/membership.go | 1 + clientapi/routing/profile.go | 4 +- clientapi/routing/redaction.go | 2 +- clientapi/routing/sendevent.go | 1 + clientapi/threepid/invites.go | 1 + federationapi/routing/send.go | 4 ++ federationapi/routing/threepid.go | 3 +- federationsender/consumers/keychange.go | 3 ++ federationsender/internal/perform.go | 4 +- roomserver/api/input.go | 1 + roomserver/api/wrapper.go | 15 ++++-- roomserver/internal/input/input.go | 63 ++++++++++++++++++------- setup/mscs/msc2836/msc2836.go | 2 +- syncapi/consumers/keychange.go | 3 ++ 15 files changed, 79 insertions(+), 29 deletions(-) diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 8f96c3d35..85331192b 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -463,6 +463,7 @@ func createRoom( }, ev.Headered(roomVersion), nil, + false, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 33fb38831..7ddb827eb 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -110,6 +110,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us []*gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)}, cfg.Matrix.ServerName, nil, + false, ); err != nil { util.GetLogger(ctx).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index 7bea35e50..9de1869bf 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -169,7 +169,7 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -286,7 +286,7 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go index c25ca4eff..8492236b6 100644 --- a/clientapi/routing/redaction.go +++ b/clientapi/routing/redaction.go @@ -120,7 +120,7 @@ func SendRedaction( JSON: jsonerror.NotFound("Room does not exist"), } } - if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil); err != nil { + if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index 204d2592a..f04983122 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -122,6 +122,7 @@ func SendEvent( }, cfg.Matrix.ServerName, txnAndSessionID, + false, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index 53cd6b8ca..985cf00c4 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -367,5 +367,6 @@ func emit3PIDInviteEvent( }, cfg.Matrix.ServerName, nil, + false, ) } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 4b5f0d660..ba73a5a6a 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -692,6 +692,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e }, api.DoNotSendToOtherServers, nil, + true, // asynchronous ) } @@ -734,6 +735,7 @@ withNextEvent: SendAsServer: api.DoNotSendToOtherServers, }, }, + false, ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } @@ -882,6 +884,7 @@ func (t *txnReq) processEventWithMissingState( resolvedState, backwardsExtremity.Headered(roomVersion), hadEvents, + true, ) if err != nil { return fmt.Errorf("api.SendEventWithState: %w", err) @@ -902,6 +905,7 @@ func (t *txnReq) processEventWithMissingState( append(headeredNewEvents, e.Headered(roomVersion)), api.DoNotSendToOtherServers, nil, + true, ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index 5ba28881c..fb919a591 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites( } // Send all the events - if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -180,6 +180,7 @@ func ExchangeThirdPartyInvite( }, cfg.Matrix.ServerName, nil, + false, ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index a0158dbc2..982219a14 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -96,6 +96,9 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { } func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error { + if m.DeviceKeys == nil { + return nil + } logger := logrus.WithField("user_id", m.UserID) // only send key change events which originated from us diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 53fa974b2..47ec54278 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -249,7 +249,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( roomserverAPI.KindNew, respState, event.Headered(respMakeJoin.RoomVersion), - nil, + nil, false, ); err != nil { logrus.WithFields(logrus.Fields{ "room_id": roomID, @@ -430,7 +430,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer( roomserverAPI.KindNew, &respState, respPeek.LatestEvent.Headered(respPeek.RoomVersion), - nil, + nil, false, ); err != nil { return fmt.Errorf("r.producer.SendEventWithState: %w", err) } diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 8e6e4ac7b..a537e64ef 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -86,6 +86,7 @@ type TransactionID struct { // InputRoomEventsRequest is a request to InputRoomEvents type InputRoomEventsRequest struct { InputRoomEvents []InputRoomEvent `json:"input_room_events"` + Asynchronous bool `json:"async"` } // InputRoomEventsResponse is a response to InputRoomEvents diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index de66df803..cdb186c07 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -27,6 +27,7 @@ func SendEvents( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, events []*gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, + async bool, ) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { @@ -38,7 +39,7 @@ func SendEvents( TransactionID: txnID, } } - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, async) } // SendEventWithState writes an event with the specified kind to the roomserver @@ -47,7 +48,7 @@ func SendEvents( func SendEventWithState( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, - haveEventIDs map[string]bool, + haveEventIDs map[string]bool, async bool, ) error { outliers, err := state.Events() if err != nil { @@ -79,14 +80,18 @@ func SendEventWithState( StateEventIDs: stateEventIDs, }) - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, async) } // SendInputRoomEvents to the roomserver. func SendInputRoomEvents( - ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, + ctx context.Context, rsAPI RoomserverInternalAPI, + ires []InputRoomEvent, async bool, ) error { - request := InputRoomEventsRequest{InputRoomEvents: ires} + request := InputRoomEventsRequest{ + InputRoomEvents: ires, + Asynchronous: async, + } var response InputRoomEventsResponse rsAPI.InputRoomEvents(ctx, &request, &response) return response.Err() diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index e31efd151..f561e8b59 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -60,7 +60,7 @@ func (r *Inputer) Start() error { defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() var inputRoomEvent api.InputRoomEvent if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Ack() + _ = msg.Term() return } inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) @@ -68,6 +68,7 @@ func (r *Inputer) Start() error { _ = msg.InProgress() if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) + _ = msg.Respond([]byte(err.Error())) } else { hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) } @@ -82,28 +83,56 @@ func (r *Inputer) Start() error { // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( - _ context.Context, + ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - var err error - for _, e := range request.InputRoomEvents { - msg := &nats.Msg{ - Subject: r.InputRoomEventTopic, - Header: nats.Header{}, + if request.Asynchronous { + var err error + for _, e := range request.InputRoomEvents { + msg := &nats.Msg{ + Subject: r.InputRoomEventTopic, + Header: nats.Header{}, + } + roomID := e.Event.RoomID() + msg.Header.Set("room_id", roomID) + msg.Data, err = json.Marshal(e) + if err != nil { + response.ErrMsg = err.Error() + return + } + if _, err = r.JetStream.PublishMsg(msg); err != nil { + return + } + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } - roomID := e.Event.RoomID() - msg.Header.Set("room_id", roomID) - msg.Data, err = json.Marshal(e) - if err != nil { - response.ErrMsg = err.Error() - return + } else { + responses := make(chan error, len(request.InputRoomEvents)) + defer close(responses) + for _, e := range request.InputRoomEvents { + inputRoomEvent := e + inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) + inbox.(*phony.Inbox).Act(nil, func() { + _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent) + if err != nil { + sentry.CaptureException(err) + } else { + hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) + } + responses <- err + }) } - if _, err = r.JetStream.PublishMsg(msg); err != nil { - response.ErrMsg = err.Error() - return + for i := 0; i < len(request.InputRoomEvents); i++ { + select { + case <-ctx.Done(): + return + case err := <-responses: + if err != nil { + response.ErrMsg = err.Error() + return + } + } } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } } diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go index a538299dc..5e7896481 100644 --- a/setup/mscs/msc2836/msc2836.go +++ b/setup/mscs/msc2836/msc2836.go @@ -649,7 +649,7 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event }) } // we've got the data by this point so use a background context - err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires) + err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires, false) if err != nil { util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver") } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 1938ff9b0..7b9af0a1f 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -120,6 +120,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error { + if m.DeviceKeys == nil { + return nil + } output := m.DeviceKeys // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse