diff --git a/commands.go b/commands.go index 4c3f7bc..a7bdebb 100644 --- a/commands.go +++ b/commands.go @@ -606,9 +606,6 @@ func (handler *CommandHandler) CommandLogout(ce *CommandEvent) { ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) ce.User.DeleteConnection() ce.User.DeleteSession() - ce.Bridge.DB.BackfillQuery.DeleteAll(ce.User.MXID) - ce.Bridge.DB.HistorySyncQuery.DeleteAllConversations(ce.User.MXID) - ce.Bridge.DB.HistorySyncQuery.DeleteAllMessages(ce.User.MXID) ce.Reply("Logged out successfully.") } @@ -850,10 +847,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { return } if !ce.Bridge.Config.Bridge.HistorySync.Backfill { - ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{ - MsgType: event.MsgNotice, - Body: "Backfill is not enabled for this bridge.", - }) + ce.Reply("Backfill is not enabled for this bridge.") return } batchSize := 100 diff --git a/database/upgrades/2022-03-15-prioritized-backfill.go b/database/upgrades/2022-03-15-prioritized-backfill.go index 813e746..20a513e 100644 --- a/database/upgrades/2022-03-15-prioritized-backfill.go +++ b/database/upgrades/2022-03-15-prioritized-backfill.go @@ -1,12 +1,25 @@ package upgrades -import "database/sql" +import ( + "database/sql" + "fmt" +) func init() { upgrades[39] = upgrade{"Add backfill queue", func(tx *sql.Tx, ctx context) error { - _, err := tx.Exec(` + // The queue_id needs to auto-increment every insertion. For SQLite, + // INTEGER PRIMARY KEY is an alias for the ROWID, so it will + // auto-increment. See https://sqlite.org/lang_createtable.html#rowid + // For Postgres, we need to add GENERATED ALWAYS AS IDENTITY for the + // same functionality. + queueIDColumnTypeModifier := "" + if ctx.dialect == Postgres { + queueIDColumnTypeModifier = "GENERATED ALWAYS AS IDENTITY" + } + + _, err := tx.Exec(fmt.Sprintf(` CREATE TABLE backfill_queue ( - queue_id INTEGER PRIMARY KEY, + queue_id INTEGER PRIMARY KEY %s, user_mxid TEXT, type INTEGER NOT NULL, priority INTEGER NOT NULL, @@ -22,34 +35,11 @@ func init() { FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ) - `) + `, queueIDColumnTypeModifier)) if err != nil { return err } - // The queue_id needs to auto-increment every insertion. For SQLite, - // INTEGER PRIMARY KEY is an alias for the ROWID, so it will - // auto-increment. See https://sqlite.org/lang_createtable.html#rowid - // For Postgres, we have to manually add the sequence. - if ctx.dialect == Postgres { - _, err = tx.Exec(` - CREATE SEQUENCE backfill_queue_queue_id_seq - START WITH 1 - OWNED BY backfill_queue.queue_id - `) - if err != nil { - return err - } - _, err = tx.Exec(` - ALTER TABLE backfill_queue - ALTER COLUMN queue_id - SET DEFAULT nextval('backfill_queue_queue_id_seq'::regclass) - `) - if err != nil { - return err - } - } - return err }} } diff --git a/database/upgrades/2022-03-18-historysync-store.go b/database/upgrades/2022-03-18-historysync-store.go index 15564d0..3625069 100644 --- a/database/upgrades/2022-03-18-historysync-store.go +++ b/database/upgrades/2022-03-18-historysync-store.go @@ -6,88 +6,45 @@ import ( func init() { upgrades[40] = upgrade{"Store history syncs for later backfills", func(tx *sql.Tx, ctx context) error { - if ctx.dialect == Postgres { - _, err := tx.Exec(` - CREATE TABLE history_sync_conversation ( - user_mxid TEXT, - conversation_id TEXT, - portal_jid TEXT, - portal_receiver TEXT, - last_message_timestamp TIMESTAMP, - archived BOOLEAN, - pinned INTEGER, - mute_end_time TIMESTAMP, - disappearing_mode INTEGER, - end_of_history_transfer_type INTEGER, - ephemeral_expiration INTEGER, - marked_as_unread BOOLEAN, - unread_count INTEGER, + _, err := tx.Exec(` + CREATE TABLE history_sync_conversation ( + user_mxid TEXT, + conversation_id TEXT, + portal_jid TEXT, + portal_receiver TEXT, + last_message_timestamp TIMESTAMP, + archived BOOLEAN, + pinned INTEGER, + mute_end_time TIMESTAMP, + disappearing_mode INTEGER, + end_of_history_transfer_type INTEGER, + ephemeral_expiration INTEGER, + marked_as_unread BOOLEAN, + unread_count INTEGER, - PRIMARY KEY (user_mxid, conversation_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE - ) - `) - if err != nil { - return err - } - _, err = tx.Exec(` - CREATE TABLE history_sync_message ( - user_mxid TEXT, - conversation_id TEXT, - message_id TEXT, - timestamp TIMESTAMP, - data BYTEA, + PRIMARY KEY (user_mxid, conversation_id), + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE + ) + `) + if err != nil { + return err + } + _, err = tx.Exec(` + CREATE TABLE history_sync_message ( + user_mxid TEXT, + conversation_id TEXT, + message_id TEXT, + timestamp TIMESTAMP, + data BYTEA, - PRIMARY KEY (user_mxid, conversation_id, message_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE - ) - `) - if err != nil { - return err - } - } else if ctx.dialect == SQLite { - _, err := tx.Exec(` - CREATE TABLE history_sync_conversation ( - user_mxid TEXT, - conversation_id TEXT, - portal_jid TEXT, - portal_receiver TEXT, - last_message_timestamp DATETIME, - archived INTEGER, - pinned INTEGER, - mute_end_time DATETIME, - disappearing_mode INTEGER, - end_of_history_transfer_type INTEGER, - ephemeral_expiration INTEGER, - marked_as_unread INTEGER, - unread_count INTEGER, - - PRIMARY KEY (user_mxid, conversation_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE - ) - `) - if err != nil { - return err - } - _, err = tx.Exec(` - CREATE TABLE history_sync_message ( - user_mxid TEXT, - conversation_id TEXT, - message_id TEXT, - timestamp DATETIME, - data BLOB, - - PRIMARY KEY (user_mxid, conversation_id, message_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE - ) - `) - if err != nil { - return err - } + PRIMARY KEY (user_mxid, conversation_id, message_id), + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE + ) + `) + if err != nil { + return err } return nil diff --git a/provisioning.go b/provisioning.go index f149240..114e7cc 100644 --- a/provisioning.go +++ b/provisioning.go @@ -436,9 +436,6 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) { user.bridge.Metrics.TrackConnectionState(user.JID, false) user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) user.DeleteSession() - prov.bridge.DB.BackfillQuery.DeleteAll(user.MXID) - prov.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID) - prov.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID) jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."}) } diff --git a/user.go b/user.go index db2575d..854535d 100644 --- a/user.go +++ b/user.go @@ -412,6 +412,11 @@ func (user *User) DeleteSession() { user.JID = types.EmptyJID user.Update() } + + // Delete all of the backfill and history sync data. + user.bridge.DB.BackfillQuery.DeleteAll(user.MXID) + user.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID) + user.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID) } func (user *User) IsConnected() bool { @@ -563,11 +568,18 @@ func (user *User) HandleEvent(event interface{}) { go user.tryAutomaticDoublePuppeting() case *events.OfflineSyncPreview: user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts) + go user.sendBridgeState(BridgeState{ + StateEvent: StateBackfilling, + Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts), + }) case *events.OfflineSyncCompleted: if !user.PhoneRecentlySeen(true) { user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen) go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline}) } else { + if user.GetPrevBridgeState().StateEvent == StateBackfilling { + user.log.Infoln("Offline sync completed") + } go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) } case *events.AppStateSyncComplete: