1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2024-11-04 17:08:52 +01:00

improvement: try to load missing prev events

This commit is contained in:
Timo Kösters 2021-08-09 19:15:14 +02:00
parent d2f406e0e8
commit 260db9fcc7
No known key found for this signature in database
GPG key ID: 356E705610F626D5
2 changed files with 71 additions and 27 deletions

View file

@ -280,6 +280,24 @@ impl Rooms {
.is_some()) .is_some())
} }
/// Checks if a room exists.
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
// Look for PDUs in that room.
self.pduid_pdu
.iter_from(&prefix, false)
.filter(|(k, _)| k.starts_with(&prefix))
.map(|(_, pdu)| {
serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid first PDU in db."))
.map(Arc::new)
})
.next()
.transpose()
}
/// Force the creation of a new StateHash and insert it into the db. /// Force the creation of a new StateHash and insert it into the db.
/// ///
/// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.

View file

@ -272,13 +272,14 @@ where
if status == 200 { if status == 200 {
let response = T::IncomingResponse::try_from_http_response(http_response); let response = T::IncomingResponse::try_from_http_response(http_response);
response.map_err(|e| { response.map_err(|e| {
warn!("Invalid 200 response: {}", e); warn!("Invalid 200 response from {}: {}", &destination, e);
Error::BadServerResponse("Server returned bad 200 response.") Error::BadServerResponse("Server returned bad 200 response.")
}) })
} else { } else {
Err(Error::FederationError( Err(Error::FederationError(
destination.to_owned(), destination.to_owned(),
RumaError::try_from_http_response(http_response).map_err(|_| { RumaError::try_from_http_response(http_response).map_err(|e| {
warn!("Server returned bad error response: {}", e);
Error::BadServerResponse("Server returned bad error response.") Error::BadServerResponse("Server returned bad error response.")
})?, })?,
)) ))
@ -811,7 +812,7 @@ pub async fn send_transaction_message_route(
} }
/// An async function that can recursively call itself. /// An async function that can recursively call itself.
type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E>> + 'a + Send>>; type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
/// When receiving an event one needs to: /// When receiving an event one needs to:
/// 0. Check the server is in the room /// 0. Check the server is in the room
@ -836,7 +837,7 @@ type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E
/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" /// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
/// it /// it
/// 14. Use state resolution to find new room state /// 14. Use state resolution to find new room state
// We use some AsyncRecursiveResult hacks here so we can call this async funtion recursively // We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] #[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))]
pub fn handle_incoming_pdu<'a>( pub fn handle_incoming_pdu<'a>(
origin: &'a ServerName, origin: &'a ServerName,
@ -846,7 +847,7 @@ pub fn handle_incoming_pdu<'a>(
is_timeline_event: bool, is_timeline_event: bool,
db: &'a Database, db: &'a Database,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
) -> AsyncRecursiveResult<'a, Option<Vec<u8>>, String> { ) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> {
Box::pin(async move { Box::pin(async move {
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
match db.rooms.exists(&room_id) { match db.rooms.exists(&room_id) {
@ -920,9 +921,15 @@ pub fn handle_incoming_pdu<'a>(
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// EDIT: Step 5 is not applied anymore because it failed too often // EDIT: Step 5 is not applied anymore because it failed too often
debug!("Fetching auth events for {}", incoming_pdu.event_id); debug!("Fetching auth events for {}", incoming_pdu.event_id);
fetch_and_handle_events(db, origin, &incoming_pdu.auth_events, &room_id, pub_key_map) fetch_and_handle_events(
.await db,
.map_err(|e| e.to_string())?; origin,
&incoming_pdu.auth_events,
&room_id,
pub_key_map,
false,
)
.await;
// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
debug!( debug!(
@ -1004,10 +1011,28 @@ pub fn handle_incoming_pdu<'a>(
debug!("Added pdu as outlier."); debug!("Added pdu as outlier.");
// 8. if not timeline event: stop // 8. if not timeline event: stop
if !is_timeline_event { if !is_timeline_event
|| incoming_pdu.origin_server_ts
< db.rooms
.first_pdu_in_room(&room_id)
.map_err(|_| "Error loading first room event.".to_owned())?
.expect("Room exists")
.origin_server_ts
{
return Ok(None); return Ok(None);
} }
// Load missing prev events first
fetch_and_handle_events(
db,
origin,
&incoming_pdu.prev_events,
&room_id,
pub_key_map,
true,
)
.await;
// TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
@ -1034,9 +1059,9 @@ pub fn handle_incoming_pdu<'a>(
&state.into_iter().collect::<Vec<_>>(), &state.into_iter().collect::<Vec<_>>(),
&room_id, &room_id,
pub_key_map, pub_key_map,
false,
) )
.await .await
.map_err(|_| "Failed to fetch state events locally".to_owned())?
.into_iter() .into_iter()
.map(|pdu| { .map(|pdu| {
( (
@ -1081,18 +1106,15 @@ pub fn handle_incoming_pdu<'a>(
{ {
Ok(res) => { Ok(res) => {
debug!("Fetching state events at event."); debug!("Fetching state events at event.");
let state_vec = match fetch_and_handle_events( let state_vec = fetch_and_handle_events(
&db, &db,
origin, origin,
&res.pdu_ids, &res.pdu_ids,
&room_id, &room_id,
pub_key_map, pub_key_map,
false,
) )
.await .await;
{
Ok(state) => state,
Err(_) => return Err("Failed to fetch state events.".to_owned()),
};
let mut state = HashMap::new(); let mut state = HashMap::new();
for pdu in state_vec { for pdu in state_vec {
@ -1118,18 +1140,15 @@ pub fn handle_incoming_pdu<'a>(
} }
debug!("Fetching auth chain events at event."); debug!("Fetching auth chain events at event.");
match fetch_and_handle_events( fetch_and_handle_events(
&db, &db,
origin, origin,
&res.auth_chain_ids, &res.auth_chain_ids,
&room_id, &room_id,
pub_key_map, pub_key_map,
false,
) )
.await .await;
{
Ok(state) => state,
Err(_) => return Err("Failed to fetch auth chain.".to_owned()),
};
state_at_incoming_event = Some(state); state_at_incoming_event = Some(state);
} }
@ -1381,7 +1400,8 @@ pub(crate) fn fetch_and_handle_events<'a>(
events: &'a [EventId], events: &'a [EventId],
room_id: &'a RoomId, room_id: &'a RoomId,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
) -> AsyncRecursiveResult<'a, Vec<Arc<PduEvent>>, Error> { are_timeline_events: bool,
) -> AsyncRecursiveType<'a, Vec<Arc<PduEvent>>> {
Box::pin(async move { Box::pin(async move {
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) {
Entry::Vacant(e) => { Entry::Vacant(e) => {
@ -1408,7 +1428,12 @@ pub(crate) fn fetch_and_handle_events<'a>(
// a. Look in the main timeline (pduid_pdu tree) // a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree // b. Look at outlier pdu tree
// (get_pdu checks both) // (get_pdu checks both)
let pdu = match db.rooms.get_pdu(&id) { let local_pdu = if are_timeline_events {
db.rooms.get_non_outlier_pdu(&id).map(|o| o.map(Arc::new))
} else {
db.rooms.get_pdu(&id)
};
let pdu = match local_pdu {
Ok(Some(pdu)) => { Ok(Some(pdu)) => {
trace!("Found {} in db", id); trace!("Found {} in db", id);
pdu pdu
@ -1439,7 +1464,7 @@ pub(crate) fn fetch_and_handle_events<'a>(
&event_id, &event_id,
&room_id, &room_id,
value.clone(), value.clone(),
false, are_timeline_events,
db, db,
pub_key_map, pub_key_map,
) )
@ -1482,7 +1507,7 @@ pub(crate) fn fetch_and_handle_events<'a>(
}; };
pdus.push(pdu); pdus.push(pdu);
} }
Ok(pdus) pdus
}) })
} }
@ -2193,7 +2218,8 @@ pub async fn create_join_event_route(
&pub_key_map, &pub_key_map,
) )
.await .await
.map_err(|_| { .map_err(|e| {
warn!("Error while handling incoming send join PDU: {}", e);
Error::BadRequest( Error::BadRequest(
ErrorKind::InvalidParam, ErrorKind::InvalidParam,
"Error while handling incoming PDU.", "Error while handling incoming PDU.",