0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-26 08:42:34 +01:00

ircd:🆑 Refactor for ctx::dock notify.

This commit is contained in:
Jason Volk 2021-03-17 17:56:01 -07:00
parent a21589654a
commit 519d2e3f45

View file

@ -1007,33 +1007,23 @@ const
namespace ircd::cl
{
struct alignas(16) pkg
{
cl_event event {nullptr};
ctx::ctx *ctx {nullptr};
}
__packed;
static std::unique_ptr<ctx::context> handle_context;
static std::mutex offload_mutex alignas(64);
static std::condition_variable offload_cond alignas(64);
static std::atomic<bool> offload_term alignas(64);
static std::atomic<pkg> offload_pkg alignas(64);
struct completion;
static void handle_event(cl_event, cl_int, void *) noexcept;
static void handle_incomplete(work &, const int &);
static void handle_offload();
static void handle_worker();
}
struct ircd::cl::completion
{
cl_event event {nullptr};
cl_int status {CL_COMPLETE};
ctx::dock dock;
};
void
ircd::cl::work::init()
{
assert(!handle_context);
handle_context = std::make_unique<ctx::context>
(
"cl.work", handle_worker
);
}
void
@ -1041,88 +1031,33 @@ ircd::cl::work::fini()
noexcept
{
cl::sync();
if(!handle_context)
return;
offload_term.store(true, std::memory_order_release);
offload_cond.notify_all();
handle_context->terminate();
handle_context->join();
handle_context.reset(nullptr);
}
void
ircd::cl::handle_worker()
ircd::cl::handle_incomplete(work &work,
const int &status)
{
ctx::ole::opts opts;
opts.name = "cl.work"_sv;
ctx::offload
completion c
{
opts, []
{
while(!offload_term.load(std::memory_order_relaxed))
handle_offload();
}
cl_event(work.handle),
status,
};
}
void
ircd::cl::handle_offload()
{
// This is a pre-locked mutex used to achieve "naked wakeup" with only the
// standard API's and contractual assumptions and atomics between threads.
static std::unique_lock lock{offload_mutex};
call
(
clSetEventCallback,
cl_event(work.handle),
CL_COMPLETE,
&handle_event,
&c
);
// Acquire the work assignment pkg. The mutex/lock does nothing here
// because the main thread never takes it (nor does anyone else). Instead
// the main thread (notifier) guarantees if our result is not visible, a
// notify is to follow, therefor it's always safe for this thread to go to
// sleep if the condition is not witnessed as satisfied.
pkg p;
bool terminated {false};
offload_cond.wait(lock, [&p, &terminated]
c.dock.wait([&c]
{
// Also check for termination as another wakeup condition.
terminated = offload_term.load(std::memory_order_relaxed);
p = offload_pkg.load(std::memory_order_acquire);
return terminated | (p.event != nullptr);
return !c.event || c.status == CL_COMPLETE;
});
if(unlikely(!p.event))
{
assert(terminated);
return;
}
int status; try
{
char buf[4] {0};
status = info<int>(clGetEventInfo, p.event, CL_EVENT_COMMAND_EXECUTION_STATUS, buf);
}
catch(...)
{
status = -1;
}
if(status < 0 || status == CL_COMPLETE)
{
handle_event(p.event, status, p.ctx);
return;
}
const auto res
{
clSetEventCallback(p.event, CL_COMPLETE, &handle_event, p.ctx)
};
if(unlikely(res != CL_SUCCESS))
ircd::terminate
{
"clSetEventCallback(%p) context:%p unexpected error #%d.",
p.event,
p.ctx,
res,
};
throw_on_error(c.status);
}
void
@ -1131,86 +1066,14 @@ ircd::cl::handle_event(cl_event event,
void *const priv)
noexcept
{
always_assert(status < 0 || status == CL_COMPLETE);
// Prepare response to the main thread. The event pointer is anulled to
// indicate completion.
pkg ours;
ours.event = nullptr;
ours.ctx = reinterpret_cast<ctx::ctx *>(priv);
// Expect the main thread to not have changed anything in the pacakge
// during our operation. The context should be nicely suspended this whole
// time. In case the context anulls the package to abandon the result this
// transaction will fail and we'll do nothing further from here.
//
// Note at this time abandonment is not supported since we're missing
// another phase indicating a commitment by this thread to notify. Without
// any obligation by the main thread to such a commitment, the *ctx may
// be invalid by the time this thread notifies.
pkg theirs;
theirs.ctx = ours.ctx;
theirs.event = event;
constexpr auto
success(std::memory_order_acq_rel),
failure(std::memory_order_relaxed);
if(unlikely(!offload_pkg.compare_exchange_strong(theirs, ours, success, failure)))
return;
if(unlikely(!theirs.ctx))
return;
// Wakeup the ctx for the result. This is a special notify meant to
// originate from external threads.
ctx::notify(*theirs.ctx, ctx::threadsafe);
}
void
ircd::cl::handle_incomplete(work &work,
const int &status)
{
// Expose the datagram to the other thread. On supported architectures
// we can pass 16 bytes atomically, or two pointers. One for the event
// (input) and the other for the context to notify (output).
pkg theirs, ours;
ours.event = cl_event(work.handle);
ours.ctx = ctx::current;
constexpr auto
success(std::memory_order_acq_rel),
failure(std::memory_order_relaxed);
if(unlikely(!offload_pkg.compare_exchange_strong(theirs, ours, success, failure)))
auto *const c
{
always_assert(false); // Conflict from two ircd::ctx.
return;
}
reinterpret_cast<completion *>(priv)
};
// When finished here the package is cleared immediately without condition.
// It's probably unsafe to clear the package with zero cooperation from the
// other thread, so we disable interrupts here for now; the other thread
// must respond.
const ctx::uninterruptible::nothrow ui;
const unwind unset{[]
{
offload_pkg.store(pkg{}, std::memory_order_release);
}};
// Send a "naked wakeup" to the other thread because we're not taking a
// lock on this condition variable for the notify. To accomplish this we
// fulfill two obligations on this side:
// 1. Ensure every package is always followed by a notify after it.
// 2. Ensure notify is followed by same release-semantics as an unlock().
offload_cond.notify_one();
std::atomic_thread_fence(std::memory_order_release); do
{
// Unconditionally suspend this context. The other thread will have
// to respond with at least one notification preceded by a result.
ctx::wait();
theirs = offload_pkg.load(std::memory_order_acquire);
}
while(theirs.event != nullptr);
assert(theirs.ctx == ctx::current);
assert(c.event != nullptr);
c->status = status;
c->dock.notify_one();
}
//
@ -1245,8 +1108,14 @@ ircd::cl::work::wait()
if(!handle)
return false;
char buf[4];
const auto &status
const unwind free{[this]
{
call(clReleaseEvent, cl_event(handle));
handle = nullptr;
}};
char buf[4] {0};
const int status
{
info<int>(clGetEventInfo, cl_event(handle), CL_EVENT_COMMAND_EXECUTION_STATUS, buf)
};
@ -1254,8 +1123,6 @@ ircd::cl::work::wait()
if(status >= 0 && status != CL_COMPLETE)
handle_incomplete(*this, status);
call(clReleaseEvent, cl_event(handle));
handle = nullptr;
return true;
}