mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-15 01:23:43 +01:00
Add trace logging to RoomserverInternalAPI (#1120)
This is a wrapper around whatever impl we have which then logs the function name/request/response/error. Also tweak when we log on kafka streams: only log on the producer side not the consumer side: we've never had issues with comms and having 1 message rather than N would be nice.
This commit is contained in:
parent
079d8fe8fb
commit
4675e1ddb6
7 changed files with 261 additions and 21 deletions
|
@ -17,9 +17,9 @@ package producers
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncAPIProducer produces events for the sync API server to consume
|
// SyncAPIProducer produces events for the sync API server to consume
|
||||||
|
@ -44,6 +44,11 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
|
||||||
m.Topic = string(p.Topic)
|
m.Topic = string(p.Topic)
|
||||||
m.Key = sarama.StringEncoder(userID)
|
m.Key = sarama.StringEncoder(userID)
|
||||||
m.Value = sarama.ByteEncoder(value)
|
m.Value = sarama.ByteEncoder(value)
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"user_id": userID,
|
||||||
|
"room_id": roomID,
|
||||||
|
"data_type": dataType,
|
||||||
|
}).Infof("Producing to topic '%s'", p.Topic)
|
||||||
|
|
||||||
_, _, err = p.Producer.SendMessage(&m)
|
_, _, err = p.Producer.SendMessage(&m)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -17,6 +17,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
"github.com/matrix-org/dendrite/eduserver"
|
"github.com/matrix-org/dendrite/eduserver"
|
||||||
|
@ -28,6 +29,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/setup"
|
"github.com/matrix-org/dendrite/internal/setup"
|
||||||
"github.com/matrix-org/dendrite/publicroomsapi/storage"
|
"github.com/matrix-org/dendrite/publicroomsapi/storage"
|
||||||
"github.com/matrix-org/dendrite/roomserver"
|
"github.com/matrix-org/dendrite/roomserver"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/serverkeyapi"
|
"github.com/matrix-org/dendrite/serverkeyapi"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -39,6 +41,7 @@ var (
|
||||||
certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS")
|
certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS")
|
||||||
keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS")
|
keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS")
|
||||||
enableHTTPAPIs = flag.Bool("api", false, "Use HTTP APIs instead of short-circuiting (warning: exposes API endpoints!)")
|
enableHTTPAPIs = flag.Bool("api", false, "Use HTTP APIs instead of short-circuiting (warning: exposes API endpoints!)")
|
||||||
|
traceInternal = os.Getenv("DENDRITE_TRACE_INTERNAL") == "1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -72,14 +75,18 @@ func main() {
|
||||||
}
|
}
|
||||||
keyRing := serverKeyAPI.KeyRing()
|
keyRing := serverKeyAPI.KeyRing()
|
||||||
|
|
||||||
rsComponent := roomserver.NewInternalAPI(
|
rsAPI := roomserver.NewInternalAPI(
|
||||||
base, keyRing, federation,
|
base, keyRing, federation,
|
||||||
)
|
)
|
||||||
rsAPI := rsComponent
|
|
||||||
if base.UseHTTPAPIs {
|
if base.UseHTTPAPIs {
|
||||||
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
|
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
|
||||||
rsAPI = base.RoomserverHTTPClient()
|
rsAPI = base.RoomserverHTTPClient()
|
||||||
}
|
}
|
||||||
|
if traceInternal {
|
||||||
|
rsAPI = &api.RoomserverInternalAPITrace{
|
||||||
|
Impl: rsAPI,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
eduInputAPI := eduserver.NewInternalAPI(
|
eduInputAPI := eduserver.NewInternalAPI(
|
||||||
base, cache.New(), deviceDB,
|
base, cache.New(), deviceDB,
|
||||||
|
@ -102,7 +109,7 @@ func main() {
|
||||||
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
|
||||||
fsAPI = base.FederationSenderHTTPClient()
|
fsAPI = base.FederationSenderHTTPClient()
|
||||||
}
|
}
|
||||||
rsComponent.SetFederationSenderAPI(fsAPI)
|
rsAPI.SetFederationSenderAPI(fsAPI)
|
||||||
|
|
||||||
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
|
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -97,6 +97,11 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"room_id": ite.RoomID,
|
||||||
|
"user_id": ite.UserID,
|
||||||
|
"typing": ite.Typing,
|
||||||
|
}).Infof("Producing to topic '%s'", t.OutputTypingEventTopic)
|
||||||
|
|
||||||
m := &sarama.ProducerMessage{
|
m := &sarama.ProducerMessage{
|
||||||
Topic: string(t.OutputTypingEventTopic),
|
Topic: string(t.OutputTypingEventTopic),
|
||||||
|
@ -132,6 +137,11 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
|
||||||
devices = append(devices, ise.DeviceID)
|
devices = append(devices, ise.DeviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"user_id": ise.UserID,
|
||||||
|
"num_devices": len(devices),
|
||||||
|
"type": ise.Type,
|
||||||
|
}).Infof("Producing to topic '%s'", t.OutputSendToDeviceEventTopic)
|
||||||
for _, device := range devices {
|
for _, device := range devices {
|
||||||
ote := &api.OutputSendToDeviceEvent{
|
ote := &api.OutputSendToDeviceEvent{
|
||||||
UserID: ise.UserID,
|
UserID: ise.UserID,
|
||||||
|
@ -139,12 +149,6 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
|
||||||
SendToDeviceEvent: ise.SendToDeviceEvent,
|
SendToDeviceEvent: ise.SendToDeviceEvent,
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.WithFields(logrus.Fields{
|
|
||||||
"user_id": ise.UserID,
|
|
||||||
"device_id": ise.DeviceID,
|
|
||||||
"event_type": ise.Type,
|
|
||||||
}).Info("handling send-to-device message")
|
|
||||||
|
|
||||||
eventJSON, err := json.Marshal(ote)
|
eventJSON, err := json.Marshal(ote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("sendToDevice failed json.Marshal")
|
logrus.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||||
|
|
|
@ -86,11 +86,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
switch output.Type {
|
switch output.Type {
|
||||||
case api.OutputTypeNewRoomEvent:
|
case api.OutputTypeNewRoomEvent:
|
||||||
ev := &output.NewRoomEvent.Event
|
ev := &output.NewRoomEvent.Event
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"event_id": ev.EventID(),
|
|
||||||
"room_id": ev.RoomID(),
|
|
||||||
"send_as_server": output.NewRoomEvent.SendAsServer,
|
|
||||||
}).Info("received room event from roomserver")
|
|
||||||
|
|
||||||
if err := s.processMessage(*output.NewRoomEvent); err != nil {
|
if err := s.processMessage(*output.NewRoomEvent); err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
|
|
218
roomserver/api/api_trace.go
Normal file
218
roomserver/api/api_trace.go
Normal file
|
@ -0,0 +1,218 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RoomserverInternalAPITrace wraps a RoomserverInternalAPI and logs the
|
||||||
|
// complete request/response/error
|
||||||
|
type RoomserverInternalAPITrace struct {
|
||||||
|
Impl RoomserverInternalAPI
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {
|
||||||
|
t.Impl.SetFederationSenderAPI(fsAPI)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) InputRoomEvents(
|
||||||
|
ctx context.Context,
|
||||||
|
req *InputRoomEventsRequest,
|
||||||
|
res *InputRoomEventsResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.InputRoomEvents(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("InputRoomEvents req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) PerformJoin(
|
||||||
|
ctx context.Context,
|
||||||
|
req *PerformJoinRequest,
|
||||||
|
res *PerformJoinResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.PerformJoin(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("PerformJoin req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) PerformLeave(
|
||||||
|
ctx context.Context,
|
||||||
|
req *PerformLeaveRequest,
|
||||||
|
res *PerformLeaveResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.PerformLeave(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("PerformLeave req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryLatestEventsAndState(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryLatestEventsAndStateRequest,
|
||||||
|
res *QueryLatestEventsAndStateResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryLatestEventsAndState(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryLatestEventsAndState req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryStateAfterEvents(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryStateAfterEventsRequest,
|
||||||
|
res *QueryStateAfterEventsResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryStateAfterEvents(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryStateAfterEvents req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryEventsByID(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryEventsByIDRequest,
|
||||||
|
res *QueryEventsByIDResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryEventsByID(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryEventsByID req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryMembershipForUser(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryMembershipForUserRequest,
|
||||||
|
res *QueryMembershipForUserResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryMembershipForUser(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryMembershipForUser req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryMembershipsForRoom(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryMembershipsForRoomRequest,
|
||||||
|
res *QueryMembershipsForRoomResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryMembershipsForRoom(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryMembershipsForRoom req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryServerAllowedToSeeEvent(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryServerAllowedToSeeEventRequest,
|
||||||
|
res *QueryServerAllowedToSeeEventResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryServerAllowedToSeeEvent(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryServerAllowedToSeeEvent req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryMissingEvents(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryMissingEventsRequest,
|
||||||
|
res *QueryMissingEventsResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryMissingEvents(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryMissingEvents req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryStateAndAuthChain(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryStateAndAuthChainRequest,
|
||||||
|
res *QueryStateAndAuthChainResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryStateAndAuthChain(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryStateAndAuthChain req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) PerformBackfill(
|
||||||
|
ctx context.Context,
|
||||||
|
req *PerformBackfillRequest,
|
||||||
|
res *PerformBackfillResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.PerformBackfill(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("PerformBackfill req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryRoomVersionCapabilities(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryRoomVersionCapabilitiesRequest,
|
||||||
|
res *QueryRoomVersionCapabilitiesResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryRoomVersionCapabilities(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryRoomVersionCapabilities req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) QueryRoomVersionForRoom(
|
||||||
|
ctx context.Context,
|
||||||
|
req *QueryRoomVersionForRoomRequest,
|
||||||
|
res *QueryRoomVersionForRoomResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.QueryRoomVersionForRoom(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("QueryRoomVersionForRoom req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) SetRoomAlias(
|
||||||
|
ctx context.Context,
|
||||||
|
req *SetRoomAliasRequest,
|
||||||
|
res *SetRoomAliasResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.SetRoomAlias(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("SetRoomAlias req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) GetRoomIDForAlias(
|
||||||
|
ctx context.Context,
|
||||||
|
req *GetRoomIDForAliasRequest,
|
||||||
|
res *GetRoomIDForAliasResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.GetRoomIDForAlias(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("GetRoomIDForAlias req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) GetAliasesForRoomID(
|
||||||
|
ctx context.Context,
|
||||||
|
req *GetAliasesForRoomIDRequest,
|
||||||
|
res *GetAliasesForRoomIDResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.GetAliasesForRoomID(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("GetAliasesForRoomID req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) GetCreatorIDForAlias(
|
||||||
|
ctx context.Context,
|
||||||
|
req *GetCreatorIDForAliasRequest,
|
||||||
|
res *GetCreatorIDForAliasResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.GetCreatorIDForAlias(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("GetCreatorIDForAlias req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) RemoveRoomAlias(
|
||||||
|
ctx context.Context,
|
||||||
|
req *RemoveRoomAliasRequest,
|
||||||
|
res *RemoveRoomAliasResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.RemoveRoomAlias(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("RemoveRoomAlias req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func js(thing interface{}) string {
|
||||||
|
b, err := json.Marshal(thing)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Sprintf("Marshal error:%s", err)
|
||||||
|
}
|
||||||
|
return string(b)
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
|
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
)
|
)
|
||||||
|
@ -40,6 +41,21 @@ func (r *RoomserverInternalAPI) WriteOutputEvents(roomID string, updates []api.O
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logger := log.WithFields(log.Fields{
|
||||||
|
"room_id": roomID,
|
||||||
|
"type": updates[i].Type,
|
||||||
|
})
|
||||||
|
if updates[i].NewRoomEvent != nil {
|
||||||
|
logger = logger.WithFields(log.Fields{
|
||||||
|
"event_type": updates[i].NewRoomEvent.Event.Type(),
|
||||||
|
"event_id": updates[i].NewRoomEvent.Event.EventID(),
|
||||||
|
"adds_state": len(updates[i].NewRoomEvent.AddsStateEventIDs),
|
||||||
|
"removes_state": len(updates[i].NewRoomEvent.RemovesStateEventIDs),
|
||||||
|
"send_as_server": updates[i].NewRoomEvent.SendAsServer,
|
||||||
|
"sender": updates[i].NewRoomEvent.Event.Sender(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
logger.Infof("Producing to topic '%s'", r.OutputRoomEventTopic)
|
||||||
messages[i] = &sarama.ProducerMessage{
|
messages[i] = &sarama.ProducerMessage{
|
||||||
Topic: r.OutputRoomEventTopic,
|
Topic: r.OutputRoomEventTopic,
|
||||||
Key: sarama.StringEncoder(roomID),
|
Key: sarama.StringEncoder(roomID),
|
||||||
|
|
|
@ -98,11 +98,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
ctx context.Context, msg api.OutputNewRoomEvent,
|
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||||
) error {
|
) error {
|
||||||
ev := msg.Event
|
ev := msg.Event
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"event_id": ev.EventID(),
|
|
||||||
"room_id": ev.RoomID(),
|
|
||||||
"room_version": ev.RoomVersion,
|
|
||||||
}).Info("received event from roomserver")
|
|
||||||
|
|
||||||
addsStateEvents := msg.AddsState()
|
addsStateEvents := msg.AddsState()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue