1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-01-04 04:24:00 +01:00

fix: improve locks

This commit is contained in:
Timo Kösters 2021-08-03 11:10:58 +02:00
parent 6b06fc9707
commit 0eeba86b32
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
12 changed files with 153 additions and 134 deletions

View file

@ -243,15 +243,15 @@ pub async fn register_route(
let room_id = RoomId::new(db.globals.server_name()); let room_id = RoomId::new(db.globals.server_name());
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let mut content = ruma::events::room::create::CreateEventContent::new(conduit_user.clone()); let mut content = ruma::events::room::create::CreateEventContent::new(conduit_user.clone());
content.federate = true; content.federate = true;
@ -270,7 +270,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 2. Make conduit bot join // 2. Make conduit bot join
@ -293,7 +293,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 3. Power levels // 3. Power levels
@ -318,7 +318,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 4.1 Join Rules // 4.1 Join Rules
@ -336,7 +336,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 4.2 History Visibility // 4.2 History Visibility
@ -356,7 +356,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 4.3 Guest Access // 4.3 Guest Access
@ -374,7 +374,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 6. Events implied by name and topic // 6. Events implied by name and topic
@ -393,7 +393,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
@ -410,7 +410,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// Room alias // Room alias
@ -433,7 +433,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?;
@ -458,7 +458,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -479,7 +479,7 @@ pub async fn register_route(
&user_id, &user_id,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// Send welcome message // Send welcome message
@ -498,7 +498,7 @@ pub async fn register_route(
&conduit_user, &conduit_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
} }
@ -677,15 +677,15 @@ pub async fn deactivate_route(
blurhash: None, blurhash: None,
}; };
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -698,7 +698,7 @@ pub async fn deactivate_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
} }

View file

@ -203,15 +203,15 @@ pub async fn kick_user_route(
event.membership = ruma::events::room::member::MembershipState::Leave; event.membership = ruma::events::room::member::MembershipState::Leave;
// TODO: reason // TODO: reason
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(body.room_id.clone()) .entry(body.room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -224,10 +224,10 @@ pub async fn kick_user_route(
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
drop(mutex_lock); drop(state_lock);
db.flush()?; db.flush()?;
@ -275,15 +275,15 @@ pub async fn ban_user_route(
}, },
)?; )?;
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(body.room_id.clone()) .entry(body.room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -296,10 +296,10 @@ pub async fn ban_user_route(
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
drop(mutex_lock); drop(state_lock);
db.flush()?; db.flush()?;
@ -337,15 +337,15 @@ pub async fn unban_user_route(
event.membership = ruma::events::room::member::MembershipState::Leave; event.membership = ruma::events::room::member::MembershipState::Leave;
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(body.room_id.clone()) .entry(body.room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -358,10 +358,10 @@ pub async fn unban_user_route(
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
drop(mutex_lock); drop(state_lock);
db.flush()?; db.flush()?;
@ -486,15 +486,15 @@ async fn join_room_by_id_helper(
) -> ConduitResult<join_room_by_id::Response> { ) -> ConduitResult<join_room_by_id::Response> {
let sender_user = sender_user.expect("user is authenticated"); let sender_user = sender_user.expect("user is authenticated");
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
// Ask a remote server if we don't have this room // Ask a remote server if we don't have this room
if !db.rooms.exists(&room_id)? && room_id.server_name() != db.globals.server_name() { if !db.rooms.exists(&room_id)? && room_id.server_name() != db.globals.server_name() {
@ -706,11 +706,11 @@ async fn join_room_by_id_helper(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
} }
drop(mutex_lock); drop(state_lock);
db.flush()?; db.flush()?;
@ -790,15 +790,15 @@ pub async fn invite_helper<'a>(
) -> Result<()> { ) -> Result<()> {
if user_id.server_name() != db.globals.server_name() { if user_id.server_name() != db.globals.server_name() {
let (room_version_id, pdu_json, invite_room_state) = { let (room_version_id, pdu_json, invite_room_state) = {
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let prev_events = db let prev_events = db
.rooms .rooms
@ -942,7 +942,7 @@ pub async fn invite_helper<'a>(
let invite_room_state = db.rooms.calculate_invite_state(&pdu)?; let invite_room_state = db.rooms.calculate_invite_state(&pdu)?;
drop(mutex_lock); drop(state_lock);
(room_version_id, pdu_json, invite_room_state) (room_version_id, pdu_json, invite_room_state)
}; };
@ -1018,16 +1018,15 @@ pub async fn invite_helper<'a>(
return Ok(()); return Ok(());
} }
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let state_lock = mutex_state.lock().await;
let mutex_lock = mutex.lock().await;
db.rooms.build_and_append_pdu( db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -1048,10 +1047,10 @@ pub async fn invite_helper<'a>(
&sender_user, &sender_user,
room_id, room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
drop(mutex_lock); drop(state_lock);
Ok(()) Ok(())
} }

View file

@ -28,15 +28,15 @@ pub async fn send_message_event_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_deref(); let sender_device = body.sender_device.as_deref();
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(body.room_id.clone()) .entry(body.room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
// Check if this is a new transaction id // Check if this is a new transaction id
if let Some(response) = if let Some(response) =
@ -75,7 +75,7 @@ pub async fn send_message_event_route(
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
db.transaction_ids.add_txnid( db.transaction_ids.add_txnid(
@ -85,7 +85,7 @@ pub async fn send_message_event_route(
event_id.as_bytes(), event_id.as_bytes(),
)?; )?;
drop(mutex_lock); drop(state_lock);
db.flush()?; db.flush()?;

View file

@ -73,19 +73,19 @@ pub async fn set_displayname_route(
}) })
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
{ {
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let _ = let _ =
db.rooms db.rooms
.build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &state_lock);
// Presence update // Presence update
db.rooms.edus.update_presence( db.rooms.edus.update_presence(
@ -207,19 +207,19 @@ pub async fn set_avatar_url_route(
}) })
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
{ {
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let _ = let _ =
db.rooms db.rooms
.build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &state_lock);
// Presence update // Presence update
db.rooms.edus.update_presence( db.rooms.edus.update_presence(

View file

@ -20,15 +20,15 @@ pub async fn redact_event_route(
) -> ConduitResult<redact_event::Response> { ) -> ConduitResult<redact_event::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(body.room_id.clone()) .entry(body.room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let event_id = db.rooms.build_and_append_pdu( let event_id = db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -44,10 +44,10 @@ pub async fn redact_event_route(
&sender_user, &sender_user,
&body.room_id, &body.room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
drop(mutex_lock); drop(state_lock);
db.flush()?; db.flush()?;

View file

@ -33,15 +33,15 @@ pub async fn create_room_route(
let room_id = RoomId::new(db.globals.server_name()); let room_id = RoomId::new(db.globals.server_name());
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let alias = body let alias = body
.room_alias_name .room_alias_name
@ -79,7 +79,7 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 2. Let the room creator join // 2. Let the room creator join
@ -102,7 +102,7 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 3. Power levels // 3. Power levels
@ -157,7 +157,7 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 4. Events set by preset // 4. Events set by preset
@ -184,7 +184,7 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 4.2 History Visibility // 4.2 History Visibility
@ -202,7 +202,7 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 4.3 Guest Access // 4.3 Guest Access
@ -228,7 +228,7 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// 5. Events listed in initial_state // 5. Events listed in initial_state
@ -244,7 +244,7 @@ pub async fn create_room_route(
} }
db.rooms db.rooms
.build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock)?; .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &state_lock)?;
} }
// 6. Events implied by name and topic // 6. Events implied by name and topic
@ -261,7 +261,7 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
} }
@ -280,12 +280,12 @@ pub async fn create_room_route(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
} }
// 7. Events implied by invite (and TODO: invite_3pid) // 7. Events implied by invite (and TODO: invite_3pid)
drop(mutex_lock); drop(state_lock);
for user_id in &body.invite { for user_id in &body.invite {
let _ = invite_helper(sender_user, user_id, &room_id, &db, body.is_direct).await; let _ = invite_helper(sender_user, user_id, &room_id, &db, body.is_direct).await;
} }
@ -364,13 +364,12 @@ pub async fn get_room_aliases_route(
#[cfg_attr( #[cfg_attr(
feature = "conduit_bin", feature = "conduit_bin",
post("/_matrix/client/r0/rooms/<_room_id>/upgrade", data = "<body>") post("/_matrix/client/r0/rooms/<_>/upgrade", data = "<body>")
)] )]
#[tracing::instrument(skip(db, body))] #[tracing::instrument(skip(db, body))]
pub async fn upgrade_room_route( pub async fn upgrade_room_route(
db: DatabaseGuard, db: DatabaseGuard,
body: Ruma<upgrade_room::Request<'_>>, body: Ruma<upgrade_room::Request<'_>>,
_room_id: String,
) -> ConduitResult<upgrade_room::Response> { ) -> ConduitResult<upgrade_room::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
@ -387,15 +386,15 @@ pub async fn upgrade_room_route(
// Create a replacement room // Create a replacement room
let replacement_room = RoomId::new(db.globals.server_name()); let replacement_room = RoomId::new(db.globals.server_name());
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(body.room_id.clone()) .entry(body.room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
// Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further
// Fail if the sender does not have the required permissions // Fail if the sender does not have the required permissions
@ -414,9 +413,21 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// Change lock to replacement room
drop(state_lock);
let mutex_state = Arc::clone(
db.globals
.roomid_mutex_state
.write()
.unwrap()
.entry(replacement_room.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
// Get the old room federations status // Get the old room federations status
let federate = serde_json::from_value::<Raw<ruma::events::room::create::CreateEventContent>>( let federate = serde_json::from_value::<Raw<ruma::events::room::create::CreateEventContent>>(
db.rooms db.rooms
@ -455,7 +466,7 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&replacement_room, &replacement_room,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// Join the new room // Join the new room
@ -478,7 +489,7 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&replacement_room, &replacement_room,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
// Recommended transferable state events list from the specs // Recommended transferable state events list from the specs
@ -512,7 +523,7 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&replacement_room, &replacement_room,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
} }
@ -556,10 +567,10 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
drop(mutex_lock); drop(state_lock);
db.flush()?; db.flush()?;

View file

@ -259,15 +259,15 @@ pub async fn send_state_event_for_key_helper(
} }
} }
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let event_id = db.rooms.build_and_append_pdu( let event_id = db.rooms.build_and_append_pdu(
PduBuilder { PduBuilder {
@ -280,7 +280,7 @@ pub async fn send_state_event_for_key_helper(
&sender_user, &sender_user,
&room_id, &room_id,
&db, &db,
&mutex_lock, &state_lock,
)?; )?;
Ok(event_id) Ok(event_id)

View file

@ -191,17 +191,17 @@ async fn sync_helper(
let room_id = room_id?; let room_id = room_id?;
// Get and drop the lock to wait for remaining operations to finish // Get and drop the lock to wait for remaining operations to finish
let mutex = Arc::clone( // This will make sure the we have all events until next_batch
let mutex_insert = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_insert
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let insert_lock = mutex_insert.lock().unwrap();
let mutex_lock = mutex.lock().await; drop(insert_lock);
drop(mutex_lock);
let mut non_timeline_pdus = db let mut non_timeline_pdus = db
.rooms .rooms
@ -665,16 +665,16 @@ async fn sync_helper(
let (room_id, left_state_events) = result?; let (room_id, left_state_events) = result?;
// Get and drop the lock to wait for remaining operations to finish // Get and drop the lock to wait for remaining operations to finish
let mutex = Arc::clone( let mutex_insert = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_insert
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let insert_lock = mutex_insert.lock().unwrap();
drop(mutex_lock); drop(insert_lock);
let left_count = db.rooms.get_left_count(&room_id, &sender_user)?; let left_count = db.rooms.get_left_count(&room_id, &sender_user)?;
@ -705,16 +705,16 @@ async fn sync_helper(
let (room_id, invite_state_events) = result?; let (room_id, invite_state_events) = result?;
// Get and drop the lock to wait for remaining operations to finish // Get and drop the lock to wait for remaining operations to finish
let mutex = Arc::clone( let mutex_insert = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_insert
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let insert_lock = mutex_insert.lock().unwrap();
drop(mutex_lock); drop(insert_lock);
let invite_count = db.rooms.get_invite_count(&room_id, &sender_user)?; let invite_count = db.rooms.get_invite_count(&room_id, &sender_user)?;

View file

@ -84,15 +84,15 @@ impl Admin {
tokio::select! { tokio::select! {
Some(event) = receiver.next() => { Some(event) = receiver.next() => {
let guard = db.read().await; let guard = db.read().await;
let mutex = Arc::clone( let mutex_state = Arc::clone(
guard.globals guard.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(conduit_room.clone()) .entry(conduit_room.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
match event { match event {
AdminCommand::RegisterAppservice(yaml) => { AdminCommand::RegisterAppservice(yaml) => {
@ -106,17 +106,17 @@ impl Admin {
count, count,
appservices.into_iter().filter_map(|r| r.ok()).collect::<Vec<_>>().join(", ") appservices.into_iter().filter_map(|r| r.ok()).collect::<Vec<_>>().join(", ")
); );
send_message(message::MessageEventContent::text_plain(output), guard, &mutex_lock); send_message(message::MessageEventContent::text_plain(output), guard, &state_lock);
} else { } else {
send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &mutex_lock); send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &state_lock);
} }
} }
AdminCommand::SendMessage(message) => { AdminCommand::SendMessage(message) => {
send_message(message, guard, &mutex_lock); send_message(message, guard, &state_lock);
} }
} }
drop(mutex_lock); drop(state_lock);
} }
} }
} }

View file

@ -12,7 +12,7 @@ use std::{
fs, fs,
future::Future, future::Future,
path::PathBuf, path::PathBuf,
sync::{Arc, RwLock}, sync::{Arc, Mutex, RwLock},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore}; use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore};
@ -45,7 +45,8 @@ pub struct Globals {
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>, pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub servername_ratelimiter: Arc<RwLock<HashMap<Box<ServerName>, Arc<Semaphore>>>>, pub servername_ratelimiter: Arc<RwLock<HashMap<Box<ServerName>, Arc<Semaphore>>>>,
pub sync_receivers: RwLock<HashMap<(UserId, Box<DeviceId>), SyncHandle>>, pub sync_receivers: RwLock<HashMap<(UserId, Box<DeviceId>), SyncHandle>>,
pub roomid_mutex: RwLock<HashMap<RoomId, Arc<TokioMutex<()>>>>, pub roomid_mutex_insert: RwLock<HashMap<RoomId, Arc<Mutex<()>>>>,
pub roomid_mutex_state: RwLock<HashMap<RoomId, Arc<TokioMutex<()>>>>,
pub roomid_mutex_federation: RwLock<HashMap<RoomId, Arc<TokioMutex<()>>>>, // this lock will be held longer pub roomid_mutex_federation: RwLock<HashMap<RoomId, Arc<TokioMutex<()>>>>, // this lock will be held longer
pub rotate: RotationHandler, pub rotate: RotationHandler,
} }
@ -200,7 +201,8 @@ impl Globals {
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
servername_ratelimiter: Arc::new(RwLock::new(HashMap::new())), servername_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
roomid_mutex: RwLock::new(HashMap::new()), roomid_mutex_state: RwLock::new(HashMap::new()),
roomid_mutex_insert: RwLock::new(HashMap::new()),
roomid_mutex_federation: RwLock::new(HashMap::new()), roomid_mutex_federation: RwLock::new(HashMap::new()),
sync_receivers: RwLock::new(HashMap::new()), sync_receivers: RwLock::new(HashMap::new()),
rotate: RotationHandler::new(), rotate: RotationHandler::new(),

View file

@ -736,6 +736,16 @@ impl Rooms {
self.replace_pdu_leaves(&pdu.room_id, leaves)?; self.replace_pdu_leaves(&pdu.room_id, leaves)?;
let mutex_insert = Arc::clone(
db.globals
.roomid_mutex_insert
.write()
.unwrap()
.entry(pdu.room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().unwrap();
let count1 = db.globals.next_count()?; let count1 = db.globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending // Mark as read first so the sending client doesn't get a notification even if appending
// fails // fails
@ -750,6 +760,8 @@ impl Rooms {
// There's a brief moment of time here where the count is updated but the pdu does not // There's a brief moment of time here where the count is updated but the pdu does not
// exist. This could theoretically lead to dropped pdus, but it's extremely rare // exist. This could theoretically lead to dropped pdus, but it's extremely rare
//
// Update: We fixed this using insert_lock
self.pduid_pdu.insert( self.pduid_pdu.insert(
&pdu_id, &pdu_id,
@ -761,6 +773,8 @@ impl Rooms {
self.eventid_pduid self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &pdu_id)?; .insert(pdu.event_id.as_bytes(), &pdu_id)?;
drop(insert_lock);
// See if the event matches any known pushers // See if the event matches any known pushers
let power_levels: PowerLevelsEventContent = db let power_levels: PowerLevelsEventContent = db
.rooms .rooms
@ -1464,13 +1478,6 @@ impl Rooms {
self.shorteventid_eventid self.shorteventid_eventid
.insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?; .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?;
// Increment the last index and use that
// This is also the next_batch/since value
let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = self.append_to_state(&pdu, &db.globals)?; let statehashid = self.append_to_state(&pdu, &db.globals)?;
@ -1904,15 +1911,15 @@ impl Rooms {
db, db,
)?; )?;
} else { } else {
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>( let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>(
self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())? self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())?
@ -1941,7 +1948,7 @@ impl Rooms {
user_id, user_id,
room_id, room_id,
db, db,
&mutex_lock, &state_lock,
)?; )?;
} }

View file

@ -1159,15 +1159,15 @@ pub fn handle_incoming_pdu<'a>(
// We start looking at current room state now, so lets lock the room // We start looking at current room state now, so lets lock the room
let mutex = Arc::clone( let mutex_state = Arc::clone(
db.globals db.globals
.roomid_mutex .roomid_mutex_state
.write() .write()
.unwrap() .unwrap()
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let state_lock = mutex_state.lock().await;
// Now we calculate the set of extremities this room has after the incoming event has been // Now we calculate the set of extremities this room has after the incoming event has been
// applied. We start with the previous extremities (aka leaves) // applied. We start with the previous extremities (aka leaves)
@ -1341,7 +1341,7 @@ pub fn handle_incoming_pdu<'a>(
val, val,
extremities, extremities,
&state_at_incoming_event, &state_at_incoming_event,
&mutex_lock, &state_lock,
) )
.map_err(|_| "Failed to add pdu to db.".to_owned())?, .map_err(|_| "Failed to add pdu to db.".to_owned())?,
); );
@ -1364,7 +1364,7 @@ pub fn handle_incoming_pdu<'a>(
} }
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
drop(mutex_lock); drop(state_lock);
Ok(pdu_id) Ok(pdu_id)
}) })
} }