0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-06-20 11:38:20 +02:00

Order in-flight state group queries in biggest-first order (#11610)

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
This commit is contained in:
reivilibre 2022-03-01 13:41:57 +00:00 committed by GitHub
parent e2e1d90a5e
commit c893632319
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 131 additions and 4 deletions

1
changelog.d/11610.misc Normal file
View file

@ -0,0 +1 @@
Deduplicate in-flight requests in `_get_state_for_groups`.

View file

@ -25,6 +25,7 @@ from typing import (
)
import attr
from sortedcontainers import SortedDict
from twisted.internet import defer
@ -72,6 +73,24 @@ class _GetStateGroupDelta:
return len(self.delta_ids) if self.delta_ids else 0
def state_filter_rough_priority_comparator(
state_filter: StateFilter,
) -> Tuple[int, int]:
"""
Returns a comparable value that roughly indicates the relative size of this
state filter compared to others.
'Larger' state filters should sort first when using ascending order, so
this is essentially the opposite of 'size'.
It should be treated as a rough guide only and should not be interpreted to
have any particular meaning. The representation may also change
The current implementation returns a tuple of the form:
* -1 for include_others, 0 otherwise
* -(number of entries in state_filter.types)
"""
return -int(state_filter.include_others), -len(state_filter.types)
class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"""A data store for fetching/storing state groups."""
@ -127,7 +146,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
# Current ongoing get_state_for_groups in-flight requests
# {group ID -> {StateFilter -> ObservableDeferred}}
self._state_group_inflight_requests: Dict[
int, Dict[StateFilter, AbstractObservableDeferred[StateMap[str]]]
int, SortedDict[StateFilter, AbstractObservableDeferred[StateMap[str]]]
] = {}
def get_max_state_group_txn(txn: Cursor) -> int:
@ -279,7 +298,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
# The list of ongoing requests which will help narrow the current request.
reusable_requests = []
for (request_state_filter, request_deferred) in inflight_requests.items():
# Iterate over existing requests in roughly biggest-first order.
for request_state_filter in inflight_requests:
request_deferred = inflight_requests[request_state_filter]
new_state_filter_left_over = state_filter_left_over.approx_difference(
request_state_filter
)
@ -358,7 +380,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
observable_deferred = ObservableDeferred(request_deferred, consumeErrors=True)
# Insert the ObservableDeferred into the cache
group_request_dict = self._state_group_inflight_requests.setdefault(group, {})
group_request_dict = self._state_group_inflight_requests.setdefault(
group, SortedDict(state_filter_rough_priority_comparator)
)
group_request_dict[db_state_filter] = observable_deferred
return await make_deferred_yieldable(observable_deferred.observe())

View file

@ -15,11 +15,16 @@ import typing
from typing import Dict, List, Sequence, Tuple
from unittest.mock import patch
from parameterized import parameterized
from twisted.internet.defer import Deferred, ensureDeferred
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes
from synapse.storage.databases.state.store import MAX_INFLIGHT_REQUESTS_PER_GROUP
from synapse.storage.databases.state.store import (
MAX_INFLIGHT_REQUESTS_PER_GROUP,
state_filter_rough_priority_comparator,
)
from synapse.storage.state import StateFilter
from synapse.types import StateMap
from synapse.util import Clock
@ -350,3 +355,100 @@ class StateGroupInflightCachingTestCase(HomeserverTestCase):
self._complete_request_fake(groups, sf, d)
self.assertTrue(reqs[CAP_COUNT].called)
self.assertTrue(reqs[CAP_COUNT + 1].called)
@parameterized.expand([(False,), (True,)])
def test_ordering_of_request_reuse(self, reverse: bool) -> None:
"""
Tests that 'larger' in-flight requests are ordered first.
This is mostly a design decision in order to prevent a request from
hanging on to multiple queries when it would have been sufficient to
hang on to only one bigger query.
The 'size' of a state filter is a rough heuristic.
- requests two pieces of state, one 'larger' than the other, but each
spawning a query
- requests a third piece of state
- completes the larger of the first two queries
- checks that the third request gets completed (and doesn't needlessly
wait for the other query)
Parameters:
reverse: whether to reverse the order of the initial requests, to ensure
that the effect doesn't depend on the order of request submission.
"""
# We add in an extra state type to make sure that both requests spawn
# queries which are not optimised out.
state_filters = [
StateFilter.freeze(
{"state.type": {"A"}, "other.state.type": {"a"}}, include_others=False
),
StateFilter.freeze(
{
"state.type": None,
"other.state.type": {"b"},
# The current rough size comparator uses the number of state types
# as an indicator of size.
# To influence it to make this state filter bigger than the previous one,
# we add another dummy state type.
"extra.state.type": {"c"},
},
include_others=False,
),
]
if reverse:
# For fairness, we perform one test run with the list reversed.
state_filters.reverse()
smallest_state_filter_idx = 1
biggest_state_filter_idx = 0
else:
smallest_state_filter_idx = 0
biggest_state_filter_idx = 1
# This assertion is for our own sanity more than anything else.
self.assertLess(
state_filter_rough_priority_comparator(
state_filters[biggest_state_filter_idx]
),
state_filter_rough_priority_comparator(
state_filters[smallest_state_filter_idx]
),
"Test invalid: bigger state filter is not actually bigger.",
)
# Spawn the initial two requests
for state_filter in state_filters:
ensureDeferred(
self.state_datastore._get_state_for_group_using_inflight_cache(
42,
state_filter,
)
)
# Spawn a third request
req = ensureDeferred(
self.state_datastore._get_state_for_group_using_inflight_cache(
42,
StateFilter.freeze(
{
"state.type": {"A"},
},
include_others=False,
),
)
)
self.pump(by=0.1)
self.assertFalse(req.called)
# Complete the largest request's query to make sure that the final request
# only waits for that one (and doesn't needlessly wait for both queries)
self._complete_request_fake(
*self.get_state_group_calls[biggest_state_filter_idx]
)
# That should have been sufficient to complete the third request
self.assertTrue(req.called)