0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2025-01-22 11:20:26 +01:00

Add basic read/write lock (#15782)

This commit is contained in:
Erik Johnston 2023-07-05 17:25:00 +01:00 committed by GitHub
parent ce857c05d5
commit 39d131b016
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 731 additions and 59 deletions

1
changelog.d/15782.misc Normal file
View file

@ -0,0 +1 @@
Add read/write style cross-worker locks.

View file

@ -197,6 +197,11 @@ IGNORED_TABLES = {
"ui_auth_sessions", "ui_auth_sessions",
"ui_auth_sessions_credentials", "ui_auth_sessions_credentials",
"ui_auth_sessions_ips", "ui_auth_sessions_ips",
# Ignore the worker locks table, as a) there shouldn't be any acquired locks
# after porting, and b) the circular foreign key constraints make it hard to
# port.
"worker_read_write_locks_mode",
"worker_read_write_locks",
} }
@ -805,7 +810,9 @@ class Porter:
) )
# Map from table name to args passed to `handle_table`, i.e. a tuple # Map from table name to args passed to `handle_table`, i.e. a tuple
# of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`. # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
tables_to_port_info_map = {r[0]: r[1:] for r in setup_res} tables_to_port_info_map = {
r[0]: r[1:] for r in setup_res if r[0] not in IGNORED_TABLES
}
# Step 5. Do the copying. # Step 5. Do the copying.
# #

View file

@ -25,6 +25,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection, LoggingDatabaseConnection,
LoggingTransaction, LoggingTransaction,
) )
from synapse.storage.engines import PostgresEngine
from synapse.util import Clock from synapse.util import Clock
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
@ -68,12 +69,20 @@ class LockStore(SQLBaseStore):
self._reactor = hs.get_reactor() self._reactor = hs.get_reactor()
self._instance_name = hs.get_instance_id() self._instance_name = hs.get_instance_id()
# A map from `(lock_name, lock_key)` to the token of any locks that we # A map from `(lock_name, lock_key)` to lock that we think we
# think we currently hold. # currently hold.
self._live_tokens: WeakValueDictionary[ self._live_lock_tokens: WeakValueDictionary[
Tuple[str, str], Lock Tuple[str, str], Lock
] = WeakValueDictionary() ] = WeakValueDictionary()
# A map from `(lock_name, lock_key, token)` to read/write lock that we
# think we currently hold. For a given lock_name/lock_key, there can be
# multiple read locks at a time but only one write lock (no mixing read
# and write locks at the same time).
self._live_read_write_lock_tokens: WeakValueDictionary[
Tuple[str, str, str], Lock
] = WeakValueDictionary()
# When we shut down we want to remove the locks. Technically this can # When we shut down we want to remove the locks. Technically this can
# lead to a race, as we may drop the lock while we are still processing. # lead to a race, as we may drop the lock while we are still processing.
# However, a) it should be a small window, b) the lock is best effort # However, a) it should be a small window, b) the lock is best effort
@ -91,11 +100,13 @@ class LockStore(SQLBaseStore):
"""Called when the server is shutting down""" """Called when the server is shutting down"""
logger.info("Dropping held locks due to shutdown") logger.info("Dropping held locks due to shutdown")
# We need to take a copy of the tokens dict as dropping the locks will # We need to take a copy of the locks as dropping the locks will cause
# cause the dictionary to change. # the dictionary to change.
locks = dict(self._live_tokens) locks = list(self._live_lock_tokens.values()) + list(
self._live_read_write_lock_tokens.values()
)
for lock in locks.values(): for lock in locks:
await lock.release() await lock.release()
logger.info("Dropped locks due to shutdown") logger.info("Dropped locks due to shutdown")
@ -122,7 +133,7 @@ class LockStore(SQLBaseStore):
""" """
# Check if this process has taken out a lock and if it's still valid. # Check if this process has taken out a lock and if it's still valid.
lock = self._live_tokens.get((lock_name, lock_key)) lock = self._live_lock_tokens.get((lock_name, lock_key))
if lock and await lock.is_still_valid(): if lock and await lock.is_still_valid():
return None return None
@ -176,61 +187,111 @@ class LockStore(SQLBaseStore):
self._reactor, self._reactor,
self._clock, self._clock,
self, self,
read_write=False,
lock_name=lock_name, lock_name=lock_name,
lock_key=lock_key, lock_key=lock_key,
token=token, token=token,
) )
self._live_tokens[(lock_name, lock_key)] = lock self._live_lock_tokens[(lock_name, lock_key)] = lock
return lock return lock
async def _is_lock_still_valid( async def try_acquire_read_write_lock(
self, lock_name: str, lock_key: str, token: str self,
) -> bool: lock_name: str,
"""Checks whether this instance still holds the lock.""" lock_key: str,
last_renewed_ts = await self.db_pool.simple_select_one_onecol( write: bool,
table="worker_locks", ) -> Optional["Lock"]:
keyvalues={ """Try to acquire a lock for the given name/key. Will return an async
"lock_name": lock_name, context manager if the lock is successfully acquired, which *must* be
"lock_key": lock_key, used (otherwise the lock will leak).
"token": token, """
},
retcol="last_renewed_ts", now = self._clock.time_msec()
allow_none=True, token = random_string(6)
desc="is_lock_still_valid",
) def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None:
return ( # We attempt to acquire the lock by inserting into
last_renewed_ts is not None # `worker_read_write_locks` and seeing if that fails any
and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts # constraints. If it doesn't then we have acquired the lock,
# otherwise we haven't.
#
# Before that though we clear the table of any stale locks.
delete_sql = """
DELETE FROM worker_read_write_locks
WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
"""
insert_sql = """
INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?, ?)
"""
if isinstance(self.database_engine, PostgresEngine):
# For Postgres we can send these queries at the same time.
txn.execute(
delete_sql + ";" + insert_sql,
(
# DELETE args
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
# UPSERT args
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
else:
# For SQLite these need to be two queries.
txn.execute(
delete_sql,
(
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
),
)
txn.execute(
insert_sql,
(
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
return
try:
await self.db_pool.runInteraction(
"try_acquire_read_write_lock",
_try_acquire_read_write_lock_txn,
)
except self.database_engine.module.IntegrityError:
return None
lock = Lock(
self._reactor,
self._clock,
self,
read_write=True,
lock_name=lock_name,
lock_key=lock_key,
token=token,
) )
async def _renew_lock(self, lock_name: str, lock_key: str, token: str) -> None: self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock
"""Attempt to renew the lock if we still hold it."""
await self.db_pool.simple_update(
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
"token": token,
},
updatevalues={"last_renewed_ts": self._clock.time_msec()},
desc="renew_lock",
)
async def _drop_lock(self, lock_name: str, lock_key: str, token: str) -> None: return lock
"""Attempt to drop the lock, if we still hold it"""
await self.db_pool.simple_delete(
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
"token": token,
},
desc="drop_lock",
)
self._live_tokens.pop((lock_name, lock_key), None)
class Lock: class Lock:
@ -259,6 +320,7 @@ class Lock:
reactor: IReactorCore, reactor: IReactorCore,
clock: Clock, clock: Clock,
store: LockStore, store: LockStore,
read_write: bool,
lock_name: str, lock_name: str,
lock_key: str, lock_key: str,
token: str, token: str,
@ -266,13 +328,23 @@ class Lock:
self._reactor = reactor self._reactor = reactor
self._clock = clock self._clock = clock
self._store = store self._store = store
self._read_write = read_write
self._lock_name = lock_name self._lock_name = lock_name
self._lock_key = lock_key self._lock_key = lock_key
self._token = token self._token = token
self._table = "worker_read_write_locks" if read_write else "worker_locks"
self._looping_call = clock.looping_call( self._looping_call = clock.looping_call(
self._renew, _RENEWAL_INTERVAL_MS, store, lock_name, lock_key, token self._renew,
_RENEWAL_INTERVAL_MS,
store,
clock,
read_write,
lock_name,
lock_key,
token,
) )
self._dropped = False self._dropped = False
@ -281,6 +353,8 @@ class Lock:
@wrap_as_background_process("Lock._renew") @wrap_as_background_process("Lock._renew")
async def _renew( async def _renew(
store: LockStore, store: LockStore,
clock: Clock,
read_write: bool,
lock_name: str, lock_name: str,
lock_key: str, lock_key: str,
token: str, token: str,
@ -291,12 +365,34 @@ class Lock:
don't end up with a reference to `self` in the reactor, which would stop don't end up with a reference to `self` in the reactor, which would stop
this from being cleaned up if we dropped the context manager. this from being cleaned up if we dropped the context manager.
""" """
await store._renew_lock(lock_name, lock_key, token) table = "worker_read_write_locks" if read_write else "worker_locks"
await store.db_pool.simple_update(
table=table,
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
"token": token,
},
updatevalues={"last_renewed_ts": clock.time_msec()},
desc="renew_lock",
)
async def is_still_valid(self) -> bool: async def is_still_valid(self) -> bool:
"""Check if the lock is still held by us""" """Check if the lock is still held by us"""
return await self._store._is_lock_still_valid( last_renewed_ts = await self._store.db_pool.simple_select_one_onecol(
self._lock_name, self._lock_key, self._token table=self._table,
keyvalues={
"lock_name": self._lock_name,
"lock_key": self._lock_key,
"token": self._token,
},
retcol="last_renewed_ts",
allow_none=True,
desc="is_lock_still_valid",
)
return (
last_renewed_ts is not None
and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts
) )
async def __aenter__(self) -> None: async def __aenter__(self) -> None:
@ -325,7 +421,23 @@ class Lock:
if self._looping_call.running: if self._looping_call.running:
self._looping_call.stop() self._looping_call.stop()
await self._store._drop_lock(self._lock_name, self._lock_key, self._token) await self._store.db_pool.simple_delete(
table=self._table,
keyvalues={
"lock_name": self._lock_name,
"lock_key": self._lock_key,
"token": self._token,
},
desc="drop_lock",
)
if self._read_write:
self._store._live_read_write_lock_tokens.pop(
(self._lock_name, self._lock_key, self._token), None
)
else:
self._store._live_lock_tokens.pop((self._lock_name, self._lock_key), None)
self._dropped = True self._dropped = True
def __del__(self) -> None: def __del__(self) -> None:

View file

@ -0,0 +1,152 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- We implement read/write style locks by using two tables with mutual foreign
-- key constraints. Note that this implementation is vulnerable to starving
-- writers if read locks repeatedly get acquired.
--
-- The first table (`worker_read_write_locks_mode`) indicates that a given lock
-- has either been acquired in read mode *or* write mode, but not both. This is
-- enforced by the unique constraint. Each instance of a lock being acquired is
-- associated with a random `token`.
--
-- The second table (`worker_read_write_locks`) tracks who has currently
-- acquired a given lock. For a given lock_name/lock_key, there can be multiple
-- read locks at a time but only one write lock (no mixing read and write locks
-- at the same time).
--
-- The foreign key from the second to first table enforces that for any given
-- lock the second table cannot have a mix of rows with read or write.
--
-- The foreign key from the first to second table enforces that we don't have a
-- row for a lock in the first table if not in the second table.
--
--
-- Furthermore, we add some triggers to automatically keep the first table up to
-- date when inserting/deleting from the second table. This reduces the number
-- of round trips needed to acquire and release locks, as those operations
-- simply become an INSERT or DELETE. These triggers are added in a separate
-- delta due to database specific syntax.
-- A table to track whether a lock is currently acquired, and if so whether its
-- in read or write mode.
CREATE TABLE worker_read_write_locks_mode (
lock_name TEXT NOT NULL,
lock_key TEXT NOT NULL,
-- Whether this lock is in read (false) or write (true) mode
write_lock BOOLEAN NOT NULL,
-- A token that has currently acquired the lock. We need this so that we can
-- add a foreign constraint from this table to `worker_read_write_locks`.
token TEXT NOT NULL
);
-- Ensure that we can only have one row per lock
CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
-- We need this (redundant) constraint so that we can have a foreign key
-- constraint against this table.
CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
-- A table to track who has currently acquired a given lock.
CREATE TABLE worker_read_write_locks (
lock_name TEXT NOT NULL,
lock_key TEXT NOT NULL,
-- We write the instance name to ease manual debugging, we don't ever read
-- from it.
-- Note: instance names aren't guarenteed to be unique.
instance_name TEXT NOT NULL,
-- Whether the process has taken out a "read" or a "write" lock.
write_lock BOOLEAN NOT NULL,
-- A random string generated each time an instance takes out a lock. Used by
-- the instance to tell whether the lock is still held by it (e.g. in the
-- case where the process stalls for a long time the lock may time out and
-- be taken out by another instance, at which point the original instance
-- can tell it no longer holds the lock as the tokens no longer match).
token TEXT NOT NULL,
last_renewed_ts BIGINT NOT NULL,
-- This constraint ensures that a given lock has only been acquired in read
-- xor write mode, but not both.
FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock)
);
CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
-- Ensures that only one instance can acquire a lock in write mode at a time.
CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
-- Add a foreign key constraint to ensure that if a lock is in
-- `worker_read_write_locks_mode` then there must be a corresponding row in
-- `worker_read_write_locks` (i.e. we don't accidentally end up with a row in
-- `worker_read_write_locks_mode` when the lock is not currently acquired).
--
-- We only add to PostgreSQL as SQLite does not support adding constraints
-- after table creation, and so doesn't support "circular" foreign key
-- constraints.
ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign
FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED;
-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
-- and acquire a lock, i.e. insert into `worker_read_write_locks`,
CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
BEGIN
INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
ON CONFLICT (lock_name, lock_key)
DO NOTHING;
RETURN NEW;
END
$$
LANGUAGE plpgsql;
CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
FOR EACH ROW
EXECUTE PROCEDURE upsert_read_write_lock_parent();
-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
-- update the `worker_read_write_locks_mode.token` to match another instance
-- that has currently acquired the lock, or we delete the row if nobody has
-- currently acquired a lock.
CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$
DECLARE
new_token TEXT;
BEGIN
SELECT token INTO new_token FROM worker_read_write_locks
WHERE
lock_name = OLD.lock_name
AND lock_key = OLD.lock_key;
IF NOT FOUND THEN
DELETE FROM worker_read_write_locks_mode
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
ELSE
UPDATE worker_read_write_locks_mode
SET token = new_token
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
END IF;
RETURN NEW;
END
$$
LANGUAGE plpgsql;
CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks
FOR EACH ROW
EXECUTE PROCEDURE delete_read_write_lock_parent();

View file

@ -0,0 +1,119 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- c.f. the postgres version for context. The tables and constraints are the
-- same, however they need to be defined slightly differently to work around how
-- each database handles circular foreign key references.
-- A table to track whether a lock is currently acquired, and if so whether its
-- in read or write mode.
CREATE TABLE worker_read_write_locks_mode (
lock_name TEXT NOT NULL,
lock_key TEXT NOT NULL,
-- Whether this lock is in read (false) or write (true) mode
write_lock BOOLEAN NOT NULL,
-- A token that has currently acquired the lock. We need this so that we can
-- add a foreign constraint from this table to `worker_read_write_locks`.
token TEXT NOT NULL,
-- Add a foreign key constraint to ensure that if a lock is in
-- `worker_read_write_locks_mode` then there must be a corresponding row in
-- `worker_read_write_locks` (i.e. we don't accidentally end up with a row in
-- `worker_read_write_locks_mode` when the lock is not currently acquired).
FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED
);
-- Ensure that we can only have one row per lock
CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
-- We need this (redundant) constraint so that we can have a foreign key
-- constraint against this table.
CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
-- A table to track who has currently acquired a given lock.
CREATE TABLE worker_read_write_locks (
lock_name TEXT NOT NULL,
lock_key TEXT NOT NULL,
-- We write the instance name to ease manual debugging, we don't ever read
-- from it.
-- Note: instance names aren't guarenteed to be unique.
instance_name TEXT NOT NULL,
-- Whether the process has taken out a "read" or a "write" lock.
write_lock BOOLEAN NOT NULL,
-- A random string generated each time an instance takes out a lock. Used by
-- the instance to tell whether the lock is still held by it (e.g. in the
-- case where the process stalls for a long time the lock may time out and
-- be taken out by another instance, at which point the original instance
-- can tell it no longer holds the lock as the tokens no longer match).
token TEXT NOT NULL,
last_renewed_ts BIGINT NOT NULL,
-- This constraint ensures that a given lock has only been acquired in read
-- xor write mode, but not both.
FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock)
);
CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
-- Ensures that only one instance can acquire a lock in write mode at a time.
CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
-- and acquire a lock, i.e. insert into `worker_read_write_locks`,
CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger
BEFORE INSERT ON worker_read_write_locks
FOR EACH ROW
BEGIN
-- First ensure that `worker_read_write_locks_mode` doesn't have stale
-- entries in it, as on SQLite we don't have the foreign key constraint to
-- enforce this.
DELETE FROM worker_read_write_locks_mode
WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
AND NOT EXISTS (
SELECT 1 FROM worker_read_write_locks
WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
);
INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
ON CONFLICT (lock_name, lock_key)
DO NOTHING;
END;
-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
-- update the `worker_read_write_locks_mode.token` to match another instance
-- that has currently acquired the lock, or we delete the row if nobody has
-- currently acquired a lock.
CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger
AFTER DELETE ON worker_read_write_locks
FOR EACH ROW
BEGIN
DELETE FROM worker_read_write_locks_mode
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
AND NOT EXISTS (
SELECT 1 FROM worker_read_write_locks
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
);
UPDATE worker_read_write_locks_mode
SET token = (
SELECT token FROM worker_read_write_locks
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
)
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
END;

View file

@ -166,4 +166,285 @@ class LockTestCase(unittest.HomeserverTestCase):
# Now call the shutdown code # Now call the shutdown code
self.get_success(self.store._on_shutdown()) self.get_success(self.store._on_shutdown())
self.assertEqual(self.store._live_tokens, {}) self.assertEqual(self.store._live_lock_tokens, {})
class ReadWriteLockTestCase(unittest.HomeserverTestCase):
"""Test the read/write lock implementation."""
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
def test_acquire_write_contention(self) -> None:
"""Test that we can only acquire one write lock at a time"""
# Track the number of tasks holding the lock.
# Should be at most 1.
in_lock = 0
max_in_lock = 0
release_lock: "Deferred[None]" = Deferred()
async def task() -> None:
nonlocal in_lock
nonlocal max_in_lock
lock = await self.store.try_acquire_read_write_lock(
"name", "key", write=True
)
if not lock:
return
async with lock:
in_lock += 1
max_in_lock = max(max_in_lock, in_lock)
# Block to allow other tasks to attempt to take the lock.
await release_lock
in_lock -= 1
# Start 3 tasks.
task1 = defer.ensureDeferred(task())
task2 = defer.ensureDeferred(task())
task3 = defer.ensureDeferred(task())
# Give the reactor a kick so that the database transaction returns.
self.pump()
release_lock.callback(None)
# Run the tasks to completion.
# To work around `Linearizer`s using a different reactor to sleep when
# contended (#12841), we call `runUntilCurrent` on
# `twisted.internet.reactor`, which is a different reactor to that used
# by the homeserver.
assert isinstance(reactor, ReactorBase)
self.get_success(task1)
reactor.runUntilCurrent()
self.get_success(task2)
reactor.runUntilCurrent()
self.get_success(task3)
# At most one task should have held the lock at a time.
self.assertEqual(max_in_lock, 1)
def test_acquire_multiple_reads(self) -> None:
"""Test that we can acquire multiple read locks at a time"""
# Track the number of tasks holding the lock.
in_lock = 0
max_in_lock = 0
release_lock: "Deferred[None]" = Deferred()
async def task() -> None:
nonlocal in_lock
nonlocal max_in_lock
lock = await self.store.try_acquire_read_write_lock(
"name", "key", write=False
)
if not lock:
return
async with lock:
in_lock += 1
max_in_lock = max(max_in_lock, in_lock)
# Block to allow other tasks to attempt to take the lock.
await release_lock
in_lock -= 1
# Start 3 tasks.
task1 = defer.ensureDeferred(task())
task2 = defer.ensureDeferred(task())
task3 = defer.ensureDeferred(task())
# Give the reactor a kick so that the database transaction returns.
self.pump()
release_lock.callback(None)
# Run the tasks to completion.
# To work around `Linearizer`s using a different reactor to sleep when
# contended (#12841), we call `runUntilCurrent` on
# `twisted.internet.reactor`, which is a different reactor to that used
# by the homeserver.
assert isinstance(reactor, ReactorBase)
self.get_success(task1)
reactor.runUntilCurrent()
self.get_success(task2)
reactor.runUntilCurrent()
self.get_success(task3)
# At most one task should have held the lock at a time.
self.assertEqual(max_in_lock, 3)
def test_write_lock_acquired(self) -> None:
"""Test that we can take out a write lock and that while we hold it
nobody else can take it out.
"""
# First to acquire this lock, so it should complete
lock = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
assert lock is not None
# Enter the context manager
self.get_success(lock.__aenter__())
# Attempting to acquire the lock again fails, as both read and write.
lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNone(lock2)
lock3 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=False)
)
self.assertIsNone(lock3)
# Calling `is_still_valid` reports true.
self.assertTrue(self.get_success(lock.is_still_valid()))
# Drop the lock
self.get_success(lock.__aexit__(None, None, None))
# We can now acquire the lock again.
lock4 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
assert lock4 is not None
self.get_success(lock4.__aenter__())
self.get_success(lock4.__aexit__(None, None, None))
def test_read_lock_acquired(self) -> None:
"""Test that we can take out a read lock and that while we hold it
only other reads can use it.
"""
# First to acquire this lock, so it should complete
lock = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=False)
)
assert lock is not None
# Enter the context manager
self.get_success(lock.__aenter__())
# Attempting to acquire the write lock fails
lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNone(lock2)
# Attempting to acquire a read lock succeeds
lock3 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=False)
)
assert lock3 is not None
self.get_success(lock3.__aenter__())
# Calling `is_still_valid` reports true.
self.assertTrue(self.get_success(lock.is_still_valid()))
# Drop the first lock
self.get_success(lock.__aexit__(None, None, None))
# Attempting to acquire the write lock still fails, as lock3 is still
# active.
lock4 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNone(lock4)
# Drop the still open third lock
self.get_success(lock3.__aexit__(None, None, None))
# We can now acquire the lock again.
lock5 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
assert lock5 is not None
self.get_success(lock5.__aenter__())
self.get_success(lock5.__aexit__(None, None, None))
def test_maintain_lock(self) -> None:
"""Test that we don't time out locks while they're still active (lock is
renewed in the background if the process is still alive)"""
lock = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
assert lock is not None
self.get_success(lock.__aenter__())
# Wait for ages with the lock, we should not be able to get the lock.
self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000)
self.pump()
lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNone(lock2)
self.get_success(lock.__aexit__(None, None, None))
def test_timeout_lock(self) -> None:
"""Test that we time out locks if they're not updated for ages"""
lock = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
assert lock is not None
self.get_success(lock.__aenter__())
# We simulate the process getting stuck by cancelling the looping call
# that keeps the lock active.
lock._looping_call.stop()
# Wait for the lock to timeout.
self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNotNone(lock2)
self.assertFalse(self.get_success(lock.is_still_valid()))
def test_drop(self) -> None:
"""Test that dropping the context manager means we stop renewing the lock"""
lock = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNotNone(lock)
del lock
# Wait for the lock to timeout.
self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNotNone(lock2)
def test_shutdown(self) -> None:
"""Test that shutting down Synapse releases the locks"""
# Acquire two locks
lock = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
)
self.assertIsNotNone(lock)
lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key2", write=True)
)
self.assertIsNotNone(lock2)
# Now call the shutdown code
self.get_success(self.store._on_shutdown())
self.assertEqual(self.store._live_read_write_lock_tokens, {})