1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-01-19 06:22:09 +01:00

fix: don't retry soft failed events

This commit is contained in:
Timo Kösters 2021-08-28 11:39:33 +02:00
parent 9bff276fa9
commit afca61fe7c
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
4 changed files with 124 additions and 71 deletions

View file

@ -256,8 +256,8 @@ async fn sync_helper(
// Calculates joined_member_count, invited_member_count and heroes // Calculates joined_member_count, invited_member_count and heroes
let calculate_counts = || { let calculate_counts = || {
let joined_member_count = db.rooms.room_members(&room_id).count(); let joined_member_count = db.rooms.room_joined_count(&room_id)?.unwrap_or(0);
let invited_member_count = db.rooms.room_members_invited(&room_id).count(); let invited_member_count = db.rooms.room_invited_count(&room_id)?.unwrap_or(0);
// Recalculate heroes (first 5 members) // Recalculate heroes (first 5 members)
let mut heroes = Vec::new(); let mut heroes = Vec::new();
@ -407,60 +407,35 @@ async fn sync_helper(
}); });
if encrypted_room { if encrypted_room {
for (user_id, current_member) in db for state_event in &state_events {
.rooms if state_event.kind != EventType::RoomMember {
.room_members(&room_id) continue;
.filter_map(|r| r.ok()) }
.filter_map(|user_id| {
db.rooms if let Some(state_key) = &state_event.state_key {
.state_get( let user_id = UserId::try_from(state_key.clone())
current_shortstatehash, .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
&EventType::RoomMember,
user_id.as_str(), if user_id == sender_user {
) continue;
.ok() }
.flatten()
.map(|current_member| (user_id, current_member)) let new_membership = serde_json::from_value::<
})
{
let current_membership = serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>, Raw<ruma::events::room::member::MemberEventContent>,
>(current_member.content.clone()) >(state_event.content.clone())
.expect("Raw::from_value always works") .expect("Raw::from_value always works")
.deserialize() .deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))? .map_err(|_| Error::bad_database("Invalid PDU in database."))?
.membership; .membership;
let since_membership = db match new_membership {
.rooms MembershipState::Join => {
.state_get(
since_shortstatehash,
&EventType::RoomMember,
user_id.as_str(),
)?
.and_then(|since_member| {
serde_json::from_value::<
Raw<ruma::events::room::member::MemberEventContent>,
>(since_member.content.clone())
.expect("Raw::from_value always works")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
})
.map_or(MembershipState::Leave, |member| member.membership);
let user_id = UserId::try_from(user_id.clone())
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
match (since_membership, current_membership) {
(MembershipState::Leave, MembershipState::Join) => {
// A new user joined an encrypted room // A new user joined an encrypted room
if !share_encrypted_room(&db, &sender_user, &user_id, &room_id)? { if !share_encrypted_room(&db, &sender_user, &user_id, &room_id)? {
device_list_updates.insert(user_id); device_list_updates.insert(user_id);
} }
} }
// TODO: Remove, this should never happen here, right? MembershipState::Leave => {
(MembershipState::Join, MembershipState::Leave) => {
// Write down users that have left encrypted rooms we are in // Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id); left_encrypted_users.insert(user_id);
} }
@ -468,6 +443,7 @@ async fn sync_helper(
} }
} }
} }
}
if joined_since_last_sync && encrypted_room || new_encrypted_room { if joined_since_last_sync && encrypted_room || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users // If the user is in a new encrypted room, give them all joined users

View file

@ -252,6 +252,7 @@ impl Database {
userroomid_joined: builder.open_tree("userroomid_joined")?, userroomid_joined: builder.open_tree("userroomid_joined")?,
roomuserid_joined: builder.open_tree("roomuserid_joined")?, roomuserid_joined: builder.open_tree("roomuserid_joined")?,
roomid_joinedcount: builder.open_tree("roomid_joinedcount")?, roomid_joinedcount: builder.open_tree("roomid_joinedcount")?,
roomid_invitedcount: builder.open_tree("roomid_invitedcount")?,
roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?, roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?,
userroomid_invitestate: builder.open_tree("userroomid_invitestate")?, userroomid_invitestate: builder.open_tree("userroomid_invitestate")?,
roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?, roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?,
@ -277,6 +278,8 @@ impl Database {
statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?, statehash_shortstatehash: builder.open_tree("statehash_shortstatehash")?,
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
softfailedeventids: builder.open_tree("softfailedeventids")?,
referencedevents: builder.open_tree("referencedevents")?, referencedevents: builder.open_tree("referencedevents")?,
pdu_cache: Mutex::new(LruCache::new(100_000)), pdu_cache: Mutex::new(LruCache::new(100_000)),
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
@ -285,6 +288,7 @@ impl Database {
shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)),
statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)),
stateinfo_cache: Mutex::new(LruCache::new(1000)), stateinfo_cache: Mutex::new(LruCache::new(1000)),
our_real_users_cache: RwLock::new(HashMap::new()),
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
@ -442,7 +446,7 @@ impl Database {
let room_id = let room_id =
RoomId::try_from(utils::string_from_bytes(&roomid).unwrap()).unwrap(); RoomId::try_from(utils::string_from_bytes(&roomid).unwrap()).unwrap();
db.rooms.update_joined_count(&room_id)?; db.rooms.update_joined_count(&room_id, &db)?;
} }
db.globals.bump_database_version(6)?; db.globals.bump_database_version(6)?;

View file

@ -26,7 +26,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
mem::size_of, mem::size_of,
sync::{Arc, Mutex}, sync::{Arc, Mutex, RwLock},
time::Instant, time::Instant,
}; };
use tokio::sync::MutexGuard; use tokio::sync::MutexGuard;
@ -59,6 +59,7 @@ pub struct Rooms {
pub(super) userroomid_joined: Arc<dyn Tree>, pub(super) userroomid_joined: Arc<dyn Tree>,
pub(super) roomuserid_joined: Arc<dyn Tree>, pub(super) roomuserid_joined: Arc<dyn Tree>,
pub(super) roomid_joinedcount: Arc<dyn Tree>, pub(super) roomid_joinedcount: Arc<dyn Tree>,
pub(super) roomid_invitedcount: Arc<dyn Tree>,
pub(super) roomuseroncejoinedids: Arc<dyn Tree>, pub(super) roomuseroncejoinedids: Arc<dyn Tree>,
pub(super) userroomid_invitestate: Arc<dyn Tree>, // InviteState = Vec<Raw<Pdu>> pub(super) userroomid_invitestate: Arc<dyn Tree>, // InviteState = Vec<Raw<Pdu>>
pub(super) roomuserid_invitecount: Arc<dyn Tree>, // InviteCount = Count pub(super) roomuserid_invitecount: Arc<dyn Tree>, // InviteCount = Count
@ -90,6 +91,7 @@ pub struct Rooms {
/// RoomId + EventId -> outlier PDU. /// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
pub(super) eventid_outlierpdu: Arc<dyn Tree>, pub(super) eventid_outlierpdu: Arc<dyn Tree>,
pub(super) softfailedeventids: Arc<dyn Tree>,
/// RoomId + EventId -> Parent PDU EventId. /// RoomId + EventId -> Parent PDU EventId.
pub(super) referencedevents: Arc<dyn Tree>, pub(super) referencedevents: Arc<dyn Tree>,
@ -100,6 +102,7 @@ pub struct Rooms {
pub(super) eventidshort_cache: Mutex<LruCache<EventId, u64>>, pub(super) eventidshort_cache: Mutex<LruCache<EventId, u64>>,
pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>, pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>,
pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>, pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>,
pub(super) our_real_users_cache: RwLock<HashMap<RoomId, Arc<HashSet<UserId>>>>,
pub(super) stateinfo_cache: Mutex< pub(super) stateinfo_cache: Mutex<
LruCache< LruCache<
u64, u64,
@ -425,7 +428,7 @@ impl Rooms {
} }
} }
self.update_joined_count(room_id)?; self.update_joined_count(room_id, &db)?;
self.roomid_shortstatehash self.roomid_shortstatehash
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
@ -1229,9 +1232,19 @@ impl Rooms {
self.eventid_outlierpdu.insert( self.eventid_outlierpdu.insert(
&event_id.as_bytes(), &event_id.as_bytes(),
&serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"), &serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"),
)?; )
}
Ok(()) #[tracing::instrument(skip(self))]
pub fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()> {
self.softfailedeventids.insert(&event_id.as_bytes(), &[])
}
#[tracing::instrument(skip(self))]
pub fn is_event_soft_failed(&self, event_id: &EventId) -> Result<bool> {
self.softfailedeventids
.get(&event_id.as_bytes())
.map(|o| o.is_some())
} }
/// Creates a new persisted data unit and adds it to a room. /// Creates a new persisted data unit and adds it to a room.
@ -1334,15 +1347,9 @@ impl Rooms {
let mut notifies = Vec::new(); let mut notifies = Vec::new();
let mut highlights = Vec::new(); let mut highlights = Vec::new();
for user in db for user in self.get_our_real_users(&pdu.room_id, db)?.iter() {
.rooms
.room_members(&pdu.room_id)
.filter_map(|r| r.ok())
.filter(|user_id| user_id.server_name() == db.globals.server_name())
.filter(|user_id| !db.users.is_deactivated(user_id).unwrap_or(true))
{
// Don't notify the user of their own events // Don't notify the user of their own events
if user == pdu.sender { if user == &pdu.sender {
continue; continue;
} }
@ -2443,29 +2450,45 @@ impl Rooms {
} }
if update_joined_count { if update_joined_count {
self.update_joined_count(room_id)?; self.update_joined_count(room_id, db)?;
} }
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self, room_id, db))]
pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { pub fn update_joined_count(&self, room_id: &RoomId, db: &Database) -> Result<()> {
let mut joinedcount = 0_u64; let mut joinedcount = 0_u64;
let mut invitedcount = 0_u64;
let mut joined_servers = HashSet::new(); let mut joined_servers = HashSet::new();
let mut real_users = HashSet::new();
for joined in self.room_members(&room_id).filter_map(|r| r.ok()) { for joined in self.room_members(&room_id).filter_map(|r| r.ok()) {
joined_servers.insert(joined.server_name().to_owned()); joined_servers.insert(joined.server_name().to_owned());
if joined.server_name() == db.globals.server_name()
&& !db.users.is_deactivated(&joined).unwrap_or(true)
{
real_users.insert(joined);
}
joinedcount += 1; joinedcount += 1;
} }
for invited in self.room_members_invited(&room_id).filter_map(|r| r.ok()) { for invited in self.room_members_invited(&room_id).filter_map(|r| r.ok()) {
joined_servers.insert(invited.server_name().to_owned()); joined_servers.insert(invited.server_name().to_owned());
invitedcount += 1;
} }
self.roomid_joinedcount self.roomid_joinedcount
.insert(room_id.as_bytes(), &joinedcount.to_be_bytes())?; .insert(room_id.as_bytes(), &joinedcount.to_be_bytes())?;
self.roomid_invitedcount
.insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?;
self.our_real_users_cache
.write()
.unwrap()
.insert(room_id.clone(), Arc::new(real_users));
for old_joined_server in self.room_servers(room_id).filter_map(|r| r.ok()) { for old_joined_server in self.room_servers(room_id).filter_map(|r| r.ok()) {
if !joined_servers.remove(&old_joined_server) { if !joined_servers.remove(&old_joined_server) {
// Server not in room anymore // Server not in room anymore
@ -2499,6 +2522,32 @@ impl Rooms {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, room_id, db))]
pub fn get_our_real_users(
&self,
room_id: &RoomId,
db: &Database,
) -> Result<Arc<HashSet<UserId>>> {
let maybe = self
.our_real_users_cache
.read()
.unwrap()
.get(room_id)
.cloned();
if let Some(users) = maybe {
Ok(users)
} else {
self.update_joined_count(room_id, &db)?;
Ok(Arc::clone(
self.our_real_users_cache
.read()
.unwrap()
.get(room_id)
.unwrap(),
))
}
}
#[tracing::instrument(skip(self, db))] #[tracing::instrument(skip(self, db))]
pub async fn leave_room( pub async fn leave_room(
&self, &self,
@ -2977,6 +3026,18 @@ impl Rooms {
.transpose()?) .transpose()?)
} }
#[tracing::instrument(skip(self))]
pub fn room_invited_count(&self, room_id: &RoomId) -> Result<Option<u64>> {
Ok(self
.roomid_invitedcount
.get(room_id.as_bytes())?
.map(|b| {
utils::u64_from_bytes(&b)
.map_err(|_| Error::bad_database("Invalid joinedcount in db."))
})
.transpose()?)
}
/// Returns an iterator over all User IDs who ever joined a room. /// Returns an iterator over all User IDs who ever joined a room.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn room_useroncejoined<'a>( pub fn room_useroncejoined<'a>(

View file

@ -1271,6 +1271,15 @@ async fn upgrade_outlier_to_timeline_pdu(
if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) { if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) {
return Ok(Some(pduid)); return Ok(Some(pduid));
} }
if db
.rooms
.is_event_soft_failed(&incoming_pdu.event_id)
.map_err(|_| "Failed to ask db for soft fail".to_owned())?
{
return Err("Event has been soft failed".into());
}
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
// doing all the checks in this list starting at 1. These are not timeline events. // doing all the checks in this list starting at 1. These are not timeline events.
@ -1683,6 +1692,9 @@ async fn upgrade_outlier_to_timeline_pdu(
if soft_fail { if soft_fail {
// Soft fail, we keep the event as an outlier but don't add it to the timeline // Soft fail, we keep the event as an outlier but don't add it to the timeline
warn!("Event was soft failed: {:?}", incoming_pdu); warn!("Event was soft failed: {:?}", incoming_pdu);
db.rooms
.mark_event_soft_failed(&incoming_pdu.event_id)
.map_err(|_| "Failed to set soft failed flag".to_owned())?;
return Err("Event has been soft failed".into()); return Err("Event has been soft failed".into());
} }