1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-01-16 00:37:08 +01:00

feat: space hierarchies

This commit is contained in:
Timo Kösters 2023-07-02 16:06:54 +02:00
parent 6a6f8e80f1
commit 9d49d599f3
No known key found for this signature in database
GPG key ID: 0B25E636FBA7E4CB
11 changed files with 503 additions and 17 deletions

View file

@ -3,7 +3,7 @@ use ruma::{
api::client::{context::get_context, error::ErrorKind, filter::LazyLoadOptions}, api::client::{context::get_context, error::ErrorKind, filter::LazyLoadOptions},
events::StateEventType, events::StateEventType,
}; };
use std::{collections::HashSet, convert::TryFrom}; use std::collections::HashSet;
use tracing::error; use tracing::error;
/// # `GET /_matrix/client/r0/rooms/{roomId}/context` /// # `GET /_matrix/client/r0/rooms/{roomId}/context`
@ -70,9 +70,7 @@ pub async fn get_context_route(
} }
// Use limit with maximum 100 // Use limit with maximum 100
let limit = usize::try_from(body.limit) let limit = u64::from(body.limit).min(100) as usize;
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Limit value is invalid."))?
.min(100);
let base_event = base_event.to_room_event(); let base_event = base_event.to_room_event();

View file

@ -133,12 +133,7 @@ pub async fn get_message_events_route(
from, from,
)?; )?;
// Use limit or else 10, with maximum 100 let limit = u64::from(body.limit).min(100) as usize;
let limit = body
.limit
.try_into()
.map_or(10_usize, |l: u32| l as usize)
.min(100);
let next_token; let next_token;

View file

@ -21,6 +21,7 @@ mod report;
mod room; mod room;
mod search; mod search;
mod session; mod session;
mod space;
mod state; mod state;
mod sync; mod sync;
mod tag; mod tag;
@ -55,6 +56,7 @@ pub use report::*;
pub use room::*; pub use room::*;
pub use search::*; pub use search::*;
pub use session::*; pub use session::*;
pub use space::*;
pub use state::*; pub use state::*;
pub use sync::*; pub use sync::*;
pub use tag::*; pub use tag::*;

View file

@ -31,7 +31,8 @@ pub async fn search_events_route(
.collect() .collect()
}); });
let limit = filter.limit.map_or(10, |l| u64::from(l) as usize); // Use limit or else 10, with maximum 100
let limit = filter.limit.map_or(10, u64::from).min(100) as usize;
let mut searches = Vec::new(); let mut searches = Vec::new();

View file

@ -0,0 +1,34 @@
use crate::{services, Result, Ruma};
use ruma::api::client::space::get_hierarchy;
/// # `GET /_matrix/client/v1/rooms/{room_id}/hierarchy``
///
/// Paginates over the space tree in a depth-first manner to locate child rooms of a given space.
pub async fn get_hierarchy_route(
body: Ruma<get_hierarchy::v1::Request>,
) -> Result<get_hierarchy::v1::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let skip = body
.from
.as_ref()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(0);
let limit = body.limit.map_or(10, u64::from).min(100) as usize;
let max_depth = body.max_depth.map_or(3, u64::from).min(10) as usize + 1; // +1 to skip the space room itself
services()
.rooms
.spaces
.get_hierarchy(
sender_user,
&body.room_id,
limit,
skip,
max_depth,
body.suggested_only,
)
.await
}

View file

@ -151,7 +151,7 @@ where
.try_into_http_request::<Vec<u8>>( .try_into_http_request::<Vec<u8>>(
&actual_destination_str, &actual_destination_str,
SendAccessToken::IfRequired(""), SendAccessToken::IfRequired(""),
&[MatrixVersion::V1_0], &[MatrixVersion::V1_4],
) )
.map_err(|e| { .map_err(|e| {
warn!( warn!(

View file

@ -2,7 +2,8 @@
rust_2018_idioms, rust_2018_idioms,
unused_qualifications, unused_qualifications,
clippy::cloned_instead_of_copied, clippy::cloned_instead_of_copied,
clippy::str_to_string clippy::str_to_string,
clippy::future_not_send
)] )]
#![allow(clippy::suspicious_else_formatting)] #![allow(clippy::suspicious_else_formatting)]
#![deny(clippy::dbg_macro)] #![deny(clippy::dbg_macro)]
@ -386,6 +387,7 @@ fn routes() -> Router {
.ruma_route(client_server::get_relating_events_with_rel_type_and_event_type_route) .ruma_route(client_server::get_relating_events_with_rel_type_and_event_type_route)
.ruma_route(client_server::get_relating_events_with_rel_type_route) .ruma_route(client_server::get_relating_events_with_rel_type_route)
.ruma_route(client_server::get_relating_events_route) .ruma_route(client_server::get_relating_events_route)
.ruma_route(client_server::get_hierarchy_route)
.ruma_route(server_server::get_server_version_route) .ruma_route(server_server::get_server_version_route)
.route( .route(
"/_matrix/key/v2/server", "/_matrix/key/v2/server",

View file

@ -90,7 +90,7 @@ impl Services {
state_compressor: rooms::state_compressor::Service { state_compressor: rooms::state_compressor::Service {
db, db,
stateinfo_cache: Mutex::new(LruCache::new( stateinfo_cache: Mutex::new(LruCache::new(
(1000.0 * config.conduit_cache_capacity_modifier) as usize, (300.0 * config.conduit_cache_capacity_modifier) as usize,
)), )),
}, },
timeline: rooms::timeline::Service { timeline: rooms::timeline::Service {
@ -98,6 +98,9 @@ impl Services {
lasttimelinecount_cache: Mutex::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()),
}, },
threads: rooms::threads::Service { db }, threads: rooms::threads::Service { db },
spaces: rooms::spaces::Service {
roomid_spacechunk_cache: Mutex::new(LruCache::new(200)),
},
user: rooms::user::Service { db }, user: rooms::user::Service { db },
}, },
transaction_ids: transaction_ids::Service { db }, transaction_ids: transaction_ids::Service { db },

View file

@ -1,9 +1,9 @@
use crate::Error; use crate::Error;
use ruma::{ use ruma::{
events::{ events::{
room::member::RoomMemberEventContent, AnyEphemeralRoomEvent, AnyMessageLikeEvent, room::member::RoomMemberEventContent, space::child::HierarchySpaceChildEvent,
AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent, AnySyncTimelineEvent, AnyEphemeralRoomEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent,
AnyTimelineEvent, StateEvent, TimelineEventType, AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, StateEvent, TimelineEventType,
}, },
serde::Raw, serde::Raw,
state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, state_res, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch,
@ -248,6 +248,19 @@ impl PduEvent {
serde_json::from_value(json).expect("Raw::from_value always works") serde_json::from_value(json).expect("Raw::from_value always works")
} }
#[tracing::instrument(skip(self))]
pub fn to_stripped_spacechild_state_event(&self) -> Raw<HierarchySpaceChildEvent> {
let json = json!({
"content": self.content,
"type": self.kind,
"sender": self.sender,
"state_key": self.state_key,
"origin_server_ts": self.origin_server_ts,
});
serde_json::from_value(json).expect("Raw::from_value always works")
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn to_member_event(&self) -> Raw<StateEvent<RoomMemberEventContent>> { pub fn to_member_event(&self) -> Raw<StateEvent<RoomMemberEventContent>> {
let mut json = json!({ let mut json = json!({

View file

@ -9,6 +9,7 @@ pub mod outlier;
pub mod pdu_metadata; pub mod pdu_metadata;
pub mod search; pub mod search;
pub mod short; pub mod short;
pub mod spaces;
pub mod state; pub mod state;
pub mod state_accessor; pub mod state_accessor;
pub mod state_cache; pub mod state_cache;
@ -56,5 +57,6 @@ pub struct Service {
pub state_compressor: state_compressor::Service, pub state_compressor: state_compressor::Service,
pub timeline: timeline::Service, pub timeline: timeline::Service,
pub threads: threads::Service, pub threads: threads::Service,
pub spaces: spaces::Service,
pub user: user::Service, pub user: user::Service,
} }

View file

@ -0,0 +1,436 @@
use std::sync::{Arc, Mutex};
use lru_cache::LruCache;
use ruma::{
api::{
client::{
error::ErrorKind,
space::{get_hierarchy, SpaceHierarchyRoomsChunk, SpaceRoomJoinRule},
},
federation,
},
directory::PublicRoomJoinRule,
events::{
room::{
avatar::RoomAvatarEventContent,
canonical_alias::RoomCanonicalAliasEventContent,
create::RoomCreateEventContent,
guest_access::{GuestAccess, RoomGuestAccessEventContent},
history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
join_rules::{JoinRule, RoomJoinRulesEventContent},
name::RoomNameEventContent,
topic::RoomTopicEventContent,
},
StateEventType,
},
OwnedRoomId, RoomId, UserId,
};
use tracing::{debug, error, warn};
use crate::{services, Error, PduEvent, Result};
pub struct CachedSpaceChunk {
chunk: SpaceHierarchyRoomsChunk,
children: Vec<OwnedRoomId>,
join_rule: JoinRule,
}
pub struct Service {
pub roomid_spacechunk_cache: Mutex<LruCache<OwnedRoomId, Option<CachedSpaceChunk>>>,
}
impl Service {
pub async fn get_hierarchy(
&self,
sender_user: &UserId,
room_id: &RoomId,
limit: usize,
skip: usize,
max_depth: usize,
suggested_only: bool,
) -> Result<get_hierarchy::v1::Response> {
let mut left_to_skip = skip;
let mut rooms_in_path = Vec::new();
let mut stack = vec![vec![room_id.to_owned()]];
let mut results = Vec::new();
while let Some(current_room) = {
while stack.last().map_or(false, |s| s.is_empty()) {
stack.pop();
}
if !stack.is_empty() {
stack.last_mut().and_then(|s| s.pop())
} else {
None
}
} {
rooms_in_path.push(current_room.clone());
if results.len() >= limit {
break;
}
if let Some(cached) = self
.roomid_spacechunk_cache
.lock()
.unwrap()
.get_mut(&current_room.to_owned())
.as_ref()
{
if let Some(cached) = cached {
if let Some(_join_rule) =
self.handle_join_rule(&cached.join_rule, sender_user, &current_room)?
{
if left_to_skip > 0 {
left_to_skip -= 1;
} else {
results.push(cached.chunk.clone());
}
if rooms_in_path.len() < max_depth {
stack.push(cached.children.clone());
}
}
}
continue;
}
if let Some(current_shortstatehash) = services()
.rooms
.state
.get_room_shortstatehash(&current_room)?
{
let state = services()
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let mut children_ids = Vec::new();
let mut children_pdus = Vec::new();
for (key, id) in state {
let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?;
if event_type != StateEventType::SpaceChild {
continue;
}
if let Ok(room_id) = OwnedRoomId::try_from(state_key) {
children_ids.push(room_id);
children_pdus.push(services().rooms.timeline.get_pdu(&id)?.ok_or_else(
|| Error::bad_database("Event in space state not found"),
)?);
}
}
// TODO: Sort children
children_ids.reverse();
let chunk = self.get_room_chunk(sender_user, &current_room, children_pdus);
if let Ok(chunk) = chunk {
if left_to_skip > 0 {
left_to_skip -= 1;
} else {
results.push(chunk.clone());
}
let join_rule = services()
.rooms
.state_accessor
.room_state_get(&current_room, &StateEventType::RoomJoinRules, "")?
.map(|s| {
serde_json::from_str(s.content.get())
.map(|c: RoomJoinRulesEventContent| c.join_rule)
.map_err(|e| {
error!("Invalid room join rule event in database: {}", e);
Error::BadDatabase("Invalid room join rule event in database.")
})
})
.transpose()?
.unwrap_or(JoinRule::Invite);
self.roomid_spacechunk_cache.lock().unwrap().insert(
current_room.clone(),
Some(CachedSpaceChunk {
chunk,
children: children_ids.clone(),
join_rule,
}),
);
}
if rooms_in_path.len() < max_depth {
stack.push(children_ids);
}
} else {
let server = current_room.server_name();
if server == services().globals.server_name() {
continue;
}
if !results.is_empty() {
// Early return so the client can see some data already
break;
}
warn!("Asking {server} for /hierarchy");
if let Ok(response) = services()
.sending
.send_federation_request(
&server,
federation::space::get_hierarchy::v1::Request {
room_id: current_room.to_owned(),
suggested_only,
},
)
.await
{
warn!("Got response from {server} for /hierarchy\n{response:?}");
let join_rule = self.translate_pjoinrule(&response.room.join_rule)?;
let chunk = SpaceHierarchyRoomsChunk {
canonical_alias: response.room.canonical_alias,
name: response.room.name,
num_joined_members: response.room.num_joined_members,
room_id: response.room.room_id,
topic: response.room.topic,
world_readable: response.room.world_readable,
guest_can_join: response.room.guest_can_join,
avatar_url: response.room.avatar_url,
join_rule: self.translate_sjoinrule(&response.room.join_rule)?,
room_type: response.room.room_type,
children_state: response.room.children_state,
};
let children = response
.children
.iter()
.map(|c| c.room_id.clone())
.collect::<Vec<_>>();
if let Some(_join_rule) =
self.handle_join_rule(&join_rule, sender_user, &current_room)?
{
if left_to_skip > 0 {
left_to_skip -= 1;
} else {
results.push(chunk.clone());
}
if rooms_in_path.len() < max_depth {
stack.push(children.clone());
}
}
self.roomid_spacechunk_cache.lock().unwrap().insert(
current_room.clone(),
Some(CachedSpaceChunk {
chunk,
children,
join_rule,
}),
);
/* TODO:
for child in response.children {
roomid_spacechunk_cache.insert(
current_room.clone(),
CachedSpaceChunk {
chunk: child.chunk,
children,
join_rule,
},
);
}
*/
} else {
self.roomid_spacechunk_cache
.lock()
.unwrap()
.insert(current_room.clone(), None);
}
}
}
Ok(get_hierarchy::v1::Response {
next_batch: if results.is_empty() {
None
} else {
Some((skip + results.len()).to_string())
},
rooms: results,
})
}
fn get_room_chunk(
&self,
sender_user: &UserId,
room_id: &RoomId,
children: Vec<Arc<PduEvent>>,
) -> Result<SpaceHierarchyRoomsChunk> {
Ok(SpaceHierarchyRoomsChunk {
canonical_alias: services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomCanonicalAlias, "")?
.map_or(Ok(None), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomCanonicalAliasEventContent| c.alias)
.map_err(|_| {
Error::bad_database("Invalid canonical alias event in database.")
})
})?,
name: services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomName, "")?
.map_or(Ok(None), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomNameEventContent| c.name)
.map_err(|_| Error::bad_database("Invalid room name event in database."))
})?,
num_joined_members: services()
.rooms
.state_cache
.room_joined_count(&room_id)?
.unwrap_or_else(|| {
warn!("Room {} has no member count", room_id);
0
})
.try_into()
.expect("user count should not be that big"),
room_id: room_id.to_owned(),
topic: services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomTopic, "")?
.map_or(Ok(None), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomTopicEventContent| Some(c.topic))
.map_err(|_| Error::bad_database("Invalid room topic event in database."))
})?,
world_readable: services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomHistoryVisibility, "")?
.map_or(Ok(false), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomHistoryVisibilityEventContent| {
c.history_visibility == HistoryVisibility::WorldReadable
})
.map_err(|_| {
Error::bad_database(
"Invalid room history visibility event in database.",
)
})
})?,
guest_can_join: services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomGuestAccess, "")?
.map_or(Ok(false), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomGuestAccessEventContent| {
c.guest_access == GuestAccess::CanJoin
})
.map_err(|_| {
Error::bad_database("Invalid room guest access event in database.")
})
})?,
avatar_url: services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomAvatar, "")?
.map(|s| {
serde_json::from_str(s.content.get())
.map(|c: RoomAvatarEventContent| c.url)
.map_err(|_| Error::bad_database("Invalid room avatar event in database."))
})
.transpose()?
// url is now an Option<String> so we must flatten
.flatten(),
join_rule: {
let join_rule = services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomJoinRules, "")?
.map(|s| {
serde_json::from_str(s.content.get())
.map(|c: RoomJoinRulesEventContent| c.join_rule)
.map_err(|e| {
error!("Invalid room join rule event in database: {}", e);
Error::BadDatabase("Invalid room join rule event in database.")
})
})
.transpose()?
.unwrap_or(JoinRule::Invite);
self.handle_join_rule(&join_rule, sender_user, room_id)?
.ok_or_else(|| {
debug!("User is not allowed to see room {room_id}");
// This error will be caught later
Error::BadRequest(
ErrorKind::Forbidden,
"User is not allowed to see the room",
)
})?
},
room_type: services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomCreate, "")?
.map(|s| {
serde_json::from_str::<RoomCreateEventContent>(s.content.get()).map_err(|e| {
error!("Invalid room create event in database: {}", e);
Error::BadDatabase("Invalid room create event in database.")
})
})
.transpose()?
.and_then(|e| e.room_type),
children_state: children
.into_iter()
.map(|pdu| pdu.to_stripped_spacechild_state_event())
.collect(),
})
}
fn translate_pjoinrule(&self, join_rule: &PublicRoomJoinRule) -> Result<JoinRule> {
match join_rule {
PublicRoomJoinRule::Knock => Ok(JoinRule::Knock),
PublicRoomJoinRule::Public => Ok(JoinRule::Public),
_ => Err(Error::BadServerResponse("Unknown join rule")),
}
}
fn translate_sjoinrule(&self, join_rule: &PublicRoomJoinRule) -> Result<SpaceRoomJoinRule> {
match join_rule {
PublicRoomJoinRule::Knock => Ok(SpaceRoomJoinRule::Knock),
PublicRoomJoinRule::Public => Ok(SpaceRoomJoinRule::Public),
_ => Err(Error::BadServerResponse("Unknown join rule")),
}
}
fn handle_join_rule(
&self,
join_rule: &JoinRule,
sender_user: &UserId,
room_id: &RoomId,
) -> Result<Option<SpaceRoomJoinRule>> {
match join_rule {
JoinRule::Public => Ok::<_, Error>(Some(SpaceRoomJoinRule::Public)),
JoinRule::Knock => Ok(Some(SpaceRoomJoinRule::Knock)),
JoinRule::Invite => {
if services()
.rooms
.state_cache
.is_joined(sender_user, &room_id)?
{
Ok(Some(SpaceRoomJoinRule::Invite))
} else {
Ok(None)
}
}
JoinRule::Restricted(_r) => {
// TODO: Check rules
Ok(None)
}
JoinRule::KnockRestricted(_r) => {
// TODO: Check rules
Ok(None)
}
_ => Ok(None),
}
}
}