Compare commits

...

30 commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre) c9646770d1 Remove non-ASCII-representable characters to fix py35-old tests. 2019-08-14 10:39:38 +01:00
Olivier Wilkinson (reivilibre) 703f9ff3c9 Merge branch 'develop' into rei/room_stats_separated 2019-08-14 08:56:24 +01:00
Olivier Wilkinson (reivilibre) 95a30250b2 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-14 08:21:21 +01:00
Olivier Wilkinson (reivilibre) f9f551fb93 Clarify docstrings in storage.stats
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-14 08:19:19 +01:00
Olivier Wilkinson (reivilibre) de6b266817 Linting
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-13 16:19:36 +01:00
Olivier Wilkinson (reivilibre) 0e6f700a13 Remove clean-up handler and replace with no-op as not currently needed.
needed.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-13 15:49:24 +01:00
Olivier Wilkinson (reivilibre) 2da4b41c63 Remove obsolete function
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-13 15:48:50 +01:00
Olivier Wilkinson (reivilibre) 96fa239b8a Fix up stats_separated1.sql
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-13 15:32:14 +01:00
Olivier Wilkinson (reivilibre) 16e2ffd166 Initial room and user statistics documentation
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-13 15:08:48 +01:00
Olivier Wilkinson (reivilibre) 20ec9698ac Fix issue with not selecting a needed column
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-13 15:08:32 +01:00
Olivier Wilkinson (reivilibre) 5b54411213 Add SQLite support by working around missing syntax 2019-08-13 15:07:57 +01:00
Olivier Wilkinson (reivilibre) fd184f6cd0 Fix generality of query 2019-08-13 15:07:00 +01:00
Olivier Wilkinson (reivilibre) 1e0bd9a9c0 Fix stats tests and their expectations of the number of events in fresh
rooms – varies depending upon room publicness.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-12 16:23:29 +01:00
Olivier Wilkinson (reivilibre) 182cdcbf24 Docstrings in storage.stats.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-12 16:22:18 +01:00
Olivier Wilkinson (reivilibre) 314567d62d Split out partial indices from theschema delta, thus supporting SQLite.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-12 16:21:35 +01:00
Olivier Wilkinson (reivilibre) d54ae7118d Move back to defer.inlineCallbacks from async as it makes stats
unergonomic if we move to `async` from the bottom-up.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-09 15:51:42 +01:00
Olivier Wilkinson (reivilibre) 9064f28e59 Add initial batch of stats tests
Still want to add more, concerning behaviour in the different states
of the background updater completion.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 16:38:23 +01:00
Olivier Wilkinson (reivilibre) 9ee50ccdf2 Fix stats_separated SQL
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 16:36:16 +01:00
Olivier Wilkinson (reivilibre) 69234072d3 Remove obsolete functions for updating stats absolutely.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:48:23 +01:00
Olivier Wilkinson (reivilibre) b3844451f9 Introduce get_room_state; a way to get state for a single room
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:47:02 +01:00
Olivier Wilkinson (reivilibre) 4a3fec1f3b Fix tests
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:46:16 +01:00
Olivier Wilkinson (reivilibre) 7f2ec2e954 Handle user registration and ensure they start accruing statistics
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:44:15 +01:00
Olivier Wilkinson (reivilibre) 1c9732d64b Update incremental processor to use new interfaces and track total_events
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:43:50 +01:00
Olivier Wilkinson (reivilibre) a59da511ce Update public room visibility change handler
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:43:25 +01:00
Olivier Wilkinson (reivilibre) 4a45fb5ab8 Adapt state delta handling to match the new interface
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:40:51 +01:00
Olivier Wilkinson (reivilibre) cf9f7ae366 Introduce total_events tracking and rework statistics tracking.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:37:29 +01:00
Olivier Wilkinson (reivilibre) 5ca4cd5ad4 Track more stats positions
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:32:17 +01:00
Olivier Wilkinson (reivilibre) 4cb921c0f9 Fix type signature in get_current_state_deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:29:31 +01:00
Olivier Wilkinson (reivilibre) 5216299124 Use threading.Lock to prevent concurrent incremental position updates
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:28:42 +01:00
Olivier Wilkinson (reivilibre) 259014b7ad Schema delta for separated statistics
Separated by current/historical

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-08 11:25:02 +01:00
10 changed files with 1981 additions and 303 deletions

2
changelog.d/5847.misc Normal file
View file

@ -0,0 +1,2 @@
Rework room and user statistics to separate current & historical rows, as well
as track stats correctly.

View file

@ -0,0 +1,146 @@
Room and User Statistics
========================
Synapse maintains room and user statistics (as well as a cache of room state),
in various tables.
These can be used for administrative purposes but are also used when generating
the public room directory. If these tables get stale or out of sync (possibly
after database corruption), you may wish to regenerate them.
# Synapse Administrator Documentation
## Various SQL scripts that you may find useful
### Delete stats, including historical stats
```sql
DELETE FROM room_stats_current;
DELETE FROM room_stats_historical;
DELETE FROM user_stats_current;
DELETE FROM user_stats_historical;
```
### Regenerate stats (all subjects)
```sql
BEGIN;
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE);
COMMIT;
DELETE FROM room_stats_current;
DELETE FROM user_stats_current;
```
then follow the steps below for **'Regenerate stats (missing subjects only)'**
### Regenerate stats (missing subjects only)
```sql
-- Set up staging tables
-- we depend on current_state_events_membership because this is used
-- in our counting.
INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_stats_prepare', '{}', 'current_state_events_membership');
-- Run through each room and update stats
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
-- Run through each user and update stats.
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
-- Clean up staging tables
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_cleanup', '{}', 'populate_stats_process_users');
```
then **restart Synapse**.
# Synapse Developer Documentation
## High-Level Concepts
### Definitions
* **subject**: Something we are tracking stats about currently a room or user.
* **current row**: An entry for a subject in the appropriate current statistics
table. Each subject can have only one.
* **historical row**: An entry for a subject in the appropriate historical
statistics table. Each subject can have any number of these.
### Overview
Stats are maintained as time series. There are two kinds of column:
* absolute columns where the value is correct for the time given by `end_ts`
in the stats row. (Imagine a line graph for these values)
* per-slice columns where the value corresponds to how many of the occurrences
occurred within the time slice given by `(end_ts bucket_size)…end_ts`
or `start_ts…end_ts`. (Imagine a histogram for these values)
Currently, only absolute columns are in use.
Stats are maintained in two tables (for each type): current and historical.
Current stats correspond to the present values. Each subject can only have one
entry.
Historical stats correspond to values in the past. Subjects may have multiple
entries.
## Concepts around the management of stats
### current rows
#### dirty current rows
Current rows can be **dirty**, which means that they have changed since the
latest historical row for the same subject.
**Dirty** current rows possess an end timestamp, `end_ts`.
#### old current rows and old collection
When a (necessarily dirty) current row has an `end_ts` in the past, it is said
to be **old**.
Old current rows must be copied into a historical row, and cleared of their dirty
status, before further statistics can be tracked for that subject.
The process which does this is referred to as **old collection**.
#### incomplete current rows
There are also **incomplete** current rows, which are current rows that do not
contain a full count yet this is because they are waiting for the regeneration
process to give them an initial count. Incomplete current rows DO NOT contain
correct and up-to-date values. As such, *incomplete rows are not old-collected*.
Instead, old incomplete rows will be extended so they are no longer old.
### historical rows
Historical rows can always be considered to be valid for the time slice and
end time specified. (This, of course, assumes a lack of defects in the code
to track the statistics, and assumes integrity of the database).
Even still, there are two considerations that we may need to bear in mind:
* historical rows will not exist for every time slice they will be omitted
if there were no changes. In this case, the following assumptions can be
made to interpolate/recreate missing rows:
- absolute fields have the same values as in the preceding row
- per-slice fields are zero (`0`)
* historical rows will not be retained forever rows older than a configurable
time will be purged.
#### purge
The purging of historical rows is not yet implemented.

View file

@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@ -65,43 +62,58 @@ class StatsHandler(StateDeltasHandler):
if not self.hs.config.stats_enabled:
return
if self._is_processing:
return
lock = self.store.stats_delta_processing_lock
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
lock.release()
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
if lock.acquire(blocking=False):
# we only want to run this process one-at-a-time,
# and also, if the initial background updater wants us to keep out,
# we should respect that.
try:
run_as_background_process("stats.notify_new_event", process)
except: # noqa: E722 re-raised so fine
lock.release()
raise
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
if self.pos is None or None in self.pos.values():
self.pos = yield self.store.get_stats_positions()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
# If still None then the initial background update hasn't started yet
if self.pos is None or None in self.pos.values():
return None
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
with Measure(self.clock, "stats_delta"):
while True:
deltas = yield self.store.get_current_state_deltas(
self.pos["state_delta_stream_id"]
)
if not deltas:
return
break
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
event_processing_positions.labels("stats").set(self.pos)
event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)
if self.pos is not None:
yield self.store.update_stats_positions(self.pos)
with Measure(self.clock, "stats_total_events"):
self.pos = yield self.store.incremental_update_total_events(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
@ -119,7 +131,7 @@ class StatsHandler(StateDeltasHandler):
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
token = yield self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@ -144,44 +156,56 @@ class StatsHandler(StateDeltasHandler):
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
now = int(now) // 1000
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
room_stats_delta = {}
room_stats_complete = False
if prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
room_stats_delta["current_state_events"] = (
room_stats_delta.get("current_state_events", 0) + 1
)
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
prev_membership = prev_event_content.get(
"membership", Membership.LEAVE
)
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
if prev_membership is None:
logger.debug("No previous membership for this user.")
elif prev_membership == Membership.JOIN:
room_stats_delta["joined_members"] = (
room_stats_delta.get("joined_members", 0) - 1
)
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
room_stats_delta["invited_members"] = (
room_stats_delta.get("invited_members", 0) - 1
)
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
room_stats_delta["left_members"] = (
room_stats_delta.get("left_members", 0) - 1
)
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
room_stats_delta["banned_members"] = (
room_stats_delta.get("banned_members", 0) - 1
)
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
@ -189,20 +213,20 @@ class StatsHandler(StateDeltasHandler):
raise ValueError(err)
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
room_stats_delta["joined_members"] = (
room_stats_delta.get("joined_members", 0) + 1
)
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
room_stats_delta["invited_members"] = (
room_stats_delta.get("invited_members", 0) + 1
)
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
room_stats_delta["left_members"] = (
room_stats_delta.get("left_members", 0) + 1
)
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
room_stats_delta["banned_members"] = (
room_stats_delta.get("banned_members", 0) + 1
)
else:
err = "%s is not a valid membership" % (repr(membership),)
@ -210,26 +234,19 @@ class StatsHandler(StateDeltasHandler):
raise ValueError(err)
user_id = state_key
if self.is_mine_id(user_id):
if self.is_mine_id(user_id) and membership in (
Membership.JOIN,
Membership.LEAVE,
):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
)
field = "public_rooms" if public else "private_rooms"
delta = +1 if membership == Membership.JOIN else -1
yield self.store.update_stats_delta(
now, "user", user_id, {field: delta}
)
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
@ -246,28 +263,46 @@ class StatsHandler(StateDeltasHandler):
},
)
room_stats_complete = True
elif typ == EventTypes.JoinRules:
old_room_state = yield self.store.get_room_state(room_id)
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
# whether the room would be public anyway,
# because of history_visibility
other_field_gives_publicity = (
old_room_state["history_visibility"] == "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
if not other_field_gives_publicity:
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.RoomHistoryVisibility:
old_room_state = yield self.store.get_room_state(room_id)
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
# whether the room would be public anyway,
# because of join_rule
other_field_gives_publicity = (
old_room_state["join_rules"] == JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
if not other_field_gives_publicity:
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
@ -290,6 +325,20 @@ class StatsHandler(StateDeltasHandler):
room_id, {"canonical_alias": event_content.get("alias")}
)
if room_stats_complete:
yield self.store.update_stats_delta(
now,
"room",
room_id,
room_stats_delta,
complete_with_stream_id=stream_id,
)
elif len(room_stats_delta) > 0:
yield self.store.update_stats_delta(
now, "room", room_id, room_stats_delta
)
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
@ -308,10 +357,13 @@ class StatsHandler(StateDeltasHandler):
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
ts,
"user",
user_id,
{
"public_rooms": +1 if is_public else -1,
"private_rooms": -1 if is_public else +1,
},
)
@defer.inlineCallbacks

View file

@ -845,6 +845,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))

View file

@ -0,0 +1,168 @@
/* Copyright 2018 New Vector Ltd
* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
----- First clean up from previous versions of room stats.
-- First remove old stats stuff
DROP TABLE IF EXISTS room_stats;
DROP TABLE IF EXISTS user_stats;
DROP TABLE IF EXISTS room_stats_earliest_tokens;
DROP TABLE IF EXISTS _temp_populate_stats_position;
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
DROP TABLE IF EXISTS stats_stream_pos;
-- Unschedule old background updates if they're still scheduled
DELETE FROM background_updates WHERE update_name IN (
'populate_stats_createtables',
'populate_stats_process_rooms',
'populate_stats_cleanup',
'regen_stats'
);
----- Create tables for our version of room stats.
-- single-row table to track position of incremental updates
CREATE TABLE IF NOT EXISTS stats_incremental_position (
-- the stream_id of the last-processed state delta
state_delta_stream_id BIGINT,
-- the stream_ordering of the last-processed backfilled event
-- (this is negative)
total_events_min_stream_ordering BIGINT,
-- the stream_ordering of the last-processed normally-created event
-- (this is positive)
total_events_max_stream_ordering BIGINT,
-- If true, this represents the contract agreed upon by the background
-- population processor.
-- If false, this is suitable for use by the delta/incremental processor.
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
);
-- insert a null row and make sure it is the only one.
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
-- represents PRESENT room statistics for a room
CREATE TABLE IF NOT EXISTS room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
-- These starts cover the time from start_ts...end_ts (in seconds).
-- Note that end_ts is quantised, and start_ts usually so.
start_ts BIGINT,
end_ts BIGINT,
current_state_events INT NOT NULL DEFAULT 0,
total_events INT NOT NULL DEFAULT 0,
joined_members INT NOT NULL DEFAULT 0,
invited_members INT NOT NULL DEFAULT 0,
left_members INT NOT NULL DEFAULT 0,
banned_members INT NOT NULL DEFAULT 0,
-- If initial background count is still to be performed: NULL
-- If initial background count has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT,
CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
);
-- represents HISTORICAL room statistics for a room
CREATE TABLE IF NOT EXISTS room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
-- Note that end_ts is quantised, and start_ts usually so.
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
PRIMARY KEY (room_id, end_ts)
);
-- We use this index to speed up deletion of ancient room stats.
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular room.)
-- represents PRESENT statistics for a user
CREATE TABLE IF NOT EXISTS user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
-- The timestamp that represents the start of the
start_ts BIGINT,
end_ts BIGINT,
public_rooms INT DEFAULT 0 NOT NULL,
private_rooms INT DEFAULT 0 NOT NULL,
-- If initial background count is still to be performed: NULL
-- If initial background count has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL statistics for a user
CREATE TABLE IF NOT EXISTS user_stats_historical (
user_id TEXT NOT NULL,
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
PRIMARY KEY (user_id, end_ts)
);
-- We use this index to speed up deletion of ancient user stats.
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular user.)
-- Set up staging tables
-- we depend on current_state_events_membership because this is used
-- in our counting.
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_prepare', '{}', 'current_state_events_membership');
-- Run through each room and update stats
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
-- Run through each user and update stats.
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
-- Clean up staging tables
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_cleanup', '{}', 'populate_stats_process_users');

View file

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This schema delta will be run after 'stats_separated1.sql' due to lexicographic
# ordering. Note that it MUST be so.
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
def _run_create_generic(stats_type, cursor, database_engine):
"""
Creates the pertinent (partial, if supported) indices for one kind of stats.
Args:
stats_type: "room" or "user" - the type of stats
cursor: Database Cursor
database_engine: Database Engine
"""
if isinstance(database_engine, Sqlite3Engine):
# even though SQLite >= 3.8 can support partial indices, we won't enable
# them, in case the SQLite database may be later used on another system.
# It's also the case that SQLite is only likely to be used in small
# deployments or testing, where the optimisations gained by use of a
# partial index are not a big concern.
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
ON %s_stats_current (end_ts);
"""
% (stats_type, stats_type)
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
ON %s_stats_current (completed_delta_stream_id, %s_id);
"""
% (stats_type, stats_type, stats_type)
)
elif isinstance(database_engine, PostgresEngine):
# This partial index helps us with finding dirty stats rows
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
ON %s_stats_current (end_ts)
WHERE end_ts IS NOT NULL;
"""
% (stats_type, stats_type)
)
# This partial index helps us with old collection
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
ON %s_stats_current (%s_id)
WHERE completed_delta_stream_id IS NULL;
"""
% (stats_type, stats_type, stats_type)
)
else:
raise NotImplementedError("Unknown database engine.")
def run_create(cursor, database_engine):
"""
This function is called as part of the schema delta.
It will create indices - partial, if supported - for the new 'separated'
room & user statistics.
"""
_run_create_generic("room", cursor, database_engine)
_run_create_generic("user", cursor, database_engine)
def run_upgrade(cur, database_engine, config):
"""
This function is run on a database upgrade (of a non-empty database).
We have no need to do anything specific here.
"""
pass

View file

@ -31,7 +31,7 @@ class StateDeltasStore(SQLBaseStore):
- state_key (str):
- event_id (str|None): new event_id for this state key. None if the
state has been deleted.
- prev_event_id (str|None): previous event_id for this state key. None
- prev_event_id (str): previous event_id for this state key. None
if it's new state.
Args:

File diff suppressed because it is too large Load diff

View file

@ -17,12 +17,18 @@ from mock import Mock
from twisted.internet import defer
from synapse import storage
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
# The expected number of state events in a fresh public room.
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM = 5
# The expected number of state events in a fresh private room.
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM = 6
class StatsRoomTests(unittest.HomeserverTestCase):
@ -33,7 +39,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
@ -47,7 +52,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
@ -56,7 +61,17 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_createtables",
"depends_on": "populate_stats_prepare",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
@ -66,11 +81,33 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
"depends_on": "populate_stats_process_users",
},
)
)
def _get_current_stats(self, stats_type, stat_id):
table, id_col = storage.stats.TYPE_TO_TABLE[stats_type]
cols = (
["start_ts", "end_ts", "completed_delta_stream_id"]
+ list(storage.stats.ABSOLUTE_STATS_FIELDS[stats_type])
+ list(storage.stats.PER_SLICE_FIELDS[stats_type])
)
return self.get_success(
self.store._simple_select_one(
table + "_current", {id_col: stat_id}, cols, allow_none=True
)
)
def _perform_background_initial_update(self):
# Do the initial population of the stats via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
@ -114,6 +151,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
@ -138,12 +176,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
self.get_success(self.store.update_stats_stream_pos(None))
self.get_success(self.store.update_stats_positions(None))
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
@ -154,6 +192,8 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
# orig_delta_processor = self.store.
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
@ -185,8 +225,13 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
# Get the deltas! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
# self.handler.notify_new_event()
# We need to let the delta processor advance…
self.pump(10 * 60)
# Get the slices! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
@ -259,7 +304,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
room_1 = self.helper.create_room_as(u1, tok=u1_token)
# Do the initial population of the user directory via the background update
# Do the initial population of the stats via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
@ -299,6 +344,528 @@ class StatsRoomTests(unittest.HomeserverTestCase):
# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)
def test_create_user(self):
"""
When we create a user, it should have statistics already ready.
"""
u1 = self.register_user("u1", "pass")
u1stats = self._get_current_stats("user", u1)
self.assertIsNotNone(u1stats)
# row is complete
self.assertIsNotNone(u1stats["completed_delta_stream_id"])
# not in any rooms by default
self.assertEqual(u1stats["public_rooms"], 0)
self.assertEqual(u1stats["private_rooms"], 0)
def test_create_room(self):
"""
When we create a room, it should have statistics already ready.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
r1stats = self._get_current_stats("room", r1)
r2 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
r2stats = self._get_current_stats("room", r2)
self.assertIsNotNone(r1stats)
self.assertIsNotNone(r2stats)
# row is complete
self.assertIsNotNone(r1stats["completed_delta_stream_id"])
self.assertIsNotNone(r2stats["completed_delta_stream_id"])
# contains the default things you'd expect in a fresh room
self.assertEqual(
r1stats["total_events"],
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM,
"Wrong number of total_events in new room's stats!"
" You may need to update this if more state events are added to"
" the room creation process.",
)
self.assertEqual(
r2stats["total_events"],
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
"Wrong number of total_events in new room's stats!"
" You may need to update this if more state events are added to"
" the room creation process.",
)
self.assertEqual(
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(
r2stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM
)
self.assertEqual(r1stats["joined_members"], 1)
self.assertEqual(r1stats["invited_members"], 0)
self.assertEqual(r1stats["banned_members"], 0)
self.assertEqual(r2stats["joined_members"], 1)
self.assertEqual(r2stats["invited_members"], 0)
self.assertEqual(r2stats["banned_members"], 0)
def test_send_message_increments_total_events(self):
"""
When we send a message, it increments total_events.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send(r1, "hiss", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
def test_send_state_event_nonoverwriting(self):
"""
When we send a non-overwriting state event, it increments total_events AND current_state_events
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
self.helper.send_state(
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send_state(
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="moggy"
)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
def test_send_state_event_overwriting(self):
"""
When we send an overwriting state event, it increments total_events ONLY
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
self.helper.send_state(
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send_state(
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby"
)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
def test_join_first_time(self):
"""
When a user joins a room for the first time, total_events, current_state_events and
joined_members should increase by exactly 1.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], 1
)
def test_join_after_leave(self):
"""
When a user joins a room after being previously left, total_events and
joined_members should increase by exactly 1.
current_state_events should not increase.
left_members should decrease by exactly 1.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
self.helper.leave(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
)
self.assertEqual(
r1stats_post["left_members"] - r1stats_ante["left_members"], -1
)
def test_invited(self):
"""
When a user invites another user, current_state_events, total_events and
invited_members should increase by exactly 1.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
r1stats_ante = self._get_current_stats("room", r1)
self.helper.invite(r1, u1, u2, tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
self.assertEqual(
r1stats_post["invited_members"] - r1stats_ante["invited_members"], +1
)
def test_join_after_invite(self):
"""
When a user joins a room after being invited, total_events and
joined_members should increase by exactly 1.
current_state_events should not increase.
invited_members should decrease by exactly 1.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.invite(r1, u1, u2, tok=u1token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
)
self.assertEqual(
r1stats_post["invited_members"] - r1stats_ante["invited_members"], -1
)
def test_left(self):
"""
When a user leaves a room after joining, total_events and
left_members should increase by exactly 1.
current_state_events should not increase.
joined_members should decrease by exactly 1.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.leave(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["left_members"] - r1stats_ante["left_members"], +1
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
)
def test_banned(self):
"""
When a user is banned from a room after joining, total_events and
left_members should increase by exactly 1.
current_state_events should not increase.
banned_members should decrease by exactly 1.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.change_membership(r1, u1, u2, "ban", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["banned_members"] - r1stats_ante["banned_members"], +1
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
)
def test_initial_background_update(self):
"""
Test that statistics can be generated by the initial background update
handler.
This test also tests that stats rows are not created for new subjects
when stats are disabled. However, it may be desirable to change this
behaviour eventually to still keep current rows.
"""
self.hs.config.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
# test that these subjects, which were created during a time of disabled
# stats, do not have stats.
self.assertIsNone(self._get_current_stats("room", r1))
self.assertIsNone(self._get_current_stats("user", u1))
self.hs.config.stats_enabled = True
self._perform_background_initial_update()
r1stats = self._get_current_stats("room", r1)
u1stats = self._get_current_stats("user", u1)
self.assertIsNotNone(r1stats["completed_delta_stream_id"])
self.assertIsNotNone(u1stats["completed_delta_stream_id"])
self.assertEqual(r1stats["joined_members"], 1)
self.assertEqual(
r1stats["total_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(u1stats["public_rooms"], 1)
def test_incomplete_stats(self):
"""
This tests that we track incomplete statistics.
We first test that incomplete stats are incrementally generated,
following the preparation of a background regen.
We then test that these incomplete rows are completed by the background
regen.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
u3 = self.register_user("u3", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
# preparation stage of the initial background update
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_delete(
"room_stats_current", {"1": 1}, "test_delete_stats"
)
)
self.get_success(
self.store._simple_delete(
"user_stats_current", {"1": 1}, "test_delete_stats"
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r1stats_ante = self._get_current_stats("room", r1)
u1stats_ante = self._get_current_stats("user", u1)
u2stats_ante = self._get_current_stats("user", u2)
self.helper.invite(r1, u1, u2, tok=u1token)
self.helper.join(r1, u2, tok=u2token)
self.helper.invite(r1, u1, u3, tok=u1token)
self.helper.send(r1, "thou shalt yield", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
u1stats_post = self._get_current_stats("user", u1)
u2stats_post = self._get_current_stats("user", u2)
# now let the background update continue & finish
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_prepare",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_users",
},
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r1stats_complete = self._get_current_stats("room", r1)
u1stats_complete = self._get_current_stats("user", u1)
u2stats_complete = self._get_current_stats("user", u2)
# now we make our assertions
# first check that none of the stats rows were complete before
# the background update occurred.
self.assertIsNone(r1stats_ante["completed_delta_stream_id"])
self.assertIsNone(r1stats_post["completed_delta_stream_id"])
self.assertIsNone(u1stats_ante["completed_delta_stream_id"])
self.assertIsNone(u1stats_post["completed_delta_stream_id"])
self.assertIsNone(u2stats_ante["completed_delta_stream_id"])
self.assertIsNone(u2stats_post["completed_delta_stream_id"])
# check that _ante rows are all skeletons without any deltas applied
self.assertEqual(r1stats_ante["joined_members"], 0)
self.assertEqual(r1stats_ante["invited_members"], 0)
self.assertEqual(r1stats_ante["total_events"], 0)
self.assertEqual(r1stats_ante["current_state_events"], 0)
self.assertEqual(u1stats_ante["public_rooms"], 0)
self.assertEqual(u1stats_ante["private_rooms"], 0)
self.assertEqual(u2stats_ante["public_rooms"], 0)
self.assertEqual(u2stats_ante["private_rooms"], 0)
# check that _post rows have the expected deltas applied
self.assertEqual(r1stats_post["joined_members"], 1)
self.assertEqual(r1stats_post["invited_members"], 1)
self.assertEqual(r1stats_post["total_events"], 4)
self.assertEqual(r1stats_post["current_state_events"], 2)
self.assertEqual(u1stats_post["public_rooms"], 0)
self.assertEqual(u1stats_post["private_rooms"], 0)
self.assertEqual(u2stats_post["public_rooms"], 0)
self.assertEqual(u2stats_post["private_rooms"], 1)
# check that _complete rows are complete and correct
self.assertEqual(r1stats_complete["joined_members"], 2)
self.assertEqual(r1stats_complete["invited_members"], 1)
self.assertEqual(
r1stats_complete["total_events"],
4 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
)
self.assertEqual(
r1stats_complete["current_state_events"],
2 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
)
self.assertEqual(u1stats_complete["public_rooms"], 0)
self.assertEqual(u1stats_complete["private_rooms"], 1)
self.assertEqual(u2stats_complete["public_rooms"], 0)
self.assertEqual(u2stats_complete["private_rooms"], 1)

View file

@ -128,8 +128,12 @@ class RestHelper(object):
return channel.json_body
def send_state(self, room_id, event_type, body, tok, expect_code=200):
path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""):
path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % (
room_id,
event_type,
state_key,
)
if tok:
path = path + "?access_token=%s" % tok