mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-01 04:04:26 +01:00
improvement: limit prev event fetching
This commit is contained in:
parent
b09499c2df
commit
4956fb9fba
2 changed files with 32 additions and 6 deletions
|
@ -636,7 +636,7 @@ impl Database {
|
||||||
|
|
||||||
if db.globals.database_version()? < 9 {
|
if db.globals.database_version()? < 9 {
|
||||||
// Update tokenids db layout
|
// Update tokenids db layout
|
||||||
let mut batch = db.rooms.tokenids.iter().filter_map(|(key, _)| {
|
let batch = db.rooms.tokenids.iter().filter_map(|(key, _)| {
|
||||||
if !key.starts_with(b"!") {
|
if !key.starts_with(b"!") {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
@ -659,14 +659,29 @@ impl Database {
|
||||||
println!("old {:?}", key);
|
println!("old {:?}", key);
|
||||||
println!("new {:?}", new_key);
|
println!("new {:?}", new_key);
|
||||||
Some((new_key, Vec::new()))
|
Some((new_key, Vec::new()))
|
||||||
});
|
}).collect::<Vec<_>>();
|
||||||
|
|
||||||
db.rooms.tokenids.insert_batch(&mut batch)?;
|
let mut iter = batch.into_iter().peekable();
|
||||||
|
|
||||||
for (key, _) in db.rooms.tokenids.iter() {
|
while iter.peek().is_some() {
|
||||||
|
db.rooms.tokenids.insert_batch(&mut iter.by_ref().take(1000))?;
|
||||||
|
println!("smaller batch done");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Deleting starts");
|
||||||
|
|
||||||
|
let batch2 = db.rooms.tokenids.iter().filter_map(|(key, _)| {
|
||||||
if key.starts_with(b"!") {
|
if key.starts_with(b"!") {
|
||||||
db.rooms.tokenids.remove(&key)?;
|
println!("del {:?}", key);
|
||||||
|
Some(key)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
}).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
for key in batch2 {
|
||||||
|
println!("del");
|
||||||
|
db.rooms.tokenids.remove(&key)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
db.globals.bump_database_version(9)?;
|
db.globals.bump_database_version(9)?;
|
||||||
|
|
|
@ -254,7 +254,7 @@ where
|
||||||
}); // TODO: handle timeout
|
}); // TODO: handle timeout
|
||||||
|
|
||||||
if status != 200 {
|
if status != 200 {
|
||||||
info!(
|
warn!(
|
||||||
"{} {}: {}",
|
"{} {}: {}",
|
||||||
url,
|
url,
|
||||||
status,
|
status,
|
||||||
|
@ -893,6 +893,9 @@ pub async fn handle_incoming_pdu<'a>(
|
||||||
let mut graph = HashMap::new();
|
let mut graph = HashMap::new();
|
||||||
let mut eventid_info = HashMap::new();
|
let mut eventid_info = HashMap::new();
|
||||||
let mut todo_outlier_stack = incoming_pdu.prev_events.clone();
|
let mut todo_outlier_stack = incoming_pdu.prev_events.clone();
|
||||||
|
|
||||||
|
let mut amount = 0;
|
||||||
|
|
||||||
while let Some(prev_event_id) = todo_outlier_stack.pop() {
|
while let Some(prev_event_id) = todo_outlier_stack.pop() {
|
||||||
if let Some((pdu, json_opt)) = fetch_and_handle_outliers(
|
if let Some((pdu, json_opt)) = fetch_and_handle_outliers(
|
||||||
db,
|
db,
|
||||||
|
@ -905,6 +908,13 @@ pub async fn handle_incoming_pdu<'a>(
|
||||||
.await
|
.await
|
||||||
.pop()
|
.pop()
|
||||||
{
|
{
|
||||||
|
if amount > 100 {
|
||||||
|
// Max limit reached
|
||||||
|
warn!("Max prev event limit reached!");
|
||||||
|
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(json) =
|
if let Some(json) =
|
||||||
json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten())
|
json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten())
|
||||||
{
|
{
|
||||||
|
@ -915,6 +925,7 @@ pub async fn handle_incoming_pdu<'a>(
|
||||||
.expect("Room exists")
|
.expect("Room exists")
|
||||||
.origin_server_ts
|
.origin_server_ts
|
||||||
{
|
{
|
||||||
|
amount += 1;
|
||||||
for prev_prev in &pdu.prev_events {
|
for prev_prev in &pdu.prev_events {
|
||||||
if !graph.contains_key(prev_prev) {
|
if !graph.contains_key(prev_prev) {
|
||||||
todo_outlier_stack.push(dbg!(prev_prev.clone()));
|
todo_outlier_stack.push(dbg!(prev_prev.clone()));
|
||||||
|
|
Loading…
Reference in a new issue