0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-12-14 04:43:50 +01:00

Update room directory in Pinecone demo some more

This commit is contained in:
Neil Alexander 2021-05-07 12:48:30 +01:00
parent 603bf590f0
commit 81d60d5448
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -27,8 +27,6 @@ import (
pineconeSessions "github.com/matrix-org/pinecone/sessions" pineconeSessions "github.com/matrix-org/pinecone/sessions"
) )
const pineconeRoomAttempts = 3
type PineconeRoomProvider struct { type PineconeRoomProvider struct {
r *pineconeRouter.Router r *pineconeRouter.Router
s *pineconeSessions.Sessions s *pineconeSessions.Sessions
@ -77,31 +75,24 @@ func bulkFetchPublicRoomsFromServers(
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(homeservers)) wg.Add(len(homeservers))
// concurrently query for public rooms // concurrently query for public rooms
reqctx, reqcancel := context.WithCancel(ctx) reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5)
for _, hs := range homeservers { for _, hs := range homeservers {
go func(homeserverDomain gomatrixserverlib.ServerName) { go func(homeserverDomain gomatrixserverlib.ServerName) {
defer wg.Done() defer wg.Done()
util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
var fres gomatrixserverlib.RespPublicRooms fres, err := fedClient.GetPublicRooms(reqctx, homeserverDomain, int(limit), "", false, "")
var err error
for i := 0; i < pineconeRoomAttempts; i++ {
fres, err = fedClient.GetPublicRooms(reqctx, homeserverDomain, int(limit), "", false, "")
if err != nil { if err != nil {
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn( util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchPublicRoomsFromServers: failed to query hs", "bulkFetchPublicRoomsFromServers: failed to query hs",
) )
if i == pineconeRoomAttempts-1 {
return return
} }
} else {
break
}
}
for _, room := range fres.Chunk { for _, room := range fres.Chunk {
// atomically send a room or stop // atomically send a room or stop
select { select {
case roomCh <- room: case roomCh <- room:
case <-done: case <-done:
case <-reqctx.Done():
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
return return
} }