Switch bridge state system to using state events

This commit is contained in:
Tulir Asokan 2021-08-04 16:14:26 +03:00
parent f1dd496a2d
commit abbff16e66
5 changed files with 111 additions and 61 deletions

View file

@ -29,14 +29,29 @@ import (
"time"
"github.com/Rhymen/go-whatsapp"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/id"
)
type BridgeStateEvent string
const (
StateStarting BridgeStateEvent = "STARTING"
StateUnconfigured BridgeStateEvent = "UNCONFIGURED"
StateConnecting BridgeStateEvent = "CONNECTING"
StateBackfilling BridgeStateEvent = "BACKFILLING"
StateConnected BridgeStateEvent = "CONNECTED"
StateTransientDisconnect BridgeStateEvent = "TRANSIENT_DISCONNECT"
StateBadCredentials BridgeStateEvent = "BAD_CREDENTIALS"
StateUnknownError BridgeStateEvent = "UNKNOWN_ERROR"
StateLoggedOut BridgeStateEvent = "LOGGED_OUT"
)
type BridgeErrorCode string
const (
WANotLoggedIn BridgeErrorCode = "logged-out"
WANotLoggedIn BridgeErrorCode = "wa-logged-out"
WANotConnected BridgeErrorCode = "wa-not-connected"
WAConnecting BridgeErrorCode = "wa-connecting"
WATimeout BridgeErrorCode = "wa-timeout"
@ -56,36 +71,39 @@ var bridgeHumanErrors = map[BridgeErrorCode]string{
}
type BridgeState struct {
OK bool `json:"ok"`
Timestamp int64 `json:"timestamp"`
TTL int `json:"ttl"`
StateEvent BridgeStateEvent `json:"state_event"`
Timestamp int64 `json:"timestamp"`
TTL int `json:"ttl"`
ErrorSource string `json:"error_source,omitempty"`
Error BridgeErrorCode `json:"error,omitempty"`
Message string `json:"message,omitempty"`
UserID id.UserID `json:"user_id"`
RemoteID string `json:"remote_id"`
RemoteName string `json:"remote_name"`
UserID id.UserID `json:"user_id,omitempty"`
RemoteID string `json:"remote_id,omitempty"`
RemoteName string `json:"remote_name,omitempty"`
}
func (pong *BridgeState) fill(user *User) {
pong.UserID = user.MXID
pong.RemoteID = strings.TrimSuffix(user.JID, whatsapp.NewUserSuffix)
pong.RemoteName = fmt.Sprintf("+%s", pong.RemoteID)
func (pong BridgeState) fill(user *User) BridgeState {
if user != nil {
pong.UserID = user.MXID
pong.RemoteID = strings.TrimSuffix(user.JID, whatsapp.NewUserSuffix)
pong.RemoteName = fmt.Sprintf("+%s", pong.RemoteID)
}
pong.Timestamp = time.Now().Unix()
if !pong.OK {
if len(pong.Error) > 0 {
pong.TTL = 60
pong.ErrorSource = "bridge"
pong.Message = bridgeHumanErrors[pong.Error]
} else {
pong.TTL = 240
}
return pong
}
func (pong *BridgeState) shouldDeduplicate(newPong *BridgeState) bool {
if pong == nil || pong.OK != newPong.OK || pong.Error != newPong.Error {
if pong == nil || pong.StateEvent != newPong.StateEvent || pong.Error != newPong.Error {
return false
}
return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix()
@ -97,73 +115,92 @@ func (user *User) setupAdminTestHooks() {
}
user.Conn.AdminTestHook = func(err error) {
if errors.Is(err, whatsapp.ErrConnectionTimeout) {
user.sendBridgeState(BridgeState{Error: WATimeout})
user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WATimeout})
} else if errors.Is(err, whatsapp.ErrWebsocketKeepaliveFailed) {
user.sendBridgeState(BridgeState{Error: WAServerTimeout})
user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAServerTimeout})
} else if errors.Is(err, whatsapp.ErrPingFalse) {
user.sendBridgeState(BridgeState{Error: WAPingFalse})
user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPingFalse})
} else if err == nil {
user.sendBridgeState(BridgeState{OK: true})
user.sendBridgeState(BridgeState{StateEvent: StateConnected})
} else {
user.sendBridgeState(BridgeState{Error: WAPingError})
user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPingError})
}
}
user.Conn.CountTimeoutHook = func(wsKeepaliveErrorCount int) {
if wsKeepaliveErrorCount > 0 {
user.sendBridgeState(BridgeState{Error: WAServerTimeout})
user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAServerTimeout})
} else {
user.sendBridgeState(BridgeState{Error: WATimeout})
user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WATimeout})
}
}
}
func (user *User) createBridgeStateRequest(ctx context.Context, state *BridgeState) (req *http.Request, err error) {
func (bridge *Bridge) createBridgeStateRequest(ctx context.Context, state *BridgeState) (req *http.Request, err error) {
var body bytes.Buffer
if err = json.NewEncoder(&body).Encode(&state); err != nil {
return nil, fmt.Errorf("failed to encode bridge state JSON: %w", err)
}
req, err = http.NewRequestWithContext(ctx, http.MethodPost, user.bridge.Config.Homeserver.StatusEndpoint, &body)
req, err = http.NewRequestWithContext(ctx, http.MethodPost, bridge.Config.Homeserver.StatusEndpoint, &body)
if err != nil {
return
}
req.Header.Set("Authorization", "Bearer "+user.bridge.Config.AppService.ASToken)
req.Header.Set("Authorization", "Bearer "+bridge.Config.AppService.ASToken)
req.Header.Set("Content-Type", "application/json")
return
}
func sendPreparedBridgeStateRequest(logger log.Logger, req *http.Request) bool {
resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.Warnln("Failed to send bridge state update:", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
respBody, _ := ioutil.ReadAll(resp.Body)
if respBody != nil {
respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
}
logger.Warnfln("Unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody)
return false
}
return true
}
func (bridge *Bridge) sendGlobalBridgeState(state BridgeState) {
if len(bridge.Config.Homeserver.StatusEndpoint) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if req, err := bridge.createBridgeStateRequest(ctx, &state); err != nil {
bridge.Log.Warnln("Failed to prepare global bridge state update request:", err)
} else if ok := sendPreparedBridgeStateRequest(bridge.Log, req); ok {
bridge.Log.Debugfln("Sent new global bridge state %+v", state)
}
}
func (user *User) sendBridgeState(state BridgeState) {
if len(user.bridge.Config.Homeserver.StatusEndpoint) == 0 {
return
}
state.fill(user)
state = state.fill(user)
if user.prevBridgeStatus != nil && user.prevBridgeStatus.shouldDeduplicate(&state) {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var resp *http.Response
if req, err := user.createBridgeStateRequest(ctx, &state); err != nil {
if req, err := user.bridge.createBridgeStateRequest(ctx, &state); err != nil {
user.log.Warnln("Failed to prepare bridge state update request:", err)
} else if resp, err = http.DefaultClient.Do(req); err != nil {
user.log.Warnln("Failed to send bridge state update:", err)
} else if resp.StatusCode < 200 || resp.StatusCode > 299 {
respBody, _ := ioutil.ReadAll(resp.Body)
if respBody != nil {
respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
}
user.log.Warnfln("Unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody)
} else {
} else if ok := sendPreparedBridgeStateRequest(user.log, req); ok {
user.prevBridgeStatus = &state
user.log.Debugfln("Sent new bridge state %+v", state)
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
}
var bridgeStatePingID uint32 = 0
@ -176,6 +213,7 @@ func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Requ
user := prov.bridge.GetUserByMXID(id.UserID(userID))
var resp BridgeState
if user.Conn == nil {
resp.StateEvent = StateBadCredentials
if user.Session == nil {
resp.Error = WANotLoggedIn
} else {
@ -192,28 +230,32 @@ func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Requ
return
}
user.log.Debugfln("Ping %d response: %v", pingID, err)
resp.StateEvent = StateTransientDisconnect
if err == whatsapp.ErrPingFalse {
user.log.Debugln("Forwarding ping false error from provisioning API to HandleError")
go user.HandleError(err)
resp.Error = WAPingFalse
} else if errors.Is(err, whatsapp.ErrConnectionTimeout) {
resp.Error = WATimeout
}else if errors.Is(err, whatsapp.ErrWebsocketKeepaliveFailed) {
} else if errors.Is(err, whatsapp.ErrWebsocketKeepaliveFailed) {
resp.Error = WAServerTimeout
} else if err != nil {
resp.Error = WAPingError
} else {
resp.OK = true
resp.StateEvent = StateConnected
}
} else if user.Conn.IsLoginInProgress() {
resp.StateEvent = StateConnecting
resp.Error = WAConnecting
} else if user.Conn.IsConnected() {
resp.StateEvent = StateBadCredentials
resp.Error = WANotLoggedIn
} else {
resp.StateEvent = StateBadCredentials
resp.Error = WANotConnected
}
}
resp.fill(user)
resp = resp.fill(user)
user.log.Debugfln("Responding bridge state in bridge status endpoint: %+v", resp)
jsonResponse(w, http.StatusOK, &resp)
user.prevBridgeStatus = &resp

View file

@ -413,7 +413,7 @@ func (handler *CommandHandler) CommandLogout(ce *CommandEvent) {
ce.Reply("Unknown error while logging out: %v", err)
return
}
ce.User.removeFromJIDMap()
ce.User.removeFromJIDMap(StateLoggedOut)
// TODO this causes a foreign key violation, which should be fixed
//ce.User.JID = ""
ce.User.SetSession(nil)
@ -473,7 +473,7 @@ func (handler *CommandHandler) CommandDeleteSession(ce *CommandEvent) {
return
}
//ce.User.JID = ""
ce.User.removeFromJIDMap()
ce.User.removeFromJIDMap(StateLoggedOut)
ce.User.SetSession(nil)
ce.User.DeleteConnection()
ce.Reply("Session information purged")
@ -569,7 +569,7 @@ func (handler *CommandHandler) CommandDisconnect(ce *CommandEvent) {
return
}
ce.User.bridge.Metrics.TrackConnectionState(ce.User.JID, false)
ce.User.sendBridgeState(BridgeState{Error: WANotConnected})
ce.User.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
ce.Reply("Successfully disconnected. Use the `reconnect` command to reconnect.")
}

10
main.go
View file

@ -283,12 +283,13 @@ func (bridge *Bridge) Start() {
bridge.Log.Debugln("Checking connection to homeserver")
bridge.ensureConnection()
if bridge.Crypto != nil {
err := bridge.Crypto.Init()
err = bridge.Crypto.Init()
if err != nil {
bridge.Log.Fatalln("Error initializing end-to-bridge encryption:", err)
os.Exit(19)
}
}
bridge.sendGlobalBridgeState(BridgeState{StateEvent: StateStarting}.fill(nil))
if bridge.Provisioning != nil {
bridge.Log.Debugln("Initializing provisioning API")
bridge.Provisioning.Init()
@ -370,9 +371,16 @@ func (bridge *Bridge) UpdateBotProfile() {
func (bridge *Bridge) StartUsers() {
bridge.Log.Debugln("Starting users")
foundAnySessions := false
for _, user := range bridge.GetAllUsers() {
if user.Session != nil {
foundAnySessions = true
}
go user.Connect(false)
}
if !foundAnySessions {
bridge.sendGlobalBridgeState(BridgeState{StateEvent: StateUnconfigured}.fill(nil))
}
bridge.Log.Debugln("Starting custom puppets")
for _, loopuppet := range bridge.GetAllPuppetsWithCustomMXID() {
go func(puppet *Puppet) {

View file

@ -337,7 +337,7 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) {
}
user.bridge.Metrics.TrackConnectionState(user.JID, false)
user.removeFromJIDMap()
user.removeFromJIDMap(StateLoggedOut)
// TODO this causes a foreign key violation, which should be fixed
//ce.User.JID = ""

28
user.go
View file

@ -114,7 +114,7 @@ func (user *User) addToJIDMap() {
user.bridge.usersLock.Unlock()
}
func (user *User) removeFromJIDMap() {
func (user *User) removeFromJIDMap(state BridgeStateEvent) {
user.bridge.usersLock.Lock()
jidUser, ok := user.bridge.usersByJID[user.JID]
if ok && user == jidUser {
@ -122,7 +122,7 @@ func (user *User) removeFromJIDMap() {
}
user.bridge.usersLock.Unlock()
user.bridge.Metrics.TrackLoginState(user.JID, false)
user.sendBridgeState(BridgeState{Error: WANotLoggedIn})
user.sendBridgeState(BridgeState{StateEvent: state, Error: WANotLoggedIn})
}
func (bridge *Bridge) GetAllUsers() []*User {
@ -257,7 +257,7 @@ func (user *User) Connect(evenIfNoSession bool) bool {
}
user.log.Debugln("Connecting to WhatsApp")
if user.Session != nil {
user.sendBridgeState(BridgeState{Error: WAConnecting})
user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting})
}
timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
if timeout == 0 {
@ -289,7 +289,7 @@ func (user *User) DeleteConnection() {
user.Conn.RemoveHandlers()
user.Conn = nil
user.bridge.Metrics.TrackConnectionState(user.JID, false)
user.sendBridgeState(BridgeState{Error: WANotConnected})
user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
user.connLock.Unlock()
}
@ -306,13 +306,13 @@ func (user *User) RestoreSession() bool {
if errors.Is(err, whatsapp.ErrUnpaired) {
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
"To re-pair your phone, log in again.")
user.removeFromJIDMap()
user.removeFromJIDMap(StateBadCredentials)
//user.JID = ""
user.SetSession(nil)
user.DeleteConnection()
return false
} else {
user.sendBridgeState(BridgeState{Error: WANotConnected})
user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
"on your phone is reachable and use `reconnect` to try connecting again.")
}
@ -462,7 +462,7 @@ func (cl ChatList) Swap(i, j int) {
}
func (user *User) PostLogin() {
user.sendBridgeState(BridgeState{OK: true})
user.sendBridgeState(BridgeState{StateEvent: StateBackfilling})
user.bridge.Metrics.TrackConnectionState(user.JID, true)
user.bridge.Metrics.TrackLoginState(user.JID, true)
user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
@ -539,7 +539,7 @@ func (user *User) postConnPing() bool {
if disconnectErr != nil {
user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
}
user.sendBridgeState(BridgeState{Error: WANotConnected})
user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
user.bridge.Metrics.TrackDisconnection(user.MXID)
go func() {
time.Sleep(1 * time.Second)
@ -578,6 +578,7 @@ func (user *User) intPostLogin() {
case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
}
user.sendBridgeState(BridgeState{StateEvent: StateConnected})
}
type NormalMessage interface {
@ -979,7 +980,7 @@ func (user *User) HandleError(err error) {
if closed.Code == 1000 && user.cleanDisconnection {
user.cleanDisconnection = false
if !user.bridge.Config.Bridge.AggressiveReconnect {
user.sendBridgeState(BridgeState{Error: WANotConnected})
user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
user.bridge.Metrics.TrackConnectionState(user.JID, false)
user.log.Infoln("Clean disconnection by server")
return
@ -1012,7 +1013,7 @@ func (user *User) tryReconnect(msg string) {
user.bridge.Metrics.TrackConnectionState(user.JID, false)
if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
user.sendBridgeState(BridgeState{Error: WANotConnected})
user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
return
}
if user.bridge.Config.Bridge.ReportConnectionRetry {
@ -1038,7 +1039,7 @@ func (user *User) tryReconnect(msg string) {
return
default:
}
user.sendBridgeState(BridgeState{Error: WAConnecting})
user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting})
err := user.Conn.Restore(true, ctx)
if err == nil {
user.ConnectionErrors = 0
@ -1055,13 +1056,12 @@ func (user *User) tryReconnect(msg string) {
}
} else if errors.Is(err, whatsapp.ErrUnpaired) {
user.log.Errorln("Got init 401 (unpaired) error when trying to reconnect, not retrying")
user.removeFromJIDMap()
user.removeFromJIDMap(StateBadCredentials)
//user.JID = ""
user.SetSession(nil)
user.DeleteConnection()
user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: unpaired from phone. " +
"To re-pair your phone, log in again.")
user.sendBridgeState(BridgeState{Error: WANotLoggedIn})
return
} else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
@ -1082,7 +1082,7 @@ func (user *User) tryReconnect(msg string) {
}
}
user.sendBridgeState(BridgeState{Error: WANotConnected})
user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
if user.bridge.Config.Bridge.ReportConnectionRetry {
user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
} else {