Merge pull request #394 from mautrix/sumner/bri-830-add-bridge-and-remote-message-tracking

Use message send checkpoints
This commit is contained in:
Sumner Evans 2021-11-18 08:46:50 -07:00 committed by GitHub
commit eaa52427ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 30 additions and 10 deletions

View file

@ -29,10 +29,11 @@ var ExampleConfig string
type Config struct { type Config struct {
Homeserver struct { Homeserver struct {
Address string `yaml:"address"` Address string `yaml:"address"`
Domain string `yaml:"domain"` Domain string `yaml:"domain"`
Asmux bool `yaml:"asmux"` Asmux bool `yaml:"asmux"`
StatusEndpoint string `yaml:"status_endpoint"` StatusEndpoint string `yaml:"status_endpoint"`
MessageSendCheckpointEndpoint string `yaml:"message_send_checkpoint_endpoint"`
} `yaml:"homeserver"` } `yaml:"homeserver"`
AppService struct { AppService struct {
@ -120,6 +121,7 @@ func (config *Config) MakeAppService() (*appservice.AppService, error) {
as.HomeserverURL = config.Homeserver.Address as.HomeserverURL = config.Homeserver.Address
as.Host.Hostname = config.AppService.Hostname as.Host.Hostname = config.AppService.Hostname
as.Host.Port = config.AppService.Port as.Host.Port = config.AppService.Port
as.MessageSendCheckpointEndpoint = config.Homeserver.MessageSendCheckpointEndpoint
as.DefaultHTTPRetries = 4 as.DefaultHTTPRetries = 4
var err error var err error
as.Registration, err = config.GetRegistration() as.Registration, err = config.GetRegistration()

View file

@ -31,6 +31,7 @@ func (helper *UpgradeHelper) doUpgrade() {
helper.Copy(Str, "homeserver", "domain") helper.Copy(Str, "homeserver", "domain")
helper.Copy(Bool, "homeserver", "asmux") helper.Copy(Bool, "homeserver", "asmux")
helper.Copy(Str|Null, "homeserver", "status_endpoint") helper.Copy(Str|Null, "homeserver", "status_endpoint")
helper.Copy(Str|Null, "homeserver", "message_send_checkpoint_endpoint")
helper.Copy(Str, "appservice", "address") helper.Copy(Str, "appservice", "address")
helper.Copy(Str, "appservice", "hostname") helper.Copy(Str, "appservice", "hostname")

View file

@ -11,6 +11,8 @@ homeserver:
# If set, the bridge will make POST requests to this URL whenever a user's whatsapp connection state changes. # If set, the bridge will make POST requests to this URL whenever a user's whatsapp connection state changes.
# The bridge will use the appservice as_token to authorize requests. # The bridge will use the appservice as_token to authorize requests.
status_endpoint: null status_endpoint: null
# Endpoint for reporting per-message status.
message_send_checkpoint_endpoint: null
# Application service host/registration related details. # Application service host/registration related details.
# Changing these values requires regeneration of the registration. # Changing these values requires regeneration of the registration.

2
go.mod
View file

@ -14,7 +14,7 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
maunium.net/go/mauflag v1.0.0 maunium.net/go/mauflag v1.0.0
maunium.net/go/maulogger/v2 v2.3.1 maunium.net/go/maulogger/v2 v2.3.1
maunium.net/go/mautrix v0.10.2 maunium.net/go/mautrix v0.10.3-0.20211118154012-0649e096bb01
) )
require ( require (

4
go.sum
View file

@ -221,5 +221,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.3.1 h1:fwBYJne0pHvJrrIPHK+TAPfyxxbBEz46oVGez2x0ODE= maunium.net/go/maulogger/v2 v2.3.1 h1:fwBYJne0pHvJrrIPHK+TAPfyxxbBEz46oVGez2x0ODE=
maunium.net/go/maulogger/v2 v2.3.1/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A= maunium.net/go/maulogger/v2 v2.3.1/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/mautrix v0.10.2 h1:JslJrE+6VBP/5h/Zq7Q6SsnLUqrNlg7oI/MMcgJNYIY= maunium.net/go/mautrix v0.10.3-0.20211118154012-0649e096bb01 h1:u2gQndikai5YcUVdqz01ToBJ+Ngu2DLRIlpvXQicjvc=
maunium.net/go/mautrix v0.10.2/go.mod h1:k4Ng5oci83MEbqPL6KOjPdbU7f8v01KlMjR/zTQ+7mA= maunium.net/go/mautrix v0.10.3-0.20211118154012-0649e096bb01/go.mod h1:k4Ng5oci83MEbqPL6KOjPdbU7f8v01KlMjR/zTQ+7mA=

View file

@ -342,16 +342,20 @@ func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) {
mx.log.Debugfln("Got session %s after waiting, trying to decrypt %s again", content.SessionID, evt.ID) mx.log.Debugfln("Got session %s after waiting, trying to decrypt %s again", content.SessionID, evt.ID)
decrypted, err = mx.bridge.Crypto.Decrypt(evt) decrypted, err = mx.bridge.Crypto.Decrypt(evt)
} else { } else {
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, false)
go mx.waitLongerForSession(evt) go mx.waitLongerForSession(evt)
return return
} }
} }
if err != nil { if err != nil {
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true)
mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err) mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err)
_, _ = mx.bridge.Bot.SendNotice(evt.RoomID, fmt.Sprintf( _, _ = mx.bridge.Bot.SendNotice(evt.RoomID, fmt.Sprintf(
"\u26a0 Your message was not bridged: %v", err)) "\u26a0 Your message was not bridged: %v", err))
return return
} }
mx.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted)
mx.bridge.EventProcessor.Dispatch(decrypted) mx.bridge.EventProcessor.Dispatch(decrypted)
} }
@ -375,14 +379,18 @@ func (mx *MatrixHandler) waitLongerForSession(evt *event.Event) {
mx.log.Debugfln("Got session %s after waiting more, trying to decrypt %s again", content.SessionID, evt.ID) mx.log.Debugfln("Got session %s after waiting more, trying to decrypt %s again", content.SessionID, evt.ID)
decrypted, err := mx.bridge.Crypto.Decrypt(evt) decrypted, err := mx.bridge.Crypto.Decrypt(evt)
if err == nil { if err == nil {
mx.as.SendMessageSendCheckpoint(decrypted, appservice.StepDecrypted)
mx.bridge.EventProcessor.Dispatch(decrypted) mx.bridge.EventProcessor.Dispatch(decrypted)
_, _ = mx.bridge.Bot.RedactEvent(evt.RoomID, resp.EventID) _, _ = mx.bridge.Bot.RedactEvent(evt.RoomID, resp.EventID)
return return
} }
mx.log.Warnfln("Failed to decrypt %s: %v", err) mx.log.Warnfln("Failed to decrypt %s: %v", evt.ID, err)
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, err, true)
update.Body = fmt.Sprintf("\u26a0 Your message was not bridged: %v", err) update.Body = fmt.Sprintf("\u26a0 Your message was not bridged: %v", err)
} else { } else {
mx.log.Debugfln("Didn't get %s, giving up on %s", content.SessionID, evt.ID) errMsg := fmt.Sprintf("Didn't get %s, giving up on %s", content.SessionID, evt.ID)
mx.log.Debugfln(errMsg)
mx.as.SendErrorMessageSendCheckpoint(evt, appservice.StepDecrypted, fmt.Errorf(errMsg), true)
update.Body = "\u26a0 Your message was not bridged: the bridge hasn't received the decryption keys. " + update.Body = "\u26a0 Your message was not bridged: the bridge hasn't received the decryption keys. " +
"If this keeps happening, try restarting your client." "If this keeps happening, try restarting your client."
} }
@ -395,7 +403,10 @@ func (mx *MatrixHandler) waitLongerForSession(evt *event.Event) {
EventID: resp.EventID, EventID: resp.EventID,
} }
} }
_, _ = mx.bridge.Bot.SendMessageEvent(evt.RoomID, event.EventMessage, &update) _, err = mx.bridge.Bot.SendMessageEvent(evt.RoomID, event.EventMessage, &update)
if err != nil {
mx.log.Debugfln("Failed to update decryption error notice %s: %v", resp.EventID, err)
}
} }
func (mx *MatrixHandler) HandleMessage(evt *event.Event) { func (mx *MatrixHandler) HandleMessage(evt *event.Event) {

View file

@ -2144,8 +2144,10 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
if err != nil { if err != nil {
portal.log.Errorln("Error sending message: %v", err) portal.log.Errorln("Error sending message: %v", err)
portal.sendErrorMessage(err.Error(), true) portal.sendErrorMessage(err.Error(), true)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, err, true)
} else { } else {
portal.log.Debugfln("Handled Matrix event %s", evt.ID) portal.log.Debugfln("Handled Matrix event %s", evt.ID)
portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote)
portal.sendDeliveryReceipt(evt.ID) portal.sendDeliveryReceipt(evt.ID)
dbMsg.MarkSent(ts) dbMsg.MarkSent(ts)
} }
@ -2179,8 +2181,10 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) {
_, err := sender.Client.RevokeMessage(portal.Key.JID, msg.JID) _, err := sender.Client.RevokeMessage(portal.Key.JID, msg.JID)
if err != nil { if err != nil {
portal.log.Errorfln("Error handling Matrix redaction %s: %v", evt.ID, err) portal.log.Errorfln("Error handling Matrix redaction %s: %v", evt.ID, err)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, err, true)
} else { } else {
portal.log.Debugfln("Handled Matrix redaction %s of %s", evt.ID, evt.Redacts) portal.log.Debugfln("Handled Matrix redaction %s of %s", evt.ID, evt.Redacts)
portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote)
portal.sendDeliveryReceipt(evt.ID) portal.sendDeliveryReceipt(evt.ID)
} }
} }