diff --git a/internal/caching/cache_lazy_load_members.go b/internal/caching/cache_lazy_load_members.go index 71a317624..f0d495065 100644 --- a/internal/caching/cache_lazy_load_members.go +++ b/internal/caching/cache_lazy_load_members.go @@ -15,33 +15,14 @@ const ( LazyLoadCacheMaxAge = time.Minute * 30 ) -type LazyLoadCache struct { - // InMemoryLRUCachePartition containing other InMemoryLRUCachePartitions - // with the actual cached members - userCaches *InMemoryLRUCachePartition +type LazyLoadCache interface { + StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) + IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) } -// NewLazyLoadCache creates a new LazyLoadCache. -func NewLazyLoadCache() (*LazyLoadCache, error) { - cache, err := NewInMemoryLRUCachePartition( - LazyLoadCacheName, - LazyLoadCacheMutable, - LazyLoadCacheMaxEntries, - LazyLoadCacheMaxAge, - true, - ) - if err != nil { - return nil, err - } - go cacheCleaner(cache) - return &LazyLoadCache{ - userCaches: cache, - }, nil -} - -func (c *LazyLoadCache) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) { +func (c Caches) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) { cacheName := fmt.Sprintf("%s/%s", device.UserID, device.ID) - userCache, ok := c.userCaches.Get(cacheName) + userCache, ok := c.LazyLoading.Get(cacheName) if ok && userCache != nil { if cache, ok := userCache.(*InMemoryLRUCachePartition); ok { return cache, nil @@ -57,12 +38,12 @@ func (c *LazyLoadCache) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryL if err != nil { return nil, err } - c.userCaches.Set(cacheName, cache) + c.LazyLoading.Set(cacheName, cache) go cacheCleaner(cache) return cache, nil } -func (c *LazyLoadCache) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { +func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { cache, err := c.lazyLoadCacheForUser(device) if err != nil { return @@ -71,7 +52,7 @@ func (c *LazyLoadCache) StoreLazyLoadedUser(device *userapi.Device, roomID, user cache.Set(cacheKey, eventID) } -func (c *LazyLoadCache) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) { +func (c Caches) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) { cache, err := c.lazyLoadCacheForUser(device) if err != nil { return "", false diff --git a/internal/caching/caches.go b/internal/caching/caches.go index 722405de6..173e47e5b 100644 --- a/internal/caching/caches.go +++ b/internal/caching/caches.go @@ -1,6 +1,8 @@ package caching -import "time" +import ( + "time" +) // Caches contains a set of references to caches. They may be // different implementations as long as they satisfy the Cache @@ -13,6 +15,7 @@ type Caches struct { RoomInfos Cache // RoomInfoCache FederationEvents Cache // FederationEventsCache SpaceSummaryRooms Cache // SpaceSummaryRoomsCache + LazyLoading Cache // LazyLoadCache } // Cache is the interface that an implementation must satisfy. diff --git a/internal/caching/impl_inmemorylru.go b/internal/caching/impl_inmemorylru.go index 94fdd1a9b..594760892 100644 --- a/internal/caching/impl_inmemorylru.go +++ b/internal/caching/impl_inmemorylru.go @@ -70,9 +70,21 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) { if err != nil { return nil, err } + + lazyLoadCache, err := NewInMemoryLRUCachePartition( + LazyLoadCacheName, + LazyLoadCacheMutable, + LazyLoadCacheMaxEntries, + LazyLoadCacheMaxAge, + enablePrometheus, + ) + if err != nil { + return nil, err + } + go cacheCleaner( roomVersions, serverKeys, roomServerRoomIDs, - roomInfos, federationEvents, spaceRooms, + roomInfos, federationEvents, spaceRooms, lazyLoadCache, ) return &Caches{ RoomVersions: roomVersions, @@ -81,6 +93,7 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) { RoomInfos: roomInfos, FederationEvents: federationEvents, SpaceSummaryRooms: spaceRooms, + LazyLoading: lazyLoadCache, }, nil } diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index f5f4b2dd0..87cc2aae0 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -45,7 +45,7 @@ func Context( rsAPI roomserver.SyncRoomserverAPI, syncDB storage.Database, roomID, eventID string, - lazyLoadCache *caching.LazyLoadCache, + lazyLoadCache caching.LazyLoadCache, ) util.JSONResponse { filter, err := parseRoomEventFilter(req) if err != nil { @@ -155,7 +155,7 @@ func applyLazyLoadMembers( filter *gomatrixserverlib.RoomEventFilter, eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent, state []*gomatrixserverlib.HeaderedEvent, - lazyLoadCache *caching.LazyLoadCache, + lazyLoadCache caching.LazyLoadCache, ) []*gomatrixserverlib.HeaderedEvent { if filter == nil || !filter.LazyLoadMembers { return state diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index f19dfaed3..b0c990ec0 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -63,7 +63,7 @@ func OnIncomingMessagesRequest( rsAPI api.SyncRoomserverAPI, cfg *config.SyncAPI, srp *sync.RequestPool, - lazyLoadCache *caching.LazyLoadCache, + lazyLoadCache caching.LazyLoadCache, ) util.JSONResponse { var err error diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 245ee5b66..6bc495d8d 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -39,7 +39,7 @@ func Setup( userAPI userapi.SyncUserAPI, rsAPI api.SyncRoomserverAPI, cfg *config.SyncAPI, - lazyLoadCache *caching.LazyLoadCache, + lazyLoadCache caching.LazyLoadCache, ) { v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter() diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index f774a1af8..00b3dfe3b 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -32,7 +32,7 @@ type PDUStreamProvider struct { tasks chan func() workers atomic.Int32 // userID+deviceID -> lazy loading cache - lazyLoadCache *caching.LazyLoadCache + lazyLoadCache caching.LazyLoadCache rsAPI roomserverAPI.SyncRoomserverAPI } diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index af2a0387e..1ca4ee8c3 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -27,7 +27,7 @@ type Streams struct { func NewSyncStreamProviders( d storage.Database, userAPI userapi.SyncUserAPI, rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI, - eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, + eduCache *caching.EDUCache, lazyLoadCache caching.LazyLoadCache, notifier *notifier.Notifier, ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 686e2044f..6da8ce6d1 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -53,12 +53,8 @@ func AddPublicRoutes( } eduCache := caching.NewTypingCache() - lazyLoadCache, err := caching.NewLazyLoadCache() - if err != nil { - logrus.WithError(err).Panicf("failed to create lazy loading cache") - } notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") @@ -146,6 +142,6 @@ func AddPublicRoutes( routing.Setup( base.PublicClientAPIMux, requestPool, syncDB, userAPI, - rsAPI, cfg, lazyLoadCache, + rsAPI, cfg, base.Caches, ) }