0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-26 15:33:54 +01:00

ircd::fs::aio: Simplify various conditions; add various comments; cleanup.

This commit is contained in:
Jason Volk 2019-01-13 14:02:03 -08:00
parent 4871928bfa
commit ea4c3447c6
2 changed files with 77 additions and 29 deletions

View file

@ -71,7 +71,11 @@ ircd::fs::aio::init::init()
if(!bool(aio::enable)) if(!bool(aio::enable))
return; return;
system = new struct system; system = new struct aio::system
(
size_t(max_events),
size_t(max_submit)
);
} }
ircd::fs::aio::init::~init() ircd::fs::aio::init::~init()
@ -308,8 +312,7 @@ ircd::fs::aio::request::operator()()
// Wait here until there's room to submit a request // Wait here until there's room to submit a request
system->dock.wait([] system->dock.wait([]
{ {
const size_t count(system->qcount + system->in_flight); return system->request_avail() > 0;
return count < size_t(max_events);
}); });
// Submit to system // Submit to system
@ -368,30 +371,31 @@ ircd::fs::aio::system::eventfd_flags
// system::system // system::system
// //
ircd::fs::aio::system::system() ircd::fs::aio::system::system(const size_t &max_events,
const size_t &max_submit)
try try
:event :event
{ {
size_t(max_events) max_events
} }
,queue ,queue
{ {
size_t(max_submit) max_submit
} }
,resfd ,resfd
{ {
ios::get(), int(syscall(::eventfd, ecount, eventfd_flags)) ios::get(), int(syscall(::eventfd, ecount, eventfd_flags))
} }
{ {
syscall<SYS_io_setup>(size_t(max_events), &idp); syscall<SYS_io_setup>(this->max_events(), &idp);
set_handle(); set_handle();
log::debug log::debug
{ {
"Established AIO(%p) context (fd:%d max_events:%zu max_submit:%zu)", "Established AIO(%p) context (fd:%d max_events:%zu max_submit:%zu)",
this, this,
int(resfd.native_handle()), int(resfd.native_handle()),
size_t(max_events), this->max_events(),
size_t(max_submit) this->max_submit(),
}; };
} }
catch(const std::exception &e) catch(const std::exception &e)
@ -539,24 +543,22 @@ ircd::fs::aio::system::submit(request &request)
{ {
assert(request.opts); assert(request.opts);
assert(qcount < queue.size()); assert(qcount < queue.size());
assert(qcount + in_flight < max_events); assert(qcount + in_flight < max_events());
assert(request.aio_data == uintptr_t(&request)); assert(request.aio_data == uintptr_t(&request));
const ctx::critical_assertion ca; const ctx::critical_assertion ca;
queue.at(qcount++) = static_cast<iocb *>(&request); queue.at(qcount++) = static_cast<iocb *>(&request);
stats.cur_queued++; stats.cur_queued++;
// Determine whether the user wants (or needs) to submit without delay. // Determine whether this request will trigger a flush of the queue
// and be submitted itself as well.
const bool submit_now const bool submit_now
{ {
// The nodelay flag is set // The nodelay flag is set by the user.
request.opts->nodelay request.opts->nodelay
// The queue has reached the configured size // The queue has reached its limits.
|| qcount >= size_t(max_submit) || qcount >= max_submit()
// The queue has reached its maximum size
|| qcount >= queue.size()
}; };
const size_t submitted const size_t submitted
@ -564,13 +566,16 @@ ircd::fs::aio::system::submit(request &request)
submit_now? submit() : 0 submit_now? submit() : 0
}; };
// Only post the chaser when the queue has one item. If it has more
// items the chaser was already posted after the first item and will
// flush the whole queue down to 0.
if(qcount == 1) if(qcount == 1)
ircd::post(std::bind(&system::chase, this)); ircd::post(std::bind(&system::chase, this));
} }
/// The chaser is posted to the IRCd event loop after the first /// The chaser is posted to the IRCd event loop after the first request is
/// request is queued. Ideally more requests will queue up before /// Ideally more requests will queue up before the chaser reaches the front
/// the chaser is executed. /// of the IRCd event queue and executes.
void void
ircd::fs::aio::system::chase() ircd::fs::aio::system::chase()
noexcept try noexcept try
@ -600,7 +605,7 @@ try
{ {
assert(qcount > 0); assert(qcount > 0);
assert(in_flight + qcount < MAX_EVENTS); assert(in_flight + qcount < MAX_EVENTS);
assert(in_flight + qcount <= size_t(max_events)); assert(in_flight + qcount <= max_events());
const auto submitted const auto submitted
{ {
@ -738,15 +743,24 @@ void
ircd::fs::aio::system::handle_event(const io_event &event) ircd::fs::aio::system::handle_event(const io_event &event)
noexcept try noexcept try
{ {
// Our extended control block is passed in event.data // The kernel always references the iocb in `event.obj`
auto *const iocb
{
reinterpret_cast<struct ::iocb *>(event.obj)
};
// We referenced our request (which extends the same iocb anyway)
// for the kernel to carry through as an opaque in `event.data`.
auto &request auto &request
{ {
*reinterpret_cast<aio::request *>(event.data) *reinterpret_cast<aio::request *>(event.data)
}; };
auto *const iocb(reinterpret_cast<struct ::iocb *>(event.obj)); // Check that everything lines up.
assert(iocb == static_cast<struct ::iocb *>(&request)); assert(iocb == static_cast<struct ::iocb *>(&request));
assert(reinterpret_cast<aio::request *>(iocb->aio_data) == &request); assert(reinterpret_cast<aio::request *>(iocb->aio_data) == &request);
// Assert that we understand the return-value semantics of this interface.
assert(event.res2 >= 0); assert(event.res2 >= 0);
assert(event.res == -1 || event.res2 == 0); assert(event.res == -1 || event.res2 == 0);
@ -755,12 +769,10 @@ noexcept try
request.errcode = event.res >= -1? event.res2 : std::abs(event.res); request.errcode = event.res >= -1? event.res2 : std::abs(event.res);
// Notify the waiting context. Note that we are on the main async stack // Notify the waiting context. Note that we are on the main async stack
// but it is safe to notify from here. The waiter may be null if it left. // but it is safe to notify from here.
assert(!request.waiter || request.waiter != ctx::current); assert(request.waiter);
assert(ctx::current == nullptr); assert(ctx::current == nullptr);
if(likely(request.waiter)) ctx::notify(*request.waiter);
ctx::notify(*request.waiter);
stats.events++; stats.events++;
} }
catch(const std::exception &e) catch(const std::exception &e)
@ -774,6 +786,35 @@ catch(const std::exception &e)
}; };
} }
size_t
ircd::fs::aio::system::request_avail()
const
{
assert(request_count() <= max_events());
return max_events() - request_count();
}
size_t
ircd::fs::aio::system::request_count()
const
{
return qcount + in_flight;
}
size_t
ircd::fs::aio::system::max_submit()
const
{
return queue.size();
}
size_t
ircd::fs::aio::system::max_events()
const
{
return event.size();
}
// //
// internal util // internal util
// //

View file

@ -51,6 +51,11 @@ struct ircd::fs::aio::system
/// Handler to the io context we submit requests to the system with /// Handler to the io context we submit requests to the system with
aio_context_t idp {0}; aio_context_t idp {0};
size_t max_events() const;
size_t max_submit() const;
size_t request_count() const; // qcount + in_flight
size_t request_avail() const; // max_events - request_count()
// Callback stack invoked when the sigfd is notified of completed events. // Callback stack invoked when the sigfd is notified of completed events.
void handle_event(const io_event &) noexcept; void handle_event(const io_event &) noexcept;
void handle_events() noexcept; void handle_events() noexcept;
@ -68,7 +73,9 @@ struct ircd::fs::aio::system
bool wait(); bool wait();
bool interrupt(); bool interrupt();
system(); system(const size_t &max_events,
const size_t &max_submit);
~system() noexcept; ~system() noexcept;
}; };