mirror of
https://github.com/matrix-org/dendrite
synced 2024-05-20 06:13:48 +02:00
Compare commits
16 commits
0758adfd97
...
6f9b1eb92d
Author | SHA1 | Date | |
---|---|---|---|
6f9b1eb92d | |||
8aa088f713 | |||
cb76da4981 | |||
b436d43133 | |||
4872d21d72 | |||
93bf55df19 | |||
5b84d4613c | |||
b9061b96e4 | |||
de727386ba | |||
8093f1f80c | |||
fea083b3e1 | |||
a6827a35a3 | |||
9a307fc401 | |||
5f6dc39011 | |||
18bd6603ca | |||
a4c4fb9886 |
1
gomatrixserverlib
Submodule
1
gomatrixserverlib
Submodule
|
@ -0,0 +1 @@
|
|||
Subproject commit dd56185f075de0a1c24e7ed1adf60ce42323d21e
|
|
@ -66,15 +66,15 @@ func NewRouters() Routers {
|
|||
}
|
||||
|
||||
var NotAllowedHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
unrecognizedErr, _ := json.Marshal(spec.Unrecognized("Unrecognized request")) // nolint:misspell
|
||||
_, _ = w.Write(unrecognizedErr) // nolint:misspell
|
||||
}))
|
||||
|
||||
var NotFoundCORSHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
unrecognizedErr, _ := json.Marshal(spec.Unrecognized("Unrecognized request")) // nolint:misspell
|
||||
_, _ = w.Write(unrecognizedErr) // nolint:misspell
|
||||
}))
|
||||
|
|
|
@ -17,7 +17,7 @@ func TestRoutersError(t *testing.T) {
|
|||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if ct := rec.Header().Get("Content-Type"); ct != "application/json" {
|
||||
if ct := rec.Result().Header.Get("Content-Type"); ct != "application/json" {
|
||||
t.Fatalf("unexpected content-type: %s", ct)
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ func TestRoutersError(t *testing.T) {
|
|||
if rec.Code != http.StatusMethodNotAllowed {
|
||||
t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if ct := rec.Header().Get("Content-Type"); ct != "application/json" {
|
||||
if ct := rec.Result().Header.Get("Content-Type"); ct != "application/json" {
|
||||
t.Fatalf("unexpected content-type: %s", ct)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,8 +120,33 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
|
|||
}
|
||||
}
|
||||
|
||||
// set a unix timestamp of when it last saw the types
|
||||
// this way it can filter based on time
|
||||
type PresenceMap struct {
|
||||
mu sync.Mutex
|
||||
seen map[string]map[int]int64
|
||||
}
|
||||
|
||||
var lastPresence PresenceMap
|
||||
|
||||
// how long before the online status expires
|
||||
// should be long enough that any client will have another sync before expiring
|
||||
const presenceTimeout int64 = 10
|
||||
|
||||
// updatePresence sends presence updates to the SyncAPI and FederationAPI
|
||||
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
|
||||
//allow checking back on presence to set offline if needed
|
||||
rp.updatePresenceInternal(db, presence, userID, true)
|
||||
}
|
||||
|
||||
func (rp *RequestPool) updatePresenceInternal(db storage.Presence, presence string, userID string, check_again bool) {
|
||||
|
||||
//lock the map to this thread
|
||||
lastPresence.mu.Lock()
|
||||
|
||||
//grab time for caching
|
||||
workingTime := time.Now().Unix()
|
||||
|
||||
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
||||
return
|
||||
}
|
||||
|
@ -140,6 +165,48 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|||
LastActiveTS: spec.AsTimestamp(time.Now()),
|
||||
}
|
||||
|
||||
//make sure that the map is defined correctly as needed
|
||||
if lastPresence.seen == nil {
|
||||
lastPresence.seen = make(map[string]map[int]int64)
|
||||
}
|
||||
if lastPresence.seen[userID] == nil {
|
||||
lastPresence.seen[userID] = make(map[int]int64)
|
||||
}
|
||||
|
||||
//update time for each presence
|
||||
lastPresence.seen[userID][int(presenceID)] = workingTime
|
||||
|
||||
var presenceToSet types.Presence
|
||||
|
||||
//online will always get priority
|
||||
if (workingTime - lastPresence.seen[userID][int(types.PresenceOnline)]) < presenceTimeout {
|
||||
presenceToSet = types.PresenceOnline
|
||||
|
||||
//idle gets secondary priority because your presence shouldnt be idle if you are on a different device
|
||||
//kinda copying discord presence
|
||||
} else if (workingTime - lastPresence.seen[userID][int(types.PresenceUnavailable)]) < presenceTimeout {
|
||||
presenceToSet = types.PresenceUnavailable
|
||||
|
||||
//only set offline status if there is no known online devices
|
||||
//clients may set offline to attempt to not alter the online status of the user
|
||||
} else if (workingTime - lastPresence.seen[userID][int(types.PresenceOffline)]) < presenceTimeout {
|
||||
presenceToSet = types.PresenceOffline
|
||||
|
||||
if check_again {
|
||||
//after a timeout, check presence again to make sure it gets set as offline sooner or later
|
||||
time.AfterFunc(time.Second*time.Duration(presenceTimeout), func() { rp.updatePresenceInternal(db, types.PresenceOffline.String(), userID, false) })
|
||||
}
|
||||
|
||||
//set unknown if there is truly no devices that we know the state of
|
||||
} else {
|
||||
presenceToSet = types.PresenceUnknown
|
||||
}
|
||||
|
||||
//the map is no longer being written to or read from
|
||||
//i assume let the rest happen without being held up as to keep things heading through
|
||||
//as fast and smoothly as possible
|
||||
lastPresence.mu.Unlock()
|
||||
|
||||
// ensure we also send the current status_msg to federated servers and not nil
|
||||
dbPresence, err := db.GetPresences(context.Background(), []string{userID})
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
|
@ -148,7 +215,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|||
if len(dbPresence) > 0 && dbPresence[0] != nil {
|
||||
newPresence.ClientFields = dbPresence[0].ClientFields
|
||||
}
|
||||
newPresence.ClientFields.Presence = presenceID.String()
|
||||
newPresence.ClientFields.Presence = presenceToSet.String()
|
||||
|
||||
defer rp.presence.Store(userID, newPresence)
|
||||
// avoid spamming presence updates when syncing
|
||||
|
@ -160,7 +227,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|||
}
|
||||
}
|
||||
|
||||
if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil {
|
||||
if err := rp.producer.SendPresence(userID, presenceToSet, newPresence.ClientFields.StatusMsg); err != nil {
|
||||
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
||||
return
|
||||
}
|
||||
|
@ -168,9 +235,10 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|||
// now synchronously update our view of the world. It's critical we do this before calculating
|
||||
// the /sync response else we may not return presence: online immediately.
|
||||
rp.consumer.EmitPresence(
|
||||
context.Background(), userID, presenceID, newPresence.ClientFields.StatusMsg,
|
||||
context.Background(), userID, presenceToSet, newPresence.ClientFields.StatusMsg,
|
||||
spec.AsTimestamp(time.Now()), true,
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
|
||||
|
|
Loading…
Reference in a new issue