mirror of
https://github.com/matrix-org/dendrite
synced 2024-05-20 06:13:48 +02:00
Compare commits
16 commits
0758adfd97
...
6f9b1eb92d
Author | SHA1 | Date | |
---|---|---|---|
6f9b1eb92d | |||
8aa088f713 | |||
cb76da4981 | |||
b436d43133 | |||
4872d21d72 | |||
93bf55df19 | |||
5b84d4613c | |||
b9061b96e4 | |||
de727386ba | |||
8093f1f80c | |||
fea083b3e1 | |||
a6827a35a3 | |||
9a307fc401 | |||
5f6dc39011 | |||
18bd6603ca | |||
a4c4fb9886 |
1
gomatrixserverlib
Submodule
1
gomatrixserverlib
Submodule
|
@ -0,0 +1 @@
|
||||||
|
Subproject commit dd56185f075de0a1c24e7ed1adf60ce42323d21e
|
|
@ -66,15 +66,15 @@ func NewRouters() Routers {
|
||||||
}
|
}
|
||||||
|
|
||||||
var NotAllowedHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
var NotAllowedHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
unrecognizedErr, _ := json.Marshal(spec.Unrecognized("Unrecognized request")) // nolint:misspell
|
unrecognizedErr, _ := json.Marshal(spec.Unrecognized("Unrecognized request")) // nolint:misspell
|
||||||
_, _ = w.Write(unrecognizedErr) // nolint:misspell
|
_, _ = w.Write(unrecognizedErr) // nolint:misspell
|
||||||
}))
|
}))
|
||||||
|
|
||||||
var NotFoundCORSHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
var NotFoundCORSHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
unrecognizedErr, _ := json.Marshal(spec.Unrecognized("Unrecognized request")) // nolint:misspell
|
unrecognizedErr, _ := json.Marshal(spec.Unrecognized("Unrecognized request")) // nolint:misspell
|
||||||
_, _ = w.Write(unrecognizedErr) // nolint:misspell
|
_, _ = w.Write(unrecognizedErr) // nolint:misspell
|
||||||
}))
|
}))
|
||||||
|
|
|
@ -17,7 +17,7 @@ func TestRoutersError(t *testing.T) {
|
||||||
if rec.Code != http.StatusNotFound {
|
if rec.Code != http.StatusNotFound {
|
||||||
t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
|
t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
|
||||||
}
|
}
|
||||||
if ct := rec.Header().Get("Content-Type"); ct != "application/json" {
|
if ct := rec.Result().Header.Get("Content-Type"); ct != "application/json" {
|
||||||
t.Fatalf("unexpected content-type: %s", ct)
|
t.Fatalf("unexpected content-type: %s", ct)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ func TestRoutersError(t *testing.T) {
|
||||||
if rec.Code != http.StatusMethodNotAllowed {
|
if rec.Code != http.StatusMethodNotAllowed {
|
||||||
t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
|
t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String())
|
||||||
}
|
}
|
||||||
if ct := rec.Header().Get("Content-Type"); ct != "application/json" {
|
if ct := rec.Result().Header.Get("Content-Type"); ct != "application/json" {
|
||||||
t.Fatalf("unexpected content-type: %s", ct)
|
t.Fatalf("unexpected content-type: %s", ct)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,8 +120,33 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set a unix timestamp of when it last saw the types
|
||||||
|
// this way it can filter based on time
|
||||||
|
type PresenceMap struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
seen map[string]map[int]int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastPresence PresenceMap
|
||||||
|
|
||||||
|
// how long before the online status expires
|
||||||
|
// should be long enough that any client will have another sync before expiring
|
||||||
|
const presenceTimeout int64 = 10
|
||||||
|
|
||||||
// updatePresence sends presence updates to the SyncAPI and FederationAPI
|
// updatePresence sends presence updates to the SyncAPI and FederationAPI
|
||||||
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
|
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
|
||||||
|
//allow checking back on presence to set offline if needed
|
||||||
|
rp.updatePresenceInternal(db, presence, userID, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rp *RequestPool) updatePresenceInternal(db storage.Presence, presence string, userID string, check_again bool) {
|
||||||
|
|
||||||
|
//lock the map to this thread
|
||||||
|
lastPresence.mu.Lock()
|
||||||
|
|
||||||
|
//grab time for caching
|
||||||
|
workingTime := time.Now().Unix()
|
||||||
|
|
||||||
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -140,6 +165,48 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
LastActiveTS: spec.AsTimestamp(time.Now()),
|
LastActiveTS: spec.AsTimestamp(time.Now()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//make sure that the map is defined correctly as needed
|
||||||
|
if lastPresence.seen == nil {
|
||||||
|
lastPresence.seen = make(map[string]map[int]int64)
|
||||||
|
}
|
||||||
|
if lastPresence.seen[userID] == nil {
|
||||||
|
lastPresence.seen[userID] = make(map[int]int64)
|
||||||
|
}
|
||||||
|
|
||||||
|
//update time for each presence
|
||||||
|
lastPresence.seen[userID][int(presenceID)] = workingTime
|
||||||
|
|
||||||
|
var presenceToSet types.Presence
|
||||||
|
|
||||||
|
//online will always get priority
|
||||||
|
if (workingTime - lastPresence.seen[userID][int(types.PresenceOnline)]) < presenceTimeout {
|
||||||
|
presenceToSet = types.PresenceOnline
|
||||||
|
|
||||||
|
//idle gets secondary priority because your presence shouldnt be idle if you are on a different device
|
||||||
|
//kinda copying discord presence
|
||||||
|
} else if (workingTime - lastPresence.seen[userID][int(types.PresenceUnavailable)]) < presenceTimeout {
|
||||||
|
presenceToSet = types.PresenceUnavailable
|
||||||
|
|
||||||
|
//only set offline status if there is no known online devices
|
||||||
|
//clients may set offline to attempt to not alter the online status of the user
|
||||||
|
} else if (workingTime - lastPresence.seen[userID][int(types.PresenceOffline)]) < presenceTimeout {
|
||||||
|
presenceToSet = types.PresenceOffline
|
||||||
|
|
||||||
|
if check_again {
|
||||||
|
//after a timeout, check presence again to make sure it gets set as offline sooner or later
|
||||||
|
time.AfterFunc(time.Second*time.Duration(presenceTimeout), func() { rp.updatePresenceInternal(db, types.PresenceOffline.String(), userID, false) })
|
||||||
|
}
|
||||||
|
|
||||||
|
//set unknown if there is truly no devices that we know the state of
|
||||||
|
} else {
|
||||||
|
presenceToSet = types.PresenceUnknown
|
||||||
|
}
|
||||||
|
|
||||||
|
//the map is no longer being written to or read from
|
||||||
|
//i assume let the rest happen without being held up as to keep things heading through
|
||||||
|
//as fast and smoothly as possible
|
||||||
|
lastPresence.mu.Unlock()
|
||||||
|
|
||||||
// ensure we also send the current status_msg to federated servers and not nil
|
// ensure we also send the current status_msg to federated servers and not nil
|
||||||
dbPresence, err := db.GetPresences(context.Background(), []string{userID})
|
dbPresence, err := db.GetPresences(context.Background(), []string{userID})
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
@ -148,7 +215,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
if len(dbPresence) > 0 && dbPresence[0] != nil {
|
if len(dbPresence) > 0 && dbPresence[0] != nil {
|
||||||
newPresence.ClientFields = dbPresence[0].ClientFields
|
newPresence.ClientFields = dbPresence[0].ClientFields
|
||||||
}
|
}
|
||||||
newPresence.ClientFields.Presence = presenceID.String()
|
newPresence.ClientFields.Presence = presenceToSet.String()
|
||||||
|
|
||||||
defer rp.presence.Store(userID, newPresence)
|
defer rp.presence.Store(userID, newPresence)
|
||||||
// avoid spamming presence updates when syncing
|
// avoid spamming presence updates when syncing
|
||||||
|
@ -160,7 +227,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil {
|
if err := rp.producer.SendPresence(userID, presenceToSet, newPresence.ClientFields.StatusMsg); err != nil {
|
||||||
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -168,9 +235,10 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
// now synchronously update our view of the world. It's critical we do this before calculating
|
// now synchronously update our view of the world. It's critical we do this before calculating
|
||||||
// the /sync response else we may not return presence: online immediately.
|
// the /sync response else we may not return presence: online immediately.
|
||||||
rp.consumer.EmitPresence(
|
rp.consumer.EmitPresence(
|
||||||
context.Background(), userID, presenceID, newPresence.ClientFields.StatusMsg,
|
context.Background(), userID, presenceToSet, newPresence.ClientFields.StatusMsg,
|
||||||
spec.AsTimestamp(time.Now()), true,
|
spec.AsTimestamp(time.Now()), true,
|
||||||
)
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
|
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
|
||||||
|
|
Loading…
Reference in a new issue