Miscellaneous improvements to connection management

This commit is contained in:
Tulir Asokan 2021-02-05 19:26:09 +02:00
parent 25cca87959
commit 3c7ff4bc0c
7 changed files with 92 additions and 75 deletions

View file

@ -412,12 +412,7 @@ func (handler *CommandHandler) CommandLogout(ce *CommandEvent) {
ce.Reply("Unknown error while logging out: %v", err)
return
}
_, err = ce.User.Conn.Disconnect()
if err != nil {
ce.User.log.Warnln("Error while disconnecting after logout:", err)
}
ce.User.Conn.RemoveHandlers()
ce.User.Conn = nil
ce.User.Disconnect()
ce.User.removeFromJIDMap()
// TODO this causes a foreign key violation, which should be fixed
//ce.User.JID = ""
@ -475,13 +470,9 @@ func (handler *CommandHandler) CommandDeleteSession(ce *CommandEvent) {
ce.Reply("Nothing to purge: no session information stored and no active connection.")
return
}
ce.User.Disconnect()
ce.User.removeFromJIDMap()
ce.User.SetSession(nil)
if ce.User.Conn != nil {
_, _ = ce.User.Conn.Disconnect()
ce.User.Conn.RemoveHandlers()
ce.User.Conn = nil
}
ce.Reply("Session information purged")
}
@ -504,7 +495,7 @@ func (handler *CommandHandler) CommandReconnect(ce *CommandEvent) {
wasConnected = false
} else if err != nil {
ce.User.log.Warnln("Error while disconnecting:", err)
} else if len(sess.Wid) > 0 {
} else {
ce.User.SetSession(&sess)
}
@ -530,16 +521,16 @@ func (handler *CommandHandler) CommandReconnect(ce *CommandEvent) {
}
if err != nil {
ce.User.log.Warnln("Error while reconnecting:", err)
if err.Error() == "restore session connection timed out" {
if errors.Is(err, whatsapp.ErrRestoreSessionTimeout) {
ce.Reply("Reconnection timed out. Is WhatsApp on your phone reachable?")
} else {
ce.Reply("Unknown error while reconnecting: %v", err)
}
ce.User.log.Debugln("Disconnecting due to failed session restore in reconnect command...")
sess, err := ce.User.Conn.Disconnect()
sess, err = ce.User.Conn.Disconnect()
if err != nil {
ce.User.log.Errorln("Failed to disconnect after failed session restore in reconnect command:", err)
} else if len(sess.Wid) > 0 {
} else {
ce.User.SetSession(&sess)
}
return
@ -563,13 +554,7 @@ func (handler *CommandHandler) CommandDeleteConnection(ce *CommandEvent) {
ce.Reply("You don't have a WhatsApp connection.")
return
}
sess, err := ce.User.Conn.Disconnect()
if err == nil && len(sess.Wid) > 0 {
ce.User.SetSession(&sess)
}
ce.User.Conn.RemoveHandlers()
ce.User.Conn = nil
ce.User.bridge.Metrics.TrackConnectionState(ce.User.JID, false)
ce.User.Disconnect()
ce.Reply("Successfully disconnected. Use the `reconnect` command to reconnect.")
}
@ -588,7 +573,7 @@ func (handler *CommandHandler) CommandDisconnect(ce *CommandEvent) {
ce.User.log.Warnln("Error while disconnecting:", err)
ce.Reply("Unknown error while disconnecting: %v", err)
return
} else if len(sess.Wid) > 0 {
} else {
ce.User.SetSession(&sess)
}
ce.User.bridge.Metrics.TrackConnectionState(ce.User.JID, false)

View file

@ -126,9 +126,7 @@ func (helper *CryptoHelper) loginBot() (*mautrix.Client, error) {
return nil, fmt.Errorf("failed to get supported login flows: %w", err)
}
if !flows.HasFlow(mautrix.AuthTypeAppservice) {
// TODO after synapse 1.22, turn this into an error
helper.log.Warnln("Encryption enabled in config, but homeserver does not advertise appservice login")
//return nil, fmt.Errorf("homeserver does not support appservice login")
return nil, fmt.Errorf("homeserver does not support appservice login")
}
// We set the API token to the AS token here to authenticate the appservice login
// It'll get overridden after the login

2
go.mod
View file

@ -16,4 +16,4 @@ require (
maunium.net/go/mautrix v0.8.0
)
replace github.com/Rhymen/go-whatsapp => github.com/tulir/go-whatsapp v0.3.16
replace github.com/Rhymen/go-whatsapp => github.com/tulir/go-whatsapp v0.3.17

2
go.sum
View file

@ -135,6 +135,8 @@ github.com/tulir/go-whatsapp v0.3.15 h1:Ogu+f5hvB6Fbdjl6BG7nUb5wuJGCzUa9Z1FraS23
github.com/tulir/go-whatsapp v0.3.15/go.mod h1:U5+sm33vrv3wz62YyRM/VS7q2ObXkxU4Xqj/3KOmN9o=
github.com/tulir/go-whatsapp v0.3.16 h1:NfcXC2DQXwls3qkAjbFqSeoMX+rUbbpBBGGvCXI3RUw=
github.com/tulir/go-whatsapp v0.3.16/go.mod h1:U5+sm33vrv3wz62YyRM/VS7q2ObXkxU4Xqj/3KOmN9o=
github.com/tulir/go-whatsapp v0.3.17 h1:HMRT6HzP1seUt5P0waD8CxThB2bfBgKX2uVjOoXCaf8=
github.com/tulir/go-whatsapp v0.3.17/go.mod h1:U5+sm33vrv3wz62YyRM/VS7q2ObXkxU4Xqj/3KOmN9o=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=

View file

@ -386,7 +386,7 @@ func (bridge *Bridge) Stop() {
sess, err := user.Conn.Disconnect()
if err != nil {
bridge.Log.Errorfln("Error while disconnecting %s: %v", user.MXID, err)
} else if len(sess.Wid) > 0 {
} else {
user.SetSession(&sess)
}
}

View file

@ -18,14 +18,17 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"github.com/Rhymen/go-whatsapp"
"github.com/gorilla/websocket"
log "maunium.net/go/maulogger/v2"
"github.com/Rhymen/go-whatsapp"
"maunium.net/go/mautrix-whatsapp/types"
"maunium.net/go/mautrix/id"
whatsappExt "maunium.net/go/mautrix-whatsapp/whatsapp-ext"
@ -42,7 +45,7 @@ func (prov *ProvisioningAPI) Init() {
r := prov.bridge.AS.Router.PathPrefix(prov.bridge.Config.AppService.Provisioning.Prefix).Subrouter()
r.Use(prov.AuthMiddleware)
r.HandleFunc("/ping", prov.Ping).Methods(http.MethodGet)
r.HandleFunc("/login", prov.Login)
r.HandleFunc("/login", prov.Login).Methods(http.MethodGet)
r.HandleFunc("/logout", prov.Logout).Methods(http.MethodPost)
r.HandleFunc("/delete_session", prov.DeleteSession).Methods(http.MethodPost)
r.HandleFunc("/delete_connection", prov.DeleteConnection).Methods(http.MethodPost)
@ -98,13 +101,8 @@ func (prov *ProvisioningAPI) DeleteSession(w http.ResponseWriter, r *http.Reques
})
return
}
user.Disconnect()
user.SetSession(nil)
if user.Conn != nil {
_, _ = user.Conn.Disconnect()
user.Conn.RemoveHandlers()
user.Conn = nil
user.bridge.Metrics.TrackConnectionState(user.JID, false)
}
jsonResponse(w, http.StatusOK, Response{true, "Session information purged"})
}
@ -117,13 +115,7 @@ func (prov *ProvisioningAPI) DeleteConnection(w http.ResponseWriter, r *http.Req
})
return
}
sess, err := user.Conn.Disconnect()
if err == nil && len(sess.Wid) > 0 {
user.SetSession(&sess)
}
user.Conn.RemoveHandlers()
user.Conn = nil
user.bridge.Metrics.TrackConnectionState(user.JID, false)
user.Disconnect()
jsonResponse(w, http.StatusOK, Response{true, "Disconnected from WhatsApp and connection deleted"})
}
@ -150,7 +142,7 @@ func (prov *ProvisioningAPI) Disconnect(w http.ResponseWriter, r *http.Request)
ErrCode: err.Error(),
})
return
} else if len(sess.Wid) > 0 {
} else {
user.SetSession(&sess)
}
user.bridge.Metrics.TrackConnectionState(user.JID, false)
@ -178,7 +170,7 @@ func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) {
wasConnected = false
} else if err != nil {
user.log.Warnln("Error while disconnecting:", err)
} else if len(sess.Wid) > 0 {
} else {
user.SetSession(&sess)
}
@ -186,7 +178,6 @@ func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) {
if err == whatsapp.ErrInvalidSession {
if user.Session != nil {
user.log.Debugln("Got invalid session error when reconnecting, but user has session. Retrying using RestoreWithSession()...")
var sess whatsapp.Session
sess, err = user.Conn.RestoreWithSession(*user.Session)
if err == nil {
user.SetSession(&sess)
@ -213,7 +204,7 @@ func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) {
}
if err != nil {
user.log.Warnln("Error while reconnecting:", err)
if err.Error() == "restore session connection timed out" {
if errors.Is(err, whatsapp.ErrRestoreSessionTimeout) {
jsonResponse(w, http.StatusForbidden, Error{
Error: "Reconnection timed out. Is WhatsApp on your phone reachable?",
ErrCode: err.Error(),
@ -228,7 +219,7 @@ func (prov *ProvisioningAPI) Reconnect(w http.ResponseWriter, r *http.Request) {
sess, err := user.Conn.Disconnect()
if err != nil {
user.log.Errorln("Failed to disconnect after failed session restore in reconnect command:", err)
} else if len(sess.Wid) > 0 {
} else {
user.SetSession(&sess)
}
return
@ -318,12 +309,7 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) {
return
}
}
_, err = user.Conn.Disconnect()
if err != nil {
user.log.Warnln("Error while disconnecting after logout:", err)
}
user.Conn.RemoveHandlers()
user.Conn = nil
user.Disconnect()
}
user.bridge.Metrics.TrackConnectionState(user.JID, false)
@ -373,6 +359,7 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
})
}
}()
user.log.Debugln("Starting login via provisioning API")
session, err := user.Conn.LoginWithRetry(qrChan, user.bridge.Config.Bridge.LoginQRRegenCount)
qrChan <- "stop"
if err != nil {
@ -383,16 +370,20 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
msg = "You have a login in progress already."
} else if err == whatsapp.ErrLoginTimedOut {
msg = "QR code scan timed out. Please try again."
} else if err == whatsapp.ErrInvalidWebsocket {
msg = "WhatsApp connection error. Please try again."
user.Disconnect()
} else {
user.log.Warnln("Failed to log in:", err)
msg = fmt.Sprintf("Unknown error while logging in: %v", err)
}
user.log.Warnln("Failed to log in:", err)
_ = c.WriteJSON(Error{
Error: msg,
ErrCode: err.Error(),
})
return
}
user.log.Debugln("Successful login via provisioning API")
user.ConnectionErrors = 0
user.JID = strings.Replace(user.Conn.Info.Wid, whatsappExt.OldUserSuffix, whatsappExt.NewUserSuffix, 1)
user.addToJIDMap()

81
user.go
View file

@ -74,6 +74,7 @@ type User struct {
syncWait sync.WaitGroup
mgmtCreateLock sync.Mutex
connLock sync.Mutex
}
func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
@ -222,38 +223,65 @@ func (user *User) SetManagementRoom(roomID id.RoomID) {
}
func (user *User) SetSession(session *whatsapp.Session) {
user.Session = session
if session == nil {
user.Session = nil
user.LastConnection = 0
} else if len(session.Wid) > 0 {
user.Session = session
} else {
return
}
user.Update()
}
func (user *User) Connect(evenIfNoSession bool) bool {
if user.Conn != nil {
user.connLock.Lock()
if user.Conn != nil && user.Conn.IsConnected() {
user.connLock.Unlock()
return true
} else if !evenIfNoSession && user.Session == nil {
user.connLock.Unlock()
return false
}
if user.Conn != nil {
user.Disconnect()
}
user.log.Debugln("Connecting to WhatsApp")
timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
if timeout == 0 {
timeout = 20
}
conn, err := whatsapp.NewConn(timeout * time.Second)
conn, err := whatsapp.NewConnWithOptions(&whatsapp.Options{
Timeout: timeout * time.Second,
LongClientName: user.bridge.Config.WhatsApp.OSName,
ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
ClientVersion: WAVersion,
})
if err != nil {
user.log.Errorln("Failed to connect to WhatsApp:", err)
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp server. " +
"This indicates a network problem on the bridge server. See bridge logs for more info.")
user.connLock.Unlock()
return false
}
user.Conn = whatsappExt.ExtendConn(conn)
_ = user.Conn.SetClientName(user.bridge.Config.WhatsApp.OSName, user.bridge.Config.WhatsApp.BrowserName, WAVersion)
user.log.Debugln("WhatsApp connection successful")
user.Conn.AddHandler(user)
user.connLock.Unlock()
return user.RestoreSession()
}
func (user *User) Disconnect() {
sess, err := user.Conn.Disconnect()
if err != nil && err != whatsapp.ErrNotConnected {
user.log.Warnln("Error disconnecting: %v", err)
}
user.SetSession(&sess)
user.Conn.RemoveHandlers()
user.Conn = nil
user.bridge.Metrics.TrackConnectionState(user.JID, false)
}
func (user *User) RestoreSession() bool {
if user.Session != nil {
sess, err := user.Conn.RestoreWithSession(*user.Session)
@ -263,7 +291,12 @@ func (user *User) RestoreSession() bool {
user.log.Errorln("Failed to restore session:", err)
if errors.Is(err, whatsapp.ErrUnpaired) {
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
"To re-pair your phone, use `delete-session` and then `login`.")
"To re-pair your phone, log in again.")
user.Disconnect()
user.removeFromJIDMap()
//user.JID = ""
user.SetSession(nil)
return false
} else {
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
"on your phone is reachable and use `reconnect` to try connecting again.")
@ -420,7 +453,7 @@ func (user *User) PostLogin() {
user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
user.syncWait.Add(1)
user.syncStart <- struct{}{}
go user.intPostLogin()
go user.intPostLogin(user.Conn)
}
func (user *User) tryAutomaticDoublePuppeting() {
@ -468,12 +501,16 @@ func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface
}
}
func (user *User) postConnPing() bool {
func (user *User) postConnPing(conn *whatsappExt.ExtendedConn) bool {
if user.Conn != conn {
user.log.Warnln("Connection changed before scheduled post-connection ping, canceling ping")
return false
}
user.log.Debugln("Making post-connection ping")
err := user.Conn.AdminTest()
err := conn.AdminTest()
if err != nil {
user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
sess, disconnectErr := user.Conn.Disconnect()
sess, disconnectErr := conn.Disconnect()
if disconnectErr != nil {
user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
} else {
@ -491,7 +528,7 @@ func (user *User) postConnPing() bool {
}
}
func (user *User) intPostLogin() {
func (user *User) intPostLogin(conn *whatsappExt.ExtendedConn) {
defer user.syncWait.Done()
user.lastReconnection = time.Now().Unix()
user.createCommunity()
@ -503,11 +540,11 @@ func (user *User) intPostLogin() {
user.log.Debugln("Chat list receive confirmation received in PostLogin")
case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
user.log.Warnln("Timed out waiting for chat list to arrive!")
user.postConnPing()
user.postConnPing(conn)
return
}
if !user.postConnPing() {
if !user.postConnPing(conn) {
user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
return
}
@ -527,10 +564,11 @@ func (user *User) HandleStreamEvent(evt whatsappExt.StreamEvent) {
if user.lastReconnection+60 > time.Now().Unix() {
user.lastReconnection = 0
user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
conn := user.Conn
go func() {
time.Sleep(20 * time.Second)
// TODO if this happens during the post-login sync, it can get stuck forever
user.postConnPing()
user.postConnPing(conn)
}()
}
} else {
@ -741,8 +779,13 @@ func (user *User) tryReconnect(msg string) {
baseDelay = -baseDelay + 1
}
delay := baseDelay
conn := user.Conn
for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
err := user.Conn.Restore()
if user.Conn != conn {
user.log.Debugln("Connection was recreated, aborting reconnection attempts")
return
}
err := conn.Restore()
if err == nil {
user.ConnectionErrors = 0
if user.bridge.Config.Bridge.ReportConnectionRetry {
@ -750,15 +793,13 @@ func (user *User) tryReconnect(msg string) {
}
user.PostLogin()
return
} else if err.Error() == "init responded with 400" {
} else if errors.Is(err, whatsapp.ErrBadRequest) {
user.log.Infoln("Got init 400 error when trying to reconnect, resetting connection...")
sess, err := user.Conn.Disconnect()
sess, err := conn.Disconnect()
if err != nil {
user.log.Debugln("Error while disconnecting for connection reset:", err)
}
if len(sess.Wid) > 0 {
user.SetSession(&sess)
}
user.SetSession(&sess)
}
user.log.Errorln("Error while trying to reconnect after disconnection:", err)
tries++
@ -805,7 +846,7 @@ func (user *User) runMessageRingBuffer() {
default:
dropped := <-user.messageOutput
user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
user.messageOutput<-msg
user.messageOutput <- msg
}
}
}