mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-14 18:13:51 +01:00
Remove unused UserUpdates producer (#1109)
This commit is contained in:
parent
85ac8a3f5b
commit
98cb0705ea
8 changed files with 5 additions and 93 deletions
|
@ -51,11 +51,6 @@ func AddPublicRoutes(
|
||||||
roomserverProducer := producers.NewRoomserverProducer(rsAPI)
|
roomserverProducer := producers.NewRoomserverProducer(rsAPI)
|
||||||
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
|
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
|
||||||
|
|
||||||
userUpdateProducer := &producers.UserUpdateProducer{
|
|
||||||
Producer: producer,
|
|
||||||
Topic: string(cfg.Kafka.Topics.UserUpdates),
|
|
||||||
}
|
|
||||||
|
|
||||||
syncProducer := &producers.SyncAPIProducer{
|
syncProducer := &producers.SyncAPIProducer{
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||||
|
@ -70,7 +65,7 @@ func AddPublicRoutes(
|
||||||
|
|
||||||
routing.Setup(
|
routing.Setup(
|
||||||
router, cfg, roomserverProducer, rsAPI, asAPI,
|
router, cfg, roomserverProducer, rsAPI, asAPI,
|
||||||
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
|
accountsDB, deviceDB, federation, *keyRing,
|
||||||
syncProducer, eduProducer, transactionsCache, fsAPI,
|
syncProducer, eduProducer, transactionsCache, fsAPI,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,62 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package producers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
)
|
|
||||||
|
|
||||||
// UserUpdateProducer produces events related to user updates.
|
|
||||||
type UserUpdateProducer struct {
|
|
||||||
Topic string
|
|
||||||
Producer sarama.SyncProducer
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Move this struct to `internal` so the components that consume the topic
|
|
||||||
// can use it when parsing incoming messages
|
|
||||||
type profileUpdate struct {
|
|
||||||
Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`)
|
|
||||||
OldValue string `json:"old_value"` // The attribute's value before the update
|
|
||||||
NewValue string `json:"new_value"` // The attribute's value after the update
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendUpdate sends an update using kafka to notify the roomserver of the
|
|
||||||
// profile update. Returns an error if the update failed to send.
|
|
||||||
func (p *UserUpdateProducer) SendUpdate(
|
|
||||||
userID string, updatedAttribute string, oldValue string, newValue string,
|
|
||||||
) error {
|
|
||||||
var update profileUpdate
|
|
||||||
var m sarama.ProducerMessage
|
|
||||||
|
|
||||||
m.Topic = string(p.Topic)
|
|
||||||
m.Key = sarama.StringEncoder(userID)
|
|
||||||
|
|
||||||
update = profileUpdate{
|
|
||||||
Updated: updatedAttribute,
|
|
||||||
OldValue: oldValue,
|
|
||||||
NewValue: newValue,
|
|
||||||
}
|
|
||||||
|
|
||||||
value, err := json.Marshal(update)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m.Value = sarama.ByteEncoder(value)
|
|
||||||
|
|
||||||
_, _, err = p.Producer.SendMessage(&m)
|
|
||||||
return err
|
|
||||||
}
|
|
|
@ -94,7 +94,7 @@ func GetAvatarURL(
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func SetAvatarURL(
|
func SetAvatarURL(
|
||||||
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
|
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
|
||||||
userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite,
|
userID string, cfg *config.Dendrite,
|
||||||
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
|
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
if userID != device.UserID {
|
if userID != device.UserID {
|
||||||
|
@ -104,8 +104,6 @@ func SetAvatarURL(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
changedKey := "avatar_url"
|
|
||||||
|
|
||||||
var r internal.AvatarURL
|
var r internal.AvatarURL
|
||||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||||
return *resErr
|
return *resErr
|
||||||
|
@ -174,11 +172,6 @@ func SetAvatarURL(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := producer.SendUpdate(userID, changedKey, oldProfile.AvatarURL, r.AvatarURL); err != nil {
|
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("producer.SendUpdate failed")
|
|
||||||
return jsonerror.InternalServerError()
|
|
||||||
}
|
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
|
@ -216,7 +209,7 @@ func GetDisplayName(
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func SetDisplayName(
|
func SetDisplayName(
|
||||||
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
|
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
|
||||||
userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite,
|
userID string, cfg *config.Dendrite,
|
||||||
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
|
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
if userID != device.UserID {
|
if userID != device.UserID {
|
||||||
|
@ -226,8 +219,6 @@ func SetDisplayName(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
changedKey := "displayname"
|
|
||||||
|
|
||||||
var r internal.DisplayName
|
var r internal.DisplayName
|
||||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||||
return *resErr
|
return *resErr
|
||||||
|
@ -296,11 +287,6 @@ func SetDisplayName(
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := producer.SendUpdate(userID, changedKey, oldProfile.DisplayName, r.DisplayName); err != nil {
|
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("producer.SendUpdate failed")
|
|
||||||
return jsonerror.InternalServerError()
|
|
||||||
}
|
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
|
|
|
@ -55,7 +55,6 @@ func Setup(
|
||||||
deviceDB devices.Database,
|
deviceDB devices.Database,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
keyRing gomatrixserverlib.KeyRing,
|
keyRing gomatrixserverlib.KeyRing,
|
||||||
userUpdateProducer *producers.UserUpdateProducer,
|
|
||||||
syncProducer *producers.SyncAPIProducer,
|
syncProducer *producers.SyncAPIProducer,
|
||||||
eduProducer *producers.EDUServerProducer,
|
eduProducer *producers.EDUServerProducer,
|
||||||
transactionsCache *transactions.Cache,
|
transactionsCache *transactions.Cache,
|
||||||
|
@ -387,7 +386,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, cfg, producer, rsAPI)
|
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||||
|
@ -409,7 +408,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, cfg, producer, rsAPI)
|
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||||
|
|
|
@ -109,7 +109,6 @@ func main() {
|
||||||
cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
|
cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
|
||||||
cfg.Kafka.Topics.OutputClientData = "clientapiOutput"
|
cfg.Kafka.Topics.OutputClientData = "clientapiOutput"
|
||||||
cfg.Kafka.Topics.OutputTypingEvent = "typingServerOutput"
|
cfg.Kafka.Topics.OutputTypingEvent = "typingServerOutput"
|
||||||
cfg.Kafka.Topics.UserUpdates = "userUpdates"
|
|
||||||
cfg.Database.Account = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
|
cfg.Database.Account = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
|
||||||
cfg.Database.Device = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName))
|
cfg.Database.Device = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName))
|
||||||
cfg.Database.MediaAPI = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
|
cfg.Database.MediaAPI = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
|
||||||
|
|
|
@ -169,7 +169,6 @@ func main() {
|
||||||
cfg.Database.RoomServer = "file:/idb/dendritejs_roomserver.db"
|
cfg.Database.RoomServer = "file:/idb/dendritejs_roomserver.db"
|
||||||
cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db"
|
cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db"
|
||||||
cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db"
|
cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db"
|
||||||
cfg.Kafka.Topics.UserUpdates = "user_updates"
|
|
||||||
cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event"
|
cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event"
|
||||||
cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event"
|
cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event"
|
||||||
cfg.Kafka.Topics.OutputClientData = "output_client_data"
|
cfg.Kafka.Topics.OutputClientData = "output_client_data"
|
||||||
|
|
|
@ -154,8 +154,6 @@ type Dendrite struct {
|
||||||
OutputTypingEvent Topic `yaml:"output_typing_event"`
|
OutputTypingEvent Topic `yaml:"output_typing_event"`
|
||||||
// Topic for eduserver/api.OutputSendToDeviceEvent events.
|
// Topic for eduserver/api.OutputSendToDeviceEvent events.
|
||||||
OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"`
|
OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"`
|
||||||
// Topic for user updates (profile, presence)
|
|
||||||
UserUpdates Topic `yaml:"user_updates"`
|
|
||||||
}
|
}
|
||||||
} `yaml:"kafka"`
|
} `yaml:"kafka"`
|
||||||
|
|
||||||
|
@ -591,7 +589,6 @@ func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) {
|
||||||
checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
||||||
checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
||||||
checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent))
|
checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent))
|
||||||
checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkDatabase verifies the parameters database.* are valid.
|
// checkDatabase verifies the parameters database.* are valid.
|
||||||
|
|
|
@ -84,7 +84,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||||
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
||||||
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
||||||
cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output"
|
cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output"
|
||||||
cfg.Kafka.Topics.UserUpdates = "test.user.output"
|
|
||||||
|
|
||||||
// TODO: Use different databases for the different schemas.
|
// TODO: Use different databases for the different schemas.
|
||||||
// Using the same database for every schema currently works because
|
// Using the same database for every schema currently works because
|
||||||
|
|
Loading…
Reference in a new issue