0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-11 06:28:55 +02:00

ircd::fs::aio: Add blocking point to manage io_submit's in flight; various reorg.

This commit is contained in:
Jason Volk 2018-12-27 12:04:48 -08:00
parent 69ae8308e5
commit 0c5ebde1fc
3 changed files with 77 additions and 53 deletions

View file

@ -73,7 +73,8 @@ struct ircd::fs::aio::stats
uint32_t cur_reads {0}; ///< pending reads
uint32_t cur_writes {0}; ///< pending write
uint32_t cur_bytes_write {0}; ///< pending write bytes
uint32_t cur_queued {0}; ///< size of submit queues.
uint32_t cur_queued {0}; ///< size of submit queues
uint32_t cur_submits {0}; ///< submits in flight
uint32_t max_requests {0}; ///< maximum observed pending requests
uint32_t max_reads {0}; ///< maximum observed pending reads

View file

@ -301,7 +301,6 @@ ircd::fs::aio::request::cancel()
/// result will be available or an exception will be thrown.
size_t
ircd::fs::aio::request::operator()()
try
{
assert(context);
assert(ctx::current);
@ -312,6 +311,13 @@ try
bytes(iovec())
};
// Wait here until there's room to submit a request
context->dock.wait([]
{
const size_t count(context->qcount + context->in_flight);
return count < size_t(max_events);
});
// Submit to kernel
context->submit(*this);
@ -322,10 +328,7 @@ try
const auto &curcnt(stats.requests - stats.complete);
stats.max_requests = std::max(stats.max_requests, curcnt);
// Block for completion
while(retval == std::numeric_limits<ssize_t>::min())
ctx::wait();
context->wait(*this);
assert(retval <= ssize_t(submitted_bytes));
// Update stats for completion phase.
@ -345,19 +348,6 @@ try
return size_t(retval);
}
catch(const ctx::interrupted &e)
{
// When the ctx is interrupted we're obligated to cancel the request.
// The handler callstack is invoked directly from here by cancel() for
// what it's worth but we rethrow the interrupt anyway.
cancel();
throw;
}
catch(const ctx::terminated &)
{
cancel();
throw;
}
ircd::fs::const_iovec_view
ircd::fs::aio::request::iovec()
@ -379,24 +369,28 @@ const
ircd::fs::aio::kernel::kernel()
try
:queue
:event
{
MAX_EVENTS
size_t(max_events)
}
,event
,queue
{
MAX_EVENTS
size_t(max_submit)
}
,resfd
{
ios::get(), int(syscall(::eventfd, semval, EFD_NONBLOCK))
ios::get(), int(syscall(::eventfd, ecount, EFD_NONBLOCK))
}
{
syscall<SYS_io_setup>(MAX_EVENTS, &idp);
syscall<SYS_io_setup>(size_t(max_events), &idp);
set_handle();
log::debug
{
"Established AIO context %p", this
"Established AIO(%p) context (fd:%d max_events:%zu max_submit:%zu)",
this,
int(resfd.native_handle()),
size_t(max_events),
size_t(max_submit)
};
}
catch(const std::exception &e)
@ -456,12 +450,34 @@ ircd::fs::aio::kernel::wait()
dock.wait([this]
{
return semval == uint64_t(-1);
return ecount == uint64_t(-1);
});
return true;
}
void
ircd::fs::aio::kernel::wait(request &request)
try
{
assert(ctx::current == request.waiter);
while(request.retval == std::numeric_limits<ssize_t>::min())
ctx::wait();
}
catch(const ctx::interrupted &e)
{
// When the ctx is interrupted we're obligated to cancel the request.
// The handler callstack is invoked directly from here by cancel() for
// what it's worth but we rethrow the interrupt anyway.
cancel(request);
throw;
}
catch(const ctx::terminated &)
{
cancel(request);
throw;
}
void
ircd::fs::aio::kernel::cancel(request &request)
{
@ -492,6 +508,7 @@ ircd::fs::aio::kernel::cancel(request &request)
if(erased_from_queue)
{
this->qcount--;
dock.notify_one();
stats.cur_queued--;
}
@ -506,17 +523,21 @@ ircd::fs::aio::kernel::cancel(request &request)
result.obj = uintptr_t(cb);
result.res = -1;
result.res2 = ECANCELED;
} else {
syscall_nointr<SYS_io_cancel>(idp, cb, &result);
in_flight--;
stats.cur_submits--;
dock.notify_one();
}
else syscall_nointr<SYS_io_cancel>(idp, cb, &result);
handle_event(result);
}
void
ircd::fs::aio::kernel::submit(request &request)
try
{
assert(qcount < queue.size());
assert(qcount + in_flight < max_events);
assert(request.aio_data == uintptr_t(&request));
const ctx::critical_assertion ca;
@ -544,13 +565,6 @@ try
if(qcount == 1)
ircd::post(std::bind(&kernel::chase, this));
}
catch(const std::exception &e)
{
throw assertive
{
"AIO(%p) submit(): %s", this, e.what()
};
}
/// The chaser is posted to the IRCd event loop after the first
/// request is queued. Ideally more requests will queue up before
@ -572,11 +586,18 @@ ircd::fs::aio::kernel::flush()
noexcept try
{
assert(qcount > 0);
assert(in_flight + qcount < MAX_EVENTS);
assert(in_flight + qcount <= size_t(max_events));
syscall<SYS_io_submit>(idp, qcount, queue.data());
stats.maxed_submits += qcount >= size_t(max_submit);
stats.single_submits += qcount == 1;
stats.cur_submits += qcount;
stats.cur_queued -= qcount;
stats.submits++;
in_flight += qcount;
qcount = 0;
}
catch(const std::exception &e)
@ -590,11 +611,11 @@ catch(const std::exception &e)
void
ircd::fs::aio::kernel::set_handle()
{
semval = 0;
ecount = 0;
const asio::mutable_buffers_1 bufs
{
&semval, sizeof(semval)
&ecount, sizeof(ecount)
};
auto handler
@ -613,7 +634,7 @@ noexcept try
{
namespace errc = boost::system::errc;
assert((bytes == 8 && !ec && semval >= 1) || (bytes == 0 && ec));
assert((bytes == 8 && !ec && ecount >= 1) || (bytes == 0 && ec));
assert(!ec || ec.category() == asio::error::get_system_category());
switch(ec.value())
@ -638,7 +659,7 @@ catch(const ctx::interrupted &)
"AIO context %p interrupted", this
};
semval = -1;
ecount = -1;
dock.notify_all();
}
@ -661,9 +682,11 @@ noexcept try
//assert(count > 0);
assert(count >= 0);
// Update any stats.
stats.events += count;
in_flight -= count;
stats.cur_submits -= count;
stats.handles++;
if(likely(count))
dock.notify_one();
for(ssize_t i(0); i < count; ++i)
handle_event(event[i]);
@ -704,6 +727,8 @@ noexcept try
assert(ctx::current == nullptr);
if(likely(request.waiter))
ctx::notify(*request.waiter);
stats.events++;
}
catch(const std::exception &e)
{

View file

@ -28,25 +28,22 @@ namespace ircd::fs::aio
/// an extern instance pointer at fs::aio::context maintained by fs::aio::init.
struct ircd::fs::aio::kernel
{
/// Internal semaphore for synchronization of this object
ctx::dock dock;
/// io_getevents vector (in)
std::vector<io_event> event;
uint64_t ecount {0};
/// io_submit queue (out)
std::vector<iocb *> queue;
size_t qcount {0};
/// io_getevents vector (in)
std::vector<io_event> event;
/// The semaphore value for the eventfd which we keep here.
uint64_t semval {0};
/// other state
ctx::dock dock;
size_t in_flight {0};
/// An eventfd which will be notified by the kernel; we integrate this with
/// the ircd io_service core epoll() event loop. The EFD_SEMAPHORE flag is
/// not used to reduce the number of triggers. We can collect multiple AIO
/// completions after a single trigger to this fd. Because EFD_SEMAPHORE is
/// not set, the semval which is kept above will reflect a hint for how
/// many AIO's are done.
/// not used to reduce the number of triggers. The semaphore value is the
/// ecount (above) which will reflect a hint for how many AIO's are done.
asio::posix::stream_descriptor resfd;
/// Handler to the io context we submit requests to the kernel with
@ -63,6 +60,7 @@ struct ircd::fs::aio::kernel
void submit(request &);
void cancel(request &);
void wait(request &);
// Control panel
bool wait();