0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-11-05 23:48:58 +01:00
dendrite/appservice/workers/transaction_scheduler.go

228 lines
6.7 KiB
Go
Raw Normal View History

Send Application Service Events (#477) * Prevent sql scanning into nil value in accounts_table Signed-off-by: Andrew Morgan <andrewm@matrix.org> * Remove uneccessary logging, null checking * Don't forget to set the localpart * Simplify error checking * Store And Send Application Service Events * Modify INSTALL.md and dendrite-config.yaml for the new appservice database * Correct all instances of casing on 'application service' to align with spec * Store incoming events that an app service is interested in in the database to be later read by transaction workers. * Retrieve these events from transaction workers, one per AS. * Minimal transaction ID data is stored as well to recover after server failure. * Send events to AS and exponentially backoff on failure. Signed-off-by: Andrew Morgan <andrewm@matrix.org> * Finish my own sentences. * Fix up database interaction * Change to event-based AS sending * Reduce cyclomatic complexity * Appease the errcheck gods * Delete by int ID instead of string. This was causing some events to not be deleted, as < an eventID doesn't really make much sense. * Check if there are more events to send before sleeping * Send same transaction if last send attempt failed * Don't backoff on non-200s, tight send loop, 1 event query * Remove tight send loop. Fix events not being deleted * Additionally order by event id, track main.go * Return the last txnID, which our events are using * Remove old main.go file * Prevent duplicate events from being sent... * Strip event content if it doesn't contain anything Signed-off-by: Andrew Morgan <andrewm@matrix.org> * Update gomatrixserverlib and use Unsigned AS event prop * Fixes * Fix sync server comment * Remove unnecessary printlns * Use logrus Fields * Worker state methods * Remove sillyness * Fix up event filtering * Handle transaction event limit in loop * Switch to using a sequence for transaction IDs * Don't verify self-signed AS certificates * Fix logging * Use gmsl.Event instead of AS-only event in transactions Also clear up the logic on lookupStateEvents a little bit. * Change invalid_txn_id to global (for efficiency) * Use a bool for EventsReady instead of an int
2018-07-05 18:34:59 +02:00
// Copyright 2018 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package workers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"time"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
var (
// Maximum size of events sent in each transaction.
transactionBatchSize = 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 60
)
// SetupTransactionWorkers spawns a separate goroutine for each application
// service. Each of these "workers" handle taking all events intended for their
// app service, batch them up into a single transaction (up to a max transaction
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers(
appserviceDB *storage.Database,
workerStates []types.ApplicationServiceWorkerState,
) error {
// Create a worker that handles transmitting events to a single homeserver
for _, workerState := range workerStates {
// Don't create a worker if this AS doesn't want to receive events
if workerState.AppService.URL != "" {
go worker(appserviceDB, workerState)
}
}
return nil
}
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).Info("starting application service")
ctx := context.Background()
2018-07-17 16:36:04 +02:00
// Create a HTTP client for sending requests to app services
Send Application Service Events (#477) * Prevent sql scanning into nil value in accounts_table Signed-off-by: Andrew Morgan <andrewm@matrix.org> * Remove uneccessary logging, null checking * Don't forget to set the localpart * Simplify error checking * Store And Send Application Service Events * Modify INSTALL.md and dendrite-config.yaml for the new appservice database * Correct all instances of casing on 'application service' to align with spec * Store incoming events that an app service is interested in in the database to be later read by transaction workers. * Retrieve these events from transaction workers, one per AS. * Minimal transaction ID data is stored as well to recover after server failure. * Send events to AS and exponentially backoff on failure. Signed-off-by: Andrew Morgan <andrewm@matrix.org> * Finish my own sentences. * Fix up database interaction * Change to event-based AS sending * Reduce cyclomatic complexity * Appease the errcheck gods * Delete by int ID instead of string. This was causing some events to not be deleted, as < an eventID doesn't really make much sense. * Check if there are more events to send before sleeping * Send same transaction if last send attempt failed * Don't backoff on non-200s, tight send loop, 1 event query * Remove tight send loop. Fix events not being deleted * Additionally order by event id, track main.go * Return the last txnID, which our events are using * Remove old main.go file * Prevent duplicate events from being sent... * Strip event content if it doesn't contain anything Signed-off-by: Andrew Morgan <andrewm@matrix.org> * Update gomatrixserverlib and use Unsigned AS event prop * Fixes * Fix sync server comment * Remove unnecessary printlns * Use logrus Fields * Worker state methods * Remove sillyness * Fix up event filtering * Handle transaction event limit in loop * Switch to using a sequence for transaction IDs * Don't verify self-signed AS certificates * Fix logging * Use gmsl.Event instead of AS-only event in transactions Also clear up the logic on lookupStateEvents a little bit. * Change invalid_txn_id to global (for efficiency) * Use a bool for EventsReady instead of an int
2018-07-05 18:34:59 +02:00
client := &http.Client{
Timeout: transactionTimeout,
}
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to read queued events from DB")
return
}
if eventCount > 0 {
ws.NotifyNewEvents()
}
// Loop forever and keep waiting for more events to send
for {
// Wait for more events if we've sent all the events in the database
ws.WaitForNewEvents()
// Batch events up into a transaction
transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to create transaction")
return
}
// Send the events off to the application service
// Backoff if the application service does not respond
err = send(client, ws.AppService, txnID, transactionJSON)
if err != nil {
// Backoff
backoff(&ws, err)
continue
}
// We sent successfully, hooray!
ws.Backoff = 0
// Transactions have a maximum event size, so there may still be some events
// left over to send. Keep sending until none are left
if !eventsRemaining {
ws.FinishEventProcessing()
}
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("unable to remove appservice events from the database")
return
}
}
}
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func backoff(ws *types.ApplicationServiceWorkerState, err error) {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
backoffDuration)
ws.Backoff++
if ws.Backoff > 6 {
ws.Backoff = 6
}
// Backoff
time.Sleep(backoffSeconds)
}
// createTransaction takes in a slice of AS events, stores them in an AS
// transaction, and JSON-encodes the results.
func createTransaction(
ctx context.Context,
db *storage.Database,
appserviceID string,
) (
transactionJSON []byte,
txnID, maxID int,
eventsRemaining bool,
err error,
) {
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil {
log.WithFields(log.Fields{
"appservice": appserviceID,
}).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
return
}
// Check if these events do not already have a transaction ID
if txnID == -1 {
// If not, grab next available ID from the DB
txnID, err = db.GetLatestTxnID(ctx)
if err != nil {
return nil, 0, 0, false, err
}
// Mark new events with current transactionID
if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
return nil, 0, 0, false, err
}
}
// Create a transaction and store the events inside
transaction := gomatrixserverlib.ApplicationServiceTransaction{
Events: events,
}
transactionJSON, err = json.Marshal(transaction)
if err != nil {
return
}
return
}
// send sends events to an application service. Returns an error if an OK was not
// received back from the application service or the request timed out.
func send(
client *http.Client,
appservice config.ApplicationService,
txnID int,
transaction []byte,
) error {
// POST a transaction to our AS
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": appservice.ID,
}).WithError(err).Error("unable to close response body from application service")
}
}()
// Check the AS received the events correctly
if resp.StatusCode != http.StatusOK {
// TODO: Handle non-200 error codes from application services
return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
}
return nil
}