diff --git a/bridgestate.go b/bridgestate.go index bcb3036..2064ebc 100644 --- a/bridgestate.go +++ b/bridgestate.go @@ -23,10 +23,9 @@ import ( "fmt" "io" "net/http" + "runtime/debug" "time" - log "maunium.net/go/maulogger/v2" - "maunium.net/go/mautrix/id" ) @@ -115,27 +114,23 @@ func (pong *BridgeState) shouldDeduplicate(newPong *BridgeState) bool { return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix() } -func (bridge *Bridge) createBridgeStateRequest(ctx context.Context, state *BridgeState) (req *http.Request, err error) { +func (bridge *Bridge) sendBridgeState(ctx context.Context, state *BridgeState) error { var body bytes.Buffer - if err = json.NewEncoder(&body).Encode(&state); err != nil { - return nil, fmt.Errorf("failed to encode bridge state JSON: %w", err) + if err := json.NewEncoder(&body).Encode(&state); err != nil { + return fmt.Errorf("failed to encode bridge state JSON: %w", err) } - req, err = http.NewRequestWithContext(ctx, http.MethodPost, bridge.Config.Homeserver.StatusEndpoint, &body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, bridge.Config.Homeserver.StatusEndpoint, &body) if err != nil { - return + return fmt.Errorf("failed to prepare request: %w", err) } req.Header.Set("Authorization", "Bearer "+bridge.Config.AppService.ASToken) req.Header.Set("Content-Type", "application/json") - return -} -func sendPreparedBridgeStateRequest(logger log.Logger, req *http.Request) bool { resp, err := http.DefaultClient.Do(req) if err != nil { - logger.Warnln("Failed to send bridge state update:", err) - return false + return fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { @@ -143,10 +138,9 @@ func sendPreparedBridgeStateRequest(logger log.Logger, req *http.Request) bool { if respBody != nil { respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n")) } - logger.Warnfln("Unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody) - return false + return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody) } - return true + return nil } func (bridge *Bridge) sendGlobalBridgeState(state BridgeState) { @@ -156,30 +150,69 @@ func (bridge *Bridge) sendGlobalBridgeState(state BridgeState) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if req, err := bridge.createBridgeStateRequest(ctx, &state); err != nil { - bridge.Log.Warnln("Failed to prepare global bridge state update request:", err) - } else if ok := sendPreparedBridgeStateRequest(bridge.Log, req); ok { + if err := bridge.sendBridgeState(ctx, &state); err != nil { + bridge.Log.Warnln("Failed to update global bridge state:", err) + } else { bridge.Log.Debugfln("Sent new global bridge state %+v", state) } } +func (user *User) bridgeStateLoop() { + defer func() { + err := recover() + if err != nil { + user.log.Errorfln("Bridge state loop panicked: %v\n%s", err, debug.Stack()) + } + }() + for state := range user.bridgeStateQueue { + user.immediateSendBridgeState(state) + } +} + +func (user *User) immediateSendBridgeState(state BridgeState) { + retryIn := 2 + for { + if user.prevBridgeStatus != nil && user.prevBridgeStatus.shouldDeduplicate(&state) { + user.log.Debugfln("Not sending bridge state %s as it's a duplicate", state.StateEvent) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + err := user.bridge.sendBridgeState(ctx, &state) + cancel() + + if err != nil { + user.log.Warnfln("Failed to update bridge state: %v, retrying in %d seconds", err, retryIn) + time.Sleep(time.Duration(retryIn) * time.Second) + retryIn *= 2 + if retryIn > 64 { + retryIn = 64 + } + } else { + user.log.Debugfln("Sent new bridge state %+v", state) + return + } + } +} + func (user *User) sendBridgeState(state BridgeState) { if len(user.bridge.Config.Homeserver.StatusEndpoint) == 0 { return } state = state.fill(user) - if user.prevBridgeStatus != nil && user.prevBridgeStatus.shouldDeduplicate(&state) { - return - } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - if req, err := user.bridge.createBridgeStateRequest(ctx, &state); err != nil { - user.log.Warnln("Failed to prepare bridge state update request:", err) - } else if ok := sendPreparedBridgeStateRequest(user.log, req); ok { - user.prevBridgeStatus = &state - user.log.Debugfln("Sent new bridge state %+v", state) + if len(user.bridgeStateQueue) >= 8 { + user.log.Warnln("Bridge state queue is nearly full, discarding an item") + select { + case <-user.bridgeStateQueue: + default: + } + } + select { + case user.bridgeStateQueue <- state: + default: + user.log.Errorfln("Bridge state queue is full, dropped new state") } } diff --git a/user.go b/user.go index bd500d6..2ef1e2d 100644 --- a/user.go +++ b/user.go @@ -76,6 +76,8 @@ type User struct { groupListCacheLock sync.Mutex groupListCacheTime time.Time + bridgeStateQueue chan BridgeState + BackfillQueue *BackfillQueue } @@ -189,6 +191,10 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User { user.RelayWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelayWhitelisted(user.MXID) user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID) user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID) + if len(user.bridge.Config.Homeserver.StatusEndpoint) > 0 { + user.bridgeStateQueue = make(chan BridgeState, 10) + go user.bridgeStateLoop() + } return user }