Improve errors in state store migration edge cases

This commit is contained in:
Tulir Asokan 2019-08-25 17:25:19 +03:00
parent ab91d326fd
commit 3be9aa2319
13 changed files with 42 additions and 23 deletions

View file

@ -43,7 +43,7 @@ type Config struct {
MaxIdleConns int `yaml:"max_idle_conns"` MaxIdleConns int `yaml:"max_idle_conns"`
} `yaml:"database"` } `yaml:"database"`
StateStore string `yaml:"state_store_path"` StateStore string `yaml:"state_store_path,omitempty"`
ID string `yaml:"id"` ID string `yaml:"id"`
Bot struct { Bot struct {

View file

@ -6,9 +6,9 @@ import (
) )
func init() { func init() {
upgrades[0] = upgrade{"Initial schema", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[0] = upgrade{"Initial schema", func(tx *sql.Tx, ctx context) error {
var byteType string var byteType string
if dialect == SQLite { if ctx.dialect == SQLite {
byteType = "BLOB" byteType = "BLOB"
} else { } else {
byteType = "bytea" byteType = "bytea"

View file

@ -5,8 +5,8 @@ import (
) )
func init() { func init() {
upgrades[1] = upgrade{"Add ON DELETE CASCADE to message table", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[1] = upgrade{"Add ON DELETE CASCADE to message table", func(tx *sql.Tx, ctx context) error {
if dialect == SQLite { if ctx.dialect == SQLite {
// SQLite doesn't support constraint updates, but it isn't that careful with constraints anyway. // SQLite doesn't support constraint updates, but it isn't that careful with constraints anyway.
return nil return nil
} }

View file

@ -5,7 +5,7 @@ import (
) )
func init() { func init() {
upgrades[2] = upgrade{"Add timestamp column to messages", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[2] = upgrade{"Add timestamp column to messages", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec("ALTER TABLE message ADD COLUMN timestamp BIGINT NOT NULL DEFAULT 0") _, err := tx.Exec("ALTER TABLE message ADD COLUMN timestamp BIGINT NOT NULL DEFAULT 0")
if err != nil { if err != nil {
return err return err

View file

@ -5,7 +5,7 @@ import (
) )
func init() { func init() {
upgrades[3] = upgrade{"Add last_connection column to users", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[3] = upgrade{"Add last_connection column to users", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`ALTER TABLE "user" ADD COLUMN last_connection BIGINT NOT NULL DEFAULT 0`) _, err := tx.Exec(`ALTER TABLE "user" ADD COLUMN last_connection BIGINT NOT NULL DEFAULT 0`)
if err != nil { if err != nil {
return err return err

View file

@ -8,8 +8,8 @@ import (
func init() { func init() {
var keys = []string{"imageMessage", "contactMessage", "locationMessage", "extendedTextMessage", "documentMessage", "audioMessage", "videoMessage"} var keys = []string{"imageMessage", "contactMessage", "locationMessage", "extendedTextMessage", "documentMessage", "audioMessage", "videoMessage"}
upgrades[4] = upgrade{"Update message content to new protocol version. This may take a while.", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[4] = upgrade{"Update message content to new protocol version. This may take a while.", func(tx *sql.Tx, ctx context) error {
rows, err := db.Query("SELECT mxid, content FROM message") rows, err := ctx.db.Query("SELECT mxid, content FROM message")
if err != nil { if err != nil {
return err return err
} }

View file

@ -5,7 +5,7 @@ import (
) )
func init() { func init() {
upgrades[5] = upgrade{"Add columns to store custom puppet info", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[5] = upgrade{"Add columns to store custom puppet info", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`ALTER TABLE puppet ADD COLUMN custom_mxid VARCHAR(255)`) _, err := tx.Exec(`ALTER TABLE puppet ADD COLUMN custom_mxid VARCHAR(255)`)
if err != nil { if err != nil {
return err return err

View file

@ -5,7 +5,7 @@ import (
) )
func init() { func init() {
upgrades[6] = upgrade{"Add user-portal mapping table", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[6] = upgrade{"Add user-portal mapping table", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`CREATE TABLE user_portal ( _, err := tx.Exec(`CREATE TABLE user_portal (
user_jid VARCHAR(255), user_jid VARCHAR(255),
portal_jid VARCHAR(255), portal_jid VARCHAR(255),

View file

@ -5,7 +5,7 @@ import (
) )
func init() { func init() {
upgrades[7] = upgrade{"Add columns to store avatar MXC URIs", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[7] = upgrade{"Add columns to store avatar MXC URIs", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`ALTER TABLE puppet ADD COLUMN avatar_url VARCHAR(255)`) _, err := tx.Exec(`ALTER TABLE puppet ADD COLUMN avatar_url VARCHAR(255)`)
if err != nil { if err != nil {
return err return err

View file

@ -5,7 +5,7 @@ import (
) )
func init() { func init() {
upgrades[8] = upgrade{"Add columns to store portal in filtering community meta", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[8] = upgrade{"Add columns to store portal in filtering community meta", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`ALTER TABLE user_portal ADD COLUMN in_community BOOLEAN NOT NULL DEFAULT FALSE`) _, err := tx.Exec(`ALTER TABLE user_portal ADD COLUMN in_community BOOLEAN NOT NULL DEFAULT FALSE`)
return err return err
}} }}

View file

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"strings" "strings"
"maunium.net/go/mautrix" "maunium.net/go/mautrix"
@ -78,15 +79,16 @@ func init() {
user_id VARCHAR(255) PRIMARY KEY user_id VARCHAR(255) PRIMARY KEY
)` )`
upgrades[9] = upgrade{"Move state store to main DB", func(dialect Dialect, tx *sql.Tx, db *sql.DB) error { upgrades[9] = upgrade{"Move state store to main DB", func(tx *sql.Tx, ctx context) error {
store := appservice.NewBasicStateStore().(*appservice.BasicStateStore) store := appservice.NewBasicStateStore().(*appservice.BasicStateStore)
if dialect == Postgres { if ctx.dialect == Postgres {
roomStateTable = strings.Replace(roomStateTable, "TEXT", "JSONB", 1) roomStateTable = strings.Replace(roomStateTable, "TEXT", "JSONB", 1)
} }
if data, err := ioutil.ReadFile("mx-state.json"); err != nil { if data, err := ioutil.ReadFile("mx-state.json"); err != nil {
return err ctx.log.Debugln("mx-state.json not found, not migrating state store")
return nil
} else if err = json.Unmarshal(data, &store); err != nil { } else if err = json.Unmarshal(data, &store); err != nil {
return err return err
} else if _, err := tx.Exec(userProfileTable); err != nil { } else if _, err := tx.Exec(userProfileTable); err != nil {
@ -101,6 +103,8 @@ func init() {
return err return err
} else if err = migratePowerLevels(tx, store.PowerLevels); err != nil { } else if err = migratePowerLevels(tx, store.PowerLevels); err != nil {
return err return err
} else if err = os.Rename("mx-state.json", "mx-state.json.bak"); err != nil {
return err
} }
return nil return nil
}} }}

View file

@ -15,11 +15,17 @@ const (
SQLite SQLite
) )
type upgradeFunc func(Dialect, *sql.Tx, *sql.DB) error type upgradeFunc func(*sql.Tx, context) error
type context struct {
dialect Dialect
db *sql.DB
log log.Logger
}
type upgrade struct { type upgrade struct {
message string message string
fn upgradeFunc fn upgradeFunc
} }
const NumberOfUpgrades = 10 const NumberOfUpgrades = 10
@ -28,7 +34,7 @@ var upgrades [NumberOfUpgrades]upgrade
var UnsupportedDatabaseVersion = fmt.Errorf("unsupported database version") var UnsupportedDatabaseVersion = fmt.Errorf("unsupported database version")
func getVersion(dialect Dialect, db *sql.DB) (int, error) { func GetVersion(db *sql.DB) (int, error) {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS version (version INTEGER)") _, err := db.Exec("CREATE TABLE IF NOT EXISTS version (version INTEGER)")
if err != nil { if err != nil {
return -1, err return -1, err
@ -42,7 +48,7 @@ func getVersion(dialect Dialect, db *sql.DB) (int, error) {
return version, nil return version, nil
} }
func setVersion(dialect Dialect, tx *sql.Tx, version int) error { func SetVersion(tx *sql.Tx, version int) error {
_, err := tx.Exec("DELETE FROM version") _, err := tx.Exec("DELETE FROM version")
if err != nil { if err != nil {
return err return err
@ -62,7 +68,7 @@ func Run(log log.Logger, dialectName string, db *sql.DB) error {
return fmt.Errorf("unknown dialect %s", dialectName) return fmt.Errorf("unknown dialect %s", dialectName)
} }
version, err := getVersion(dialect, db) version, err := GetVersion(db)
if err != nil { if err != nil {
return err return err
} }
@ -78,11 +84,11 @@ func Run(log log.Logger, dialectName string, db *sql.DB) error {
if err != nil { if err != nil {
return err return err
} }
err = upgrade.fn(dialect, tx, db) err = upgrade.fn(tx, context{dialect, db, log})
if err != nil { if err != nil {
return err return err
} }
err = setVersion(dialect, tx, version+i+1) err = SetVersion(tx, version+i+1)
if err != nil { if err != nil {
return err return err
} }

View file

@ -164,6 +164,15 @@ func (bridge *Bridge) Init() {
os.Exit(14) os.Exit(14)
} }
if len(bridge.Config.AppService.StateStore) > 0 && bridge.Config.AppService.StateStore != "./mx-state.json" {
version, err := upgrades.GetVersion(bridge.DB.DB)
if version < 0 && err == nil {
bridge.Log.Fatalln("Non-standard state store path. Please move the state store to ./mx-state.json " +
"and update the config. The state store will be migrated into the db on the next launch.")
os.Exit(18)
}
}
bridge.Log.Debugln("Initializing state store") bridge.Log.Debugln("Initializing state store")
bridge.StateStore = database.NewSQLStateStore(bridge.DB) bridge.StateStore = database.NewSQLStateStore(bridge.DB)
bridge.AS.StateStore = bridge.StateStore bridge.AS.StateStore = bridge.StateStore