Updates from CR

Co-authored-by: Tulir Asokan <tulir@maunium.net>
This commit is contained in:
Sumner Evans 2022-04-06 08:45:45 -06:00
parent 83d397900f
commit c664e5f107
No known key found for this signature in database
GPG key ID: 8904527AB50022FD
5 changed files with 67 additions and 117 deletions

View file

@ -606,9 +606,6 @@ func (handler *CommandHandler) CommandLogout(ce *CommandEvent) {
ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut})
ce.User.DeleteConnection()
ce.User.DeleteSession()
ce.Bridge.DB.BackfillQuery.DeleteAll(ce.User.MXID)
ce.Bridge.DB.HistorySyncQuery.DeleteAllConversations(ce.User.MXID)
ce.Bridge.DB.HistorySyncQuery.DeleteAllMessages(ce.User.MXID)
ce.Reply("Logged out successfully.")
}
@ -850,10 +847,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) {
return
}
if !ce.Bridge.Config.Bridge.HistorySync.Backfill {
ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
MsgType: event.MsgNotice,
Body: "Backfill is not enabled for this bridge.",
})
ce.Reply("Backfill is not enabled for this bridge.")
return
}
batchSize := 100

View file

@ -1,12 +1,25 @@
package upgrades
import "database/sql"
import (
"database/sql"
"fmt"
)
func init() {
upgrades[39] = upgrade{"Add backfill queue", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`
// The queue_id needs to auto-increment every insertion. For SQLite,
// INTEGER PRIMARY KEY is an alias for the ROWID, so it will
// auto-increment. See https://sqlite.org/lang_createtable.html#rowid
// For Postgres, we need to add GENERATED ALWAYS AS IDENTITY for the
// same functionality.
queueIDColumnTypeModifier := ""
if ctx.dialect == Postgres {
queueIDColumnTypeModifier = "GENERATED ALWAYS AS IDENTITY"
}
_, err := tx.Exec(fmt.Sprintf(`
CREATE TABLE backfill_queue (
queue_id INTEGER PRIMARY KEY,
queue_id INTEGER PRIMARY KEY %s,
user_mxid TEXT,
type INTEGER NOT NULL,
priority INTEGER NOT NULL,
@ -22,34 +35,11 @@ func init() {
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE
)
`)
`, queueIDColumnTypeModifier))
if err != nil {
return err
}
// The queue_id needs to auto-increment every insertion. For SQLite,
// INTEGER PRIMARY KEY is an alias for the ROWID, so it will
// auto-increment. See https://sqlite.org/lang_createtable.html#rowid
// For Postgres, we have to manually add the sequence.
if ctx.dialect == Postgres {
_, err = tx.Exec(`
CREATE SEQUENCE backfill_queue_queue_id_seq
START WITH 1
OWNED BY backfill_queue.queue_id
`)
if err != nil {
return err
}
_, err = tx.Exec(`
ALTER TABLE backfill_queue
ALTER COLUMN queue_id
SET DEFAULT nextval('backfill_queue_queue_id_seq'::regclass)
`)
if err != nil {
return err
}
}
return err
}}
}

View file

@ -6,88 +6,45 @@ import (
func init() {
upgrades[40] = upgrade{"Store history syncs for later backfills", func(tx *sql.Tx, ctx context) error {
if ctx.dialect == Postgres {
_, err := tx.Exec(`
CREATE TABLE history_sync_conversation (
user_mxid TEXT,
conversation_id TEXT,
portal_jid TEXT,
portal_receiver TEXT,
last_message_timestamp TIMESTAMP,
archived BOOLEAN,
pinned INTEGER,
mute_end_time TIMESTAMP,
disappearing_mode INTEGER,
end_of_history_transfer_type INTEGER,
ephemeral_expiration INTEGER,
marked_as_unread BOOLEAN,
unread_count INTEGER,
_, err := tx.Exec(`
CREATE TABLE history_sync_conversation (
user_mxid TEXT,
conversation_id TEXT,
portal_jid TEXT,
portal_receiver TEXT,
last_message_timestamp TIMESTAMP,
archived BOOLEAN,
pinned INTEGER,
mute_end_time TIMESTAMP,
disappearing_mode INTEGER,
end_of_history_transfer_type INTEGER,
ephemeral_expiration INTEGER,
marked_as_unread BOOLEAN,
unread_count INTEGER,
PRIMARY KEY (user_mxid, conversation_id),
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE
)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE TABLE history_sync_message (
user_mxid TEXT,
conversation_id TEXT,
message_id TEXT,
timestamp TIMESTAMP,
data BYTEA,
PRIMARY KEY (user_mxid, conversation_id),
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE
)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE TABLE history_sync_message (
user_mxid TEXT,
conversation_id TEXT,
message_id TEXT,
timestamp TIMESTAMP,
data BYTEA,
PRIMARY KEY (user_mxid, conversation_id, message_id),
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE
)
`)
if err != nil {
return err
}
} else if ctx.dialect == SQLite {
_, err := tx.Exec(`
CREATE TABLE history_sync_conversation (
user_mxid TEXT,
conversation_id TEXT,
portal_jid TEXT,
portal_receiver TEXT,
last_message_timestamp DATETIME,
archived INTEGER,
pinned INTEGER,
mute_end_time DATETIME,
disappearing_mode INTEGER,
end_of_history_transfer_type INTEGER,
ephemeral_expiration INTEGER,
marked_as_unread INTEGER,
unread_count INTEGER,
PRIMARY KEY (user_mxid, conversation_id),
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE
)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE TABLE history_sync_message (
user_mxid TEXT,
conversation_id TEXT,
message_id TEXT,
timestamp DATETIME,
data BLOB,
PRIMARY KEY (user_mxid, conversation_id, message_id),
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE
)
`)
if err != nil {
return err
}
PRIMARY KEY (user_mxid, conversation_id, message_id),
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE
)
`)
if err != nil {
return err
}
return nil

View file

@ -436,9 +436,6 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) {
user.bridge.Metrics.TrackConnectionState(user.JID, false)
user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut})
user.DeleteSession()
prov.bridge.DB.BackfillQuery.DeleteAll(user.MXID)
prov.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID)
prov.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID)
jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."})
}

12
user.go
View file

@ -412,6 +412,11 @@ func (user *User) DeleteSession() {
user.JID = types.EmptyJID
user.Update()
}
// Delete all of the backfill and history sync data.
user.bridge.DB.BackfillQuery.DeleteAll(user.MXID)
user.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID)
user.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID)
}
func (user *User) IsConnected() bool {
@ -563,11 +568,18 @@ func (user *User) HandleEvent(event interface{}) {
go user.tryAutomaticDoublePuppeting()
case *events.OfflineSyncPreview:
user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts)
go user.sendBridgeState(BridgeState{
StateEvent: StateBackfilling,
Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts),
})
case *events.OfflineSyncCompleted:
if !user.PhoneRecentlySeen(true) {
user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen)
go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline})
} else {
if user.GetPrevBridgeState().StateEvent == StateBackfilling {
user.log.Infoln("Offline sync completed")
}
go user.sendBridgeState(BridgeState{StateEvent: StateConnected})
}
case *events.AppStateSyncComplete: