Change CacheMetrics to be quicker

We change it so that each cache has an individual CacheMetric, instead
of having one global CacheMetric. This means that when a cache tries to
increment a counter it does not need to go through so many indirections.
This commit is contained in:
Erik Johnston 2016-06-02 11:29:44 +01:00
parent 065e739d6e
commit 73c7112433
8 changed files with 82 additions and 70 deletions

View file

@ -33,11 +33,7 @@ from .metric import (
logger = logging.getLogger(__name__)
# We'll keep all the available metrics in a single toplevel dict, one shared
# for the entire process. We don't currently support per-HomeServer instances
# of metrics, because in practice any one python VM will host only one
# HomeServer anyway. This makes a lot of implementation neater
all_metrics = {}
all_metrics = []
class Metrics(object):
@ -53,7 +49,7 @@ class Metrics(object):
metric = metric_class(full_name, *args, **kwargs)
all_metrics[full_name] = metric
all_metrics.append(metric)
return metric
def register_counter(self, *args, **kwargs):
@ -84,12 +80,12 @@ def render_all():
# TODO(paul): Internal hack
update_resource_metrics()
for name in sorted(all_metrics.keys()):
for metric in all_metrics:
try:
strs += all_metrics[name].render()
strs += metric.render()
except Exception:
strs += ["# FAILED to render %s" % name]
logger.exception("Failed to render %s metric", name)
strs += ["# FAILED to render"]
logger.exception("Failed to render metric")
strs.append("") # to generate a final CRLF

View file

@ -47,9 +47,6 @@ class BaseMetric(object):
for k, v in zip(self.labels, values)])
)
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
@ -83,6 +80,9 @@ class CounterMetric(BaseMetric):
def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
@ -126,30 +126,30 @@ class DistributionMetric(object):
class CacheMetric(object):
"""A combination of two CounterMetrics, one to count cache hits and one to
count a total, and a callback metric to yield the current size.
__slots__ = ("name", "cache_name", "hits", "misses", "size_callback")
This metric generates standard metric name pairs, so that monitoring rules
can easily be applied to measure hit ratio."""
def __init__(self, name, size_callback, labels=[]):
def __init__(self, name, size_callback, cache_name):
self.name = name
self.cache_name = cache_name
self.hits = CounterMetric(name + ":hits", labels=labels)
self.total = CounterMetric(name + ":total", labels=labels)
self.hits = 0
self.misses = 0
self.size = CallbackMetric(
name + ":size",
callback=size_callback,
labels=labels,
)
self.size_callback = size_callback
def inc_hits(self, *values):
self.hits.inc(*values)
self.total.inc(*values)
def inc_hits(self):
self.hits += 1
def inc_misses(self, *values):
self.total.inc(*values)
def inc_misses(self):
self.misses += 1
def render(self):
return self.hits.render() + self.total.render() + self.size.render()
size = self.size_callback()
hits = self.hits
total = self.misses + self.hits
return [
"""%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
"""%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
"""%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
]

View file

@ -24,11 +24,21 @@ DEBUG_CACHES = False
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}
cache_counter = metrics.register_cache(
"cache",
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"],
)
# cache_counter = metrics.register_cache(
# "cache",
# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
# labels=["name"],
# )
def register_cache(name, cache):
caches_by_name[name] = cache
return metrics.register_cache(
"cache",
lambda: len(cache),
name,
)
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
caches_by_name["string_cache"] = _string_cache

View file

@ -22,7 +22,7 @@ from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
)
from . import caches_by_name, DEBUG_CACHES, cache_counter
from . import DEBUG_CACHES, register_cache
from twisted.internet import defer
@ -43,6 +43,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
class Cache(object):
__slots__ = (
"cache",
"max_entries",
"name",
"keylen",
"sequence",
"thread",
"metrics",
)
def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
if lru:
@ -59,7 +68,7 @@ class Cache(object):
self.keylen = keylen
self.sequence = 0
self.thread = None
caches_by_name[name] = self.cache
self.metrics = register_cache(name, self.cache)
def check_thread(self):
expected_thread = self.thread
@ -74,10 +83,10 @@ class Cache(object):
def get(self, key, default=_CacheSentinel):
val = self.cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
cache_counter.inc_hits(self.name)
self.metrics.inc_hits()
return val
cache_counter.inc_misses(self.name)
self.metrics.inc_misses()
if default is _CacheSentinel:
raise KeyError()

View file

@ -15,7 +15,7 @@
from synapse.util.caches.lrucache import LruCache
from collections import namedtuple
from . import caches_by_name, cache_counter
from . import register_cache
import threading
import logging
@ -43,7 +43,7 @@ class DictionaryCache(object):
__slots__ = []
self.sentinel = Sentinel()
caches_by_name[name] = self.cache
self.metrics = register_cache(name, self.cache)
def check_thread(self):
expected_thread = self.thread
@ -58,7 +58,7 @@ class DictionaryCache(object):
def get(self, key, dict_keys=None):
entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel:
cache_counter.inc_hits(self.name)
self.metrics.inc_hits()
if dict_keys is None:
return DictionaryEntry(entry.full, dict(entry.value))
@ -69,7 +69,7 @@ class DictionaryCache(object):
if k in entry.value
})
cache_counter.inc_misses(self.name)
self.metrics.inc_misses()
return DictionaryEntry(False, {})
def invalidate(self, key):

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name
from synapse.util.caches import register_cache
import logging
@ -49,7 +49,7 @@ class ExpiringCache(object):
self._cache = {}
caches_by_name[cache_name] = self._cache
self.metrics = register_cache(cache_name, self._cache)
def start(self):
if not self._expiry_ms:
@ -78,9 +78,9 @@ class ExpiringCache(object):
def __getitem__(self, key):
try:
entry = self._cache[key]
cache_counter.inc_hits(self._cache_name)
self.metrics.inc_hits()
except KeyError:
cache_counter.inc_misses(self._cache_name)
self.metrics.inc_misses()
raise
if self._reset_expiry_on_get:

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name
from synapse.util.caches import register_cache
from blist import sorteddict
@ -42,7 +42,7 @@ class StreamChangeCache(object):
self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
caches_by_name[self.name] = self._cache
self.metrics = register_cache(self.name, self._cache)
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
@ -53,19 +53,19 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos < self._earliest_known_stream_pos:
cache_counter.inc_misses(self.name)
self.metrics.inc_misses()
return True
latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None:
cache_counter.inc_hits(self.name)
self.metrics.inc_hits()
return False
if stream_pos < latest_entity_change_pos:
cache_counter.inc_misses(self.name)
self.metrics.inc_misses()
return True
cache_counter.inc_hits(self.name)
self.metrics.inc_hits()
return False
def get_entities_changed(self, entities, stream_pos):
@ -82,10 +82,10 @@ class StreamChangeCache(object):
self._cache[k] for k in keys[i:]
).intersection(entities)
cache_counter.inc_hits(self.name)
self.metrics.inc_hits()
else:
result = entities
cache_counter.inc_misses(self.name)
self.metrics.inc_misses()
return result

View file

@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase):
'vector{method="PUT"} 1',
])
# Check that passing too few values errors
self.assertRaises(ValueError, counter.inc)
class CallbackMetricTestCase(unittest.TestCase):
@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase):
def test_cache(self):
d = dict()
metric = CacheMetric("cache", lambda: len(d))
metric = CacheMetric("cache", lambda: len(d), "cache_name")
self.assertEquals(metric.render(), [
'cache:hits 0',
'cache:total 0',
'cache:size 0',
'cache:hits{name="cache_name"} 0',
'cache:total{name="cache_name"} 0',
'cache:size{name="cache_name"} 0',
])
metric.inc_misses()
d["key"] = "value"
self.assertEquals(metric.render(), [
'cache:hits 0',
'cache:total 1',
'cache:size 1',
'cache:hits{name="cache_name"} 0',
'cache:total{name="cache_name"} 1',
'cache:size{name="cache_name"} 1',
])
metric.inc_hits()
self.assertEquals(metric.render(), [
'cache:hits 1',
'cache:total 2',
'cache:size 1',
'cache:hits{name="cache_name"} 1',
'cache:total{name="cache_name"} 2',
'cache:size{name="cache_name"} 1',
])