mirror of
https://github.com/matrix-construct/construct
synced 2025-01-13 08:23:56 +01:00
ircd::fs::aio: Various improvements; fix cancel w/ queue; class member queues rather than tls.
This commit is contained in:
parent
e58a975750
commit
69ae8308e5
3 changed files with 119 additions and 42 deletions
|
@ -73,12 +73,15 @@ struct ircd::fs::aio::stats
|
|||
uint32_t cur_reads {0}; ///< pending reads
|
||||
uint32_t cur_writes {0}; ///< pending write
|
||||
uint32_t cur_bytes_write {0}; ///< pending write bytes
|
||||
uint32_t cur_queued {0}; ///< size of submit queues.
|
||||
|
||||
uint32_t max_requests {0}; ///< maximum observed pending requests
|
||||
uint32_t max_reads {0}; ///< maximum observed pending reads
|
||||
uint32_t max_writes {0}; ///< maximum observed pending write
|
||||
|
||||
uint32_t maxed_submits {0}; ///< number of submits at threshold
|
||||
uint32_t single_submits {0}; ///< number of submits of just one
|
||||
uint32_t chased_submits {0}; ///< number of submits via chase
|
||||
};
|
||||
|
||||
struct ircd::fs::aio::init
|
||||
|
|
144
ircd/aio.cc
144
ircd/aio.cc
|
@ -291,16 +291,10 @@ noexcept
|
|||
void
|
||||
ircd::fs::aio::request::cancel()
|
||||
{
|
||||
io_event result {0};
|
||||
const auto &cb{static_cast<iocb *>(this)};
|
||||
|
||||
assert(context);
|
||||
syscall_nointr<SYS_io_cancel>(context->idp, cb, &result);
|
||||
|
||||
context->cancel(*this);
|
||||
stats.bytes_cancel += bytes(iovec());
|
||||
stats.cancel++;
|
||||
|
||||
context->handle_event(result);
|
||||
}
|
||||
|
||||
/// Submit a request and properly yield the ircd::ctx. When this returns the
|
||||
|
@ -385,14 +379,21 @@ const
|
|||
|
||||
ircd::fs::aio::kernel::kernel()
|
||||
try
|
||||
:resfd
|
||||
:queue
|
||||
{
|
||||
MAX_EVENTS
|
||||
}
|
||||
,event
|
||||
{
|
||||
MAX_EVENTS
|
||||
}
|
||||
,resfd
|
||||
{
|
||||
ios::get(), int(syscall(::eventfd, semval, EFD_NONBLOCK))
|
||||
}
|
||||
{
|
||||
syscall<SYS_io_setup>(MAX_EVENTS, &idp);
|
||||
set_handle();
|
||||
|
||||
log::debug
|
||||
{
|
||||
"Established AIO context %p", this
|
||||
|
@ -411,6 +412,7 @@ catch(const std::exception &e)
|
|||
ircd::fs::aio::kernel::~kernel()
|
||||
noexcept try
|
||||
{
|
||||
assert(qcount == 0);
|
||||
const ctx::uninterruptible::nothrow ui;
|
||||
|
||||
interrupt();
|
||||
|
@ -461,43 +463,73 @@ ircd::fs::aio::kernel::wait()
|
|||
}
|
||||
|
||||
void
|
||||
ircd::fs::aio::kernel::submit(request &request)
|
||||
noexcept try
|
||||
ircd::fs::aio::kernel::cancel(request &request)
|
||||
{
|
||||
thread_local size_t count;
|
||||
thread_local std::array<iocb *, MAX_EVENTS> queue;
|
||||
|
||||
// The flusher submits all queued requests and resets the count.
|
||||
static const auto flush{[]
|
||||
const auto &cb
|
||||
{
|
||||
assert(context);
|
||||
syscall<SYS_io_submit>(context->idp, count, queue.data());
|
||||
stats.maxed_submits += count >= size_t(max_submit);
|
||||
++stats.submits;
|
||||
count = 0;
|
||||
}};
|
||||
static_cast<iocb *>(&request)
|
||||
};
|
||||
|
||||
// The chaser is posted to the IRCd event loop after the first
|
||||
// request is queued. Ideally more requests will queue up before
|
||||
// the chaser is executed.
|
||||
static const auto chase{[]
|
||||
const auto eit
|
||||
{
|
||||
if(count)
|
||||
flush();
|
||||
}};
|
||||
std::remove(begin(queue), end(queue), cb)
|
||||
};
|
||||
|
||||
assert(count < queue.size());
|
||||
const auto qcount
|
||||
{
|
||||
size_t(std::distance(begin(queue), eit))
|
||||
};
|
||||
|
||||
// We know something was erased if the qcount no longer matches
|
||||
const bool erased_from_queue
|
||||
{
|
||||
this->qcount > qcount
|
||||
};
|
||||
|
||||
// Make the qcount accurate again after any erasure.
|
||||
assert(!erased_from_queue || this->qcount == qcount + 1);
|
||||
assert(erased_from_queue || this->qcount == qcount);
|
||||
if(erased_from_queue)
|
||||
{
|
||||
this->qcount--;
|
||||
stats.cur_queued--;
|
||||
}
|
||||
|
||||
// Setup an io_event result which we will handle as a normal event
|
||||
// immediately on this stack. We create our own cancel result if
|
||||
// the request was not yet submitted to the kernel so the handler
|
||||
// remains agnostic to our userspace queues.
|
||||
io_event result {0};
|
||||
if(erased_from_queue)
|
||||
{
|
||||
result.data = cb->aio_data;
|
||||
result.obj = uintptr_t(cb);
|
||||
result.res = -1;
|
||||
result.res2 = ECANCELED;
|
||||
}
|
||||
else syscall_nointr<SYS_io_cancel>(idp, cb, &result);
|
||||
|
||||
handle_event(result);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::fs::aio::kernel::submit(request &request)
|
||||
try
|
||||
{
|
||||
assert(qcount < queue.size());
|
||||
assert(request.aio_data == uintptr_t(&request));
|
||||
|
||||
const ctx::critical_assertion ca;
|
||||
queue.at(count++) = static_cast<iocb *>(&request);
|
||||
queue.at(qcount++) = static_cast<iocb *>(&request);
|
||||
stats.cur_queued++;
|
||||
|
||||
const bool flush_now
|
||||
{
|
||||
// The queue has reached the configured size
|
||||
count >= size_t(max_submit)
|
||||
qcount >= size_t(max_submit)
|
||||
|
||||
// The queue has reached its maximum size
|
||||
|| count >= queue.size()
|
||||
|| qcount >= queue.size()
|
||||
|
||||
// The request causes serialization. This is considered true for all
|
||||
// non-reading events, even for different files and locations. It may
|
||||
|
@ -509,19 +541,50 @@ noexcept try
|
|||
if(flush_now)
|
||||
return flush();
|
||||
|
||||
if(count == 1)
|
||||
ircd::post(chase);
|
||||
if(qcount == 1)
|
||||
ircd::post(std::bind(&kernel::chase, this));
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
log::critical
|
||||
throw assertive
|
||||
{
|
||||
"AIO(%p) submit: %s",
|
||||
this,
|
||||
e.what()
|
||||
"AIO(%p) submit(): %s", this, e.what()
|
||||
};
|
||||
}
|
||||
|
||||
throw;
|
||||
/// The chaser is posted to the IRCd event loop after the first
|
||||
/// request is queued. Ideally more requests will queue up before
|
||||
/// the chaser is executed.
|
||||
void
|
||||
ircd::fs::aio::kernel::chase()
|
||||
noexcept
|
||||
{
|
||||
if(qcount)
|
||||
{
|
||||
flush();
|
||||
stats.chased_submits++;
|
||||
}
|
||||
}
|
||||
|
||||
/// The flusher submits all queued requests and resets the count.
|
||||
void
|
||||
ircd::fs::aio::kernel::flush()
|
||||
noexcept try
|
||||
{
|
||||
assert(qcount > 0);
|
||||
syscall<SYS_io_submit>(idp, qcount, queue.data());
|
||||
stats.maxed_submits += qcount >= size_t(max_submit);
|
||||
stats.single_submits += qcount == 1;
|
||||
stats.cur_queued -= qcount;
|
||||
stats.submits++;
|
||||
qcount = 0;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
throw assertive
|
||||
{
|
||||
"AIO(%p) flush(%zu): %s", this, qcount, e.what()
|
||||
};
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -584,7 +647,6 @@ ircd::fs::aio::kernel::handle_events()
|
|||
noexcept try
|
||||
{
|
||||
assert(!ctx::current);
|
||||
thread_local std::array<io_event, MAX_EVENTS> event;
|
||||
|
||||
// The number of completed requests available in events[]. This syscall
|
||||
// is restarted on EINTR. After restart, it may or may not find any ready
|
||||
|
|
14
ircd/aio.h
14
ircd/aio.h
|
@ -14,6 +14,7 @@
|
|||
|
||||
namespace ircd::fs::aio
|
||||
{
|
||||
struct kernel;
|
||||
struct request;
|
||||
|
||||
void prefetch(const fd &, const size_t &, const read_opts &);
|
||||
|
@ -30,6 +31,13 @@ struct ircd::fs::aio::kernel
|
|||
/// Internal semaphore for synchronization of this object
|
||||
ctx::dock dock;
|
||||
|
||||
/// io_submit queue (out)
|
||||
std::vector<iocb *> queue;
|
||||
size_t qcount {0};
|
||||
|
||||
/// io_getevents vector (in)
|
||||
std::vector<io_event> event;
|
||||
|
||||
/// The semaphore value for the eventfd which we keep here.
|
||||
uint64_t semval {0};
|
||||
|
||||
|
@ -50,7 +58,11 @@ struct ircd::fs::aio::kernel
|
|||
void handle(const boost::system::error_code &, const size_t) noexcept;
|
||||
void set_handle();
|
||||
|
||||
void submit(request &) noexcept;
|
||||
void flush() noexcept;
|
||||
void chase() noexcept;
|
||||
|
||||
void submit(request &);
|
||||
void cancel(request &);
|
||||
|
||||
// Control panel
|
||||
bool wait();
|
||||
|
|
Loading…
Reference in a new issue