package consumers import ( "context" "encoding/json" "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/types" uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/util" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) type OutputReadUpdateConsumer struct { ctx context.Context cfg *config.UserAPI jetstream nats.JetStreamContext durable string db storage.Database pgClient pushgateway.Client ServerName gomatrixserverlib.ServerName topic string userAPI uapi.UserInternalAPI syncProducer *producers.SyncAPI } func NewOutputReadUpdateConsumer( process *process.ProcessContext, cfg *config.UserAPI, js nats.JetStreamContext, store storage.Database, pgClient pushgateway.Client, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPI, ) *OutputReadUpdateConsumer { return &OutputReadUpdateConsumer{ ctx: process.Context(), cfg: cfg, jetstream: js, db: store, ServerName: cfg.Matrix.ServerName, durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), pgClient: pgClient, userAPI: userAPI, syncProducer: syncProducer, } } func (s *OutputReadUpdateConsumer) Start() error { if err := jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, nats.DeliverAll(), nats.ManualAck(), ); err != nil { return err } return nil } func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { var read types.ReadUpdate if err := json.Unmarshal(msg.Data, &read); err != nil { log.WithError(err).Error("userapi clientapi consumer: message parse failure") return true } if read.FullyRead == 0 && read.Read == 0 { return true } userID := string(msg.Header.Get(jetstream.UserID)) roomID := string(msg.Header.Get(jetstream.RoomID)) localpart, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { log.WithError(err).Error("userapi clientapi consumer: SplitID failure") return true } if domain != s.ServerName { log.Error("userapi clientapi consumer: not a local user") return true } log := log.WithFields(log.Fields{ "room_id": roomID, "user_id": userID, }) log.Tracef("Received read update from sync API: %#v", read) if read.Read > 0 { updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, int64(read.Read), true) if err != nil { log.WithError(err).Error("userapi EDU consumer") return false } if updated { if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil { log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed") return false } if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed") return false } } } if read.FullyRead > 0 { deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, roomID, int64(read.FullyRead)) if err != nil { log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed") return false } if deleted { if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { log.WithError(err).Error("userapi clientapi consumer: NotifyUserCounts failed") return false } if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, read.RoomID); err != nil { log.WithError(err).Errorf("userapi clientapi consumer: GetAndSendNotificationData failed") return false } } } return true }