0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 18:22:50 +01:00
construct/matrix/acquire.cc
2020-11-27 16:58:20 -08:00

568 lines
11 KiB
C++

// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
decltype(ircd::m::acquire::log)
ircd::m::acquire::log
{
"m.acquire"
};
template<>
decltype(ircd::util::instance_list<ircd::m::acquire>::allocator)
ircd::util::instance_list<ircd::m::acquire>::allocator
{};
template<>
decltype(ircd::util::instance_list<ircd::m::acquire>::list)
ircd::util::instance_list<ircd::m::acquire>::list
{
allocator
};
//
// execute::execute
//
ircd::m::acquire::acquire::acquire(const struct opts &opts)
:opts{opts}
,head_vmopts{opts.vmopts}
,history_vmopts{opts.vmopts}
{
if(opts.head)
{
head_vmopts.notify_servers = false;
head_vmopts.phase.set(m::vm::phase::NOTIFY, false);
head_vmopts.phase.set(m::vm::phase::FETCH_PREV, false);
head_vmopts.phase.set(m::vm::phase::FETCH_STATE, false);
}
if(opts.state)
{
state_vmopts.notify_servers = false;
state_vmopts.phase.set(m::vm::phase::FETCH_PREV, false);
state_vmopts.phase.set(m::vm::phase::FETCH_STATE, false);
state_vmopts.wopts.appendix.set(dbs::appendix::ROOM_HEAD, false);
}
if(opts.history)
{
history_vmopts.notify_servers = false;
history_vmopts.phase.set(m::vm::phase::NOTIFY, false);
history_vmopts.phase.set(m::vm::phase::FETCH_PREV, false);
history_vmopts.phase.set(m::vm::phase::FETCH_STATE, false);
history_vmopts.wopts.appendix.set(dbs::appendix::ROOM_HEAD, false);
}
// Branch to acquire head
if(opts.head)
if(!opts.depth.second)
acquire_head();
// Branch to acquire history
if(opts.history)
acquire_history();
// Branch to acquire state
if(opts.state)
acquire_state();
// Complete all work before returning, otherwise everything
// will be cancelled on unwind.
while(!fetching.empty())
while(handle());
}
ircd::m::acquire::~acquire()
noexcept
{
}
void
ircd::m::acquire::acquire_history()
{
event::idx ref_min
{
opts.ref.first
};
for(size_t i(0); i < opts.rounds; ++i)
{
if(!fetch_history(ref_min))
break;
if(ref_min > opts.ref.second)
break;
}
}
bool
ircd::m::acquire::fetch_history(event::idx &ref_min)
{
const auto top
{
m::top(opts.room.room_id)
};
const auto &[top_id, top_depth, top_idx]
{
top
};
auto depth_range
{
opts.depth
};
if(!depth_range.first && opts.viewport_size)
depth_range =
{
m::viewport(opts.room).first, depth_range.second
};
if(!depth_range.second)
depth_range.second = top_depth;
if(size_t(depth_range.second - depth_range.first) < opts.viewport_size)
depth_range.first -= std::min(long(opts.viewport_size), depth_range.first);
m::room::events::missing missing
{
opts.room
};
bool ret(false);
event::idx ref_top(ref_min);
missing.for_each(depth_range, [this, &top, &ref_min, &ref_top, &ret]
(const event::id &event_id, const int64_t &ref_depth, const event::idx &ref_idx)
{
if(ctx::interruption_requested())
return false;
if(ref_idx < opts.ref.first || ref_idx < ref_min)
return true;
if(ref_idx > opts.ref.second)
return true;
// Branch if we have to measure the viewportion
if(opts.viewport_size)
{
const m::event::idx_range idx_range
{
std::min(ref_idx, std::get<event::idx>(top)),
std::max(ref_idx, std::get<event::idx>(top)),
};
// Bail if this event sits above the viewport.
if(m::room::events::count(opts.room, idx_range) > opts.viewport_size)
return true;
}
const auto ref_id
{
m::event_id(ref_idx)
};
const m::room ref_room
{
opts.room.room_id, ref_id
};
const auto &[sound_depth, sound_idx]
{
m::sounding(ref_room)
};
const auto &[twain_depth, _twain_idx]
{
sound_idx == ref_idx?
m::twain(ref_room):
std::make_pair(0L, 0UL)
};
const auto gap
{
sound_depth >= twain_depth?
size_t(sound_depth - twain_depth):
0UL
};
// Ignore if this ref borders on a gap which does not satisfy the options
if(gap < opts.gap.first || gap > opts.gap.second)
return true;
// The depth on each side of a gap is used as a poor heuristic to
// guesstimate how many events might be missing and how much to
// request from a remote at once. Due to protocol limitations, this
// can err in both directions:
// - It lowballs in situations like #ping:maunium.net where the DAG
// is wide, causing more rounds of requests to fill a gap.
// - It's overzealous in cases of secondary/distant references that
// have nothing to do with a gap preceding the ref.
//
// Fortunately in practice the majority of estimates are close enough.
// XXX /get_missing_events should be considered if there's low
// confidence in a gap estimate.
const auto &limit
{
std::clamp(gap, 1UL, 48UL)
};
const bool submitted
{
submit(event_id, opts.hint, false, limit, &history_vmopts)
};
if(submitted)
log::debug
{
log, "Fetch %s miss prev of %s @%lu in %s @%lu sound:%lu twain:%ld fetching:%zu",
string_view{event_id},
string_view{ref_id},
ref_depth,
string_view{ref_room.room_id},
std::get<int64_t>(top),
sound_depth,
twain_depth,
fetching.size(),
};
ref_top = std::max(ref_top, ref_idx);
ret |= submitted;
return true;
});
assert(ref_top >= ref_min);
ref_min = ref_top;
return ret;
}
void
ircd::m::acquire::acquire_state()
{
m::event::id::buf event_id;
if(opts.room.event_id)
event_id = opts.room.event_id;
if(!event_id && opts.viewport_size)
event_id = m::event_id(std::nothrow, m::viewport(opts.room).second); //TODO: opts.viewport_size
if(!event_id && opts.history)
event_id = m::event_id(std::nothrow, m::sounding(opts.room).second);
if(!event_id && opts.head)
event_id = m::head(opts.room);
if(!event_id)
return;
m::room::state::fetch::opts sfopts;
sfopts.room.room_id = opts.room.room_id;
sfopts.room.event_id = event_id;
m::room::state::fetch
{
sfopts, [this](const m::event::id &event_id, const string_view &remote)
{
return fetch_state(event_id, remote);
}
};
}
bool
ircd::m::acquire::fetch_state(const m::event::id &event_id,
const string_view &remote)
{
// Bail if interrupted
if(ctx::interruption_requested())
return false;
const auto hostpart
{
event_id.host()
};
const auto hint
{
hostpart? hostpart: remote
};
const bool submitted
{
submit(event_id, hint, false, 1, &state_vmopts)
};
if(submitted)
log::debug
{
log, "Fetch %s in state of %s fetching:%zu",
string_view{event_id},
string_view{opts.room.room_id},
fetching.size(),
};
return true;
}
void
ircd::m::acquire::acquire_head()
{
m::room::head::fetch::opts hfopts;
hfopts.room_id = opts.room.room_id;
hfopts.top = m::top(opts.room.room_id);
m::room::head::fetch
{
hfopts, [this, &hfopts](const m::event &result)
{
const auto &[top_id, top_depth, top_idx]
{
hfopts.top
};
return fetch_head(result, top_depth);
}
};
}
bool
ircd::m::acquire::fetch_head(const m::event &result,
const int64_t &top_depth)
{
// Bail if interrupted
if(ctx::interruption_requested())
return false;
// Bail if the depth is below the window
if(json::get<"depth"_>(result) < opts.depth.first)
return false;
const auto gap
{
json::get<"depth"_>(result) - top_depth
};
const auto &limit
{
std::clamp(gap, 1L, 48L)
};
const auto &hint
{
json::get<"origin"_>(result)
};
const bool submitted
{
submit(result.event_id, hint, true, limit, &head_vmopts)
};
if(submitted)
log::debug
{
log, "Fetch %s head from '%s' in %s @%lu fetching:%zu",
string_view{result.event_id},
hint,
string_view{opts.room.room_id},
top_depth,
fetching.size(),
};
return true;
}
bool
ircd::m::acquire::submit(const m::event::id &event_id,
const string_view &hint,
const bool &hint_only,
const size_t &limit,
const vm::opts *const &vmopts)
{
const bool ret
{
!started(event_id)?
start(event_id, hint, hint_only, limit, vmopts):
false
};
if(ret || full())
while(handle());
return ret;
}
bool
ircd::m::acquire::start(const m::event::id &event_id,
const string_view &hint,
const bool &hint_only,
const size_t &limit,
const vm::opts *const &vmopts)
try
{
assert(vmopts);
fetch::opts fopts;
fopts.room_id = opts.room.room_id;
fopts.event_id = event_id;
fopts.backfill_limit = limit;
fopts.op =
limit > 1?
fetch::op::backfill:
fetch::op::event;
fopts.hint = hint;
fopts.attempt_limit =
!hint_only?
opts.attempt_max:
1U;
fetching.emplace_back(result
{
vmopts, fetch::start(fopts), event_id
});
return true;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Fetch %s in %s from '%s' :%s",
string_view{event_id},
string_view{opts.room.room_id},
hint?: "<any>"_sv,
e.what(),
};
return false;
}
bool
ircd::m::acquire::started(const event::id &event_id)
const
{
const auto it
{
std::find_if(std::begin(fetching), std::end(fetching), [&event_id]
(const auto &result)
{
return result.event_id == event_id;
})
};
return it != std::end(fetching);
}
bool
ircd::m::acquire::handle()
{
if(fetching.empty())
return false;
auto next
{
ctx::when_any(std::begin(fetching), std::end(fetching), []
(auto &it) -> ctx::future<m::fetch::result> &
{
return it->future;
})
};
const milliseconds timeout
{
full()? 5000: 50
};
ctx::interruption_point();
if(!next.wait(timeout, std::nothrow))
return full();
const unique_iterator it
{
fetching, next.get()
};
assert(it.it != std::end(fetching));
return handle(*it.it);
}
bool
ircd::m::acquire::handle(result &result)
try
{
auto response
{
result.future.get()
};
const json::object body
{
response
};
const json::array pdus
{
body["pdus"]
};
log::debug
{
log, "Eval %zu from '%s' for %s in %s",
pdus.size(),
string_view{response.origin},
string_view{result.event_id},
string_view{opts.room.room_id},
};
assert(result.vmopts);
assert
(
false
|| result.vmopts == &this->head_vmopts
|| result.vmopts == &this->history_vmopts
|| result.vmopts == &this->state_vmopts
);
auto vmopts(*result.vmopts);
vmopts.node_id = response.origin;
m::vm::eval
{
pdus, vmopts
};
return true;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Eval %s in %s :%s",
string_view{result.event_id},
string_view{opts.room.room_id},
e.what(),
};
return true;
}
bool
ircd::m::acquire::full()
const noexcept
{
return fetching.size() >= opts.fetch_width;
}