From 4675e1ddb6a48fe1425032dc4f3cef56cbde7243 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 12 Jun 2020 12:10:08 +0100 Subject: [PATCH] Add trace logging to RoomserverInternalAPI (#1120) This is a wrapper around whatever impl we have which then logs the function name/request/response/error. Also tweak when we log on kafka streams: only log on the producer side not the consumer side: we've never had issues with comms and having 1 message rather than N would be nice. --- clientapi/producers/syncapi.go | 9 +- cmd/dendrite-monolith-server/main.go | 13 +- eduserver/input/input.go | 16 +- federationsender/consumers/roomserver.go | 5 - roomserver/api/api_trace.go | 218 +++++++++++++++++++++++ roomserver/internal/input.go | 16 ++ syncapi/consumers/roomserver.go | 5 - 7 files changed, 261 insertions(+), 21 deletions(-) create mode 100644 roomserver/api/api_trace.go diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 244a61dc2..375b1eee4 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -17,9 +17,9 @@ package producers import ( "encoding/json" - "github.com/matrix-org/dendrite/internal" - "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal" + log "github.com/sirupsen/logrus" ) // SyncAPIProducer produces events for the sync API server to consume @@ -44,6 +44,11 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string m.Topic = string(p.Topic) m.Key = sarama.StringEncoder(userID) m.Value = sarama.ByteEncoder(value) + log.WithFields(log.Fields{ + "user_id": userID, + "room_id": roomID, + "data_type": dataType, + }).Infof("Producing to topic '%s'", p.Topic) _, _, err = p.Producer.SendMessage(&m) return err diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 195a1ac50..2d538027e 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -17,6 +17,7 @@ package main import ( "flag" "net/http" + "os" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/eduserver" @@ -28,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/serverkeyapi" "github.com/sirupsen/logrus" @@ -39,6 +41,7 @@ var ( certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS") keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS") enableHTTPAPIs = flag.Bool("api", false, "Use HTTP APIs instead of short-circuiting (warning: exposes API endpoints!)") + traceInternal = os.Getenv("DENDRITE_TRACE_INTERNAL") == "1" ) func main() { @@ -72,14 +75,18 @@ func main() { } keyRing := serverKeyAPI.KeyRing() - rsComponent := roomserver.NewInternalAPI( + rsAPI := roomserver.NewInternalAPI( base, keyRing, federation, ) - rsAPI := rsComponent if base.UseHTTPAPIs { roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI) rsAPI = base.RoomserverHTTPClient() } + if traceInternal { + rsAPI = &api.RoomserverInternalAPITrace{ + Impl: rsAPI, + } + } eduInputAPI := eduserver.NewInternalAPI( base, cache.New(), deviceDB, @@ -102,7 +109,7 @@ func main() { federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) fsAPI = base.FederationSenderHTTPClient() } - rsComponent.SetFederationSenderAPI(fsAPI) + rsAPI.SetFederationSenderAPI(fsAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) if err != nil { diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 0bbf5b844..6eafce42f 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -97,6 +97,11 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { if err != nil { return err } + logrus.WithFields(logrus.Fields{ + "room_id": ite.RoomID, + "user_id": ite.UserID, + "typing": ite.Typing, + }).Infof("Producing to topic '%s'", t.OutputTypingEventTopic) m := &sarama.ProducerMessage{ Topic: string(t.OutputTypingEventTopic), @@ -132,6 +137,11 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e devices = append(devices, ise.DeviceID) } + logrus.WithFields(logrus.Fields{ + "user_id": ise.UserID, + "num_devices": len(devices), + "type": ise.Type, + }).Infof("Producing to topic '%s'", t.OutputSendToDeviceEventTopic) for _, device := range devices { ote := &api.OutputSendToDeviceEvent{ UserID: ise.UserID, @@ -139,12 +149,6 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e SendToDeviceEvent: ise.SendToDeviceEvent, } - logrus.WithFields(logrus.Fields{ - "user_id": ise.UserID, - "device_id": ise.DeviceID, - "event_type": ise.Type, - }).Info("handling send-to-device message") - eventJSON, err := json.Marshal(ote) if err != nil { logrus.WithError(err).Error("sendToDevice failed json.Marshal") diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index a15937f9e..299c7b37a 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -86,11 +86,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { switch output.Type { case api.OutputTypeNewRoomEvent: ev := &output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "send_as_server": output.NewRoomEvent.SendAsServer, - }).Info("received room event from roomserver") if err := s.processMessage(*output.NewRoomEvent); err != nil { // panic rather than continue with an inconsistent database diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go new file mode 100644 index 000000000..a478eeb9a --- /dev/null +++ b/roomserver/api/api_trace.go @@ -0,0 +1,218 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + + fsAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/util" +) + +// RoomserverInternalAPITrace wraps a RoomserverInternalAPI and logs the +// complete request/response/error +type RoomserverInternalAPITrace struct { + Impl RoomserverInternalAPI +} + +func (t *RoomserverInternalAPITrace) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) { + t.Impl.SetFederationSenderAPI(fsAPI) +} + +func (t *RoomserverInternalAPITrace) InputRoomEvents( + ctx context.Context, + req *InputRoomEventsRequest, + res *InputRoomEventsResponse, +) error { + err := t.Impl.InputRoomEvents(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("InputRoomEvents req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) PerformJoin( + ctx context.Context, + req *PerformJoinRequest, + res *PerformJoinResponse, +) error { + err := t.Impl.PerformJoin(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("PerformJoin req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) PerformLeave( + ctx context.Context, + req *PerformLeaveRequest, + res *PerformLeaveResponse, +) error { + err := t.Impl.PerformLeave(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("PerformLeave req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryLatestEventsAndState( + ctx context.Context, + req *QueryLatestEventsAndStateRequest, + res *QueryLatestEventsAndStateResponse, +) error { + err := t.Impl.QueryLatestEventsAndState(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryLatestEventsAndState req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryStateAfterEvents( + ctx context.Context, + req *QueryStateAfterEventsRequest, + res *QueryStateAfterEventsResponse, +) error { + err := t.Impl.QueryStateAfterEvents(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryStateAfterEvents req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryEventsByID( + ctx context.Context, + req *QueryEventsByIDRequest, + res *QueryEventsByIDResponse, +) error { + err := t.Impl.QueryEventsByID(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryEventsByID req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryMembershipForUser( + ctx context.Context, + req *QueryMembershipForUserRequest, + res *QueryMembershipForUserResponse, +) error { + err := t.Impl.QueryMembershipForUser(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryMembershipForUser req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryMembershipsForRoom( + ctx context.Context, + req *QueryMembershipsForRoomRequest, + res *QueryMembershipsForRoomResponse, +) error { + err := t.Impl.QueryMembershipsForRoom(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryMembershipsForRoom req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryServerAllowedToSeeEvent( + ctx context.Context, + req *QueryServerAllowedToSeeEventRequest, + res *QueryServerAllowedToSeeEventResponse, +) error { + err := t.Impl.QueryServerAllowedToSeeEvent(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryServerAllowedToSeeEvent req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryMissingEvents( + ctx context.Context, + req *QueryMissingEventsRequest, + res *QueryMissingEventsResponse, +) error { + err := t.Impl.QueryMissingEvents(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryMissingEvents req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryStateAndAuthChain( + ctx context.Context, + req *QueryStateAndAuthChainRequest, + res *QueryStateAndAuthChainResponse, +) error { + err := t.Impl.QueryStateAndAuthChain(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryStateAndAuthChain req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) PerformBackfill( + ctx context.Context, + req *PerformBackfillRequest, + res *PerformBackfillResponse, +) error { + err := t.Impl.PerformBackfill(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("PerformBackfill req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryRoomVersionCapabilities( + ctx context.Context, + req *QueryRoomVersionCapabilitiesRequest, + res *QueryRoomVersionCapabilitiesResponse, +) error { + err := t.Impl.QueryRoomVersionCapabilities(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryRoomVersionCapabilities req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) QueryRoomVersionForRoom( + ctx context.Context, + req *QueryRoomVersionForRoomRequest, + res *QueryRoomVersionForRoomResponse, +) error { + err := t.Impl.QueryRoomVersionForRoom(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("QueryRoomVersionForRoom req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) SetRoomAlias( + ctx context.Context, + req *SetRoomAliasRequest, + res *SetRoomAliasResponse, +) error { + err := t.Impl.SetRoomAlias(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("SetRoomAlias req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) GetRoomIDForAlias( + ctx context.Context, + req *GetRoomIDForAliasRequest, + res *GetRoomIDForAliasResponse, +) error { + err := t.Impl.GetRoomIDForAlias(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("GetRoomIDForAlias req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) GetAliasesForRoomID( + ctx context.Context, + req *GetAliasesForRoomIDRequest, + res *GetAliasesForRoomIDResponse, +) error { + err := t.Impl.GetAliasesForRoomID(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("GetAliasesForRoomID req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) GetCreatorIDForAlias( + ctx context.Context, + req *GetCreatorIDForAliasRequest, + res *GetCreatorIDForAliasResponse, +) error { + err := t.Impl.GetCreatorIDForAlias(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("GetCreatorIDForAlias req=%+v res=%+v", js(req), js(res)) + return err +} + +func (t *RoomserverInternalAPITrace) RemoveRoomAlias( + ctx context.Context, + req *RemoveRoomAliasRequest, + res *RemoveRoomAliasResponse, +) error { + err := t.Impl.RemoveRoomAlias(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("RemoveRoomAlias req=%+v res=%+v", js(req), js(res)) + return err +} + +func js(thing interface{}) string { + b, err := json.Marshal(thing) + if err != nil { + return fmt.Sprintf("Marshal error:%s", err) + } + return string(b) +} diff --git a/roomserver/internal/input.go b/roomserver/internal/input.go index 932b4df46..e863af953 100644 --- a/roomserver/internal/input.go +++ b/roomserver/internal/input.go @@ -21,6 +21,7 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/roomserver/api" + log "github.com/sirupsen/logrus" fsAPI "github.com/matrix-org/dendrite/federationsender/api" ) @@ -40,6 +41,21 @@ func (r *RoomserverInternalAPI) WriteOutputEvents(roomID string, updates []api.O if err != nil { return err } + logger := log.WithFields(log.Fields{ + "room_id": roomID, + "type": updates[i].Type, + }) + if updates[i].NewRoomEvent != nil { + logger = logger.WithFields(log.Fields{ + "event_type": updates[i].NewRoomEvent.Event.Type(), + "event_id": updates[i].NewRoomEvent.Event.EventID(), + "adds_state": len(updates[i].NewRoomEvent.AddsStateEventIDs), + "removes_state": len(updates[i].NewRoomEvent.RemovesStateEventIDs), + "send_as_server": updates[i].NewRoomEvent.SendAsServer, + "sender": updates[i].NewRoomEvent.Event.Sender(), + }) + } + logger.Infof("Producing to topic '%s'", r.OutputRoomEventTopic) messages[i] = &sarama.ProducerMessage{ Topic: r.OutputRoomEventTopic, Key: sarama.StringEncoder(roomID), diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 135976823..98be5bb73 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -98,11 +98,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( ctx context.Context, msg api.OutputNewRoomEvent, ) error { ev := msg.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "room_version": ev.RoomVersion, - }).Info("received event from roomserver") addsStateEvents := msg.AddsState()