mirror of
https://github.com/matrix-org/dendrite
synced 2024-11-12 04:41:11 +01:00
Guard in all key consumers
This commit is contained in:
parent
ddbef7c3ff
commit
a9e715b5c5
3 changed files with 15 additions and 6 deletions
|
@ -84,6 +84,11 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
|
||||||
|
// This probably shouldn't happen but stops us from panicking if we come
|
||||||
|
// across an update that doesn't satisfy either types.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
switch m.Type {
|
switch m.Type {
|
||||||
case api.TypeCrossSigningUpdate:
|
case api.TypeCrossSigningUpdate:
|
||||||
return t.onCrossSigningMessage(m)
|
return t.onCrossSigningMessage(m)
|
||||||
|
@ -95,12 +100,6 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
|
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
|
||||||
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
|
|
||||||
// This probably shouldn't happen but stops us from panicking if we come
|
|
||||||
// across an update that doesn't satisfy either types.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
logger := logrus.WithField("user_id", m.UserID)
|
logger := logrus.WithField("user_id", m.UserID)
|
||||||
|
|
||||||
// only send key change events which originated from us
|
// only send key change events which originated from us
|
||||||
|
|
|
@ -77,6 +77,11 @@ func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMess
|
||||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if m.OutputCrossSigningKeyUpdate == nil {
|
||||||
|
// This probably shouldn't happen but stops us from panicking if we come
|
||||||
|
// across an update that doesn't satisfy either types.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
switch m.Type {
|
switch m.Type {
|
||||||
case api.TypeCrossSigningUpdate:
|
case api.TypeCrossSigningUpdate:
|
||||||
return t.onCrossSigningMessage(m)
|
return t.onCrossSigningMessage(m)
|
||||||
|
|
|
@ -109,6 +109,11 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
||||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
|
||||||
|
// This probably shouldn't happen but stops us from panicking if we come
|
||||||
|
// across an update that doesn't satisfy either types.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
switch m.Type {
|
switch m.Type {
|
||||||
case api.TypeCrossSigningUpdate:
|
case api.TypeCrossSigningUpdate:
|
||||||
return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)
|
return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)
|
||||||
|
|
Loading…
Reference in a new issue