mirror of
https://github.com/matrix-construct/construct
synced 2025-01-14 16:46:50 +01:00
ircd::fs::aio: Fix waiting on cancels; various fixes.
This commit is contained in:
parent
003f9f771a
commit
1137f8a29b
2 changed files with 50 additions and 29 deletions
|
@ -405,6 +405,8 @@ ircd::fs::aio::request::operator()()
|
||||||
|
|
||||||
// Wait for completion
|
// Wait for completion
|
||||||
system->wait(*this);
|
system->wait(*this);
|
||||||
|
|
||||||
|
assert(completed());
|
||||||
assert(retval <= ssize_t(submitted_bytes));
|
assert(retval <= ssize_t(submitted_bytes));
|
||||||
|
|
||||||
// Update stats for completion phase.
|
// Update stats for completion phase.
|
||||||
|
@ -438,11 +440,22 @@ ircd::fs::aio::request::operator()()
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
ircd::fs::aio::request::queued()
|
||||||
|
const
|
||||||
|
{
|
||||||
|
return !for_each_queued([this]
|
||||||
|
(const auto &request)
|
||||||
|
{
|
||||||
|
return &request != this; // true to continue and return true
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
ircd::fs::aio::request::completed()
|
ircd::fs::aio::request::completed()
|
||||||
const
|
const
|
||||||
{
|
{
|
||||||
return retval != std::numeric_limits<decltype(retval)>::min();
|
return retval >= -1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
ircd::fs::const_iovec_view
|
ircd::fs::const_iovec_view
|
||||||
|
@ -607,36 +620,40 @@ ircd::fs::aio::system::wait()
|
||||||
|
|
||||||
void
|
void
|
||||||
ircd::fs::aio::system::wait(request &request)
|
ircd::fs::aio::system::wait(request &request)
|
||||||
try
|
|
||||||
{
|
{
|
||||||
assert(ctx::current == request.waiter);
|
assert(ctx::current == request.waiter);
|
||||||
while(request.retval == std::numeric_limits<ssize_t>::min())
|
while(!request.completed()) try
|
||||||
|
{
|
||||||
ctx::wait();
|
ctx::wait();
|
||||||
}
|
}
|
||||||
catch(const ctx::interrupted &e)
|
catch(...)
|
||||||
{
|
{
|
||||||
// When the ctx is interrupted we're obligated to cancel the request.
|
// When the ctx is interrupted we're obliged to cancel the request
|
||||||
// The handler callstack is invoked directly from here by cancel() for
|
// if it has not reached a completed state.
|
||||||
// what it's worth but we rethrow the interrupt anyway.
|
if(request.completed())
|
||||||
if(!request.completed())
|
|
||||||
request.cancel();
|
|
||||||
|
|
||||||
throw;
|
throw;
|
||||||
}
|
|
||||||
catch(const ctx::terminated &)
|
|
||||||
{
|
|
||||||
if(!request.completed())
|
|
||||||
request.cancel();
|
|
||||||
|
|
||||||
|
// The handler callstack is invoked synchronously on this stack for
|
||||||
|
// requests which are still in our userspace queue.
|
||||||
|
if(request.queued())
|
||||||
|
{
|
||||||
|
request.cancel();
|
||||||
throw;
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The handler callstack is invoked asynchronously for requests
|
||||||
|
// submitted to the kernel; we *must* wait for that by blocking
|
||||||
|
// ctx interrupts and terminations and continue to wait.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
ircd::fs::aio::system::cancel(request &request)
|
ircd::fs::aio::system::cancel(request &request)
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
assert(request.retval == std::numeric_limits<ssize_t>::min());
|
|
||||||
assert(request.aio_data == uintptr_t(&request));
|
assert(request.aio_data == uintptr_t(&request));
|
||||||
|
assert(!request.completed() || request.queued());
|
||||||
|
|
||||||
iocb *const cb
|
iocb *const cb
|
||||||
{
|
{
|
||||||
|
@ -681,6 +698,7 @@ try
|
||||||
result.res = -1;
|
result.res = -1;
|
||||||
result.res2 = ECANCELED;
|
result.res2 = ECANCELED;
|
||||||
} else {
|
} else {
|
||||||
|
assert(!request.queued());
|
||||||
syscall_nointr<SYS_io_cancel>(head.get(), cb, &result);
|
syscall_nointr<SYS_io_cancel>(head.get(), cb, &result);
|
||||||
in_flight--;
|
in_flight--;
|
||||||
stats.cur_submits--;
|
stats.cur_submits--;
|
||||||
|
@ -692,7 +710,8 @@ try
|
||||||
}
|
}
|
||||||
catch(const std::system_error &e)
|
catch(const std::system_error &e)
|
||||||
{
|
{
|
||||||
log::error
|
assert(request.aio_data == uintptr_t(&request));
|
||||||
|
log::critical
|
||||||
{
|
{
|
||||||
"AIO(%p) cancel(fd:%d size:%zu off:%zd op:%u pri:%u) #%lu :%s",
|
"AIO(%p) cancel(fd:%d size:%zu off:%zd op:%u pri:%u) #%lu :%s",
|
||||||
this,
|
this,
|
||||||
|
@ -715,10 +734,10 @@ ircd::fs::aio::system::submit(request &request)
|
||||||
assert(qcount < queue.size());
|
assert(qcount < queue.size());
|
||||||
assert(qcount + in_flight < max_events());
|
assert(qcount + in_flight < max_events());
|
||||||
assert(request.aio_data == uintptr_t(&request));
|
assert(request.aio_data == uintptr_t(&request));
|
||||||
|
assert(!request.completed());
|
||||||
const ctx::critical_assertion ca;
|
const ctx::critical_assertion ca;
|
||||||
queue.at(qcount++) = static_cast<iocb *>(&request);
|
|
||||||
|
|
||||||
|
queue.at(qcount++) = static_cast<iocb *>(&request);
|
||||||
stats.cur_queued++;
|
stats.cur_queued++;
|
||||||
stats.max_queued = std::max(stats.max_queued, stats.cur_queued);
|
stats.max_queued = std::max(stats.max_queued, stats.cur_queued);
|
||||||
assert(stats.cur_queued == qcount);
|
assert(stats.cur_queued == qcount);
|
||||||
|
@ -754,7 +773,7 @@ ircd::fs::aio::system::submit(request &request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The chaser is posted to the IRCd event loop after the first request is
|
/// The chaser is posted to the IRCd event loop after the first request.
|
||||||
/// Ideally more requests will queue up before the chaser reaches the front
|
/// Ideally more requests will queue up before the chaser reaches the front
|
||||||
/// of the IRCd event queue and executes.
|
/// of the IRCd event queue and executes.
|
||||||
void
|
void
|
||||||
|
@ -780,7 +799,8 @@ catch(const std::exception &e)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The submitter submits all queued requests and resets the count.
|
/// The submitter submits all queued requests and resets our userspace queue
|
||||||
|
/// count down to zero.
|
||||||
size_t
|
size_t
|
||||||
ircd::fs::aio::system::submit()
|
ircd::fs::aio::system::submit()
|
||||||
noexcept try
|
noexcept try
|
||||||
|
@ -832,7 +852,7 @@ try
|
||||||
{
|
{
|
||||||
assert(qcount > 0);
|
assert(qcount > 0);
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifdef RB_DEBUG
|
||||||
const size_t count[3]
|
const size_t count[3]
|
||||||
{
|
{
|
||||||
count_queued(op::READ),
|
count_queued(op::READ),
|
||||||
|
@ -856,7 +876,7 @@ try
|
||||||
syscall<SYS_io_submit>(head.get(), qcount, queue.data())
|
syscall<SYS_io_submit>(head.get(), qcount, queue.data())
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifdef RB_DEBUG
|
||||||
stats.stalls += warning.timer.stop() > 0;
|
stats.stalls += warning.timer.stop() > 0;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -114,14 +114,15 @@ struct ircd::fs::aio::request
|
||||||
struct fdsync;
|
struct fdsync;
|
||||||
struct fsync;
|
struct fsync;
|
||||||
|
|
||||||
ctx::ctx *waiter {ctx::current};
|
ssize_t retval {-2L};
|
||||||
ssize_t retval {std::numeric_limits<ssize_t>::min()};
|
ssize_t errcode {0L};
|
||||||
ssize_t errcode {0};
|
|
||||||
const struct opts *opts {nullptr};
|
const struct opts *opts {nullptr};
|
||||||
|
ctx::ctx *waiter {ctx::current};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
const_iovec_view iovec() const;
|
const_iovec_view iovec() const;
|
||||||
bool completed() const;
|
bool completed() const;
|
||||||
|
bool queued() const;
|
||||||
|
|
||||||
size_t operator()();
|
size_t operator()();
|
||||||
bool cancel();
|
bool cancel();
|
||||||
|
|
Loading…
Reference in a new issue