Handle disappearing messages in history sync properly

This commit is contained in:
Tulir Asokan 2022-03-14 23:48:41 +02:00
parent 973f80d7a7
commit df64bb0e10

View file

@ -50,6 +50,9 @@ type wrappedInfo struct {
*types.MessageInfo *types.MessageInfo
Type database.MessageType Type database.MessageType
Error database.MessageErrorType Error database.MessageErrorType
ExpirationStart uint64
ExpiresIn uint32
} }
type conversationList []*waProto.Conversation type conversationList []*waProto.Conversation
@ -423,7 +426,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
if len(converted.ReplyTo) > 0 { if len(converted.ReplyTo) > 0 {
portal.SetReply(converted.Content, converted.ReplyTo) portal.SetReply(converted.Content, converted.ReplyTo)
} }
err := portal.appendBatchEvents(converted, info, &batch.Events, infos) err := portal.appendBatchEvents(converted, info, webMsg.GetEphemeralStartTimestamp(), &batch.Events, infos)
if err != nil { if err != nil {
portal.log.Errorfln("Error handling message %s during backfill: %v", info.ID, err) portal.log.Errorfln("Error handling message %s during backfill: %v", info.ID, err)
} }
@ -493,7 +496,7 @@ func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessa
return &info return &info
} }
func (portal *Portal) appendBatchEvents(converted *ConvertedMessage, info *types.MessageInfo, eventsArray *[]*event.Event, infoArray *[]*wrappedInfo) error { func (portal *Portal) appendBatchEvents(converted *ConvertedMessage, info *types.MessageInfo, expirationStart uint64, eventsArray *[]*event.Event, infoArray *[]*wrappedInfo) error {
mainEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Content, converted.Extra) mainEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Content, converted.Extra)
if err != nil { if err != nil {
return err return err
@ -504,10 +507,10 @@ func (portal *Portal) appendBatchEvents(converted *ConvertedMessage, info *types
return err return err
} }
*eventsArray = append(*eventsArray, mainEvt, captionEvt) *eventsArray = append(*eventsArray, mainEvt, captionEvt)
*infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error}, nil) *infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error, expirationStart, converted.ExpiresIn}, nil)
} else { } else {
*eventsArray = append(*eventsArray, mainEvt) *eventsArray = append(*eventsArray, mainEvt)
*infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error}) *infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error, expirationStart, converted.ExpiresIn})
} }
if converted.MultiEvent != nil { if converted.MultiEvent != nil {
for _, subEvtContent := range converted.MultiEvent { for _, subEvtContent := range converted.MultiEvent {
@ -563,19 +566,35 @@ func (portal *Portal) finishBatch(eventIDs []id.EventID, infos []*wrappedInfo) {
} else if info, ok := infoMap[types.MessageID(msgID)]; !ok { } else if info, ok := infoMap[types.MessageID(msgID)]; !ok {
portal.log.Warnfln("Didn't find info of message %s (event %s) to register it in the database", msgID, eventID) portal.log.Warnfln("Didn't find info of message %s (event %s) to register it in the database", msgID, eventID)
} else { } else {
portal.markHandled(nil, info.MessageInfo, eventID, true, false, info.Type, info.Error) portal.finishBatchEvt(info, eventID)
} }
} }
} else { } else {
for i := 0; i < len(infos); i++ { for i := 0; i < len(infos); i++ {
if infos[i] != nil { portal.finishBatchEvt(infos[i], eventIDs[i])
portal.markHandled(nil, infos[i].MessageInfo, eventIDs[i], true, false, infos[i].Type, infos[i].Error)
}
} }
portal.log.Infofln("Successfully sent %d events", len(eventIDs)) portal.log.Infofln("Successfully sent %d events", len(eventIDs))
} }
} }
func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) {
if info == nil {
return
}
portal.markHandled(nil, info.MessageInfo, eventID, true, false, info.Type, info.Error)
if info.ExpiresIn > 0 {
if info.ExpirationStart > 0 {
remainingSeconds := time.Unix(int64(info.ExpirationStart), 0).Add(time.Duration(info.ExpiresIn) * time.Second).Sub(time.Now()).Seconds()
portal.log.Debugfln("Disappearing history sync message: expires in %d, started at %d, remaining %d", info.ExpiresIn, info.ExpirationStart, int(remainingSeconds))
portal.MarkDisappearing(eventID, uint32(remainingSeconds), true)
} else {
portal.log.Debugfln("Disappearing history sync message: expires in %d (not started)", info.ExpiresIn)
portal.MarkDisappearing(eventID, info.ExpiresIn, false)
}
}
}
func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) { func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) {
resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{}) resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
if err != nil { if err != nil {