From eae0989c4048152729a5369ed6cb252d87f8b2a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 21 Feb 2023 16:38:50 +0100 Subject: [PATCH] fix: refactor backfill and add support for search --- src/api/client_server/search.rs | 14 +- src/database/key_value/rooms/search.rs | 16 +- src/database/key_value/rooms/timeline.rs | 282 ++++++++--------------- src/database/mod.rs | 4 - src/service/rooms/timeline/mod.rs | 45 +++- 5 files changed, 157 insertions(+), 204 deletions(-) diff --git a/src/api/client_server/search.rs b/src/api/client_server/search.rs index 51255d5a..5d760db4 100644 --- a/src/api/client_server/search.rs +++ b/src/api/client_server/search.rs @@ -81,6 +81,14 @@ pub async fn search_events_route( let results: Vec<_> = results .iter() + .filter_map(|result| { + services() + .rooms + .timeline + .get_pdu_from_id(result) + .ok()? + .map(|pdu| pdu.to_room_event()) + }) .map(|result| { Ok::<_, Error>(SearchResult { context: EventContextResult { @@ -91,11 +99,7 @@ pub async fn search_events_route( start: None, }, rank: None, - result: services() - .rooms - .timeline - .get_pdu_from_id(result)? - .map(|pdu| pdu.to_room_event()), + result: Some(result), }) }) .filter_map(|r| r.ok()) diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index 19ae57b4..ad573f06 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -1,5 +1,3 @@ -use std::mem::size_of; - use ruma::RoomId; use crate::{database::KeyValueDatabase, service, services, utils, Result}; @@ -15,7 +13,7 @@ impl service::rooms::search::Data for KeyValueDatabase { let mut key = shortroomid.to_be_bytes().to_vec(); key.extend_from_slice(word.as_bytes()); key.push(0xff); - key.extend_from_slice(pdu_id); + key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here (key, Vec::new()) }); @@ -34,7 +32,6 @@ impl service::rooms::search::Data for KeyValueDatabase { .expect("room exists") .to_be_bytes() .to_vec(); - let prefix_clone = prefix.clone(); let words: Vec<_> = search_string .split_terminator(|c: char| !c.is_alphanumeric()) @@ -46,6 +43,7 @@ impl service::rooms::search::Data for KeyValueDatabase { let mut prefix2 = prefix.clone(); prefix2.extend_from_slice(word.as_bytes()); prefix2.push(0xff); + let prefix3 = prefix2.clone(); let mut last_possible_id = prefix2.clone(); last_possible_id.extend_from_slice(&u64::MAX.to_be_bytes()); @@ -53,7 +51,7 @@ impl service::rooms::search::Data for KeyValueDatabase { self.tokenids .iter_from(&last_possible_id, true) // Newest pdus first .take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(|(key, _)| key[key.len() - size_of::()..].to_vec()) + .map(move |(key, _)| key[prefix3.len()..].to_vec()) }); let common_elements = match utils::common_elements(iterators, |a, b| { @@ -64,12 +62,6 @@ impl service::rooms::search::Data for KeyValueDatabase { None => return Ok(None), }; - let mapped = common_elements.map(move |id| { - let mut pduid = prefix_clone.clone(); - pduid.extend_from_slice(&id); - pduid - }); - - Ok(Some((Box::new(mapped), words))) + Ok(Some((Box::new(common_elements), words))) } } diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 7a33e5de..d9c4423c 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -42,19 +42,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase { Ok(self .eventid_pduid .get(event_id.as_bytes())? - .map(|pdu_id| Ok::<_, Error>(PduCount::Normal(pdu_count(&pdu_id)?))) - .transpose()? - .map_or_else( - || { - Ok::<_, Error>( - self.eventid_backfillpduid - .get(event_id.as_bytes())? - .map(|pdu_id| Ok::<_, Error>(PduCount::Backfilled(pdu_count(&pdu_id)?))) - .transpose()?, - ) - }, - |x| Ok(Some(x)), - )?) + .map(|pdu_id| pdu_count(&pdu_id)) + .transpose()?) } /// Returns the json of a pdu. @@ -83,21 +72,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase { .ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid.")) }) .transpose()? - .map_or_else( - || { - Ok::<_, Error>( - self.eventid_backfillpduid - .get(event_id.as_bytes())? - .map(|pduid| { - self.pduid_backfillpdu.get(&pduid)?.ok_or_else(|| { - Error::bad_database("Invalid pduid in eventid_pduid.") - }) - }) - .transpose()?, - ) - }, - |x| Ok(Some(x)), - )? .map(|pdu| { serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) }) @@ -106,10 +80,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { /// Returns the pdu's id. fn get_pdu_id(&self, event_id: &EventId) -> Result>> { - Ok(self.eventid_pduid.get(event_id.as_bytes())?.map_or_else( - || self.eventid_backfillpduid.get(event_id.as_bytes()), - |x| Ok(Some(x)), - )?) + Ok(self.eventid_pduid.get(event_id.as_bytes())?) } /// Returns the pdu. @@ -124,21 +95,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase { .ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid.")) }) .transpose()? - .map_or_else( - || { - Ok::<_, Error>( - self.eventid_backfillpduid - .get(event_id.as_bytes())? - .map(|pduid| { - self.pduid_backfillpdu.get(&pduid)?.ok_or_else(|| { - Error::bad_database("Invalid pduid in eventid_pduid.") - }) - }) - .transpose()?, - ) - }, - |x| Ok(Some(x)), - )? .map(|pdu| { serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) }) @@ -183,28 +139,22 @@ impl service::rooms::timeline::Data for KeyValueDatabase { /// /// This does __NOT__ check the outliers `Tree`. fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result> { - self.pduid_pdu - .get(pdu_id)? - .map_or_else(|| self.pduid_backfillpdu.get(pdu_id), |x| Ok(Some(x)))? - .map_or(Ok(None), |pdu| { - Ok(Some( - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid PDU in db."))?, - )) - }) + self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { + Ok(Some( + serde_json::from_slice(&pdu) + .map_err(|_| Error::bad_database("Invalid PDU in db."))?, + )) + }) } /// Returns the pdu as a `BTreeMap`. fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result> { - self.pduid_pdu - .get(pdu_id)? - .map_or_else(|| self.pduid_backfillpdu.get(pdu_id), |x| Ok(Some(x)))? - .map_or(Ok(None), |pdu| { - Ok(Some( - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid PDU in db."))?, - )) - }) + self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { + Ok(Some( + serde_json::from_slice(&pdu) + .map_err(|_| Error::bad_database("Invalid PDU in db."))?, + )) + }) } fn append_pdu( @@ -236,13 +186,12 @@ impl service::rooms::timeline::Data for KeyValueDatabase { event_id: &EventId, json: &CanonicalJsonObject, ) -> Result<()> { - self.pduid_backfillpdu.insert( + self.pduid_pdu.insert( pdu_id, &serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"), )?; - self.eventid_backfillpduid - .insert(event_id.as_bytes(), pdu_id)?; + self.eventid_pduid.insert(event_id.as_bytes(), pdu_id)?; self.eventid_outlierpdu.remove(event_id.as_bytes())?; Ok(()) @@ -272,64 +221,24 @@ impl service::rooms::timeline::Data for KeyValueDatabase { room_id: &RoomId, until: PduCount, ) -> Result> + 'a>> { - // Create the first part of the full pdu id - let prefix = services() - .rooms - .short - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut current_backfill = prefix.clone(); - // +1 so we don't send the base event - let backfill_count = match until { - PduCount::Backfilled(x) => x + 1, - PduCount::Normal(_) => 0, - }; - current_backfill.extend_from_slice(&backfill_count.to_be_bytes()); + let (prefix, current) = count_to_id(&room_id, until, 1, true)?; let user_id = user_id.to_owned(); - let user_id2 = user_id.to_owned(); - let prefix2 = prefix.clone(); - let backfill_iter = self - .pduid_backfillpdu - .iter_from(¤t_backfill, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - let count = PduCount::Backfilled(pdu_count(&pdu_id)?); - Ok((count, pdu)) - }); - - match until { - PduCount::Backfilled(_) => Ok(Box::new(backfill_iter)), - PduCount::Normal(x) => { - let mut current_normal = prefix2.clone(); - // -1 so we don't send the base event - current_normal.extend_from_slice(&x.saturating_sub(1).to_be_bytes()); - let normal_iter = self - .pduid_pdu - .iter_from(¤t_normal, true) - .take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id2 { - pdu.remove_transaction_id()?; - } - let count = PduCount::Normal(pdu_count(&pdu_id)?); - Ok((count, pdu)) - }); - - Ok(Box::new(normal_iter.chain(backfill_iter))) - } - } + Ok(Box::new( + self.pduid_pdu + .iter_from(¤t, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + let count = pdu_count(&pdu_id)?; + Ok((count, pdu)) + }), + )) } fn pdus_after<'a>( @@ -338,64 +247,24 @@ impl service::rooms::timeline::Data for KeyValueDatabase { room_id: &RoomId, from: PduCount, ) -> Result> + 'a>> { - // Create the first part of the full pdu id - let prefix = services() - .rooms - .short - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut current_normal = prefix.clone(); - // +1 so we don't send the base event - let normal_count = match from { - PduCount::Normal(x) => x + 1, - PduCount::Backfilled(_) => 0, - }; - current_normal.extend_from_slice(&normal_count.to_be_bytes()); + let (prefix, current) = count_to_id(&room_id, from, 1, false)?; let user_id = user_id.to_owned(); - let user_id2 = user_id.to_owned(); - let prefix2 = prefix.clone(); - let normal_iter = self - .pduid_pdu - .iter_from(¤t_normal, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - let count = PduCount::Normal(pdu_count(&pdu_id)?); - Ok((count, pdu)) - }); - - match from { - PduCount::Normal(_) => Ok(Box::new(normal_iter)), - PduCount::Backfilled(x) => { - let mut current_backfill = prefix2.clone(); - // -1 so we don't send the base event - current_backfill.extend_from_slice(&x.saturating_sub(1).to_be_bytes()); - let backfill_iter = self - .pduid_backfillpdu - .iter_from(¤t_backfill, true) - .take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id2 { - pdu.remove_transaction_id()?; - } - let count = PduCount::Backfilled(pdu_count(&pdu_id)?); - Ok((count, pdu)) - }); - - Ok(Box::new(backfill_iter.chain(normal_iter))) - } - } + Ok(Box::new( + self.pduid_pdu + .iter_from(¤t, false) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(move |(pdu_id, v)| { + let mut pdu = serde_json::from_slice::(&v) + .map_err(|_| Error::bad_database("PDU in db is invalid."))?; + if pdu.sender != user_id { + pdu.remove_transaction_id()?; + } + let count = pdu_count(&pdu_id)?; + Ok((count, pdu)) + }), + )) } fn increment_notification_counts( @@ -428,7 +297,58 @@ impl service::rooms::timeline::Data for KeyValueDatabase { } /// Returns the `count` of this pdu's id. -fn pdu_count(pdu_id: &[u8]) -> Result { - utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::()..]) - .map_err(|_| Error::bad_database("PDU has invalid count bytes.")) +fn pdu_count(pdu_id: &[u8]) -> Result { + let last_u64 = utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::()..]) + .map_err(|_| Error::bad_database("PDU has invalid count bytes."))?; + let second_last_u64 = utils::u64_from_bytes( + &pdu_id[pdu_id.len() - 2 * size_of::()..pdu_id.len() - size_of::()], + ); + + if matches!(second_last_u64, Ok(0)) { + Ok(PduCount::Backfilled(u64::MAX - last_u64)) + } else { + Ok(PduCount::Normal(last_u64)) + } +} + +fn count_to_id( + room_id: &RoomId, + count: PduCount, + offset: u64, + subtract: bool, +) -> Result<(Vec, Vec)> { + let prefix = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists") + .to_be_bytes() + .to_vec(); + let mut pdu_id = prefix.clone(); + // +1 so we don't send the base event + let count_raw = match count { + PduCount::Normal(x) => { + if subtract { + x - offset + } else { + x + offset + } + } + PduCount::Backfilled(x) => { + pdu_id.extend_from_slice(&0_u64.to_be_bytes()); + let num = u64::MAX - x; + if subtract { + if num > 0 { + num - offset + } else { + num + } + } else { + num + offset + } + } + }; + pdu_id.extend_from_slice(&count_raw.to_be_bytes()); + + Ok((prefix, pdu_id)) } diff --git a/src/database/mod.rs b/src/database/mod.rs index f07ad879..e05991d9 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -74,9 +74,7 @@ pub struct KeyValueDatabase { //pub rooms: rooms::Rooms, pub(super) pduid_pdu: Arc, // PduId = ShortRoomId + Count - pub(super) pduid_backfillpdu: Arc, // PduId = ShortRoomId + Count pub(super) eventid_pduid: Arc, - pub(super) eventid_backfillpduid: Arc, pub(super) roomid_pduleaves: Arc, pub(super) alias_roomid: Arc, pub(super) aliasid_alias: Arc, // AliasId = RoomId + Count @@ -297,9 +295,7 @@ impl KeyValueDatabase { presenceid_presence: builder.open_tree("presenceid_presence")?, userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?, pduid_pdu: builder.open_tree("pduid_pdu")?, - pduid_backfillpdu: builder.open_tree("pduid_backfillpdu")?, eventid_pduid: builder.open_tree("eventid_pduid")?, - eventid_backfillpduid: builder.open_tree("eventid_backfillpduid")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, alias_roomid: builder.open_tree("alias_roomid")?, diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index dcf04bef..47f4c65c 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1045,6 +1045,24 @@ impl Service { ) -> Result<()> { let (event_id, value, room_id) = server_server::parse_incoming_pdu(&pdu)?; + // Lock so we cannot backfill the same pdu twice at the same time + let mutex = Arc::clone( + services() + .globals + .roomid_mutex_federation + .write() + .unwrap() + .entry(room_id.to_owned()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + + // Skip the PDU if we already have it as a timeline event + if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(&event_id)? { + info!("We already know {event_id} at {pdu_id:?}"); + return Ok(()); + } + services() .rooms .event_handler @@ -1052,6 +1070,7 @@ impl Service { .await?; let value = self.get_pdu_json(&event_id)?.expect("We just created it"); + let pdu = self.get_pdu(&event_id)?.expect("We just created it"); let shortroomid = services() .rooms @@ -1072,14 +1091,36 @@ impl Service { let count = services().globals.next_count()?; let mut pdu_id = shortroomid.to_be_bytes().to_vec(); - pdu_id.extend_from_slice(&count.to_be_bytes()); + pdu_id.extend_from_slice(&0_u64.to_be_bytes()); + pdu_id.extend_from_slice(&(u64::MAX - count).to_be_bytes()); // Insert pdu self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value)?; drop(insert_lock); - info!("Appended incoming pdu"); + match pdu.kind { + RoomEventType::RoomMessage => { + #[derive(Deserialize)] + struct ExtractBody { + body: Option, + } + + let content = serde_json::from_str::(pdu.content.get()) + .map_err(|_| Error::bad_database("Invalid content in pdu."))?; + + if let Some(body) = content.body { + services() + .rooms + .search + .index_pdu(shortroomid, &pdu_id, &body)?; + } + } + _ => {} + } + drop(mutex_lock); + + info!("Prepended backfill pdu"); Ok(()) } }