From 42b12934e33c81b0684347d5c02e874bf3492674 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Mon, 13 Mar 2023 10:39:02 +0100
Subject: [PATCH] Don't crash when a room errors

---
 src/api/client_server/sync.rs | 1231 +++++++++++++++++----------------
 1 file changed, 630 insertions(+), 601 deletions(-)

diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index 834438c9..5eb820cc 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -2,7 +2,14 @@ use crate::{service::rooms::timeline::PduCount, services, Error, Result, Ruma, R
 use ruma::{
     api::client::{
         filter::{FilterDefinition, LazyLoadOptions},
-        sync::sync_events::{self, DeviceLists, UnreadNotificationsCount},
+        sync::sync_events::{
+            self,
+            v3::{
+                Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
+                LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
+            },
+            DeviceLists, UnreadNotificationsCount,
+        },
         uiaa::UiaaResponse,
     },
     events::{
@@ -10,7 +17,7 @@ use ruma::{
         RoomEventType, StateEventType,
     },
     serde::Raw,
-    OwnedDeviceId, OwnedUserId, RoomId, UserId,
+    DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId,
 };
 use std::{
     collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
@@ -160,11 +167,6 @@ async fn sync_helper(
     body: sync_events::v3::Request,
     // bool = caching allowed
 ) -> Result<(sync_events::v3::Response, bool), Error> {
-    use sync_events::v3::{
-        Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
-        Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
-    };
-
     // TODO: match body.set_presence {
     services().rooms.edus.presence.ping_presence(&sender_user)?;
 
@@ -192,6 +194,8 @@ async fn sync_helper(
         _ => (false, false),
     };
 
+    let full_state = body.full_state;
+
     let mut joined_rooms = BTreeMap::new();
     let since = body
         .since
@@ -220,605 +224,57 @@ async fn sync_helper(
         .collect::<Vec<_>>();
     for room_id in all_joined_rooms {
         let room_id = room_id?;
-
-        {
-            // Get and drop the lock to wait for remaining operations to finish
-            // This will make sure the we have all events until next_batch
-            let mutex_insert = Arc::clone(
-                services()
-                    .globals
-                    .roomid_mutex_insert
-                    .write()
-                    .unwrap()
-                    .entry(room_id.clone())
-                    .or_default(),
-            );
-            let insert_lock = mutex_insert.lock().unwrap();
-            drop(insert_lock);
-        }
-
-        let timeline_pdus;
-        let limited;
-        if services()
-            .rooms
-            .timeline
-            .last_timeline_count(&sender_user, &room_id)?
-            > sincecount
-        {
-            let mut non_timeline_pdus = services()
-                .rooms
-                .timeline
-                .pdus_until(&sender_user, &room_id, PduCount::max())?
-                .filter_map(|r| {
-                    // Filter out buggy events
-                    if r.is_err() {
-                        error!("Bad pdu in pdus_since: {:?}", r);
-                    }
-                    r.ok()
-                })
-                .take_while(|(pducount, _)| pducount > &sincecount);
-
-            // Take the last 10 events for the timeline
-            timeline_pdus = non_timeline_pdus
-                .by_ref()
-                .take(10)
-                .collect::<Vec<_>>()
-                .into_iter()
-                .rev()
-                .collect::<Vec<_>>();
-
-            // They /sync response doesn't always return all messages, so we say the output is
-            // limited unless there are events in non_timeline_pdus
-            limited = non_timeline_pdus.next().is_some();
-        } else {
-            timeline_pdus = Vec::new();
-            limited = false;
-        }
-
-        let send_notification_counts = !timeline_pdus.is_empty()
-            || services()
-                .rooms
-                .user
-                .last_notification_read(&sender_user, &room_id)?
-                > since;
-
-        let mut timeline_users = HashSet::new();
-        for (_, event) in &timeline_pdus {
-            timeline_users.insert(event.sender.as_str().to_owned());
-        }
-
-        services().rooms.lazy_loading.lazy_load_confirm_delivery(
+        if let Ok(joined_room) = load_joined_room(
             &sender_user,
             &sender_device,
             &room_id,
+            since,
             sincecount,
-        )?;
-
-        // Database queries:
-
-        let current_shortstatehash =
-            if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
-                s
-            } else {
-                error!("Room {} has no state", room_id);
-                continue;
-            };
-
-        let since_shortstatehash = services()
-            .rooms
-            .user
-            .get_token_shortstatehash(&room_id, since)?;
-
-        // Calculates joined_member_count, invited_member_count and heroes
-        let calculate_counts = || {
-            let joined_member_count = services()
-                .rooms
-                .state_cache
-                .room_joined_count(&room_id)?
-                .unwrap_or(0);
-            let invited_member_count = services()
-                .rooms
-                .state_cache
-                .room_invited_count(&room_id)?
-                .unwrap_or(0);
-
-            // Recalculate heroes (first 5 members)
-            let mut heroes = Vec::new();
-
-            if joined_member_count + invited_member_count <= 5 {
-                // Go through all PDUs and for each member event, check if the user is still joined or
-                // invited until we have 5 or we reach the end
-
-                for hero in services()
-                    .rooms
-                    .timeline
-                    .all_pdus(&sender_user, &room_id)?
-                    .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
-                    .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
-                    .map(|(_, pdu)| {
-                        let content: RoomMemberEventContent =
-                            serde_json::from_str(pdu.content.get()).map_err(|_| {
-                                Error::bad_database("Invalid member event in database.")
-                            })?;
-
-                        if let Some(state_key) = &pdu.state_key {
-                            let user_id = UserId::parse(state_key.clone()).map_err(|_| {
-                                Error::bad_database("Invalid UserId in member PDU.")
-                            })?;
-
-                            // The membership was and still is invite or join
-                            if matches!(
-                                content.membership,
-                                MembershipState::Join | MembershipState::Invite
-                            ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
-                                || services()
-                                    .rooms
-                                    .state_cache
-                                    .is_invited(&user_id, &room_id)?)
-                            {
-                                Ok::<_, Error>(Some(state_key.clone()))
-                            } else {
-                                Ok(None)
-                            }
-                        } else {
-                            Ok(None)
-                        }
-                    })
-                    // Filter out buggy users
-                    .filter_map(|u| u.ok())
-                    // Filter for possible heroes
-                    .flatten()
-                {
-                    if heroes.contains(&hero) || hero == sender_user.as_str() {
-                        continue;
-                    }
-
-                    heroes.push(hero);
-                }
-            }
-
-            Ok::<_, Error>((
-                Some(joined_member_count),
-                Some(invited_member_count),
-                heroes,
-            ))
-        };
-
-        let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
-            .and_then(|shortstatehash| {
-                services()
-                    .rooms
-                    .state_accessor
-                    .state_get(
-                        shortstatehash,
-                        &StateEventType::RoomMember,
-                        sender_user.as_str(),
-                    )
-                    .transpose()
-            })
-            .transpose()?
-            .and_then(|pdu| {
-                serde_json::from_str(pdu.content.get())
-                    .map_err(|_| Error::bad_database("Invalid PDU in database."))
-                    .ok()
-            });
-
-        let joined_since_last_sync =
-            since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
-
-        let (
-            heroes,
-            joined_member_count,
-            invited_member_count,
-            joined_since_last_sync,
-            state_events,
-        ) = if since_shortstatehash.is_none() || joined_since_last_sync {
-            // Probably since = 0, we will do an initial sync
-
-            let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
-
-            let current_state_ids = services()
-                .rooms
-                .state_accessor
-                .state_full_ids(current_shortstatehash)
-                .await?;
-
-            let mut state_events = Vec::new();
-            let mut lazy_loaded = HashSet::new();
-
-            let mut i = 0;
-            for (shortstatekey, id) in current_state_ids {
-                let (event_type, state_key) = services()
-                    .rooms
-                    .short
-                    .get_statekey_from_short(shortstatekey)?;
-
-                if event_type != StateEventType::RoomMember {
-                    let pdu = match services().rooms.timeline.get_pdu(&id)? {
-                        Some(pdu) => pdu,
-                        None => {
-                            error!("Pdu in state not found: {}", id);
-                            continue;
-                        }
-                    };
-                    state_events.push(pdu);
-
-                    i += 1;
-                    if i % 100 == 0 {
-                        tokio::task::yield_now().await;
-                    }
-                } else if !lazy_load_enabled
-                    || body.full_state
-                    || timeline_users.contains(&state_key)
-                    // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
-                    || *sender_user == state_key
-                {
-                    let pdu = match services().rooms.timeline.get_pdu(&id)? {
-                        Some(pdu) => pdu,
-                        None => {
-                            error!("Pdu in state not found: {}", id);
-                            continue;
-                        }
-                    };
-
-                    // This check is in case a bad user ID made it into the database
-                    if let Ok(uid) = UserId::parse(&state_key) {
-                        lazy_loaded.insert(uid);
-                    }
-                    state_events.push(pdu);
-
-                    i += 1;
-                    if i % 100 == 0 {
-                        tokio::task::yield_now().await;
-                    }
-                }
-            }
-
-            // Reset lazy loading because this is an initial sync
-            services().rooms.lazy_loading.lazy_load_reset(
-                &sender_user,
-                &sender_device,
-                &room_id,
-            )?;
-
-            // The state_events above should contain all timeline_users, let's mark them as lazy
-            // loaded.
-            services().rooms.lazy_loading.lazy_load_mark_sent(
-                &sender_user,
-                &sender_device,
-                &room_id,
-                lazy_loaded,
-                next_batchcount,
-            );
-
-            (
-                heroes,
-                joined_member_count,
-                invited_member_count,
-                true,
-                state_events,
-            )
-        } else if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
-            // No state changes
-            (Vec::new(), None, None, false, Vec::new())
-        } else {
-            // Incremental /sync
-            let since_shortstatehash = since_shortstatehash.unwrap();
-
-            let mut state_events = Vec::new();
-            let mut lazy_loaded = HashSet::new();
-
-            if since_shortstatehash != current_shortstatehash {
-                let current_state_ids = services()
-                    .rooms
-                    .state_accessor
-                    .state_full_ids(current_shortstatehash)
-                    .await?;
-                let since_state_ids = services()
-                    .rooms
-                    .state_accessor
-                    .state_full_ids(since_shortstatehash)
-                    .await?;
-
-                for (key, id) in current_state_ids {
-                    if body.full_state || since_state_ids.get(&key) != Some(&id) {
-                        let pdu = match services().rooms.timeline.get_pdu(&id)? {
-                            Some(pdu) => pdu,
-                            None => {
-                                error!("Pdu in state not found: {}", id);
-                                continue;
-                            }
-                        };
-
-                        if pdu.kind == RoomEventType::RoomMember {
-                            match UserId::parse(
-                                pdu.state_key
-                                    .as_ref()
-                                    .expect("State event has state key")
-                                    .clone(),
-                            ) {
-                                Ok(state_key_userid) => {
-                                    lazy_loaded.insert(state_key_userid);
-                                }
-                                Err(e) => error!("Invalid state key for member event: {}", e),
-                            }
-                        }
-
-                        state_events.push(pdu);
-                        tokio::task::yield_now().await;
-                    }
-                }
-            }
-
-            for (_, event) in &timeline_pdus {
-                if lazy_loaded.contains(&event.sender) {
-                    continue;
-                }
-
-                if !services().rooms.lazy_loading.lazy_load_was_sent_before(
-                    &sender_user,
-                    &sender_device,
-                    &room_id,
-                    &event.sender,
-                )? || lazy_load_send_redundant
-                {
-                    if let Some(member_event) = services().rooms.state_accessor.room_state_get(
-                        &room_id,
-                        &StateEventType::RoomMember,
-                        event.sender.as_str(),
-                    )? {
-                        lazy_loaded.insert(event.sender.clone());
-                        state_events.push(member_event);
-                    }
-                }
-            }
-
-            services().rooms.lazy_loading.lazy_load_mark_sent(
-                &sender_user,
-                &sender_device,
-                &room_id,
-                lazy_loaded,
-                next_batchcount,
-            );
-
-            let encrypted_room = services()
-                .rooms
-                .state_accessor
-                .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
-                .is_some();
-
-            let since_encryption = services().rooms.state_accessor.state_get(
-                since_shortstatehash,
-                &StateEventType::RoomEncryption,
-                "",
-            )?;
-
-            // Calculations:
-            let new_encrypted_room = encrypted_room && since_encryption.is_none();
-
-            let send_member_count = state_events
-                .iter()
-                .any(|event| event.kind == RoomEventType::RoomMember);
-
-            if encrypted_room {
-                for state_event in &state_events {
-                    if state_event.kind != RoomEventType::RoomMember {
-                        continue;
-                    }
-
-                    if let Some(state_key) = &state_event.state_key {
-                        let user_id = UserId::parse(state_key.clone())
-                            .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
-
-                        if user_id == sender_user {
-                            continue;
-                        }
-
-                        let new_membership = serde_json::from_str::<RoomMemberEventContent>(
-                            state_event.content.get(),
-                        )
-                        .map_err(|_| Error::bad_database("Invalid PDU in database."))?
-                        .membership;
-
-                        match new_membership {
-                            MembershipState::Join => {
-                                // A new user joined an encrypted room
-                                if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
-                                    device_list_updates.insert(user_id);
-                                }
-                            }
-                            MembershipState::Leave => {
-                                // Write down users that have left encrypted rooms we are in
-                                left_encrypted_users.insert(user_id);
-                            }
-                            _ => {}
-                        }
-                    }
-                }
-            }
-
-            if joined_since_last_sync && encrypted_room || new_encrypted_room {
-                // If the user is in a new encrypted room, give them all joined users
-                device_list_updates.extend(
-                    services()
-                        .rooms
-                        .state_cache
-                        .room_members(&room_id)
-                        .flatten()
-                        .filter(|user_id| {
-                            // Don't send key updates from the sender to the sender
-                            &sender_user != user_id
-                        })
-                        .filter(|user_id| {
-                            // Only send keys if the sender doesn't share an encrypted room with the target already
-                            !share_encrypted_room(&sender_user, user_id, &room_id).unwrap_or(false)
-                        }),
-                );
-            }
-
-            let (joined_member_count, invited_member_count, heroes) = if send_member_count {
-                calculate_counts()?
-            } else {
-                (None, None, Vec::new())
-            };
-
-            (
-                heroes,
-                joined_member_count,
-                invited_member_count,
-                joined_since_last_sync,
-                state_events,
-            )
-        };
-
-        // Look for device list updates in this room
-        device_list_updates.extend(
-            services()
-                .users
-                .keys_changed(room_id.as_ref(), since, None)
-                .filter_map(|r| r.ok()),
-        );
-
-        let notification_count = if send_notification_counts {
-            Some(
-                services()
-                    .rooms
-                    .user
-                    .notification_count(&sender_user, &room_id)?
-                    .try_into()
-                    .expect("notification count can't go that high"),
-            )
-        } else {
-            None
-        };
-
-        let highlight_count = if send_notification_counts {
-            Some(
-                services()
-                    .rooms
-                    .user
-                    .highlight_count(&sender_user, &room_id)?
-                    .try_into()
-                    .expect("highlight count can't go that high"),
-            )
-        } else {
-            None
-        };
-
-        let prev_batch = timeline_pdus
-            .first()
-            .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
-                Ok(Some(match pdu_count {
-                    PduCount::Backfilled(_) => {
-                        error!("timeline in backfill state?!");
-                        "0".to_owned()
-                    }
-                    PduCount::Normal(c) => c.to_string(),
-                }))
-            })?;
-
-        let room_events: Vec<_> = timeline_pdus
-            .iter()
-            .map(|(_, pdu)| pdu.to_sync_room_event())
-            .collect();
-
-        let mut edus: Vec<_> = services()
-            .rooms
-            .edus
-            .read_receipt
-            .readreceipts_since(&room_id, since)
-            .filter_map(|r| r.ok()) // Filter out buggy events
-            .map(|(_, _, v)| v)
-            .collect();
-
-        if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
-            edus.push(
-                serde_json::from_str(
-                    &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
-                        .expect("event is valid, we just created it"),
-                )
-                .expect("event is valid, we just created it"),
-            );
-        }
-
-        // Save the state after this sync so we can send the correct state diff next sync
-        services().rooms.user.associate_token_shortstatehash(
-            &room_id,
             next_batch,
-            current_shortstatehash,
-        )?;
-
-        let joined_room = JoinedRoom {
-            account_data: RoomAccountData {
-                events: services()
-                    .account_data
-                    .changes_since(Some(&room_id), &sender_user, since)?
-                    .into_iter()
-                    .filter_map(|(_, v)| {
-                        serde_json::from_str(v.json().get())
-                            .map_err(|_| Error::bad_database("Invalid account event in database."))
-                            .ok()
-                    })
-                    .collect(),
-            },
-            summary: RoomSummary {
-                heroes,
-                joined_member_count: joined_member_count.map(|n| (n as u32).into()),
-                invited_member_count: invited_member_count.map(|n| (n as u32).into()),
-            },
-            unread_notifications: UnreadNotificationsCount {
-                highlight_count,
-                notification_count,
-            },
-            timeline: Timeline {
-                limited: limited || joined_since_last_sync,
-                prev_batch,
-                events: room_events,
-            },
-            state: State {
-                events: state_events
-                    .iter()
-                    .map(|pdu| pdu.to_sync_state_event())
-                    .collect(),
-            },
-            ephemeral: Ephemeral { events: edus },
-            unread_thread_notifications: BTreeMap::new(),
-        };
-
-        if !joined_room.is_empty() {
-            joined_rooms.insert(room_id.clone(), joined_room);
-        }
-
-        // Take presence updates from this room
-        for (user_id, presence) in services()
-            .rooms
-            .edus
-            .presence
-            .presence_since(&room_id, since)?
+            next_batchcount,
+            lazy_load_enabled,
+            lazy_load_send_redundant,
+            full_state,
+            &mut device_list_updates,
+            &mut left_encrypted_users,
+        )
+        .await
         {
-            match presence_updates.entry(user_id) {
-                Entry::Vacant(v) => {
-                    v.insert(presence);
-                }
-                Entry::Occupied(mut o) => {
-                    let p = o.get_mut();
+            if !joined_room.is_empty() {
+                joined_rooms.insert(room_id.clone(), joined_room);
+            }
 
-                    // Update existing presence event with more info
-                    p.content.presence = presence.content.presence;
-                    if let Some(status_msg) = presence.content.status_msg {
-                        p.content.status_msg = Some(status_msg);
+            // Take presence updates from this room
+            for (user_id, presence) in services()
+                .rooms
+                .edus
+                .presence
+                .presence_since(&room_id, since)?
+            {
+                match presence_updates.entry(user_id) {
+                    Entry::Vacant(v) => {
+                        v.insert(presence);
                     }
-                    if let Some(last_active_ago) = presence.content.last_active_ago {
-                        p.content.last_active_ago = Some(last_active_ago);
-                    }
-                    if let Some(displayname) = presence.content.displayname {
-                        p.content.displayname = Some(displayname);
-                    }
-                    if let Some(avatar_url) = presence.content.avatar_url {
-                        p.content.avatar_url = Some(avatar_url);
-                    }
-                    if let Some(currently_active) = presence.content.currently_active {
-                        p.content.currently_active = Some(currently_active);
+                    Entry::Occupied(mut o) => {
+                        let p = o.get_mut();
+
+                        // Update existing presence event with more info
+                        p.content.presence = presence.content.presence;
+                        if let Some(status_msg) = presence.content.status_msg {
+                            p.content.status_msg = Some(status_msg);
+                        }
+                        if let Some(last_active_ago) = presence.content.last_active_ago {
+                            p.content.last_active_ago = Some(last_active_ago);
+                        }
+                        if let Some(displayname) = presence.content.displayname {
+                            p.content.displayname = Some(displayname);
+                        }
+                        if let Some(avatar_url) = presence.content.avatar_url {
+                            p.content.avatar_url = Some(avatar_url);
+                        }
+                        if let Some(currently_active) = presence.content.currently_active {
+                            p.content.currently_active = Some(currently_active);
+                        }
                     }
                 }
             }
@@ -915,13 +371,13 @@ async fn sync_helper(
 
         let mut i = 0;
         for (key, id) in left_state_ids {
-            if body.full_state || since_state_ids.get(&key) != Some(&id) {
+            if full_state || since_state_ids.get(&key) != Some(&id) {
                 let (event_type, state_key) =
                     services().rooms.short.get_statekey_from_short(key)?;
 
                 if !lazy_load_enabled
                     || event_type != StateEventType::RoomMember
-                    || body.full_state
+                    || full_state
                     // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
                     || *sender_user == state_key
                 {
@@ -1075,7 +531,7 @@ async fn sync_helper(
     };
 
     // TODO: Retry the endpoint instead of returning (waiting for #118)
-    if !body.full_state
+    if !full_state
         && response.rooms.is_empty()
         && response.presence.is_empty()
         && response.account_data.is_empty()
@@ -1095,6 +551,579 @@ async fn sync_helper(
     }
 }
 
+async fn load_joined_room(
+    sender_user: &UserId,
+    sender_device: &DeviceId,
+    room_id: &RoomId,
+    since: u64,
+    sincecount: PduCount,
+    next_batch: u64,
+    next_batchcount: PduCount,
+    lazy_load_enabled: bool,
+    lazy_load_send_redundant: bool,
+    full_state: bool,
+    device_list_updates: &mut HashSet<OwnedUserId>,
+    left_encrypted_users: &mut HashSet<OwnedUserId>,
+) -> Result<JoinedRoom> {
+    {
+        // Get and drop the lock to wait for remaining operations to finish
+        // This will make sure the we have all events until next_batch
+        let mutex_insert = Arc::clone(
+            services()
+                .globals
+                .roomid_mutex_insert
+                .write()
+                .unwrap()
+                .entry(room_id.to_owned())
+                .or_default(),
+        );
+        let insert_lock = mutex_insert.lock().unwrap();
+        drop(insert_lock);
+    }
+
+    let timeline_pdus;
+    let limited;
+    if services()
+        .rooms
+        .timeline
+        .last_timeline_count(&sender_user, &room_id)?
+        > sincecount
+    {
+        let mut non_timeline_pdus = services()
+            .rooms
+            .timeline
+            .pdus_until(&sender_user, &room_id, PduCount::max())?
+            .filter_map(|r| {
+                // Filter out buggy events
+                if r.is_err() {
+                    error!("Bad pdu in pdus_since: {:?}", r);
+                }
+                r.ok()
+            })
+            .take_while(|(pducount, _)| pducount > &sincecount);
+
+        // Take the last 10 events for the timeline
+        timeline_pdus = non_timeline_pdus
+            .by_ref()
+            .take(10)
+            .collect::<Vec<_>>()
+            .into_iter()
+            .rev()
+            .collect::<Vec<_>>();
+
+        // They /sync response doesn't always return all messages, so we say the output is
+        // limited unless there are events in non_timeline_pdus
+        limited = non_timeline_pdus.next().is_some();
+    } else {
+        timeline_pdus = Vec::new();
+        limited = false;
+    }
+
+    let send_notification_counts = !timeline_pdus.is_empty()
+        || services()
+            .rooms
+            .user
+            .last_notification_read(&sender_user, &room_id)?
+            > since;
+
+    let mut timeline_users = HashSet::new();
+    for (_, event) in &timeline_pdus {
+        timeline_users.insert(event.sender.as_str().to_owned());
+    }
+
+    services().rooms.lazy_loading.lazy_load_confirm_delivery(
+        &sender_user,
+        &sender_device,
+        &room_id,
+        sincecount,
+    )?;
+
+    // Database queries:
+
+    let current_shortstatehash =
+        if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
+            s
+        } else {
+            error!("Room {} has no state", room_id);
+            return Err(Error::BadDatabase("Room has no state"));
+        };
+
+    let since_shortstatehash = services()
+        .rooms
+        .user
+        .get_token_shortstatehash(&room_id, since)?;
+
+    // Calculates joined_member_count, invited_member_count and heroes
+    let calculate_counts = || {
+        let joined_member_count = services()
+            .rooms
+            .state_cache
+            .room_joined_count(&room_id)?
+            .unwrap_or(0);
+        let invited_member_count = services()
+            .rooms
+            .state_cache
+            .room_invited_count(&room_id)?
+            .unwrap_or(0);
+
+        // Recalculate heroes (first 5 members)
+        let mut heroes = Vec::new();
+
+        if joined_member_count + invited_member_count <= 5 {
+            // Go through all PDUs and for each member event, check if the user is still joined or
+            // invited until we have 5 or we reach the end
+
+            for hero in services()
+                .rooms
+                .timeline
+                .all_pdus(&sender_user, &room_id)?
+                .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
+                .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
+                .map(|(_, pdu)| {
+                    let content: RoomMemberEventContent = serde_json::from_str(pdu.content.get())
+                        .map_err(|_| {
+                        Error::bad_database("Invalid member event in database.")
+                    })?;
+
+                    if let Some(state_key) = &pdu.state_key {
+                        let user_id = UserId::parse(state_key.clone())
+                            .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
+
+                        // The membership was and still is invite or join
+                        if matches!(
+                            content.membership,
+                            MembershipState::Join | MembershipState::Invite
+                        ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
+                            || services()
+                                .rooms
+                                .state_cache
+                                .is_invited(&user_id, &room_id)?)
+                        {
+                            Ok::<_, Error>(Some(state_key.clone()))
+                        } else {
+                            Ok(None)
+                        }
+                    } else {
+                        Ok(None)
+                    }
+                })
+                // Filter out buggy users
+                .filter_map(|u| u.ok())
+                // Filter for possible heroes
+                .flatten()
+            {
+                if heroes.contains(&hero) || hero == sender_user.as_str() {
+                    continue;
+                }
+
+                heroes.push(hero);
+            }
+        }
+
+        Ok::<_, Error>((
+            Some(joined_member_count),
+            Some(invited_member_count),
+            heroes,
+        ))
+    };
+
+    let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
+        .and_then(|shortstatehash| {
+            services()
+                .rooms
+                .state_accessor
+                .state_get(
+                    shortstatehash,
+                    &StateEventType::RoomMember,
+                    sender_user.as_str(),
+                )
+                .transpose()
+        })
+        .transpose()?
+        .and_then(|pdu| {
+            serde_json::from_str(pdu.content.get())
+                .map_err(|_| Error::bad_database("Invalid PDU in database."))
+                .ok()
+        });
+
+    let joined_since_last_sync =
+        since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
+
+    let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) =
+        if since_shortstatehash.is_none() || joined_since_last_sync {
+            // Probably since = 0, we will do an initial sync
+
+            let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
+
+            let current_state_ids = services()
+                .rooms
+                .state_accessor
+                .state_full_ids(current_shortstatehash)
+                .await?;
+
+            let mut state_events = Vec::new();
+            let mut lazy_loaded = HashSet::new();
+
+            let mut i = 0;
+            for (shortstatekey, id) in current_state_ids {
+                let (event_type, state_key) = services()
+                    .rooms
+                    .short
+                    .get_statekey_from_short(shortstatekey)?;
+
+                if event_type != StateEventType::RoomMember {
+                    let pdu = match services().rooms.timeline.get_pdu(&id)? {
+                        Some(pdu) => pdu,
+                        None => {
+                            error!("Pdu in state not found: {}", id);
+                            continue;
+                        }
+                    };
+                    state_events.push(pdu);
+
+                    i += 1;
+                    if i % 100 == 0 {
+                        tokio::task::yield_now().await;
+                    }
+                } else if !lazy_load_enabled
+                || full_state
+                || timeline_users.contains(&state_key)
+                // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
+                || *sender_user == state_key
+                {
+                    let pdu = match services().rooms.timeline.get_pdu(&id)? {
+                        Some(pdu) => pdu,
+                        None => {
+                            error!("Pdu in state not found: {}", id);
+                            continue;
+                        }
+                    };
+
+                    // This check is in case a bad user ID made it into the database
+                    if let Ok(uid) = UserId::parse(&state_key) {
+                        lazy_loaded.insert(uid);
+                    }
+                    state_events.push(pdu);
+
+                    i += 1;
+                    if i % 100 == 0 {
+                        tokio::task::yield_now().await;
+                    }
+                }
+            }
+
+            // Reset lazy loading because this is an initial sync
+            services().rooms.lazy_loading.lazy_load_reset(
+                &sender_user,
+                &sender_device,
+                &room_id,
+            )?;
+
+            // The state_events above should contain all timeline_users, let's mark them as lazy
+            // loaded.
+            services().rooms.lazy_loading.lazy_load_mark_sent(
+                &sender_user,
+                &sender_device,
+                &room_id,
+                lazy_loaded,
+                next_batchcount,
+            );
+
+            (
+                heroes,
+                joined_member_count,
+                invited_member_count,
+                true,
+                state_events,
+            )
+        } else if timeline_pdus.is_empty() && since_shortstatehash == Some(current_shortstatehash) {
+            // No state changes
+            (Vec::new(), None, None, false, Vec::new())
+        } else {
+            // Incremental /sync
+            let since_shortstatehash = since_shortstatehash.unwrap();
+
+            let mut state_events = Vec::new();
+            let mut lazy_loaded = HashSet::new();
+
+            if since_shortstatehash != current_shortstatehash {
+                let current_state_ids = services()
+                    .rooms
+                    .state_accessor
+                    .state_full_ids(current_shortstatehash)
+                    .await?;
+                let since_state_ids = services()
+                    .rooms
+                    .state_accessor
+                    .state_full_ids(since_shortstatehash)
+                    .await?;
+
+                for (key, id) in current_state_ids {
+                    if full_state || since_state_ids.get(&key) != Some(&id) {
+                        let pdu = match services().rooms.timeline.get_pdu(&id)? {
+                            Some(pdu) => pdu,
+                            None => {
+                                error!("Pdu in state not found: {}", id);
+                                continue;
+                            }
+                        };
+
+                        if pdu.kind == RoomEventType::RoomMember {
+                            match UserId::parse(
+                                pdu.state_key
+                                    .as_ref()
+                                    .expect("State event has state key")
+                                    .clone(),
+                            ) {
+                                Ok(state_key_userid) => {
+                                    lazy_loaded.insert(state_key_userid);
+                                }
+                                Err(e) => error!("Invalid state key for member event: {}", e),
+                            }
+                        }
+
+                        state_events.push(pdu);
+                        tokio::task::yield_now().await;
+                    }
+                }
+            }
+
+            for (_, event) in &timeline_pdus {
+                if lazy_loaded.contains(&event.sender) {
+                    continue;
+                }
+
+                if !services().rooms.lazy_loading.lazy_load_was_sent_before(
+                    &sender_user,
+                    &sender_device,
+                    &room_id,
+                    &event.sender,
+                )? || lazy_load_send_redundant
+                {
+                    if let Some(member_event) = services().rooms.state_accessor.room_state_get(
+                        &room_id,
+                        &StateEventType::RoomMember,
+                        event.sender.as_str(),
+                    )? {
+                        lazy_loaded.insert(event.sender.clone());
+                        state_events.push(member_event);
+                    }
+                }
+            }
+
+            services().rooms.lazy_loading.lazy_load_mark_sent(
+                &sender_user,
+                &sender_device,
+                &room_id,
+                lazy_loaded,
+                next_batchcount,
+            );
+
+            let encrypted_room = services()
+                .rooms
+                .state_accessor
+                .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
+                .is_some();
+
+            let since_encryption = services().rooms.state_accessor.state_get(
+                since_shortstatehash,
+                &StateEventType::RoomEncryption,
+                "",
+            )?;
+
+            // Calculations:
+            let new_encrypted_room = encrypted_room && since_encryption.is_none();
+
+            let send_member_count = state_events
+                .iter()
+                .any(|event| event.kind == RoomEventType::RoomMember);
+
+            if encrypted_room {
+                for state_event in &state_events {
+                    if state_event.kind != RoomEventType::RoomMember {
+                        continue;
+                    }
+
+                    if let Some(state_key) = &state_event.state_key {
+                        let user_id = UserId::parse(state_key.clone())
+                            .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
+
+                        if user_id == sender_user {
+                            continue;
+                        }
+
+                        let new_membership = serde_json::from_str::<RoomMemberEventContent>(
+                            state_event.content.get(),
+                        )
+                        .map_err(|_| Error::bad_database("Invalid PDU in database."))?
+                        .membership;
+
+                        match new_membership {
+                            MembershipState::Join => {
+                                // A new user joined an encrypted room
+                                if !share_encrypted_room(&sender_user, &user_id, &room_id)? {
+                                    device_list_updates.insert(user_id);
+                                }
+                            }
+                            MembershipState::Leave => {
+                                // Write down users that have left encrypted rooms we are in
+                                left_encrypted_users.insert(user_id);
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+            }
+
+            if joined_since_last_sync && encrypted_room || new_encrypted_room {
+                // If the user is in a new encrypted room, give them all joined users
+                device_list_updates.extend(
+                    services()
+                        .rooms
+                        .state_cache
+                        .room_members(&room_id)
+                        .flatten()
+                        .filter(|user_id| {
+                            // Don't send key updates from the sender to the sender
+                            &sender_user != user_id
+                        })
+                        .filter(|user_id| {
+                            // Only send keys if the sender doesn't share an encrypted room with the target already
+                            !share_encrypted_room(&sender_user, user_id, &room_id).unwrap_or(false)
+                        }),
+                );
+            }
+
+            let (joined_member_count, invited_member_count, heroes) = if send_member_count {
+                calculate_counts()?
+            } else {
+                (None, None, Vec::new())
+            };
+
+            (
+                heroes,
+                joined_member_count,
+                invited_member_count,
+                joined_since_last_sync,
+                state_events,
+            )
+        };
+
+    // Look for device list updates in this room
+    device_list_updates.extend(
+        services()
+            .users
+            .keys_changed(room_id.as_ref(), since, None)
+            .filter_map(|r| r.ok()),
+    );
+
+    let notification_count = if send_notification_counts {
+        Some(
+            services()
+                .rooms
+                .user
+                .notification_count(&sender_user, &room_id)?
+                .try_into()
+                .expect("notification count can't go that high"),
+        )
+    } else {
+        None
+    };
+
+    let highlight_count = if send_notification_counts {
+        Some(
+            services()
+                .rooms
+                .user
+                .highlight_count(&sender_user, &room_id)?
+                .try_into()
+                .expect("highlight count can't go that high"),
+        )
+    } else {
+        None
+    };
+
+    let prev_batch = timeline_pdus
+        .first()
+        .map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
+            Ok(Some(match pdu_count {
+                PduCount::Backfilled(_) => {
+                    error!("timeline in backfill state?!");
+                    "0".to_owned()
+                }
+                PduCount::Normal(c) => c.to_string(),
+            }))
+        })?;
+
+    let room_events: Vec<_> = timeline_pdus
+        .iter()
+        .map(|(_, pdu)| pdu.to_sync_room_event())
+        .collect();
+
+    let mut edus: Vec<_> = services()
+        .rooms
+        .edus
+        .read_receipt
+        .readreceipts_since(&room_id, since)
+        .filter_map(|r| r.ok()) // Filter out buggy events
+        .map(|(_, _, v)| v)
+        .collect();
+
+    if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
+        edus.push(
+            serde_json::from_str(
+                &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
+                    .expect("event is valid, we just created it"),
+            )
+            .expect("event is valid, we just created it"),
+        );
+    }
+
+    // Save the state after this sync so we can send the correct state diff next sync
+    services().rooms.user.associate_token_shortstatehash(
+        &room_id,
+        next_batch,
+        current_shortstatehash,
+    )?;
+
+    Ok(JoinedRoom {
+        account_data: RoomAccountData {
+            events: services()
+                .account_data
+                .changes_since(Some(&room_id), &sender_user, since)?
+                .into_iter()
+                .filter_map(|(_, v)| {
+                    serde_json::from_str(v.json().get())
+                        .map_err(|_| Error::bad_database("Invalid account event in database."))
+                        .ok()
+                })
+                .collect(),
+        },
+        summary: RoomSummary {
+            heroes,
+            joined_member_count: joined_member_count.map(|n| (n as u32).into()),
+            invited_member_count: invited_member_count.map(|n| (n as u32).into()),
+        },
+        unread_notifications: UnreadNotificationsCount {
+            highlight_count,
+            notification_count,
+        },
+        timeline: Timeline {
+            limited: limited || joined_since_last_sync,
+            prev_batch,
+            events: room_events,
+        },
+        state: State {
+            events: state_events
+                .iter()
+                .map(|pdu| pdu.to_sync_state_event())
+                .collect(),
+        },
+        ephemeral: Ephemeral { events: edus },
+        unread_thread_notifications: BTreeMap::new(),
+    })
+}
+
 fn share_encrypted_room(
     sender_user: &UserId,
     user_id: &UserId,