Move bridge state stuff to mautrix-go

This commit is contained in:
Tulir Asokan 2022-05-31 00:27:43 +03:00
parent 49a445e10d
commit 79fb0d49b3
8 changed files with 94 additions and 262 deletions

View file

@ -17,211 +17,49 @@
package main package main
import ( import (
"bytes"
"context"
"encoding/json"
"fmt" "fmt"
"io"
"net/http" "net/http"
"runtime/debug"
"time"
"maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/id" "maunium.net/go/mautrix/id"
) )
type BridgeStateEvent string
const ( const (
StateUnconfigured BridgeStateEvent = "UNCONFIGURED" WALoggedOut bridge.StateErrorCode = "wa-logged-out"
StateRunning BridgeStateEvent = "RUNNING" WAMainDeviceGone bridge.StateErrorCode = "wa-main-device-gone"
StateConnecting BridgeStateEvent = "CONNECTING" WAUnknownLogout bridge.StateErrorCode = "wa-unknown-logout"
StateBackfilling BridgeStateEvent = "BACKFILLING" WANotConnected bridge.StateErrorCode = "wa-not-connected"
StateConnected BridgeStateEvent = "CONNECTED" WAConnecting bridge.StateErrorCode = "wa-connecting"
StateTransientDisconnect BridgeStateEvent = "TRANSIENT_DISCONNECT" WAKeepaliveTimeout bridge.StateErrorCode = "wa-keepalive-timeout"
StateBadCredentials BridgeStateEvent = "BAD_CREDENTIALS" WAPhoneOffline bridge.StateErrorCode = "wa-phone-offline"
StateUnknownError BridgeStateEvent = "UNKNOWN_ERROR" WAConnectionFailed bridge.StateErrorCode = "wa-connection-failed"
StateLoggedOut BridgeStateEvent = "LOGGED_OUT"
) )
type BridgeErrorCode string func init() {
bridge.StateHumanErrors.Update(bridge.StateErrorMap{
const ( WALoggedOut: "You were logged out from another device. Relogin to continue using the bridge.",
WALoggedOut BridgeErrorCode = "wa-logged-out" WAMainDeviceGone: "Your phone was logged out from WhatsApp. Relogin to continue using the bridge.",
WAMainDeviceGone BridgeErrorCode = "wa-main-device-gone" WAUnknownLogout: "You were logged out for an unknown reason. Relogin to continue using the bridge.",
WAUnknownLogout BridgeErrorCode = "wa-unknown-logout" WANotConnected: "You're not connected to WhatsApp",
WANotConnected BridgeErrorCode = "wa-not-connected" WAConnecting: "Reconnecting to WhatsApp...",
WAConnecting BridgeErrorCode = "wa-connecting" WAKeepaliveTimeout: "The WhatsApp web servers are not responding. The bridge will try to reconnect.",
WAKeepaliveTimeout BridgeErrorCode = "wa-keepalive-timeout" WAPhoneOffline: "Your phone hasn't been seen in over 12 days. The bridge is currently connected, but will get disconnected if you don't open the app soon.",
WAPhoneOffline BridgeErrorCode = "wa-phone-offline" WAConnectionFailed: "Connecting to the WhatsApp web servers failed.",
WAConnectionFailed BridgeErrorCode = "wa-connection-failed" })
)
var bridgeHumanErrors = map[BridgeErrorCode]string{
WALoggedOut: "You were logged out from another device. Relogin to continue using the bridge.",
WAMainDeviceGone: "Your phone was logged out from WhatsApp. Relogin to continue using the bridge.",
WAUnknownLogout: "You were logged out for an unknown reason. Relogin to continue using the bridge.",
WANotConnected: "You're not connected to WhatsApp",
WAConnecting: "Reconnecting to WhatsApp...",
WAKeepaliveTimeout: "The WhatsApp web servers are not responding. The bridge will try to reconnect.",
WAPhoneOffline: "Your phone hasn't been seen in over 12 days. The bridge is currently connected, but will get disconnected if you don't open the app soon.",
WAConnectionFailed: "Connecting to the WhatsApp web servers failed.",
} }
type BridgeState struct { func (user *User) GetRemoteID() string {
StateEvent BridgeStateEvent `json:"state_event"` if user == nil || user.JID.IsEmpty() {
Timestamp int64 `json:"timestamp"` return ""
TTL int `json:"ttl"` }
return fmt.Sprintf("%s_a%d_d%d", user.JID.User, user.JID.Agent, user.JID.Device)
Source string `json:"source,omitempty"`
Error BridgeErrorCode `json:"error,omitempty"`
Message string `json:"message,omitempty"`
UserID id.UserID `json:"user_id,omitempty"`
RemoteID string `json:"remote_id,omitempty"`
RemoteName string `json:"remote_name,omitempty"`
Reason string `json:"reason,omitempty"`
Info map[string]interface{} `json:"info,omitempty"`
} }
type GlobalBridgeState struct { func (user *User) GetRemoteName() string {
RemoteStates map[string]BridgeState `json:"remoteState"` if user == nil || user.JID.IsEmpty() {
BridgeState BridgeState `json:"bridgeState"` return ""
}
func (pong BridgeState) fill(user *User) BridgeState {
if user != nil {
pong.UserID = user.MXID
pong.RemoteID = fmt.Sprintf("%s_a%d_d%d", user.JID.User, user.JID.Agent, user.JID.Device)
pong.RemoteName = fmt.Sprintf("+%s", user.JID.User)
} }
return fmt.Sprintf("+%s", user.JID.User)
pong.Timestamp = time.Now().Unix()
pong.Source = "bridge"
if len(pong.Error) > 0 {
pong.TTL = 60
pong.Message = bridgeHumanErrors[pong.Error]
} else {
pong.TTL = 240
}
return pong
}
func (pong *BridgeState) shouldDeduplicate(newPong *BridgeState) bool {
if pong == nil || pong.StateEvent != newPong.StateEvent || pong.Error != newPong.Error {
return false
}
return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix()
}
func (br *WABridge) sendBridgeState(ctx context.Context, state *BridgeState) error {
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(&state); err != nil {
return fmt.Errorf("failed to encode bridge state JSON: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, br.Config.Homeserver.StatusEndpoint, &body)
if err != nil {
return fmt.Errorf("failed to prepare request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+br.Config.AppService.ASToken)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
respBody, _ := io.ReadAll(resp.Body)
if respBody != nil {
respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
}
return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody)
}
return nil
}
func (br *WABridge) sendGlobalBridgeState(state BridgeState) {
if len(br.Config.Homeserver.StatusEndpoint) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := br.sendBridgeState(ctx, &state); err != nil {
br.Log.Warnln("Failed to update global bridge state:", err)
} else {
br.Log.Debugfln("Sent new global bridge state %+v", state)
}
}
func (user *User) bridgeStateLoop() {
defer func() {
err := recover()
if err != nil {
user.log.Errorfln("Bridge state loop panicked: %v\n%s", err, debug.Stack())
}
}()
for state := range user.bridgeStateQueue {
user.immediateSendBridgeState(state)
}
}
func (user *User) immediateSendBridgeState(state BridgeState) {
retryIn := 2
for {
if user.prevBridgeStatus != nil && user.prevBridgeStatus.shouldDeduplicate(&state) {
user.log.Debugfln("Not sending bridge state %s as it's a duplicate", state.StateEvent)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := user.bridge.sendBridgeState(ctx, &state)
cancel()
if err != nil {
user.log.Warnfln("Failed to update bridge state: %v, retrying in %d seconds", err, retryIn)
time.Sleep(time.Duration(retryIn) * time.Second)
retryIn *= 2
if retryIn > 64 {
retryIn = 64
}
} else {
user.prevBridgeStatus = &state
user.log.Debugfln("Sent new bridge state %+v", state)
return
}
}
}
func (user *User) sendBridgeState(state BridgeState) {
if len(user.bridge.Config.Homeserver.StatusEndpoint) == 0 {
return
}
state = state.fill(user)
if len(user.bridgeStateQueue) >= 8 {
user.log.Warnln("Bridge state queue is nearly full, discarding an item")
select {
case <-user.bridgeStateQueue:
default:
}
}
select {
case user.bridgeStateQueue <- state:
default:
user.log.Errorfln("Bridge state queue is full, dropped new state")
}
}
func (user *User) GetPrevBridgeState() BridgeState {
if user.prevBridgeStatus != nil {
return *user.prevBridgeStatus
}
return BridgeState{}
} }
func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Request) { func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Request) {
@ -230,32 +68,32 @@ func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Requ
} }
userID := r.URL.Query().Get("user_id") userID := r.URL.Query().Get("user_id")
user := prov.bridge.GetUserByMXID(id.UserID(userID)) user := prov.bridge.GetUserByMXID(id.UserID(userID))
var global BridgeState var global bridge.State
global.StateEvent = StateRunning global.StateEvent = bridge.StateRunning
var remote BridgeState var remote bridge.State
if user.IsConnected() { if user.IsConnected() {
if user.Client.IsLoggedIn() { if user.Client.IsLoggedIn() {
remote.StateEvent = StateConnected remote.StateEvent = bridge.StateConnected
} else if user.Session != nil { } else if user.Session != nil {
remote.StateEvent = StateConnecting remote.StateEvent = bridge.StateConnecting
remote.Error = WAConnecting remote.Error = WAConnecting
} // else: unconfigured } // else: unconfigured
} else if user.Session != nil { } else if user.Session != nil {
remote.StateEvent = StateBadCredentials remote.StateEvent = bridge.StateBadCredentials
remote.Error = WANotConnected remote.Error = WANotConnected
} // else: unconfigured } // else: unconfigured
global = global.fill(nil) global = global.Fill(nil)
resp := GlobalBridgeState{ resp := bridge.GlobalState{
BridgeState: global, BridgeState: global,
RemoteStates: map[string]BridgeState{}, RemoteStates: map[string]bridge.State{},
} }
if len(remote.StateEvent) > 0 { if len(remote.StateEvent) > 0 {
remote = remote.fill(user) remote = remote.Fill(user)
resp.RemoteStates[remote.RemoteID] = remote resp.RemoteStates[remote.RemoteID] = remote
} }
user.log.Debugfln("Responding bridge state in bridge status endpoint: %+v", resp) user.log.Debugfln("Responding bridge state in bridge status endpoint: %+v", resp)
jsonResponse(w, http.StatusOK, &resp) jsonResponse(w, http.StatusOK, &resp)
if len(resp.RemoteStates) > 0 { if len(resp.RemoteStates) > 0 {
user.prevBridgeStatus = &remote user.BridgeState.SetPrev(remote)
} }
} }

View file

@ -518,7 +518,7 @@ func fnLogout(ce *WrappedCommandEvent) {
return return
} }
ce.User.Session = nil ce.User.Session = nil
ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) ce.User.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut})
ce.User.DeleteConnection() ce.User.DeleteConnection()
ce.User.DeleteSession() ce.User.DeleteSession()
ce.Reply("Logged out successfully.") ce.Reply("Logged out successfully.")
@ -575,7 +575,7 @@ func fnDeleteSession(ce *WrappedCommandEvent) {
ce.Reply("Nothing to purge: no session information stored and no active connection.") ce.Reply("Nothing to purge: no session information stored and no active connection.")
return return
} }
ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) ce.User.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut})
ce.User.DeleteConnection() ce.User.DeleteConnection()
ce.User.DeleteSession() ce.User.DeleteSession()
ce.Reply("Session information purged") ce.Reply("Session information purged")
@ -600,7 +600,7 @@ func fnReconnect(ce *WrappedCommandEvent) {
} }
} else { } else {
ce.User.DeleteConnection() ce.User.DeleteConnection()
ce.User.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WANotConnected}) ce.User.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WANotConnected})
ce.User.Connect() ce.User.Connect()
ce.Reply("Restarted connection to WhatsApp") ce.Reply("Restarted connection to WhatsApp")
} }
@ -622,7 +622,7 @@ func fnDisconnect(ce *WrappedCommandEvent) {
} }
ce.User.DeleteConnection() ce.User.DeleteConnection()
ce.Reply("Successfully disconnected. Use the `reconnect` command to reconnect.") ce.Reply("Successfully disconnected. Use the `reconnect` command to reconnect.")
ce.User.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected}) ce.User.BridgeState.Send(bridge.State{StateEvent: bridge.StateBadCredentials, Error: WANotConnected})
} }
var cmdPing = &commands.FullHandler{ var cmdPing = &commands.FullHandler{

2
go.mod
View file

@ -15,7 +15,7 @@ require (
golang.org/x/net v0.0.0-20220513224357-95641704303c golang.org/x/net v0.0.0-20220513224357-95641704303c
google.golang.org/protobuf v1.28.0 google.golang.org/protobuf v1.28.0
maunium.net/go/maulogger/v2 v2.3.2 maunium.net/go/maulogger/v2 v2.3.2
maunium.net/go/mautrix v0.11.1-0.20220529123139-5bc36b2978c1 maunium.net/go/mautrix v0.11.1-0.20220530212627-b15517460fdb
) )
require ( require (

4
go.sum
View file

@ -107,5 +107,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.3.2 h1:1XmIYmMd3PoQfp9J+PaHhpt80zpfmMqaShzUTC7FwY0= maunium.net/go/maulogger/v2 v2.3.2 h1:1XmIYmMd3PoQfp9J+PaHhpt80zpfmMqaShzUTC7FwY0=
maunium.net/go/maulogger/v2 v2.3.2/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A= maunium.net/go/maulogger/v2 v2.3.2/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/mautrix v0.11.1-0.20220529123139-5bc36b2978c1 h1:HNntVQh0XVyWDAsSQA/0Rk2++1cGOzmm7tH8xILSsak= maunium.net/go/mautrix v0.11.1-0.20220530212627-b15517460fdb h1:MTY4bW0yhg8bHszMSNymTrHvIRdiDjcnQKC8qSbt5BE=
maunium.net/go/mautrix v0.11.1-0.20220529123139-5bc36b2978c1/go.mod h1:CiKpMhAx5QZFHK03jpWb0iKI3sGU8x6+LfsOjDrcO8I= maunium.net/go/mautrix v0.11.1-0.20220530212627-b15517460fdb/go.mod h1:CiKpMhAx5QZFHK03jpWb0iKI3sGU8x6+LfsOjDrcO8I=

View file

@ -215,7 +215,7 @@ func (br *WABridge) StartUsers() {
go user.Connect() go user.Connect()
} }
if !foundAnySessions { if !foundAnySessions {
br.sendGlobalBridgeState(BridgeState{StateEvent: StateUnconfigured}.fill(nil)) br.SendGlobalBridgeState(bridge.State{StateEvent: bridge.StateUnconfigured}.Fill(nil))
} }
br.Log.Debugln("Starting custom puppets") br.Log.Debugln("Starting custom puppets")
for _, loopuppet := range br.GetAllPuppetsWithCustomMXID() { for _, loopuppet := range br.GetAllPuppetsWithCustomMXID() {

View file

@ -40,13 +40,13 @@ import (
"golang.org/x/image/draw" "golang.org/x/image/draw"
"golang.org/x/image/webp" "golang.org/x/image/webp"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"maunium.net/go/mautrix/bridge/bridgeconfig"
log "maunium.net/go/maulogger/v2" log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix" "maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge" "maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/bridge/bridgeconfig"
"maunium.net/go/mautrix/crypto/attachment" "maunium.net/go/mautrix/crypto/attachment"
"maunium.net/go/mautrix/event" "maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/format" "maunium.net/go/mautrix/format"
@ -2911,16 +2911,14 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
if err != nil { if err != nil {
portal.log.Errorfln("Error sending message: %v", err) portal.log.Errorfln("Error sending message: %v", err)
portal.sendErrorMessage(err.Error(), true) portal.sendErrorMessage(err.Error(), true)
status := appservice.StatusPermFailure status := bridge.MsgStatusPermFailure
if errors.Is(err, whatsmeow.ErrBroadcastListUnsupported) { if errors.Is(err, whatsmeow.ErrBroadcastListUnsupported) {
status = appservice.StatusUnsupported status = bridge.MsgStatusUnsupported
} }
checkpoint := appservice.NewMessageSendCheckpoint(evt, appservice.StepRemote, status, 0) portal.bridge.SendMessageCheckpoint(evt, bridge.MsgStepRemote, err, status, 0)
checkpoint.Info = err.Error()
go checkpoint.Send(portal.bridge.AS)
} else { } else {
portal.log.Debugfln("Handled Matrix event %s", evt.ID) portal.log.Debugfln("Handled Matrix event %s", evt.ID)
portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote, 0) portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0)
portal.sendDeliveryReceipt(evt.ID) portal.sendDeliveryReceipt(evt.ID)
dbMsg.MarkSent(ts) dbMsg.MarkSent(ts)
} }
@ -2946,10 +2944,10 @@ func (portal *Portal) HandleMatrixReaction(sender *User, evt *event.Event) {
err := portal.handleMatrixReaction(sender, evt) err := portal.handleMatrixReaction(sender, evt)
if err != nil { if err != nil {
portal.log.Errorfln("Error sending reaction %s: %v", evt.ID, err) portal.log.Errorfln("Error sending reaction %s: %v", evt.ID, err)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, err, true, 0) portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, err, true, 0)
} else { } else {
portal.log.Debugfln("Handled Matrix reaction %s", evt.ID) portal.log.Debugfln("Handled Matrix reaction %s", evt.ID)
portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote, 0) portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0)
portal.sendDeliveryReceipt(evt.ID) portal.sendDeliveryReceipt(evt.ID)
} }
} }
@ -3038,15 +3036,15 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) {
msg := portal.bridge.DB.Message.GetByMXID(evt.Redacts) msg := portal.bridge.DB.Message.GetByMXID(evt.Redacts)
if msg == nil { if msg == nil {
portal.log.Debugfln("Ignoring redaction %s of unknown event by %s", evt.ID, senderLogIdentifier) portal.log.Debugfln("Ignoring redaction %s of unknown event by %s", evt.ID, senderLogIdentifier)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("target not found"), true, 0) portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("target not found"), true, 0)
return return
} else if msg.IsFakeJID() { } else if msg.IsFakeJID() {
portal.log.Debugfln("Ignoring redaction %s of fake event by %s", evt.ID, senderLogIdentifier) portal.log.Debugfln("Ignoring redaction %s of fake event by %s", evt.ID, senderLogIdentifier)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("target is a fake event"), true, 0) portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("target is a fake event"), true, 0)
return return
} else if msg.Sender.User != sender.JID.User { } else if msg.Sender.User != sender.JID.User {
portal.log.Debugfln("Ignoring redaction %s of %s/%s by %s: message was sent by someone else (%s, not %s)", evt.ID, msg.MXID, msg.JID, senderLogIdentifier, msg.Sender, sender.JID) portal.log.Debugfln("Ignoring redaction %s of %s/%s by %s: message was sent by someone else (%s, not %s)", evt.ID, msg.MXID, msg.JID, senderLogIdentifier, msg.Sender, sender.JID)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("message was sent by someone else"), true, 0) portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("message was sent by someone else"), true, 0)
return return
} }
@ -3054,11 +3052,11 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) {
if msg.Type == database.MsgReaction { if msg.Type == database.MsgReaction {
if reaction := portal.bridge.DB.Reaction.GetByMXID(evt.Redacts); reaction == nil { if reaction := portal.bridge.DB.Reaction.GetByMXID(evt.Redacts); reaction == nil {
portal.log.Debugfln("Ignoring redaction of reaction %s: reaction database entry not found", evt.ID) portal.log.Debugfln("Ignoring redaction of reaction %s: reaction database entry not found", evt.ID)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("reaction database entry not found"), true, 0) portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("reaction database entry not found"), true, 0)
return return
} else if reactionTarget := reaction.GetTarget(); reactionTarget == nil { } else if reactionTarget := reaction.GetTarget(); reactionTarget == nil {
portal.log.Debugfln("Ignoring redaction of reaction %s: reaction target message not found", evt.ID) portal.log.Debugfln("Ignoring redaction of reaction %s: reaction target message not found", evt.ID)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("reaction target message not found"), true, 0) portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("reaction target message not found"), true, 0)
return return
} else { } else {
portal.log.Debugfln("Sending redaction reaction %s of %s/%s to WhatsApp", evt.ID, msg.MXID, msg.JID) portal.log.Debugfln("Sending redaction reaction %s of %s/%s to WhatsApp", evt.ID, msg.MXID, msg.JID)
@ -3070,10 +3068,10 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) {
} }
if err != nil { if err != nil {
portal.log.Errorfln("Error handling Matrix redaction %s: %v", evt.ID, err) portal.log.Errorfln("Error handling Matrix redaction %s: %v", evt.ID, err)
portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, err, true, 0) portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, err, true, 0)
} else { } else {
portal.log.Debugfln("Handled Matrix redaction %s of %s", evt.ID, evt.Redacts) portal.log.Debugfln("Handled Matrix redaction %s of %s", evt.ID, evt.Redacts)
portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote, 0) portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0)
portal.sendDeliveryReceipt(evt.ID) portal.sendDeliveryReceipt(evt.ID)
} }
} }

View file

@ -39,6 +39,7 @@ import (
log "maunium.net/go/maulogger/v2" log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/id" "maunium.net/go/mautrix/id"
) )
@ -149,7 +150,7 @@ func (prov *ProvisioningAPI) DeleteSession(w http.ResponseWriter, r *http.Reques
user.DeleteConnection() user.DeleteConnection()
user.DeleteSession() user.DeleteSession()
jsonResponse(w, http.StatusOK, Response{true, "Session information purged"}) jsonResponse(w, http.StatusOK, Response{true, "Session information purged"})
user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) user.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut})
} }
func (prov *ProvisioningAPI) Disconnect(w http.ResponseWriter, r *http.Request) { func (prov *ProvisioningAPI) Disconnect(w http.ResponseWriter, r *http.Request) {
@ -163,7 +164,7 @@ func (prov *ProvisioningAPI) Disconnect(w http.ResponseWriter, r *http.Request)
} }
user.DeleteConnection() user.DeleteConnection()
jsonResponse(w, http.StatusOK, Response{true, "Disconnected from WhatsApp"}) jsonResponse(w, http.StatusOK, Response{true, "Disconnected from WhatsApp"})
user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected}) user.BridgeState.Send(bridge.State{StateEvent: bridge.StateBadCredentials, Error: WANotConnected})
} }
func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) { func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) {
@ -180,7 +181,7 @@ func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) {
} }
} else { } else {
user.DeleteConnection() user.DeleteConnection()
user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WANotConnected}) user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WANotConnected})
user.Connect() user.Connect()
jsonResponse(w, http.StatusAccepted, Response{true, "Restarted connection to WhatsApp"}) jsonResponse(w, http.StatusAccepted, Response{true, "Restarted connection to WhatsApp"})
} }
@ -575,7 +576,7 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) {
} }
user.bridge.Metrics.TrackConnectionState(user.JID, false) user.bridge.Metrics.TrackConnectionState(user.JID, false)
user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) user.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut})
user.DeleteSession() user.DeleteSession()
jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."}) jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."})
} }

59
user.go
View file

@ -32,11 +32,11 @@ import (
"time" "time"
log "maunium.net/go/maulogger/v2" log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/bridge/bridgeconfig"
"maunium.net/go/mautrix" "maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge" "maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/bridge/bridgeconfig"
"maunium.net/go/mautrix/event" "maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/format" "maunium.net/go/mautrix/format"
"maunium.net/go/mautrix/id" "maunium.net/go/mautrix/id"
@ -70,9 +70,8 @@ type User struct {
spaceCreateLock sync.Mutex spaceCreateLock sync.Mutex
connLock sync.Mutex connLock sync.Mutex
historySyncs chan *events.HistorySync historySyncs chan *events.HistorySync
prevBridgeStatus *BridgeState lastPresence types.Presence
lastPresence types.Presence
historySyncLoopsStarted bool historySyncLoopsStarted bool
spaceMembershipChecked bool spaceMembershipChecked bool
@ -82,9 +81,8 @@ type User struct {
groupListCacheLock sync.Mutex groupListCacheLock sync.Mutex
groupListCacheTime time.Time groupListCacheTime time.Time
bridgeStateQueue chan BridgeState
BackfillQueue *BackfillQueue BackfillQueue *BackfillQueue
BridgeState *bridge.BridgeStateQueue
} }
func (br *WABridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User { func (br *WABridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
@ -153,7 +151,7 @@ func (user *User) addToJIDMap() {
user.bridge.usersLock.Unlock() user.bridge.usersLock.Unlock()
} }
func (user *User) removeFromJIDMap(state BridgeState) { func (user *User) removeFromJIDMap(state bridge.State) {
user.bridge.usersLock.Lock() user.bridge.usersLock.Lock()
jidUser, ok := user.bridge.usersByUsername[user.JID.User] jidUser, ok := user.bridge.usersByUsername[user.JID.User]
if ok && user == jidUser { if ok && user == jidUser {
@ -161,7 +159,7 @@ func (user *User) removeFromJIDMap(state BridgeState) {
} }
user.bridge.usersLock.Unlock() user.bridge.usersLock.Unlock()
user.bridge.Metrics.TrackLoginState(user.JID, false) user.bridge.Metrics.TrackLoginState(user.JID, false)
user.sendBridgeState(state) user.BridgeState.Send(state)
} }
func (br *WABridge) GetAllUsers() []*User { func (br *WABridge) GetAllUsers() []*User {
@ -224,10 +222,7 @@ func (br *WABridge) NewUser(dbUser *database.User) *User {
user.RelayWhitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelRelay user.RelayWhitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelRelay
user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser
user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin
if len(user.bridge.Config.Homeserver.StatusEndpoint) > 0 { user.BridgeState = br.NewBridgeStateQueue(user, user.log)
user.bridgeStateQueue = make(chan BridgeState, 10)
go user.bridgeStateLoop()
}
return user return user
} }
@ -424,13 +419,13 @@ func (user *User) Connect() bool {
return false return false
} }
user.log.Debugln("Connecting to WhatsApp") user.log.Debugln("Connecting to WhatsApp")
user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting}) user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnecting, Error: WAConnecting})
user.createClient(user.Session) user.createClient(user.Session)
err := user.Client.Connect() err := user.Client.Connect()
if err != nil { if err != nil {
user.log.Warnln("Error connecting to WhatsApp:", err) user.log.Warnln("Error connecting to WhatsApp:", err)
user.sendBridgeState(BridgeState{ user.BridgeState.Send(bridge.State{
StateEvent: StateUnknownError, StateEvent: bridge.StateUnknownError,
Error: WAConnectionFailed, Error: WAConnectionFailed,
Info: map[string]interface{}{ Info: map[string]interface{}{
"go_error": err.Error(), "go_error": err.Error(),
@ -597,11 +592,11 @@ func (user *User) phoneSeen(ts time.Time) {
// so don't spam the database with an update every time there's an event. // so don't spam the database with an update every time there's an event.
return return
} else if !user.PhoneRecentlySeen(false) { } else if !user.PhoneRecentlySeen(false) {
if user.GetPrevBridgeState().Error == WAPhoneOffline && user.IsConnected() { if user.BridgeState.GetPrev().Error == WAPhoneOffline && user.IsConnected() {
user.log.Debugfln("Saw phone after current bridge state said it has been offline, switching state back to connected") user.log.Debugfln("Saw phone after current bridge state said it has been offline, switching state back to connected")
go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnected})
} else { } else {
user.log.Debugfln("Saw phone after current bridge state said it has been offline, not sending new bridge state (prev: %s, connected: %t)", user.GetPrevBridgeState().Error, user.IsConnected()) user.log.Debugfln("Saw phone after current bridge state said it has been offline, not sending new bridge state (prev: %s, connected: %t)", user.BridgeState.GetPrev().Error, user.IsConnected())
} }
} }
user.PhoneLastSeen = ts user.PhoneLastSeen = ts
@ -653,19 +648,19 @@ func (user *User) HandleEvent(event interface{}) {
} }
case *events.OfflineSyncPreview: case *events.OfflineSyncPreview:
user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts) user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts)
go user.sendBridgeState(BridgeState{ go user.BridgeState.Send(bridge.State{
StateEvent: StateBackfilling, StateEvent: bridge.StateBackfilling,
Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts), Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts),
}) })
case *events.OfflineSyncCompleted: case *events.OfflineSyncCompleted:
if !user.PhoneRecentlySeen(true) { if !user.PhoneRecentlySeen(true) {
user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen) user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen)
go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WAPhoneOffline})
} else { } else {
if user.GetPrevBridgeState().StateEvent == StateBackfilling { if user.BridgeState.GetPrev().StateEvent == bridge.StateBackfilling {
user.log.Infoln("Offline sync completed") user.log.Infoln("Offline sync completed")
} }
go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnected})
} }
case *events.AppStateSyncComplete: case *events.AppStateSyncComplete:
if len(user.Client.Store.PushName) > 0 && v.Name == appstate.WAPatchCriticalBlock { if len(user.Client.Store.PushName) > 0 && v.Name == appstate.WAPatchCriticalBlock {
@ -703,23 +698,23 @@ func (user *User) HandleEvent(event interface{}) {
} else { } else {
message = "Unknown stream error" message = "Unknown stream error"
} }
go user.sendBridgeState(BridgeState{StateEvent: StateUnknownError, Message: message}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateUnknownError, Message: message})
user.bridge.Metrics.TrackConnectionState(user.JID, false) user.bridge.Metrics.TrackConnectionState(user.JID, false)
case *events.ConnectFailure: case *events.ConnectFailure:
go user.sendBridgeState(BridgeState{StateEvent: StateUnknownError, Message: fmt.Sprintf("Unknown connection failure: %s", v.Reason)}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateUnknownError, Message: fmt.Sprintf("Unknown connection failure: %s", v.Reason)})
user.bridge.Metrics.TrackConnectionState(user.JID, false) user.bridge.Metrics.TrackConnectionState(user.JID, false)
case *events.ClientOutdated: case *events.ClientOutdated:
user.log.Errorfln("Got a client outdated connect failure. The bridge is likely out of date, please update immediately.") user.log.Errorfln("Got a client outdated connect failure. The bridge is likely out of date, please update immediately.")
go user.sendBridgeState(BridgeState{StateEvent: StateUnknownError, Message: "Connect failure: 405 client outdated"}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateUnknownError, Message: "Connect failure: 405 client outdated"})
user.bridge.Metrics.TrackConnectionState(user.JID, false) user.bridge.Metrics.TrackConnectionState(user.JID, false)
case *events.TemporaryBan: case *events.TemporaryBan:
go user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Message: v.String()}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateBadCredentials, Message: v.String()})
user.bridge.Metrics.TrackConnectionState(user.JID, false) user.bridge.Metrics.TrackConnectionState(user.JID, false)
case *events.Disconnected: case *events.Disconnected:
// Don't send the normal transient disconnect state if we're already in a different transient disconnect state. // Don't send the normal transient disconnect state if we're already in a different transient disconnect state.
// TODO remove this if/when the phone offline state is moved to a sub-state of CONNECTED // TODO remove this if/when the phone offline state is moved to a sub-state of CONNECTED
if user.GetPrevBridgeState().Error != WAPhoneOffline && user.PhoneRecentlySeen(false) { if user.BridgeState.GetPrev().Error != WAPhoneOffline && user.PhoneRecentlySeen(false) {
go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Message: "Disconnected from WhatsApp. Trying to reconnect."}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Message: "Disconnected from WhatsApp. Trying to reconnect."})
} }
user.bridge.Metrics.TrackConnectionState(user.JID, false) user.bridge.Metrics.TrackConnectionState(user.JID, false)
case *events.Contact: case *events.Contact:
@ -802,10 +797,10 @@ func (user *User) HandleEvent(event interface{}) {
case *events.AppState: case *events.AppState:
// Ignore // Ignore
case *events.KeepAliveTimeout: case *events.KeepAliveTimeout:
go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAKeepaliveTimeout}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WAKeepaliveTimeout})
case *events.KeepAliveRestored: case *events.KeepAliveRestored:
user.log.Infof("Keepalive restored after timeouts, sending connected event") user.log.Infof("Keepalive restored after timeouts, sending connected event")
go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnected})
case *events.MarkChatAsRead: case *events.MarkChatAsRead:
if user.bridge.Config.Bridge.SyncManualMarkedUnread { if user.bridge.Config.Bridge.SyncManualMarkedUnread {
user.markUnread(user.GetPortalByJID(v.JID), !v.Action.GetRead()) user.markUnread(user.GetPortalByJID(v.JID), !v.Action.GetRead())
@ -988,7 +983,7 @@ func (user *User) handleLoggedOut(onConnect bool, reason events.ConnectFailureRe
} else if reason == events.ConnectFailureMainDeviceGone { } else if reason == events.ConnectFailureMainDeviceGone {
errorCode = WAMainDeviceGone errorCode = WAMainDeviceGone
} }
user.removeFromJIDMap(BridgeState{StateEvent: StateBadCredentials, Error: errorCode}) user.removeFromJIDMap(bridge.State{StateEvent: bridge.StateBadCredentials, Error: errorCode})
user.DeleteConnection() user.DeleteConnection()
user.Session = nil user.Session = nil
user.JID = types.EmptyJID user.JID = types.EmptyJID