mirror of
https://github.com/matrix-construct/construct
synced 2024-06-08 21:18:57 +02:00
ircd:Ⓜ️:acquire: Use class linkage; tweak option interface; add instance_list.
This commit is contained in:
parent
7d091f8d55
commit
1c0a9f3020
|
@ -11,17 +11,42 @@
|
|||
#pragma once
|
||||
#define HAVE_IRCD_M_ACQUIRE_H
|
||||
|
||||
namespace ircd::m::acquire
|
||||
namespace ircd::m
|
||||
{
|
||||
struct acquire;
|
||||
}
|
||||
|
||||
struct ircd::m::acquire
|
||||
:instance_list<ircd::m::acquire>
|
||||
{
|
||||
struct opts;
|
||||
struct execute;
|
||||
struct result;
|
||||
|
||||
extern log::log log;
|
||||
};
|
||||
static log::log log;
|
||||
|
||||
struct ircd::m::acquire::execute
|
||||
{
|
||||
execute(const opts &);
|
||||
const struct opts &opts;
|
||||
std::list<result> fetching;
|
||||
|
||||
private:
|
||||
bool full() const noexcept;
|
||||
bool handle(result &);
|
||||
bool handle();
|
||||
|
||||
bool started(const event::id &) const;
|
||||
bool start(const event::id &, const string_view &, const bool &, const size_t &);
|
||||
bool submit(const event::id &, const string_view &, const bool &, const size_t &);
|
||||
|
||||
bool fetch_missing(event::idx &);
|
||||
void acquire_missing();
|
||||
|
||||
bool fetch_head(const m::event &, const int64_t &);
|
||||
void acquire_head();
|
||||
|
||||
public:
|
||||
acquire(const struct opts &);
|
||||
acquire(const acquire &) = delete;
|
||||
acquire &operator=(const acquire &) = delete;
|
||||
~acquire() noexcept;
|
||||
};
|
||||
|
||||
struct ircd::m::acquire::opts
|
||||
|
@ -43,11 +68,18 @@ struct ircd::m::acquire::opts
|
|||
/// Perform missing acquisition.
|
||||
bool missing {true};
|
||||
|
||||
/// Provide a viewport size; generally obtained from the eponymous conf
|
||||
/// item and used for initial backfill
|
||||
size_t viewport_size {0};
|
||||
|
||||
/// Depthwise window of acquisition; concentrate on specific depth window
|
||||
pair<int64_t> depth {0, 0};
|
||||
|
||||
/// Provide a viewport size; conf item
|
||||
size_t viewport_size {0};
|
||||
/// Won't fetch missing of ref outside this range.
|
||||
pair<event::idx> ref {0, -1UL};
|
||||
|
||||
/// Avoids filling gaps with a depth sounding outside of the range
|
||||
pair<size_t> gap {0, -1UL};
|
||||
|
||||
/// The number of rounds the algorithm runs for.
|
||||
size_t rounds {-1UL};
|
||||
|
@ -57,16 +89,10 @@ struct ircd::m::acquire::opts
|
|||
|
||||
/// Limit the number of requests in flight at any given time.
|
||||
size_t fetch_width {128};
|
||||
|
||||
/// Avoids filling gaps with a depth sounding lte this value
|
||||
size_t gap_min {0};
|
||||
|
||||
/// Avoids filling gaps with a depth sounding greater than this value
|
||||
size_t gap_max {-1UL};
|
||||
|
||||
/// Won't fetch missing unless ref gt (newer) than this idx.
|
||||
event::idx ref_min {0};
|
||||
|
||||
/// Won't fetch missing unless ref lt (older) than this idx.
|
||||
event::idx ref_max {-1UL};
|
||||
};
|
||||
|
||||
struct ircd::m::acquire::result
|
||||
{
|
||||
ctx::future<fetch::result> future;
|
||||
event::id::buf event_id;
|
||||
};
|
||||
|
|
|
@ -8,74 +8,71 @@
|
|||
// copyright notice and this permission notice is present in all copies. The
|
||||
// full license for this software is available in the LICENSE file.
|
||||
|
||||
namespace ircd::m::acquire
|
||||
{
|
||||
struct result;
|
||||
using list = std::list<result>;
|
||||
|
||||
static bool start(const opts &, list &);
|
||||
static bool handle(const opts &, ctx::future<m::fetch::result> &);
|
||||
static bool handle(const opts &, list &);
|
||||
static void fetch_head(const opts &, list &);
|
||||
static event::idx fetch_missing(const opts &, list &);
|
||||
static void submit(const opts &, list &);
|
||||
};
|
||||
|
||||
struct ircd::m::acquire::result
|
||||
{
|
||||
ctx::future<m::fetch::result> future;
|
||||
event::id::buf event_id;
|
||||
|
||||
result(ctx::future<m::fetch::result> &&future, const event::id &event_id)
|
||||
:future{std::move(future)}
|
||||
,event_id{event_id}
|
||||
{}
|
||||
};
|
||||
|
||||
decltype(ircd::m::acquire::log)
|
||||
ircd::m::acquire::log
|
||||
{
|
||||
"m.acquire"
|
||||
};
|
||||
|
||||
template<>
|
||||
decltype(ircd::util::instance_list<ircd::m::acquire>::allocator)
|
||||
ircd::util::instance_list<ircd::m::acquire>::allocator
|
||||
{};
|
||||
|
||||
template<>
|
||||
decltype(ircd::util::instance_list<ircd::m::acquire>::list)
|
||||
ircd::util::instance_list<ircd::m::acquire>::list
|
||||
{
|
||||
allocator
|
||||
};
|
||||
|
||||
//
|
||||
// execute::execute
|
||||
//
|
||||
|
||||
ircd::m::acquire::execute::execute(const opts &opts_)
|
||||
ircd::m::acquire::acquire::acquire(const struct opts &opts)
|
||||
:opts{opts}
|
||||
{
|
||||
auto opts(opts_);
|
||||
list fetching;
|
||||
|
||||
// Branch to acquire head
|
||||
if(opts.head)
|
||||
fetch_head(opts, fetching);
|
||||
acquire_head();
|
||||
|
||||
// Branch to acquire missing
|
||||
if(opts.missing)
|
||||
for(size_t i(0); i < opts.rounds; ++i)
|
||||
{
|
||||
event::idx last;
|
||||
if(!(last = fetch_missing(opts, fetching)))
|
||||
break;
|
||||
|
||||
// After each round, set the floor for the next round.
|
||||
opts.ref_min = opts_.ref_min?: last + 1;
|
||||
if(opts.ref_min > opts.ref_max)
|
||||
break;
|
||||
}
|
||||
acquire_missing();
|
||||
|
||||
// Complete all work before returning, otherwise everything
|
||||
// will be cancelled on unwind.
|
||||
while(!fetching.empty())
|
||||
while(handle(opts, fetching));
|
||||
while(handle());
|
||||
}
|
||||
|
||||
ircd::m::event::idx
|
||||
ircd::m::acquire::fetch_missing(const opts &opts_,
|
||||
list &fetching)
|
||||
ircd::m::acquire::~acquire()
|
||||
noexcept
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::acquire::acquire_missing()
|
||||
{
|
||||
event::idx ref_min
|
||||
{
|
||||
opts.ref.first
|
||||
};
|
||||
|
||||
for(size_t i(0); i < opts.rounds; ++i)
|
||||
{
|
||||
if(!fetch_missing(ref_min))
|
||||
break;
|
||||
|
||||
if(ref_min > opts.ref.second)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::fetch_missing(event::idx &ref_min)
|
||||
{
|
||||
auto opts(opts_);
|
||||
const auto top
|
||||
{
|
||||
m::top(opts.room.room_id)
|
||||
|
@ -86,40 +83,53 @@ ircd::m::acquire::fetch_missing(const opts &opts_,
|
|||
top
|
||||
};
|
||||
|
||||
// When the viewport will be counted we seek to a depth near the room head
|
||||
// by default unless the caller forced another depth floor.
|
||||
if(!opts.depth.first && opts.viewport_size)
|
||||
opts.depth.first = top_depth - std::clamp(long(opts.viewport_size), 1024L, top_depth);
|
||||
auto depth_range
|
||||
{
|
||||
opts.depth
|
||||
};
|
||||
|
||||
if(!depth_range.first && opts.viewport_size)
|
||||
depth_range =
|
||||
{
|
||||
m::viewport(opts.room).first, depth_range.second
|
||||
};
|
||||
|
||||
if(!depth_range.second)
|
||||
depth_range.second = top_depth;
|
||||
|
||||
if(size_t(depth_range.second - depth_range.first) < opts.viewport_size)
|
||||
depth_range.first -= std::min(long(opts.viewport_size), depth_range.first);
|
||||
|
||||
m::room::events::missing missing
|
||||
{
|
||||
opts.room
|
||||
};
|
||||
|
||||
event::idx last(0);
|
||||
missing.for_each(opts.depth, [&opts, &fetching, &top, &last]
|
||||
bool ret(false);
|
||||
event::idx ref_top(ref_min);
|
||||
missing.for_each(depth_range, [this, &top, &ref_min, &ref_top, &ret]
|
||||
(const event::id &event_id, const int64_t &ref_depth, const event::idx &ref_idx)
|
||||
{
|
||||
if(ctx::interruption_requested())
|
||||
return false;
|
||||
|
||||
if(ref_idx < opts.ref_min)
|
||||
if(ref_idx < opts.ref.first || ref_idx < ref_min)
|
||||
return true;
|
||||
|
||||
if(ref_idx > opts.ref_max)
|
||||
if(ref_idx > opts.ref.second)
|
||||
return true;
|
||||
|
||||
// Branch if we have to measure the viewportion
|
||||
if(opts.viewport_size)
|
||||
{
|
||||
const m::event::idx_range range
|
||||
const m::event::idx_range idx_range
|
||||
{
|
||||
std::min(ref_idx, std::get<event::idx>(top)),
|
||||
std::max(ref_idx, std::get<event::idx>(top)),
|
||||
};
|
||||
|
||||
// Bail if this event sits above the viewport.
|
||||
if(m::room::events::count(opts.room, range) > opts.viewport_size)
|
||||
if(m::room::events::count(opts.room, idx_range) > opts.viewport_size)
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -152,124 +162,202 @@ ircd::m::acquire::fetch_missing(const opts &opts_,
|
|||
0UL
|
||||
};
|
||||
|
||||
if(gap < opts.gap_min || gap > opts.gap_max)
|
||||
// Ignore if this ref borders on a gap which does not satisfy the options
|
||||
if(gap < opts.gap.first || gap > opts.gap.second)
|
||||
return true;
|
||||
|
||||
auto _opts(opts);
|
||||
_opts.room.event_id = event_id;
|
||||
_opts.hint = opts.hint;
|
||||
_opts.viewport_size = std::clamp(gap, 1UL, 48UL);
|
||||
submit(_opts, fetching);
|
||||
|
||||
log::debug
|
||||
// The depth on each side of a gap is used as a poor heuristic to
|
||||
// guesstimate how many events might be missing and how much to
|
||||
// request from a remote at once. Due to protocol limitations, this
|
||||
// can err in both directions:
|
||||
// - It lowballs in situations like #ping:maunium.net where the DAG
|
||||
// is wide, causing more rounds of requests to fill a gap.
|
||||
// - It's overzealous in cases of secondary/distant references that
|
||||
// have nothing to do with a gap preceding the ref.
|
||||
//
|
||||
// Fortunately in practice the majority of estimates are close enough.
|
||||
// XXX /get_missing_events should be considered if there's low
|
||||
// confidence in a gap estimate.
|
||||
const auto &limit
|
||||
{
|
||||
log, "Fetch %s miss prev of %s @%lu in %s @%lu sound:%lu twain:%ld fetching:%zu",
|
||||
string_view{event_id},
|
||||
string_view{ref_id},
|
||||
ref_depth,
|
||||
string_view{ref_room.room_id},
|
||||
std::get<int64_t>(top),
|
||||
sound_depth,
|
||||
twain_depth,
|
||||
fetching.size(),
|
||||
std::clamp(gap, 1UL, 48UL)
|
||||
};
|
||||
|
||||
last = std::max(last, ref_idx);
|
||||
const bool submitted
|
||||
{
|
||||
submit(event_id, opts.hint, false, limit)
|
||||
};
|
||||
|
||||
if(submitted)
|
||||
log::debug
|
||||
{
|
||||
log, "Fetch %s miss prev of %s @%lu in %s @%lu sound:%lu twain:%ld fetching:%zu",
|
||||
string_view{event_id},
|
||||
string_view{ref_id},
|
||||
ref_depth,
|
||||
string_view{ref_room.room_id},
|
||||
std::get<int64_t>(top),
|
||||
sound_depth,
|
||||
twain_depth,
|
||||
fetching.size(),
|
||||
};
|
||||
|
||||
ref_top = std::max(ref_top, ref_idx);
|
||||
ret |= submitted;
|
||||
return true;
|
||||
});
|
||||
|
||||
return last;
|
||||
assert(ref_top >= ref_min);
|
||||
ref_min = ref_top;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::acquire::fetch_head(const opts &opts,
|
||||
list &fetching)
|
||||
ircd::m::acquire::acquire_head()
|
||||
{
|
||||
const auto top
|
||||
{
|
||||
m::top(opts.room.room_id)
|
||||
};
|
||||
|
||||
const auto handle_head{[&opts, &fetching, &top]
|
||||
(const m::event &result)
|
||||
{
|
||||
// Bail if interrupted
|
||||
if(ctx::interruption_requested())
|
||||
return false;
|
||||
|
||||
// Bail if the depth is below the window
|
||||
if(json::get<"depth"_>(result) < opts.depth.first)
|
||||
return false;
|
||||
|
||||
const auto &[top_id, top_depth, top_idx] {top};
|
||||
const auto gap
|
||||
{
|
||||
json::get<"depth"_>(result) - top_depth
|
||||
};
|
||||
|
||||
auto _opts(opts);
|
||||
_opts.room.event_id = result.event_id;
|
||||
_opts.hint = json::get<"origin"_>(result);
|
||||
_opts.hint_only = true;
|
||||
_opts.viewport_size = std::clamp(gap, 1L, 48L);
|
||||
submit(_opts, fetching);
|
||||
return true;
|
||||
}};
|
||||
|
||||
m::room::head::fetch::opts hfopts;
|
||||
hfopts.room_id = opts.room.room_id;
|
||||
hfopts.top = top;
|
||||
hfopts.top = m::top(opts.room.room_id);
|
||||
m::room::head::fetch
|
||||
{
|
||||
hfopts, handle_head
|
||||
};
|
||||
}
|
||||
hfopts, [this, &hfopts](const m::event &result)
|
||||
{
|
||||
// Bail if interrupted
|
||||
if(ctx::interruption_requested())
|
||||
return false;
|
||||
|
||||
void
|
||||
ircd::m::acquire::submit(const opts &opts,
|
||||
list &fetching)
|
||||
{
|
||||
if(start(opts, fetching))
|
||||
while(handle(opts, fetching));
|
||||
const auto &[top_id, top_depth, top_idx]
|
||||
{
|
||||
hfopts.top
|
||||
};
|
||||
|
||||
return fetch_head(result, top_depth);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::start(const opts &opts,
|
||||
list &fetching)
|
||||
ircd::m::acquire::fetch_head(const m::event &result,
|
||||
const int64_t &top_depth)
|
||||
{
|
||||
const auto match{[&opts]
|
||||
(const auto &result)
|
||||
{
|
||||
return opts.room.event_id == result.event_id;
|
||||
}};
|
||||
|
||||
// Check for duplicate.
|
||||
if(std::find_if(begin(fetching), end(fetching), match) != end(fetching))
|
||||
// Bail if the depth is below the window
|
||||
if(json::get<"depth"_>(result) < opts.depth.first)
|
||||
return false;
|
||||
|
||||
fetch::opts fopts;
|
||||
fopts.op = fetch::op::backfill;
|
||||
fopts.room_id = opts.room.room_id;
|
||||
fopts.event_id = opts.room.event_id;
|
||||
fopts.backfill_limit = opts.viewport_size;
|
||||
fopts.hint = opts.hint;
|
||||
fopts.attempt_limit = opts.hint_only;
|
||||
fetching.emplace_back(fetch::start(fopts), opts.room.event_id);
|
||||
const auto gap
|
||||
{
|
||||
json::get<"depth"_>(result) - top_depth
|
||||
};
|
||||
|
||||
const auto &limit
|
||||
{
|
||||
std::clamp(gap, 1L, 48L)
|
||||
};
|
||||
|
||||
const auto &hint
|
||||
{
|
||||
json::get<"origin"_>(result)
|
||||
};
|
||||
|
||||
const bool submitted
|
||||
{
|
||||
submit(result.event_id, hint, true, limit)
|
||||
};
|
||||
|
||||
if(submitted)
|
||||
log::debug
|
||||
{
|
||||
log, "Fetch %s head from '%s' in %s @%lu fetching:%zu",
|
||||
string_view{result.event_id},
|
||||
hint,
|
||||
string_view{opts.room.room_id},
|
||||
top_depth,
|
||||
fetching.size(),
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::handle(const opts &opts,
|
||||
list &fetching)
|
||||
ircd::m::acquire::submit(const m::event::id &event_id,
|
||||
const string_view &hint,
|
||||
const bool &hint_only,
|
||||
const size_t &limit)
|
||||
{
|
||||
const bool ret
|
||||
{
|
||||
!started(event_id)?
|
||||
start(event_id, hint, hint_only, limit):
|
||||
false
|
||||
};
|
||||
|
||||
if(ret || full())
|
||||
while(handle());
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::start(const m::event::id &event_id,
|
||||
const string_view &hint,
|
||||
const bool &hint_only,
|
||||
const size_t &limit)
|
||||
try
|
||||
{
|
||||
fetch::opts fopts;
|
||||
fopts.op = fetch::op::backfill;
|
||||
fopts.room_id = opts.room.room_id;
|
||||
fopts.event_id = event_id;
|
||||
fopts.backfill_limit = limit;
|
||||
fopts.hint = hint;
|
||||
fopts.attempt_limit = hint_only;
|
||||
fetching.emplace_back(result
|
||||
{
|
||||
fetch::start(fopts), event_id
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
catch(const ctx::interrupted &e)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
log::error
|
||||
{
|
||||
log, "Fetch %s in %s from '%s' :%s",
|
||||
string_view{event_id},
|
||||
string_view{opts.room.room_id},
|
||||
hint?: "<any>"_sv,
|
||||
e.what(),
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::started(const event::id &event_id)
|
||||
const
|
||||
{
|
||||
const auto it
|
||||
{
|
||||
std::find_if(std::begin(fetching), std::end(fetching), [&event_id]
|
||||
(const auto &result)
|
||||
{
|
||||
return result.event_id == event_id;
|
||||
})
|
||||
};
|
||||
|
||||
return it != std::end(fetching);
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::handle()
|
||||
{
|
||||
if(fetching.empty())
|
||||
return false;
|
||||
|
||||
const bool full
|
||||
{
|
||||
fetching.size() >= opts.fetch_width
|
||||
};
|
||||
|
||||
auto next
|
||||
{
|
||||
ctx::when_any(std::begin(fetching), std::end(fetching), []
|
||||
|
@ -281,11 +369,11 @@ ircd::m::acquire::handle(const opts &opts,
|
|||
|
||||
const milliseconds timeout
|
||||
{
|
||||
full? 5000: 50
|
||||
full()? 5000: 50
|
||||
};
|
||||
|
||||
if(!next.wait(timeout, std::nothrow))
|
||||
return full;
|
||||
return full();
|
||||
|
||||
const unique_iterator it
|
||||
{
|
||||
|
@ -293,37 +381,33 @@ ircd::m::acquire::handle(const opts &opts,
|
|||
};
|
||||
|
||||
assert(it.it != std::end(fetching));
|
||||
|
||||
auto _opts(opts);
|
||||
_opts.room.event_id = it.it->event_id;
|
||||
return handle(_opts, it.it->future);
|
||||
return handle(*it.it);
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::handle(const opts &opts,
|
||||
ctx::future<m::fetch::result> &future)
|
||||
ircd::m::acquire::handle(result &result)
|
||||
try
|
||||
{
|
||||
auto result
|
||||
auto response
|
||||
{
|
||||
future.get()
|
||||
result.future.get()
|
||||
};
|
||||
|
||||
const json::object response
|
||||
const json::object body
|
||||
{
|
||||
result
|
||||
response
|
||||
};
|
||||
|
||||
const json::array pdus
|
||||
{
|
||||
response["pdus"]
|
||||
body["pdus"]
|
||||
};
|
||||
|
||||
log::debug
|
||||
{
|
||||
log, "Eval %zu for %s in %s",
|
||||
pdus.size(),
|
||||
string_view{opts.room.event_id},
|
||||
string_view{result.event_id},
|
||||
string_view{opts.room.room_id},
|
||||
};
|
||||
|
||||
|
@ -352,10 +436,17 @@ catch(const std::exception &e)
|
|||
log::error
|
||||
{
|
||||
log, "Eval %s in %s :%s",
|
||||
string_view{opts.room.event_id},
|
||||
string_view{result.event_id},
|
||||
string_view{opts.room.room_id},
|
||||
e.what(),
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::full()
|
||||
const noexcept
|
||||
{
|
||||
return fetching.size() >= opts.fetch_width;
|
||||
}
|
||||
|
|
|
@ -303,11 +303,11 @@ catch(const std::exception &e)
|
|||
void
|
||||
ircd::m::init::backfill::handle_room(const room::id &room_id)
|
||||
{
|
||||
m::acquire::opts opts;
|
||||
struct m::acquire::opts opts;
|
||||
opts.room = room_id;
|
||||
opts.viewport_size = ssize_t(m::room::events::viewport_size);
|
||||
opts.viewport_size *= size_t(viewports);
|
||||
m::acquire::execute
|
||||
m::acquire
|
||||
{
|
||||
opts
|
||||
};
|
||||
|
|
|
@ -11051,15 +11051,15 @@ console_cmd__room__acquire(opt &out, const string_view &line)
|
|||
room_id
|
||||
};
|
||||
|
||||
m::acquire::opts opts;
|
||||
struct m::acquire::opts opts;
|
||||
opts.room = room_id;
|
||||
opts.depth.first = depth_start;
|
||||
opts.depth.second = depth_stop;
|
||||
opts.viewport_size = viewport_size;
|
||||
opts.rounds = rounds;
|
||||
opts.head = depth_stop == 0;
|
||||
opts.gap_min = gap_min;
|
||||
m::acquire::execute
|
||||
opts.gap.first = gap_min;
|
||||
m::acquire
|
||||
{
|
||||
opts
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue