From 363c629fafcaa202f296d0c1988cdb26950e40ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 22 Mar 2021 14:04:11 +0100 Subject: [PATCH] fix: signature key fetching, optimize push sending --- Cargo.lock | 18 --- Cargo.toml | 9 +- src/client_server/push.rs | 2 +- src/database/pusher.rs | 196 ++++++++++++++++------------- src/database/rooms.rs | 22 +++- src/database/sending.rs | 259 ++++++++++++++++++++------------------ src/server_server.rs | 226 ++++++++++++++++++--------------- 7 files changed, 396 insertions(+), 336 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index adcc27be..d5010da8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1623,7 +1623,6 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.2" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "assign", "js_int", @@ -1643,7 +1642,6 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.2" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "http", "percent-encoding", @@ -1658,7 +1656,6 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.2" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1669,7 +1666,6 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.2" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "ruma-api", "ruma-common", @@ -1683,7 +1679,6 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.2" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "assign", "http", @@ -1702,7 +1697,6 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.3.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "js_int", "maplit", @@ -1715,7 +1709,6 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.2" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "js_int", "ruma-common", @@ -1729,7 +1722,6 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.2" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1740,7 +1732,6 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.1.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "js_int", "ruma-api", @@ -1755,7 +1746,6 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.18.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "paste", "rand", @@ -1769,7 +1759,6 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.18.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "proc-macro2", "quote", @@ -1780,12 +1769,10 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" [[package]] name = "ruma-identity-service-api" version = "0.0.1" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "ruma-api", "ruma-common", @@ -1798,7 +1785,6 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.0.1" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "js_int", "ruma-api", @@ -1813,7 +1799,6 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.3.0" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "form_urlencoded", "itoa", @@ -1826,7 +1811,6 @@ dependencies = [ [[package]] name = "ruma-serde-macros" version = "0.3.0" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1837,7 +1821,6 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675" dependencies = [ "base64 0.13.0", "ring", @@ -2105,7 +2088,6 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res?rev=34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488#34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488" dependencies = [ "itertools 0.10.0", "log", diff --git a/Cargo.toml b/Cargo.toml index 33f1d1ee..14762007 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,16 +18,15 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "93e62c86e #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "f196f5b6f164973d6b343af31ab4e0457f743675" } +#ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "f196f5b6f164973d6b343af31ab4e0457f743675" } #ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "push-gateway-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" } -#ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks"] } +ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks"] } # Used when doing state resolution # state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } # TODO: remove the gen-eventid feature -#state-res = { git = "https://github.com/ruma/state-res", branch = "main", features = ["unstable-pre-spec", "gen-eventid"] } -state-res = { git = "https://github.com/ruma/state-res", rev = "34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488", features = ["unstable-pre-spec", "gen-eventid"] } -#state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] } +#state-res = { git = "https://github.com/ruma/state-res", rev = "34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488", features = ["unstable-pre-spec", "gen-eventid"] } +state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio tokio = "1.2.0" diff --git a/src/client_server/push.rs b/src/client_server/push.rs index a7ddbb60..9de8c164 100644 --- a/src/client_server/push.rs +++ b/src/client_server/push.rs @@ -689,7 +689,7 @@ pub async fn get_pushers_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); Ok(get_pushers::Response { - pushers: db.pusher.get_pusher(sender_user)?, + pushers: db.pusher.get_pushers(sender_user)?, } .into()) } diff --git a/src/database/pusher.rs b/src/database/pusher.rs index cc421db3..b0b9e1e1 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -20,6 +20,7 @@ use ruma::{ push::{Action, PushCondition, PushFormat, Ruleset, Tweak}, uint, UInt, UserId, }; +use sled::IVec; use std::{convert::TryFrom, fmt::Debug}; @@ -58,7 +59,17 @@ impl PushData { Ok(()) } - pub fn get_pusher(&self, sender: &UserId) -> Result> { + pub fn get_pusher(&self, senderkey: &[u8]) -> Result> { + self.senderkey_pusher + .get(senderkey)? + .map(|push| { + Ok(serde_json::from_slice(&*push) + .map_err(|_| Error::bad_database("Invalid Pusher in db."))?) + }) + .transpose() + } + + pub fn get_pushers(&self, sender: &UserId) -> Result> { let mut prefix = sender.as_bytes().to_vec(); prefix.push(0xff); @@ -72,6 +83,16 @@ impl PushData { }) .collect() } + + pub fn get_pusher_senderkeys(&self, sender: &UserId) -> impl Iterator> { + let mut prefix = sender.as_bytes().to_vec(); + prefix.push(0xff); + + self.senderkey_pusher + .scan_prefix(prefix) + .keys() + .map(|r| Ok(r?)) + } } pub async fn send_request( @@ -155,7 +176,7 @@ where pub async fn send_push_notice( user: &UserId, unread: UInt, - pushers: &[Pusher], + pusher: &Pusher, ruleset: Ruleset, pdu: &PduEvent, db: &Database, @@ -194,7 +215,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -214,8 +235,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -246,7 +266,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) .await?; break; } @@ -272,7 +292,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) .await?; break; } @@ -289,7 +309,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -326,7 +346,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) .await?; break; } @@ -352,7 +372,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()) + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) .await?; break; } @@ -369,7 +389,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -385,7 +405,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -401,7 +421,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -415,7 +435,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -429,7 +449,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; break; } } @@ -442,98 +462,96 @@ pub async fn send_push_notice( async fn send_notice( unread: UInt, - pushers: &[Pusher], + pusher: &Pusher, tweaks: Vec, event: &PduEvent, db: &Database, name: &str, ) -> Result<()> { - let (http, _emails): (Vec<&Pusher>, _) = pushers - .iter() - .partition(|pusher| pusher.kind == Some(PusherKind::Http)); + // TODO: email + if pusher.kind == Some(PusherKind::Http) { + return Ok(()); + } // TODO: // Two problems with this // 1. if "event_id_only" is the only format kind it seems we should never add more info // 2. can pusher/devices have conflicting formats - for pusher in http { - let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); - let url = if let Some(url) = pusher.data.url.as_ref() { - url - } else { - error!("Http Pusher must have URL specified."); - continue; - }; + let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); + let url = if let Some(url) = pusher.data.url.as_ref() { + url + } else { + error!("Http Pusher must have URL specified."); + return Ok(()); + }; - let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone()); - let mut data_minus_url = pusher.data.clone(); - // The url must be stripped off according to spec - data_minus_url.url = None; - device.data = Some(data_minus_url); + let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone()); + let mut data_minus_url = pusher.data.clone(); + // The url must be stripped off according to spec + data_minus_url.url = None; + device.data = Some(data_minus_url); - // Tweaks are only added if the format is NOT event_id_only - if !event_id_only { - device.tweaks = tweaks.clone(); + // Tweaks are only added if the format is NOT event_id_only + if !event_id_only { + device.tweaks = tweaks.clone(); + } + + let d = &[device]; + let mut notifi = Notification::new(d); + + notifi.prio = NotificationPriority::Low; + notifi.event_id = Some(&event.event_id); + notifi.room_id = Some(&event.room_id); + // TODO: missed calls + notifi.counts = NotificationCounts::new(unread, uint!(0)); + + if event.kind == EventType::RoomEncrypted + || tweaks + .iter() + .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) + { + notifi.prio = NotificationPriority::High + } + + if event_id_only { + error!("SEND PUSH NOTICE `{}`", name); + send_request( + &db.globals, + &url, + send_event_notification::v1::Request::new(notifi), + ) + .await?; + } else { + notifi.sender = Some(&event.sender); + notifi.event_type = Some(&event.kind); + notifi.content = serde_json::value::to_raw_value(&event.content).ok(); + + if event.kind == EventType::RoomMember { + notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str()); } - let d = &[device]; - let mut notifi = Notification::new(d); + let user_name = db.users.displayname(&event.sender)?; + notifi.sender_display_name = user_name.as_deref(); + let room_name = db + .rooms + .room_state_get(&event.room_id, &EventType::RoomName, "")? + .map(|pdu| match pdu.content.get("name") { + Some(serde_json::Value::String(s)) => Some(s.to_string()), + _ => None, + }) + .flatten(); + notifi.room_name = room_name.as_deref(); - notifi.prio = NotificationPriority::Low; - notifi.event_id = Some(&event.event_id); - notifi.room_id = Some(&event.room_id); - // TODO: missed calls - notifi.counts = NotificationCounts::new(unread, uint!(0)); - - if event.kind == EventType::RoomEncrypted - || tweaks - .iter() - .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) - { - notifi.prio = NotificationPriority::High - } - - if event_id_only { - error!("SEND PUSH NOTICE `{}`", name); - send_request( - &db.globals, - &url, - send_event_notification::v1::Request::new(notifi), - ) - .await?; - } else { - notifi.sender = Some(&event.sender); - notifi.event_type = Some(&event.kind); - notifi.content = serde_json::value::to_raw_value(&event.content).ok(); - - if event.kind == EventType::RoomMember { - notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str()); - } - - let user_name = db.users.displayname(&event.sender)?; - notifi.sender_display_name = user_name.as_deref(); - let room_name = db - .rooms - .room_state_get(&event.room_id, &EventType::RoomName, "")? - .map(|pdu| match pdu.content.get("name") { - Some(serde_json::Value::String(s)) => Some(s.to_string()), - _ => None, - }) - .flatten(); - notifi.room_name = room_name.as_deref(); - - error!("SEND PUSH NOTICE Full `{}`", name); - send_request( - &db.globals, - &url, - send_event_notification::v1::Request::new(notifi), - ) - .await?; - } + error!("SEND PUSH NOTICE Full `{}`", name); + send_request( + &db.globals, + &url, + send_event_notification::v1::Request::new(notifi), + ) + .await?; } // TODO: email - // for email in emails {} Ok(()) } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index d494d334..2e2d4866 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -84,7 +84,6 @@ pub struct Rooms { impl Rooms { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. - #[tracing::instrument(skip(self))] pub fn state_full_ids(&self, shortstatehash: u64) -> Result> { Ok(self .stateid_shorteventid @@ -107,7 +106,6 @@ impl Rooms { .collect()) } - #[tracing::instrument(skip(self))] pub fn state_full( &self, room_id: &RoomId, @@ -628,7 +626,25 @@ impl Rooms { .insert(pdu.event_id.as_bytes(), &*pdu_id)?; // See if the event matches any known pushers - db.sending.send_push_pdu(&*pdu_id)?; + for user in db + .users + .iter() + .filter_map(|r| r.ok()) + .filter(|user_id| db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false)) + { + // Don't notify the user of their own events + if user == pdu.sender { + continue; + } + + for senderkey in db + .pusher + .get_pusher_senderkeys(&user) + .filter_map(|r| r.ok()) + { + db.sending.send_push_pdu(&*pdu_id, senderkey)?; + } + } match pdu.kind { EventType::RoomRedaction => { diff --git a/src/database/sending.rs b/src/database/sending.rs index 50bbc8bb..9b74ed70 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - convert::TryFrom, + convert::{TryFrom, TryInto}, fmt::Debug, sync::Arc, time::{Duration, Instant, SystemTime}, @@ -14,9 +14,9 @@ use log::{error, info, warn}; use ring::digest; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ - api::{appservice, federation, OutgoingRequest}, + api::{appservice, client::r0::push::Pusher, federation, OutgoingRequest}, events::{push_rules, EventType}, - uint, ServerName, UInt, + uint, ServerName, UInt, UserId, }; use sled::IVec; use tokio::{select, sync::Semaphore}; @@ -24,14 +24,14 @@ use tokio::{select, sync::Semaphore}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum OutgoingKind { Appservice(Box), - Push(Vec), + Push(Vec, Vec), // user and pushkey Normal(Box), } #[derive(Clone)] pub struct Sending { /// The state for a given state hash. - pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)ServerName / UserId + PduId + pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation) pub(super) maximum_requests: Arc, } @@ -85,9 +85,11 @@ impl Sending { p.extend_from_slice(server.as_bytes()); p } - OutgoingKind::Push(id) => { + OutgoingKind::Push(user, pushkey) => { let mut p = b"$".to_vec(); - p.extend_from_slice(&id); + p.extend_from_slice(&user); + p.push(0xff); + p.extend_from_slice(&pushkey); p } OutgoingKind::Normal(server) => { @@ -106,6 +108,7 @@ impl Sending { let mut subscriber = servernamepduids.watch_prefix(b""); loop { + println!("."); select! { Some(response) = futures.next() => { match response { @@ -116,9 +119,11 @@ impl Sending { p.extend_from_slice(server.as_bytes()); p } - OutgoingKind::Push(id) => { + OutgoingKind::Push(user, pushkey) => { let mut p = b"$".to_vec(); - p.extend_from_slice(&id); + p.extend_from_slice(&user); + p.push(0xff); + p.extend_from_slice(&pushkey); p }, OutgoingKind::Normal(server) => { @@ -179,9 +184,11 @@ impl Sending { p.extend_from_slice(serv.as_bytes()); p }, - OutgoingKind::Push(id) => { + OutgoingKind::Push(user, pushkey) => { let mut p = b"$".to_vec(); - p.extend_from_slice(&id); + p.extend_from_slice(&user); + p.push(0xff); + p.extend_from_slice(&pushkey); p }, OutgoingKind::Normal(serv) => { @@ -208,7 +215,6 @@ impl Sending { Some(event) = &mut subscriber => { if let sled::Event::Insert { key, .. } = event { let servernamepduid = key.clone(); - let mut parts = servernamepduid.splitn(2, |&b| b == 0xff); let exponential_backoff = |(tries, instant): &(u32, Instant)| { // Fail if a request has failed recently (exponential backoff) @@ -219,33 +225,8 @@ impl Sending { instant.elapsed() < min_elapsed_duration }; - if let Some((outgoing_kind, pdu_id)) = utils::string_from_bytes( - parts - .next() - .expect("splitn will always return 1 or more elements"), - ) - .map_err(|_| Error::bad_database("[Utf8] ServerName in servernamepduid bytes are invalid.")) - .and_then(|ident_str| { - // Appservices start with a plus - Ok(if ident_str.starts_with('+') { - OutgoingKind::Appservice( - Box::::try_from(&ident_str[1..]) - .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))? - ) - } else if ident_str.starts_with('$') { - OutgoingKind::Push(ident_str[1..].as_bytes().to_vec()) - } else { - OutgoingKind::Normal( - Box::::try_from(ident_str) - .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))? - ) - }) - }) - .and_then(|outgoing_kind| parts - .next() - .ok_or_else(|| Error::bad_database("Invalid servernamepduid in db.")) - .map(|pdu_id| (outgoing_kind, pdu_id)) - ) + + if let Some((outgoing_kind, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) .ok() .filter(|(outgoing_kind, _)| { if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) { @@ -258,9 +239,11 @@ impl Sending { p.extend_from_slice(serv.as_bytes()); p }, - OutgoingKind::Push(id) => { + OutgoingKind::Push(user, pushkey) => { let mut p = b"$".to_vec(); - p.extend_from_slice(&id); + p.extend_from_slice(&user); + p.push(0xff); + p.extend_from_slice(&pushkey); p }, OutgoingKind::Normal(serv) => { @@ -279,6 +262,8 @@ impl Sending { servercurrentpdus.insert(&key, &[]).unwrap(); servernamepduids.remove(&key).unwrap(); + dbg!("there is a future"); + futures.push( Self::handle_event( outgoing_kind, @@ -295,15 +280,9 @@ impl Sending { } #[tracing::instrument(skip(self))] - pub fn send_push_pdu(&self, pdu_id: &[u8]) -> Result<()> { - // Make sure we don't cause utf8 errors when parsing to a String... - let pduid = String::from_utf8_lossy(pdu_id).as_bytes().to_vec(); - - // these are valid ServerName chars - // (byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'.') + pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: IVec) -> Result<()> { let mut key = b"$".to_vec(); - // keep each pdu push unique - key.extend_from_slice(pduid.as_slice()); + key.extend_from_slice(&senderkey); key.push(0xff); key.extend_from_slice(pdu_id); self.servernamepduids.insert(key, b"")?; @@ -313,6 +292,7 @@ impl Sending { #[tracing::instrument(skip(self))] pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> { + dbg!(&server); let mut key = server.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(pdu_id); @@ -369,6 +349,8 @@ impl Sending { .filter_map(|r| r.ok()) .collect::>(); let permit = db.sending.maximum_requests.acquire().await; + + error!("sending pdus to {}: {:#?}", server, pdu_jsons); let response = appservice_server::send_request( &db.globals, db.appservice @@ -391,17 +373,17 @@ impl Sending { response } - OutgoingKind::Push(id) => { + OutgoingKind::Push(user, pushkey) => { let pdus = pdu_ids .iter() .map(|pdu_id| { Ok::<_, (Vec, Error)>( db.rooms .get_pdu_from_id(pdu_id) - .map_err(|e| (id.clone(), e))? + .map_err(|e| (pushkey.clone(), e))? .ok_or_else(|| { ( - id.clone(), + pushkey.clone(), Error::bad_database( "[Push] Event in servernamepduids not found in db.", ), @@ -418,66 +400,80 @@ impl Sending { continue; } - for user in db.users.iter().filter_map(|r| r.ok()).filter(|user_id| { - db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false) - }) { - // Don't notify the user of their own events - if user == pdu.sender { - continue; - } - - let pushers = db - .pusher - .get_pusher(&user) - .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; - - let rules_for_user = db - .account_data - .get::(None, &user, EventType::PushRules) - .map_err(|e| (OutgoingKind::Push(id.clone()), e))? - .map(|ev| ev.content.global) - .unwrap_or_else(|| crate::push_rules::default_pushrules(&user)); - - let unread: UInt = if let Some(last_read) = db - .rooms - .edus - .private_read_get(&pdu.room_id, &user) - .map_err(|e| (OutgoingKind::Push(id.clone()), e))? - { - (db.rooms - .pdus_since(&user, &pdu.room_id, last_read) - .map_err(|e| (OutgoingKind::Push(id.clone()), e))? - .filter_map(|pdu| pdu.ok()) // Filter out buggy events - .filter(|(_, pdu)| { - matches!( - pdu.kind.clone(), - EventType::RoomMessage | EventType::RoomEncrypted - ) - }) - .count() as u32) - .into() - } else { - // Just return zero unread messages - uint!(0) - }; - - let permit = db.sending.maximum_requests.acquire().await; - let _response = pusher::send_push_notice( - &user, - unread, - &pushers, - rules_for_user, - &pdu, - db, + let userid = UserId::try_from(utils::string_from_bytes(user).map_err(|e| { + ( + OutgoingKind::Push(user.clone(), pushkey.clone()), + Error::bad_database("Invalid push user string in db."), ) - .await - .map(|_response| kind.clone()) - .map_err(|e| (kind.clone(), e)); + })?) + .map_err(|e| { + ( + OutgoingKind::Push(user.clone(), pushkey.clone()), + Error::bad_database("Invalid push user id in db."), + ) + })?; - drop(permit); - } + let mut senderkey = user.clone(); + senderkey.push(0xff); + senderkey.extend_from_slice(pushkey); + + let pusher = match db + .pusher + .get_pusher(&senderkey) + .map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))? + { + Some(pusher) => pusher, + None => continue, + }; + + let rules_for_user = db + .account_data + .get::(None, &userid, EventType::PushRules) + .map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))? + .map(|ev| ev.content.global) + .unwrap_or_else(|| crate::push_rules::default_pushrules(&userid)); + + let unread: UInt = if let Some(last_read) = db + .rooms + .edus + .private_read_get(&pdu.room_id, &userid) + .map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))? + { + (db.rooms + .pdus_since(&userid, &pdu.room_id, last_read) + .map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))? + .filter_map(|pdu| pdu.ok()) // Filter out buggy events + .filter(|(_, pdu)| { + matches!( + pdu.kind.clone(), + EventType::RoomMessage | EventType::RoomEncrypted + ) + }) + .count() as u32) + .into() + } else { + // Just return zero unread messages + uint!(0) + }; + + let permit = db.sending.maximum_requests.acquire().await; + + error!("sending pdu to {}: {:#?}", userid, pdu); + let _response = pusher::send_push_notice( + &userid, + unread, + &pusher, + rules_for_user, + &pdu, + db, + ) + .await + .map(|_response| kind.clone()) + .map_err(|e| (kind.clone(), e)); + + drop(permit); } - Ok(OutgoingKind::Push(id.clone())) + Ok(OutgoingKind::Push(user.clone(), pushkey.clone())) } OutgoingKind::Normal(server) => { let pdu_jsons = pdu_ids @@ -540,30 +536,49 @@ impl Sending { } fn parse_servercurrentpdus(key: &IVec) -> Result<(OutgoingKind, IVec)> { - let mut parts = key.splitn(2, |&b| b == 0xff); - let server = parts.next().expect("splitn always returns one element"); - let pdu = parts - .next() - .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; - - let server = utils::string_from_bytes(&server).map_err(|_| { - Error::bad_database("Invalid server bytes in server_currenttransaction") - })?; - // Appservices start with a plus - Ok::<_, Error>(if server.starts_with('+') { + Ok::<_, Error>(if key.starts_with(b"+") { + let mut parts = key[1..].splitn(2, |&b| b == 0xff); + + let server = parts.next().expect("splitn always returns one element"); + let pdu = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; + let server = utils::string_from_bytes(&server).map_err(|_| { + Error::bad_database("Invalid server bytes in server_currenttransaction") + })?; + ( OutgoingKind::Appservice(Box::::try_from(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") })?), IVec::from(pdu), ) - } else if server.starts_with('$') { + } else if key.starts_with(b"$") { + let mut parts = key[1..].splitn(3, |&b| b == 0xff); + + let user = parts.next().expect("splitn always returns one element"); + let pushkey = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; + let pdu = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; ( - OutgoingKind::Push(server.as_bytes().to_vec()), + OutgoingKind::Push(user.to_vec(), pushkey.to_vec()), IVec::from(pdu), ) } else { + let mut parts = key.splitn(2, |&b| b == 0xff); + + let server = parts.next().expect("splitn always returns one element"); + let pdu = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; + let server = utils::string_from_bytes(&server).map_err(|_| { + Error::bad_database("Invalid server bytes in server_currenttransaction") + })?; + ( OutgoingKind::Normal(Box::::try_from(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") diff --git a/src/server_server.rs b/src/server_server.rs index d43588ab..82c5f82b 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -21,9 +21,10 @@ use ruma::{ }, directory::{IncomingFilter, IncomingRoomNetwork}, events::EventType, + identifiers::{KeyId, KeyName}, serde::to_canonical_value, signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, - EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, + EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, SigningKeyAlgorithm, UserId, }; use state_res::{Event, EventMap, StateMap}; use std::{ @@ -600,7 +601,7 @@ pub async fn send_transaction_message_route<'a>( // discard the event whereas the Client Server API's /send/{eventType} endpoint // would return a M_BAD_JSON error. 'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve { - debug!("Working on incoming pdu: {:?}", value); + info!("Working on incoming pdu: {:?}", value); let server_name = &body.body.origin; let mut pub_key_map = BTreeMap::new(); @@ -639,7 +640,7 @@ pub async fn send_transaction_message_route<'a>( // 6. persist the event as an outlier. db.rooms.add_pdu_outlier(&pdu)?; - debug!("Added pdu as outlier."); + info!("Added pdu as outlier."); // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all // the checks in this list starting at 1. These are not timeline events. @@ -914,7 +915,7 @@ pub async fn send_transaction_message_route<'a>( // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; - debug!("Appended incoming pdu."); + info!("Appended incoming pdu."); // Set the new room state to the resolved state update_resolved_state( @@ -961,21 +962,31 @@ fn validate_event<'a>( auth_cache: &'a mut EventMap>, ) -> AsyncRecursiveResult<'a, (Arc, Option>)> { Box::pin(async move { - for signature_server in match value + for (signature_server, signature) in match value .get("signatures") .ok_or_else(|| "No signatures in server response pdu.".to_string())? { CanonicalJsonValue::Object(map) => map, _ => return Err("Invalid signatures object in server response pdu.".to_string()), - } - .keys() - { + } { + let signature_object = match signature { + CanonicalJsonValue::Object(map) => map, + _ => { + return Err( + "Invalid signatures content object in server response pdu.".to_string() + ) + } + }; + + let signature_ids = signature_object.keys().collect::>(); + debug!("Fetching signing keys for {}", signature_server); let keys = match fetch_signing_keys( &db, &Box::::try_from(&**signature_server).map_err(|_| { "Invalid servername in signatures of server response pdu.".to_string() })?, + signature_ids, ) .await { @@ -987,26 +998,29 @@ fn validate_event<'a>( } }; - pub_key_map.insert(signature_server.clone(), keys); + pub_key_map.insert(dbg!(signature_server.clone()), dbg!(keys)); } - let mut val = - match ruma::signatures::verify_event(pub_key_map, &value, &RoomVersionId::Version5) { - Ok(ver) => { - if let ruma::signatures::Verified::Signatures = ver { - match ruma::signatures::redact(&value, &RoomVersionId::Version6) { - Ok(obj) => obj, - Err(_) => return Err("Redaction failed".to_string()), - } - } else { - value + let mut val = match ruma::signatures::verify_event( + dbg!(&pub_key_map), + &value, + &RoomVersionId::Version5, + ) { + Ok(ver) => { + if let ruma::signatures::Verified::Signatures = ver { + match ruma::signatures::redact(&value, &RoomVersionId::Version6) { + Ok(obj) => obj, + Err(_) => return Err("Redaction failed".to_string()), } + } else { + value } - Err(_e) => { - error!("{}", _e); - return Err("Signature verification failed".to_string()); - } - }; + } + Err(_e) => { + error!("{}", _e); + return Err("Signature verification failed".to_string()); + } + }; // Now that we have checked the signature and hashes we can add the eventID and convert // to our PduEvent type also finally verifying the first step listed above @@ -1116,7 +1130,7 @@ pub(crate) async fn fetch_events( Arc::new(pdu) } None => { - debug!("Fetching event over federation"); + debug!("Fetching event over federation: {:?}", id); match db .sending .send_federation_request( @@ -1159,78 +1173,93 @@ pub(crate) async fn fetch_events( pub(crate) async fn fetch_signing_keys( db: &Database, origin: &ServerName, + signature_ids: Vec<&String>, ) -> Result> { - let mut result = BTreeMap::new(); + let contains_all_ids = |keys: &BTreeMap| { + signature_ids + .iter() + .all(|&id| dbg!(dbg!(&keys).contains_key(dbg!(id)))) + }; - match db.globals.signing_keys_for(origin)? { - keys if !keys.is_empty() => Ok(keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect()), - _ => { - match db - .sending - .send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new()) - .await - { - Ok(keys) => { - db.globals.add_signing_key(origin, &keys.server_key)?; + let mut result = db + .globals + .signing_keys_for(origin)? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect::>(); - result.extend( - keys.server_key - .verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - keys.server_key - .old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - return Ok(result); - } - _ => { - for server in db.globals.trusted_servers() { - debug!("Asking {} for {}'s signing key", server, origin); - if let Ok(keys) = db - .sending - .send_federation_request( - &db.globals, - &server, - get_remote_server_keys::v2::Request::new( - origin, - SystemTime::now() - .checked_add(Duration::from_secs(3600)) - .expect("SystemTime to large"), - ), - ) - .await - { - debug!("Got signing keys: {:?}", keys); - for k in keys.server_keys.into_iter() { - db.globals.add_signing_key(origin, &k)?; - result.extend( - k.verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - result.extend( - k.old_verify_keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)), - ); - } - return Ok(result); - } - } - Err(Error::BadServerResponse( - "Failed to find public key for server", - )) - } + if contains_all_ids(&result) { + return Ok(result); + } + + if let Ok(get_keys_response) = db + .sending + .send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new()) + .await + { + db.globals + .add_signing_key(origin, &get_keys_response.server_key)?; + + result.extend( + get_keys_response + .server_key + .verify_keys + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)), + ); + result.extend( + get_keys_response + .server_key + .old_verify_keys + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)), + ); + + if contains_all_ids(&result) { + return Ok(result); + } + } + + for server in db.globals.trusted_servers() { + debug!("Asking {} for {}'s signing key", server, origin); + if let Ok(keys) = db + .sending + .send_federation_request( + &db.globals, + &server, + get_remote_server_keys::v2::Request::new( + origin, + SystemTime::now() + .checked_add(Duration::from_secs(3600)) + .expect("SystemTime to large"), + ), + ) + .await + { + debug!("Got signing keys: {:?}", keys); + for k in keys.server_keys.into_iter() { + db.globals.add_signing_key(origin, &k)?; + result.extend( + k.verify_keys + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)), + ); + result.extend( + k.old_verify_keys + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)), + ); + } + + if contains_all_ids(&result) { + return Ok(result); } } } + + Err(Error::BadServerResponse( + "Failed to find public key for server", + )) } /// Gather all state snapshots needed to resolve the current state of the room. @@ -1244,7 +1273,7 @@ pub(crate) async fn calculate_forward_extremities( db: &Database, pdu: &PduEvent, ) -> Result> { - let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; + let mut current_leaves = dbg!(db.rooms.get_pdu_leaves(pdu.room_id())?); let mut is_incoming_leaf = true; // Make sure the incoming event is not already a forward extremity @@ -1290,7 +1319,6 @@ pub(crate) async fn calculate_forward_extremities( /// /// This guarantees that the incoming event will be in the state sets (at least our servers /// and the sending server). -#[tracing::instrument(skip(db))] pub(crate) async fn build_forward_extremity_snapshots( db: &Database, pdu: Arc, @@ -1316,7 +1344,7 @@ pub(crate) async fn build_forward_extremity_snapshots( Some(leave_pdu) => { let pdu_shortstatehash = db .rooms - .pdu_shortstatehash(&leave_pdu.event_id)? + .pdu_shortstatehash(dbg!(&leave_pdu.event_id))? .ok_or_else(|| Error::bad_database("Found pdu with no statehash in db."))?; if current_shortstatehash.as_ref() == Some(&pdu_shortstatehash) { @@ -1367,7 +1395,9 @@ pub(crate) fn update_resolved_state( new_state.insert( ( ev_type, - state_k.ok_or_else(|| Error::Conflict("State contained non state event"))?, + state_k.ok_or_else(|| { + Error::Conflict("update_resolved_state: State contained non state event") + })?, ), pdu.event_id.clone(), ); @@ -1395,9 +1425,9 @@ pub(crate) fn append_incoming_pdu( new_state.insert( ( ev_type.clone(), - state_k - .clone() - .ok_or_else(|| Error::Conflict("State contained non state event"))?, + state_k.clone().ok_or_else(|| { + Error::Conflict("append_incoming_pdu: State contained non state event") + })?, ), state_pdu.event_id.clone(), );