diff --git a/bridgestate.go b/bridgestate.go index 2b66a8e..c269c13 100644 --- a/bridgestate.go +++ b/bridgestate.go @@ -17,211 +17,49 @@ package main import ( - "bytes" - "context" - "encoding/json" "fmt" - "io" "net/http" - "runtime/debug" - "time" + "maunium.net/go/mautrix/bridge" "maunium.net/go/mautrix/id" ) -type BridgeStateEvent string - const ( - StateUnconfigured BridgeStateEvent = "UNCONFIGURED" - StateRunning BridgeStateEvent = "RUNNING" - StateConnecting BridgeStateEvent = "CONNECTING" - StateBackfilling BridgeStateEvent = "BACKFILLING" - StateConnected BridgeStateEvent = "CONNECTED" - StateTransientDisconnect BridgeStateEvent = "TRANSIENT_DISCONNECT" - StateBadCredentials BridgeStateEvent = "BAD_CREDENTIALS" - StateUnknownError BridgeStateEvent = "UNKNOWN_ERROR" - StateLoggedOut BridgeStateEvent = "LOGGED_OUT" + WALoggedOut bridge.StateErrorCode = "wa-logged-out" + WAMainDeviceGone bridge.StateErrorCode = "wa-main-device-gone" + WAUnknownLogout bridge.StateErrorCode = "wa-unknown-logout" + WANotConnected bridge.StateErrorCode = "wa-not-connected" + WAConnecting bridge.StateErrorCode = "wa-connecting" + WAKeepaliveTimeout bridge.StateErrorCode = "wa-keepalive-timeout" + WAPhoneOffline bridge.StateErrorCode = "wa-phone-offline" + WAConnectionFailed bridge.StateErrorCode = "wa-connection-failed" ) -type BridgeErrorCode string - -const ( - WALoggedOut BridgeErrorCode = "wa-logged-out" - WAMainDeviceGone BridgeErrorCode = "wa-main-device-gone" - WAUnknownLogout BridgeErrorCode = "wa-unknown-logout" - WANotConnected BridgeErrorCode = "wa-not-connected" - WAConnecting BridgeErrorCode = "wa-connecting" - WAKeepaliveTimeout BridgeErrorCode = "wa-keepalive-timeout" - WAPhoneOffline BridgeErrorCode = "wa-phone-offline" - WAConnectionFailed BridgeErrorCode = "wa-connection-failed" -) - -var bridgeHumanErrors = map[BridgeErrorCode]string{ - WALoggedOut: "You were logged out from another device. Relogin to continue using the bridge.", - WAMainDeviceGone: "Your phone was logged out from WhatsApp. Relogin to continue using the bridge.", - WAUnknownLogout: "You were logged out for an unknown reason. Relogin to continue using the bridge.", - WANotConnected: "You're not connected to WhatsApp", - WAConnecting: "Reconnecting to WhatsApp...", - WAKeepaliveTimeout: "The WhatsApp web servers are not responding. The bridge will try to reconnect.", - WAPhoneOffline: "Your phone hasn't been seen in over 12 days. The bridge is currently connected, but will get disconnected if you don't open the app soon.", - WAConnectionFailed: "Connecting to the WhatsApp web servers failed.", +func init() { + bridge.StateHumanErrors.Update(bridge.StateErrorMap{ + WALoggedOut: "You were logged out from another device. Relogin to continue using the bridge.", + WAMainDeviceGone: "Your phone was logged out from WhatsApp. Relogin to continue using the bridge.", + WAUnknownLogout: "You were logged out for an unknown reason. Relogin to continue using the bridge.", + WANotConnected: "You're not connected to WhatsApp", + WAConnecting: "Reconnecting to WhatsApp...", + WAKeepaliveTimeout: "The WhatsApp web servers are not responding. The bridge will try to reconnect.", + WAPhoneOffline: "Your phone hasn't been seen in over 12 days. The bridge is currently connected, but will get disconnected if you don't open the app soon.", + WAConnectionFailed: "Connecting to the WhatsApp web servers failed.", + }) } -type BridgeState struct { - StateEvent BridgeStateEvent `json:"state_event"` - Timestamp int64 `json:"timestamp"` - TTL int `json:"ttl"` - - Source string `json:"source,omitempty"` - Error BridgeErrorCode `json:"error,omitempty"` - Message string `json:"message,omitempty"` - - UserID id.UserID `json:"user_id,omitempty"` - RemoteID string `json:"remote_id,omitempty"` - RemoteName string `json:"remote_name,omitempty"` - - Reason string `json:"reason,omitempty"` - Info map[string]interface{} `json:"info,omitempty"` +func (user *User) GetRemoteID() string { + if user == nil || user.JID.IsEmpty() { + return "" + } + return fmt.Sprintf("%s_a%d_d%d", user.JID.User, user.JID.Agent, user.JID.Device) } -type GlobalBridgeState struct { - RemoteStates map[string]BridgeState `json:"remoteState"` - BridgeState BridgeState `json:"bridgeState"` -} - -func (pong BridgeState) fill(user *User) BridgeState { - if user != nil { - pong.UserID = user.MXID - pong.RemoteID = fmt.Sprintf("%s_a%d_d%d", user.JID.User, user.JID.Agent, user.JID.Device) - pong.RemoteName = fmt.Sprintf("+%s", user.JID.User) +func (user *User) GetRemoteName() string { + if user == nil || user.JID.IsEmpty() { + return "" } - - pong.Timestamp = time.Now().Unix() - pong.Source = "bridge" - if len(pong.Error) > 0 { - pong.TTL = 60 - pong.Message = bridgeHumanErrors[pong.Error] - } else { - pong.TTL = 240 - } - return pong -} - -func (pong *BridgeState) shouldDeduplicate(newPong *BridgeState) bool { - if pong == nil || pong.StateEvent != newPong.StateEvent || pong.Error != newPong.Error { - return false - } - return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix() -} - -func (br *WABridge) sendBridgeState(ctx context.Context, state *BridgeState) error { - var body bytes.Buffer - if err := json.NewEncoder(&body).Encode(&state); err != nil { - return fmt.Errorf("failed to encode bridge state JSON: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, br.Config.Homeserver.StatusEndpoint, &body) - if err != nil { - return fmt.Errorf("failed to prepare request: %w", err) - } - - req.Header.Set("Authorization", "Bearer "+br.Config.AppService.ASToken) - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to send request: %w", err) - } - defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode > 299 { - respBody, _ := io.ReadAll(resp.Body) - if respBody != nil { - respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n")) - } - return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody) - } - return nil -} - -func (br *WABridge) sendGlobalBridgeState(state BridgeState) { - if len(br.Config.Homeserver.StatusEndpoint) == 0 { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - if err := br.sendBridgeState(ctx, &state); err != nil { - br.Log.Warnln("Failed to update global bridge state:", err) - } else { - br.Log.Debugfln("Sent new global bridge state %+v", state) - } -} - -func (user *User) bridgeStateLoop() { - defer func() { - err := recover() - if err != nil { - user.log.Errorfln("Bridge state loop panicked: %v\n%s", err, debug.Stack()) - } - }() - for state := range user.bridgeStateQueue { - user.immediateSendBridgeState(state) - } -} - -func (user *User) immediateSendBridgeState(state BridgeState) { - retryIn := 2 - for { - if user.prevBridgeStatus != nil && user.prevBridgeStatus.shouldDeduplicate(&state) { - user.log.Debugfln("Not sending bridge state %s as it's a duplicate", state.StateEvent) - return - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - err := user.bridge.sendBridgeState(ctx, &state) - cancel() - - if err != nil { - user.log.Warnfln("Failed to update bridge state: %v, retrying in %d seconds", err, retryIn) - time.Sleep(time.Duration(retryIn) * time.Second) - retryIn *= 2 - if retryIn > 64 { - retryIn = 64 - } - } else { - user.prevBridgeStatus = &state - user.log.Debugfln("Sent new bridge state %+v", state) - return - } - } -} - -func (user *User) sendBridgeState(state BridgeState) { - if len(user.bridge.Config.Homeserver.StatusEndpoint) == 0 { - return - } - - state = state.fill(user) - - if len(user.bridgeStateQueue) >= 8 { - user.log.Warnln("Bridge state queue is nearly full, discarding an item") - select { - case <-user.bridgeStateQueue: - default: - } - } - select { - case user.bridgeStateQueue <- state: - default: - user.log.Errorfln("Bridge state queue is full, dropped new state") - } -} - -func (user *User) GetPrevBridgeState() BridgeState { - if user.prevBridgeStatus != nil { - return *user.prevBridgeStatus - } - return BridgeState{} + return fmt.Sprintf("+%s", user.JID.User) } func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Request) { @@ -230,32 +68,32 @@ func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Requ } userID := r.URL.Query().Get("user_id") user := prov.bridge.GetUserByMXID(id.UserID(userID)) - var global BridgeState - global.StateEvent = StateRunning - var remote BridgeState + var global bridge.State + global.StateEvent = bridge.StateRunning + var remote bridge.State if user.IsConnected() { if user.Client.IsLoggedIn() { - remote.StateEvent = StateConnected + remote.StateEvent = bridge.StateConnected } else if user.Session != nil { - remote.StateEvent = StateConnecting + remote.StateEvent = bridge.StateConnecting remote.Error = WAConnecting } // else: unconfigured } else if user.Session != nil { - remote.StateEvent = StateBadCredentials + remote.StateEvent = bridge.StateBadCredentials remote.Error = WANotConnected } // else: unconfigured - global = global.fill(nil) - resp := GlobalBridgeState{ + global = global.Fill(nil) + resp := bridge.GlobalState{ BridgeState: global, - RemoteStates: map[string]BridgeState{}, + RemoteStates: map[string]bridge.State{}, } if len(remote.StateEvent) > 0 { - remote = remote.fill(user) + remote = remote.Fill(user) resp.RemoteStates[remote.RemoteID] = remote } user.log.Debugfln("Responding bridge state in bridge status endpoint: %+v", resp) jsonResponse(w, http.StatusOK, &resp) if len(resp.RemoteStates) > 0 { - user.prevBridgeStatus = &remote + user.BridgeState.SetPrev(remote) } } diff --git a/commands.go b/commands.go index 12535e4..1946be9 100644 --- a/commands.go +++ b/commands.go @@ -518,7 +518,7 @@ func fnLogout(ce *WrappedCommandEvent) { return } ce.User.Session = nil - ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) + ce.User.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut}) ce.User.DeleteConnection() ce.User.DeleteSession() ce.Reply("Logged out successfully.") @@ -575,7 +575,7 @@ func fnDeleteSession(ce *WrappedCommandEvent) { ce.Reply("Nothing to purge: no session information stored and no active connection.") return } - ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) + ce.User.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut}) ce.User.DeleteConnection() ce.User.DeleteSession() ce.Reply("Session information purged") @@ -600,7 +600,7 @@ func fnReconnect(ce *WrappedCommandEvent) { } } else { ce.User.DeleteConnection() - ce.User.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WANotConnected}) + ce.User.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WANotConnected}) ce.User.Connect() ce.Reply("Restarted connection to WhatsApp") } @@ -622,7 +622,7 @@ func fnDisconnect(ce *WrappedCommandEvent) { } ce.User.DeleteConnection() ce.Reply("Successfully disconnected. Use the `reconnect` command to reconnect.") - ce.User.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected}) + ce.User.BridgeState.Send(bridge.State{StateEvent: bridge.StateBadCredentials, Error: WANotConnected}) } var cmdPing = &commands.FullHandler{ diff --git a/go.mod b/go.mod index eaf2846..cfa7789 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( golang.org/x/net v0.0.0-20220513224357-95641704303c google.golang.org/protobuf v1.28.0 maunium.net/go/maulogger/v2 v2.3.2 - maunium.net/go/mautrix v0.11.1-0.20220529123139-5bc36b2978c1 + maunium.net/go/mautrix v0.11.1-0.20220530212627-b15517460fdb ) require ( diff --git a/go.sum b/go.sum index f7cffcf..7531b36 100644 --- a/go.sum +++ b/go.sum @@ -107,5 +107,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= maunium.net/go/maulogger/v2 v2.3.2 h1:1XmIYmMd3PoQfp9J+PaHhpt80zpfmMqaShzUTC7FwY0= maunium.net/go/maulogger/v2 v2.3.2/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A= -maunium.net/go/mautrix v0.11.1-0.20220529123139-5bc36b2978c1 h1:HNntVQh0XVyWDAsSQA/0Rk2++1cGOzmm7tH8xILSsak= -maunium.net/go/mautrix v0.11.1-0.20220529123139-5bc36b2978c1/go.mod h1:CiKpMhAx5QZFHK03jpWb0iKI3sGU8x6+LfsOjDrcO8I= +maunium.net/go/mautrix v0.11.1-0.20220530212627-b15517460fdb h1:MTY4bW0yhg8bHszMSNymTrHvIRdiDjcnQKC8qSbt5BE= +maunium.net/go/mautrix v0.11.1-0.20220530212627-b15517460fdb/go.mod h1:CiKpMhAx5QZFHK03jpWb0iKI3sGU8x6+LfsOjDrcO8I= diff --git a/main.go b/main.go index 8e19cf0..7999bb2 100644 --- a/main.go +++ b/main.go @@ -215,7 +215,7 @@ func (br *WABridge) StartUsers() { go user.Connect() } if !foundAnySessions { - br.sendGlobalBridgeState(BridgeState{StateEvent: StateUnconfigured}.fill(nil)) + br.SendGlobalBridgeState(bridge.State{StateEvent: bridge.StateUnconfigured}.Fill(nil)) } br.Log.Debugln("Starting custom puppets") for _, loopuppet := range br.GetAllPuppetsWithCustomMXID() { diff --git a/portal.go b/portal.go index 0d78f9f..e9fb727 100644 --- a/portal.go +++ b/portal.go @@ -40,13 +40,13 @@ import ( "golang.org/x/image/draw" "golang.org/x/image/webp" "google.golang.org/protobuf/proto" - "maunium.net/go/mautrix/bridge/bridgeconfig" log "maunium.net/go/maulogger/v2" "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/bridge" + "maunium.net/go/mautrix/bridge/bridgeconfig" "maunium.net/go/mautrix/crypto/attachment" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/format" @@ -2911,16 +2911,14 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) { if err != nil { portal.log.Errorfln("Error sending message: %v", err) portal.sendErrorMessage(err.Error(), true) - status := appservice.StatusPermFailure + status := bridge.MsgStatusPermFailure if errors.Is(err, whatsmeow.ErrBroadcastListUnsupported) { - status = appservice.StatusUnsupported + status = bridge.MsgStatusUnsupported } - checkpoint := appservice.NewMessageSendCheckpoint(evt, appservice.StepRemote, status, 0) - checkpoint.Info = err.Error() - go checkpoint.Send(portal.bridge.AS) + portal.bridge.SendMessageCheckpoint(evt, bridge.MsgStepRemote, err, status, 0) } else { portal.log.Debugfln("Handled Matrix event %s", evt.ID) - portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote, 0) + portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0) portal.sendDeliveryReceipt(evt.ID) dbMsg.MarkSent(ts) } @@ -2946,10 +2944,10 @@ func (portal *Portal) HandleMatrixReaction(sender *User, evt *event.Event) { err := portal.handleMatrixReaction(sender, evt) if err != nil { portal.log.Errorfln("Error sending reaction %s: %v", evt.ID, err) - portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, err, true, 0) + portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, err, true, 0) } else { portal.log.Debugfln("Handled Matrix reaction %s", evt.ID) - portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote, 0) + portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0) portal.sendDeliveryReceipt(evt.ID) } } @@ -3038,15 +3036,15 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) { msg := portal.bridge.DB.Message.GetByMXID(evt.Redacts) if msg == nil { portal.log.Debugfln("Ignoring redaction %s of unknown event by %s", evt.ID, senderLogIdentifier) - portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("target not found"), true, 0) + portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("target not found"), true, 0) return } else if msg.IsFakeJID() { portal.log.Debugfln("Ignoring redaction %s of fake event by %s", evt.ID, senderLogIdentifier) - portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("target is a fake event"), true, 0) + portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("target is a fake event"), true, 0) return } else if msg.Sender.User != sender.JID.User { portal.log.Debugfln("Ignoring redaction %s of %s/%s by %s: message was sent by someone else (%s, not %s)", evt.ID, msg.MXID, msg.JID, senderLogIdentifier, msg.Sender, sender.JID) - portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("message was sent by someone else"), true, 0) + portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("message was sent by someone else"), true, 0) return } @@ -3054,11 +3052,11 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) { if msg.Type == database.MsgReaction { if reaction := portal.bridge.DB.Reaction.GetByMXID(evt.Redacts); reaction == nil { portal.log.Debugfln("Ignoring redaction of reaction %s: reaction database entry not found", evt.ID) - portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("reaction database entry not found"), true, 0) + portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("reaction database entry not found"), true, 0) return } else if reactionTarget := reaction.GetTarget(); reactionTarget == nil { portal.log.Debugfln("Ignoring redaction of reaction %s: reaction target message not found", evt.ID) - portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, errors.New("reaction target message not found"), true, 0) + portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, errors.New("reaction target message not found"), true, 0) return } else { portal.log.Debugfln("Sending redaction reaction %s of %s/%s to WhatsApp", evt.ID, msg.MXID, msg.JID) @@ -3070,10 +3068,10 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) { } if err != nil { portal.log.Errorfln("Error handling Matrix redaction %s: %v", evt.ID, err) - portal.bridge.AS.SendErrorMessageSendCheckpoint(evt, appservice.StepRemote, err, true, 0) + portal.bridge.SendMessageErrorCheckpoint(evt, bridge.MsgStepRemote, err, true, 0) } else { portal.log.Debugfln("Handled Matrix redaction %s of %s", evt.ID, evt.Redacts) - portal.bridge.AS.SendMessageSendCheckpoint(evt, appservice.StepRemote, 0) + portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0) portal.sendDeliveryReceipt(evt.ID) } } diff --git a/provisioning.go b/provisioning.go index fdcb143..93fe394 100644 --- a/provisioning.go +++ b/provisioning.go @@ -39,6 +39,7 @@ import ( log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix/bridge" "maunium.net/go/mautrix/id" ) @@ -149,7 +150,7 @@ func (prov *ProvisioningAPI) DeleteSession(w http.ResponseWriter, r *http.Reques user.DeleteConnection() user.DeleteSession() jsonResponse(w, http.StatusOK, Response{true, "Session information purged"}) - user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) + user.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut}) } func (prov *ProvisioningAPI) Disconnect(w http.ResponseWriter, r *http.Request) { @@ -163,7 +164,7 @@ func (prov *ProvisioningAPI) Disconnect(w http.ResponseWriter, r *http.Request) } user.DeleteConnection() jsonResponse(w, http.StatusOK, Response{true, "Disconnected from WhatsApp"}) - user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected}) + user.BridgeState.Send(bridge.State{StateEvent: bridge.StateBadCredentials, Error: WANotConnected}) } func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) { @@ -180,7 +181,7 @@ func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) { } } else { user.DeleteConnection() - user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WANotConnected}) + user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WANotConnected}) user.Connect() jsonResponse(w, http.StatusAccepted, Response{true, "Restarted connection to WhatsApp"}) } @@ -575,7 +576,7 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) { } user.bridge.Metrics.TrackConnectionState(user.JID, false) - user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) + user.removeFromJIDMap(bridge.State{StateEvent: bridge.StateLoggedOut}) user.DeleteSession() jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."}) } diff --git a/user.go b/user.go index 95d811c..2ce53d1 100644 --- a/user.go +++ b/user.go @@ -32,11 +32,11 @@ import ( "time" log "maunium.net/go/maulogger/v2" - "maunium.net/go/mautrix/bridge/bridgeconfig" "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/bridge" + "maunium.net/go/mautrix/bridge/bridgeconfig" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/format" "maunium.net/go/mautrix/id" @@ -70,9 +70,8 @@ type User struct { spaceCreateLock sync.Mutex connLock sync.Mutex - historySyncs chan *events.HistorySync - prevBridgeStatus *BridgeState - lastPresence types.Presence + historySyncs chan *events.HistorySync + lastPresence types.Presence historySyncLoopsStarted bool spaceMembershipChecked bool @@ -82,9 +81,8 @@ type User struct { groupListCacheLock sync.Mutex groupListCacheTime time.Time - bridgeStateQueue chan BridgeState - BackfillQueue *BackfillQueue + BridgeState *bridge.BridgeStateQueue } func (br *WABridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User { @@ -153,7 +151,7 @@ func (user *User) addToJIDMap() { user.bridge.usersLock.Unlock() } -func (user *User) removeFromJIDMap(state BridgeState) { +func (user *User) removeFromJIDMap(state bridge.State) { user.bridge.usersLock.Lock() jidUser, ok := user.bridge.usersByUsername[user.JID.User] if ok && user == jidUser { @@ -161,7 +159,7 @@ func (user *User) removeFromJIDMap(state BridgeState) { } user.bridge.usersLock.Unlock() user.bridge.Metrics.TrackLoginState(user.JID, false) - user.sendBridgeState(state) + user.BridgeState.Send(state) } func (br *WABridge) GetAllUsers() []*User { @@ -224,10 +222,7 @@ func (br *WABridge) NewUser(dbUser *database.User) *User { user.RelayWhitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelRelay user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin - if len(user.bridge.Config.Homeserver.StatusEndpoint) > 0 { - user.bridgeStateQueue = make(chan BridgeState, 10) - go user.bridgeStateLoop() - } + user.BridgeState = br.NewBridgeStateQueue(user, user.log) return user } @@ -424,13 +419,13 @@ func (user *User) Connect() bool { return false } user.log.Debugln("Connecting to WhatsApp") - user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting}) + user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnecting, Error: WAConnecting}) user.createClient(user.Session) err := user.Client.Connect() if err != nil { user.log.Warnln("Error connecting to WhatsApp:", err) - user.sendBridgeState(BridgeState{ - StateEvent: StateUnknownError, + user.BridgeState.Send(bridge.State{ + StateEvent: bridge.StateUnknownError, Error: WAConnectionFailed, Info: map[string]interface{}{ "go_error": err.Error(), @@ -597,11 +592,11 @@ func (user *User) phoneSeen(ts time.Time) { // so don't spam the database with an update every time there's an event. return } else if !user.PhoneRecentlySeen(false) { - if user.GetPrevBridgeState().Error == WAPhoneOffline && user.IsConnected() { + if user.BridgeState.GetPrev().Error == WAPhoneOffline && user.IsConnected() { user.log.Debugfln("Saw phone after current bridge state said it has been offline, switching state back to connected") - go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnected}) } else { - user.log.Debugfln("Saw phone after current bridge state said it has been offline, not sending new bridge state (prev: %s, connected: %t)", user.GetPrevBridgeState().Error, user.IsConnected()) + user.log.Debugfln("Saw phone after current bridge state said it has been offline, not sending new bridge state (prev: %s, connected: %t)", user.BridgeState.GetPrev().Error, user.IsConnected()) } } user.PhoneLastSeen = ts @@ -653,19 +648,19 @@ func (user *User) HandleEvent(event interface{}) { } case *events.OfflineSyncPreview: user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts) - go user.sendBridgeState(BridgeState{ - StateEvent: StateBackfilling, + go user.BridgeState.Send(bridge.State{ + StateEvent: bridge.StateBackfilling, Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts), }) case *events.OfflineSyncCompleted: if !user.PhoneRecentlySeen(true) { user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen) - go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WAPhoneOffline}) } else { - if user.GetPrevBridgeState().StateEvent == StateBackfilling { + if user.BridgeState.GetPrev().StateEvent == bridge.StateBackfilling { user.log.Infoln("Offline sync completed") } - go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnected}) } case *events.AppStateSyncComplete: if len(user.Client.Store.PushName) > 0 && v.Name == appstate.WAPatchCriticalBlock { @@ -703,23 +698,23 @@ func (user *User) HandleEvent(event interface{}) { } else { message = "Unknown stream error" } - go user.sendBridgeState(BridgeState{StateEvent: StateUnknownError, Message: message}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateUnknownError, Message: message}) user.bridge.Metrics.TrackConnectionState(user.JID, false) case *events.ConnectFailure: - go user.sendBridgeState(BridgeState{StateEvent: StateUnknownError, Message: fmt.Sprintf("Unknown connection failure: %s", v.Reason)}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateUnknownError, Message: fmt.Sprintf("Unknown connection failure: %s", v.Reason)}) user.bridge.Metrics.TrackConnectionState(user.JID, false) case *events.ClientOutdated: user.log.Errorfln("Got a client outdated connect failure. The bridge is likely out of date, please update immediately.") - go user.sendBridgeState(BridgeState{StateEvent: StateUnknownError, Message: "Connect failure: 405 client outdated"}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateUnknownError, Message: "Connect failure: 405 client outdated"}) user.bridge.Metrics.TrackConnectionState(user.JID, false) case *events.TemporaryBan: - go user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Message: v.String()}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateBadCredentials, Message: v.String()}) user.bridge.Metrics.TrackConnectionState(user.JID, false) case *events.Disconnected: // Don't send the normal transient disconnect state if we're already in a different transient disconnect state. // TODO remove this if/when the phone offline state is moved to a sub-state of CONNECTED - if user.GetPrevBridgeState().Error != WAPhoneOffline && user.PhoneRecentlySeen(false) { - go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Message: "Disconnected from WhatsApp. Trying to reconnect."}) + if user.BridgeState.GetPrev().Error != WAPhoneOffline && user.PhoneRecentlySeen(false) { + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Message: "Disconnected from WhatsApp. Trying to reconnect."}) } user.bridge.Metrics.TrackConnectionState(user.JID, false) case *events.Contact: @@ -802,10 +797,10 @@ func (user *User) HandleEvent(event interface{}) { case *events.AppState: // Ignore case *events.KeepAliveTimeout: - go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAKeepaliveTimeout}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateTransientDisconnect, Error: WAKeepaliveTimeout}) case *events.KeepAliveRestored: user.log.Infof("Keepalive restored after timeouts, sending connected event") - go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) + go user.BridgeState.Send(bridge.State{StateEvent: bridge.StateConnected}) case *events.MarkChatAsRead: if user.bridge.Config.Bridge.SyncManualMarkedUnread { user.markUnread(user.GetPortalByJID(v.JID), !v.Action.GetRead()) @@ -988,7 +983,7 @@ func (user *User) handleLoggedOut(onConnect bool, reason events.ConnectFailureRe } else if reason == events.ConnectFailureMainDeviceGone { errorCode = WAMainDeviceGone } - user.removeFromJIDMap(BridgeState{StateEvent: StateBadCredentials, Error: errorCode}) + user.removeFromJIDMap(bridge.State{StateEvent: bridge.StateBadCredentials, Error: errorCode}) user.DeleteConnection() user.Session = nil user.JID = types.EmptyJID