Add more detailed logging when receiving history syncs

This commit is contained in:
Tulir Asokan 2023-03-02 17:24:41 +02:00
parent 5e28343350
commit d320c4ab08
2 changed files with 87 additions and 13 deletions

View File

@ -243,15 +243,13 @@ func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversation
}, nil
}
func (hsm *HistorySyncMessage) Insert() {
func (hsm *HistorySyncMessage) Insert() error {
_, err := hsm.db.Exec(`
INSERT INTO history_sync_message (user_mxid, conversation_id, message_id, timestamp, data, inserted_time)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (user_mxid, conversation_id, message_id) DO NOTHING
`, hsm.UserID, hsm.ConversationID, hsm.MessageID, hsm.Timestamp, hsm.Data, time.Now())
if err != nil {
hsm.log.Warnfln("Failed to insert history sync message %s/%s: %v", hsm.ConversationID, hsm.Timestamp, err)
}
return err
}
func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) {

View File

@ -356,22 +356,55 @@ func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncCon
}
func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.HistorySync) {
if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME || evt.GetSyncType() == waProto.HistorySync_NON_BLOCKING_DATA {
if evt == nil || evt.SyncType == nil {
return
}
description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress: %d", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress())
user.log.Infoln("Storing history sync with", description)
log := user.bridge.ZLog.With().
Str("method", "User.handleHistorySync").
Str("user_id", user.MXID.String()).
Str("sync_type", evt.GetSyncType().String()).
Uint32("chunk_order", evt.GetChunkOrder()).
Uint32("progress", evt.GetProgress()).
Logger()
if evt.GetGlobalSettings() != nil {
log.Debug().Interface("global_settings", evt.GetGlobalSettings()).Msg("Got global settings in history sync")
}
if evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME || evt.GetSyncType() == waProto.HistorySync_NON_BLOCKING_DATA {
log.Debug().
Int("conversation_count", len(evt.GetConversations())).
Int("pushname_count", len(evt.GetPushnames())).
Int("status_count", len(evt.GetStatusV3Messages())).
Int("recent_sticker_count", len(evt.GetRecentStickers())).
Int("past_participant_count", len(evt.GetPastParticipants())).
Msg("Ignoring history sync")
return
}
log.Info().
Int("conversation_count", len(evt.GetConversations())).
Int("past_participant_count", len(evt.GetPastParticipants())).
Msg("Storing history sync")
successfullySavedTotal := 0
totalMessageCount := 0
for _, conv := range evt.GetConversations() {
jid, err := types.ParseJID(conv.GetId())
if err != nil {
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
totalMessageCount += len(conv.GetMessages())
log.Warn().Err(err).
Str("chat_jid", conv.GetId()).
Int("msg_count", len(conv.GetMessages())).
Msg("Failed to parse chat JID in history sync")
continue
} else if jid.Server == types.BroadcastServer {
user.log.Debugfln("Skipping broadcast list %s in history sync", jid)
log.Debug().Str("chat_jid", jid.String()).Msg("Skipping broadcast list in history sync")
continue
}
totalMessageCount += len(conv.GetMessages())
portal := user.GetPortalByJID(jid)
log := log.With().
Str("chat_jid", portal.Key.JID.String()).
Int("msg_count", len(conv.GetMessages())).
Logger()
historySyncConversation := user.bridge.DB.HistorySync.NewConversationWithValues(
user.MXID,
@ -387,33 +420,76 @@ func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.H
conv.GetMarkedAsUnread(),
conv.GetUnreadCount())
historySyncConversation.Upsert()
var minTime, maxTime time.Time
var minTimeIndex, maxTimeIndex int
for _, rawMsg := range conv.GetMessages() {
successfullySaved := 0
unsupportedTypes := 0
for i, rawMsg := range conv.GetMessages() {
// Don't store messages that will just be skipped.
msgEvt, err := user.Client.ParseWebMessage(portal.Key.JID, rawMsg.GetMessage())
if err != nil {
user.log.Warnln("Dropping historical message due to info parse error:", err)
log.Warn().Err(err).
Int("msg_index", i).
Str("msg_id", rawMsg.GetMessage().GetKey().GetId()).
Uint64("msg_time_seconds", rawMsg.GetMessage().GetMessageTimestamp()).
Msg("Dropping historical message due to parse error")
continue
}
if minTime.IsZero() || msgEvt.Info.Timestamp.Before(minTime) {
minTime = msgEvt.Info.Timestamp
minTimeIndex = i
}
if maxTime.IsZero() || msgEvt.Info.Timestamp.After(maxTime) {
maxTime = msgEvt.Info.Timestamp
maxTimeIndex = i
}
msgType := getMessageType(msgEvt.Message)
if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
unsupportedTypes++
continue
}
// Don't store unsupported messages.
if !containsSupportedMessage(msgEvt.Message) {
unsupportedTypes++
continue
}
message, err := user.bridge.DB.HistorySync.NewMessageWithValues(user.MXID, conv.GetId(), msgEvt.Info.ID, rawMsg)
if err != nil {
user.log.Warnfln("Failed to save message %s in %s. Error: %+v", msgEvt.Info.ID, conv.GetId(), err)
log.Error().Err(err).
Int("msg_index", i).
Str("msg_id", msgEvt.Info.ID).
Time("msg_time", msgEvt.Info.Timestamp).
Msg("Failed to save historical message")
continue
}
message.Insert()
err = message.Insert()
if err != nil {
log.Error().Err(err).
Int("msg_index", i).
Str("msg_id", msgEvt.Info.ID).
Time("msg_time", msgEvt.Info.Timestamp).
Msg("Failed to save historical message")
}
successfullySaved++
}
successfullySavedTotal += successfullySaved
log.Debug().
Int("saved_count", successfullySaved).
Int("unsupported_msg_type_count", unsupportedTypes).
Time("lowest_time", minTime).
Int("lowest_time_index", minTimeIndex).
Time("highest_time", maxTime).
Int("highest_time_index", maxTimeIndex).
Msg("Saved messages from history sync conversation")
}
log.Info().
Int("total_saved_count", successfullySavedTotal).
Int("total_message_count", totalMessageCount).
Msg("Finished storing history sync")
// If this was the initial bootstrap, enqueue immediate backfills for the
// most recent portals. If it's the last history sync event, start