From d320c4ab08cb3b5ef00c1b4db483659f4be89bc4 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 2 Mar 2023 17:24:41 +0200 Subject: [PATCH] Add more detailed logging when receiving history syncs --- database/historysync.go | 6 +-- historysync.go | 94 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 87 insertions(+), 13 deletions(-) diff --git a/database/historysync.go b/database/historysync.go index 517ba4e..214440b 100644 --- a/database/historysync.go +++ b/database/historysync.go @@ -243,15 +243,13 @@ func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversation }, nil } -func (hsm *HistorySyncMessage) Insert() { +func (hsm *HistorySyncMessage) Insert() error { _, err := hsm.db.Exec(` INSERT INTO history_sync_message (user_mxid, conversation_id, message_id, timestamp, data, inserted_time) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (user_mxid, conversation_id, message_id) DO NOTHING `, hsm.UserID, hsm.ConversationID, hsm.MessageID, hsm.Timestamp, hsm.Data, time.Now()) - if err != nil { - hsm.log.Warnfln("Failed to insert history sync message %s/%s: %v", hsm.ConversationID, hsm.Timestamp, err) - } + return err } func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) { diff --git a/historysync.go b/historysync.go index d1dd386..e8b25de 100644 --- a/historysync.go +++ b/historysync.go @@ -356,22 +356,55 @@ func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncCon } func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.HistorySync) { - if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME || evt.GetSyncType() == waProto.HistorySync_NON_BLOCKING_DATA { + if evt == nil || evt.SyncType == nil { return } - description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress: %d", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress()) - user.log.Infoln("Storing history sync with", description) + log := user.bridge.ZLog.With(). + Str("method", "User.handleHistorySync"). + Str("user_id", user.MXID.String()). + Str("sync_type", evt.GetSyncType().String()). + Uint32("chunk_order", evt.GetChunkOrder()). + Uint32("progress", evt.GetProgress()). + Logger() + if evt.GetGlobalSettings() != nil { + log.Debug().Interface("global_settings", evt.GetGlobalSettings()).Msg("Got global settings in history sync") + } + if evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME || evt.GetSyncType() == waProto.HistorySync_NON_BLOCKING_DATA { + log.Debug(). + Int("conversation_count", len(evt.GetConversations())). + Int("pushname_count", len(evt.GetPushnames())). + Int("status_count", len(evt.GetStatusV3Messages())). + Int("recent_sticker_count", len(evt.GetRecentStickers())). + Int("past_participant_count", len(evt.GetPastParticipants())). + Msg("Ignoring history sync") + return + } + log.Info(). + Int("conversation_count", len(evt.GetConversations())). + Int("past_participant_count", len(evt.GetPastParticipants())). + Msg("Storing history sync") + successfullySavedTotal := 0 + totalMessageCount := 0 for _, conv := range evt.GetConversations() { jid, err := types.ParseJID(conv.GetId()) if err != nil { - user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err) + totalMessageCount += len(conv.GetMessages()) + log.Warn().Err(err). + Str("chat_jid", conv.GetId()). + Int("msg_count", len(conv.GetMessages())). + Msg("Failed to parse chat JID in history sync") continue } else if jid.Server == types.BroadcastServer { - user.log.Debugfln("Skipping broadcast list %s in history sync", jid) + log.Debug().Str("chat_jid", jid.String()).Msg("Skipping broadcast list in history sync") continue } + totalMessageCount += len(conv.GetMessages()) portal := user.GetPortalByJID(jid) + log := log.With(). + Str("chat_jid", portal.Key.JID.String()). + Int("msg_count", len(conv.GetMessages())). + Logger() historySyncConversation := user.bridge.DB.HistorySync.NewConversationWithValues( user.MXID, @@ -387,33 +420,76 @@ func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.H conv.GetMarkedAsUnread(), conv.GetUnreadCount()) historySyncConversation.Upsert() + var minTime, maxTime time.Time + var minTimeIndex, maxTimeIndex int - for _, rawMsg := range conv.GetMessages() { + successfullySaved := 0 + unsupportedTypes := 0 + for i, rawMsg := range conv.GetMessages() { // Don't store messages that will just be skipped. msgEvt, err := user.Client.ParseWebMessage(portal.Key.JID, rawMsg.GetMessage()) if err != nil { - user.log.Warnln("Dropping historical message due to info parse error:", err) + log.Warn().Err(err). + Int("msg_index", i). + Str("msg_id", rawMsg.GetMessage().GetKey().GetId()). + Uint64("msg_time_seconds", rawMsg.GetMessage().GetMessageTimestamp()). + Msg("Dropping historical message due to parse error") continue } + if minTime.IsZero() || msgEvt.Info.Timestamp.Before(minTime) { + minTime = msgEvt.Info.Timestamp + minTimeIndex = i + } + if maxTime.IsZero() || msgEvt.Info.Timestamp.After(maxTime) { + maxTime = msgEvt.Info.Timestamp + maxTimeIndex = i + } msgType := getMessageType(msgEvt.Message) if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" { + unsupportedTypes++ continue } // Don't store unsupported messages. if !containsSupportedMessage(msgEvt.Message) { + unsupportedTypes++ continue } message, err := user.bridge.DB.HistorySync.NewMessageWithValues(user.MXID, conv.GetId(), msgEvt.Info.ID, rawMsg) if err != nil { - user.log.Warnfln("Failed to save message %s in %s. Error: %+v", msgEvt.Info.ID, conv.GetId(), err) + log.Error().Err(err). + Int("msg_index", i). + Str("msg_id", msgEvt.Info.ID). + Time("msg_time", msgEvt.Info.Timestamp). + Msg("Failed to save historical message") continue } - message.Insert() + err = message.Insert() + if err != nil { + log.Error().Err(err). + Int("msg_index", i). + Str("msg_id", msgEvt.Info.ID). + Time("msg_time", msgEvt.Info.Timestamp). + Msg("Failed to save historical message") + } + successfullySaved++ } + successfullySavedTotal += successfullySaved + log.Debug(). + Int("saved_count", successfullySaved). + Int("unsupported_msg_type_count", unsupportedTypes). + Time("lowest_time", minTime). + Int("lowest_time_index", minTimeIndex). + Time("highest_time", maxTime). + Int("highest_time_index", maxTimeIndex). + Msg("Saved messages from history sync conversation") } + log.Info(). + Int("total_saved_count", successfullySavedTotal). + Int("total_message_count", totalMessageCount). + Msg("Finished storing history sync") // If this was the initial bootstrap, enqueue immediate backfills for the // most recent portals. If it's the last history sync event, start