|
|
|
@ -120,8 +120,33 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// set a unix timestamp of when it last saw the types
|
|
|
|
|
// this way it can filter based on time
|
|
|
|
|
type PresenceMap struct {
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
seen map[string]map[int]int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var lastPresence PresenceMap
|
|
|
|
|
|
|
|
|
|
// how long before the online status expires
|
|
|
|
|
// should be long enough that any client will have another sync before expiring
|
|
|
|
|
const presenceTimeout int64 = 10
|
|
|
|
|
|
|
|
|
|
// updatePresence sends presence updates to the SyncAPI and FederationAPI
|
|
|
|
|
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
|
|
|
|
|
//allow checking back on presence to set offline if needed
|
|
|
|
|
rp.updatePresenceInternal(db, presence, userID, true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rp *RequestPool) updatePresenceInternal(db storage.Presence, presence string, userID string, check_again bool) {
|
|
|
|
|
|
|
|
|
|
//lock the map to this thread
|
|
|
|
|
lastPresence.mu.Lock()
|
|
|
|
|
|
|
|
|
|
//grab time for caching
|
|
|
|
|
workingTime := time.Now().Unix()
|
|
|
|
|
|
|
|
|
|
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -140,6 +165,48 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|
|
|
|
LastActiveTS: spec.AsTimestamp(time.Now()),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//make sure that the map is defined correctly as needed
|
|
|
|
|
if lastPresence.seen == nil {
|
|
|
|
|
lastPresence.seen = make(map[string]map[int]int64)
|
|
|
|
|
}
|
|
|
|
|
if lastPresence.seen[userID] == nil {
|
|
|
|
|
lastPresence.seen[userID] = make(map[int]int64)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//update time for each presence
|
|
|
|
|
lastPresence.seen[userID][int(presenceID)] = workingTime
|
|
|
|
|
|
|
|
|
|
var presenceToSet types.Presence
|
|
|
|
|
|
|
|
|
|
//online will always get priority
|
|
|
|
|
if (workingTime - lastPresence.seen[userID][int(types.PresenceOnline)]) < presenceTimeout {
|
|
|
|
|
presenceToSet = types.PresenceOnline
|
|
|
|
|
|
|
|
|
|
//idle gets secondary priority because your presence shouldnt be idle if you are on a different device
|
|
|
|
|
//kinda copying discord presence
|
|
|
|
|
} else if (workingTime - lastPresence.seen[userID][int(types.PresenceUnavailable)]) < presenceTimeout {
|
|
|
|
|
presenceToSet = types.PresenceUnavailable
|
|
|
|
|
|
|
|
|
|
//only set offline status if there is no known online devices
|
|
|
|
|
//clients may set offline to attempt to not alter the online status of the user
|
|
|
|
|
} else if (workingTime - lastPresence.seen[userID][int(types.PresenceOffline)]) < presenceTimeout {
|
|
|
|
|
presenceToSet = types.PresenceOffline
|
|
|
|
|
|
|
|
|
|
if check_again {
|
|
|
|
|
//after a timeout, check presence again to make sure it gets set as offline sooner or later
|
|
|
|
|
time.AfterFunc(time.Second*time.Duration(presenceTimeout), func() { rp.updatePresenceInternal(db, types.PresenceOffline.String(), userID, false) })
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//set unknown if there is truly no devices that we know the state of
|
|
|
|
|
} else {
|
|
|
|
|
presenceToSet = types.PresenceUnknown
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//the map is no longer being written to or read from
|
|
|
|
|
//i assume let the rest happen without being held up as to keep things heading through
|
|
|
|
|
//as fast and smoothly as possible
|
|
|
|
|
lastPresence.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
// ensure we also send the current status_msg to federated servers and not nil
|
|
|
|
|
dbPresence, err := db.GetPresences(context.Background(), []string{userID})
|
|
|
|
|
if err != nil && err != sql.ErrNoRows {
|
|
|
|
@ -148,7 +215,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|
|
|
|
if len(dbPresence) > 0 && dbPresence[0] != nil {
|
|
|
|
|
newPresence.ClientFields = dbPresence[0].ClientFields
|
|
|
|
|
}
|
|
|
|
|
newPresence.ClientFields.Presence = presenceID.String()
|
|
|
|
|
newPresence.ClientFields.Presence = presenceToSet.String()
|
|
|
|
|
|
|
|
|
|
defer rp.presence.Store(userID, newPresence)
|
|
|
|
|
// avoid spamming presence updates when syncing
|
|
|
|
@ -160,7 +227,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil {
|
|
|
|
|
if err := rp.producer.SendPresence(userID, presenceToSet, newPresence.ClientFields.StatusMsg); err != nil {
|
|
|
|
|
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -168,9 +235,10 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|
|
|
|
// now synchronously update our view of the world. It's critical we do this before calculating
|
|
|
|
|
// the /sync response else we may not return presence: online immediately.
|
|
|
|
|
rp.consumer.EmitPresence(
|
|
|
|
|
context.Background(), userID, presenceID, newPresence.ClientFields.StatusMsg,
|
|
|
|
|
context.Background(), userID, presenceToSet, newPresence.ClientFields.StatusMsg,
|
|
|
|
|
spec.AsTimestamp(time.Now()), true,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
|
|
|
|
|