mirror of
https://github.com/matrix-construct/construct
synced 2024-11-25 08:12:37 +01:00
ircd:🆑 Offload blocking wait to thread; integrate dependencies with ircd::ctx.
This commit is contained in:
parent
5b004be146
commit
43b85dad87
3 changed files with 269 additions and 64 deletions
|
@ -36,8 +36,12 @@ namespace ircd::cl
|
|||
|
||||
/// cl_event wrapping
|
||||
struct ircd::cl::work
|
||||
:instance_list<cl::work>
|
||||
{
|
||||
void *handle {nullptr};
|
||||
ctx::ctx *context {ctx::current};
|
||||
|
||||
static void init(), fini() noexcept;
|
||||
|
||||
public:
|
||||
std::array<uint64_t, 4> profile() const;
|
||||
|
|
327
ircd/cl.cc
327
ircd/cl.cc
|
@ -190,6 +190,8 @@ ircd::cl::init::init()
|
|||
queue[i][j] = clCreateCommandQueue(primary, device[i][j], qprop, &err);
|
||||
throw_on_error(err);
|
||||
}
|
||||
|
||||
work::init();
|
||||
}
|
||||
|
||||
ircd::cl::init::~init()
|
||||
|
@ -206,7 +208,7 @@ noexcept
|
|||
log, "Shutting down OpenCL...",
|
||||
};
|
||||
|
||||
sync();
|
||||
work::fini();
|
||||
}
|
||||
|
||||
for(size_t i(0); i < PLATFORM_MAX; ++i)
|
||||
|
@ -262,6 +264,23 @@ ircd::cl::flush()
|
|||
// exec
|
||||
//
|
||||
|
||||
namespace ircd::cl
|
||||
{
|
||||
size_t depend_ctx_last(const vector_view<cl_event> &, cl::work *const &);
|
||||
}
|
||||
|
||||
template<>
|
||||
decltype(ircd::util::instance_list<ircd::cl::work>::allocator)
|
||||
ircd::util::instance_list<ircd::cl::work>::allocator
|
||||
{};
|
||||
|
||||
template<>
|
||||
decltype(ircd::util::instance_list<ircd::cl::work>::list)
|
||||
ircd::util::instance_list<ircd::cl::work>::list
|
||||
{
|
||||
allocator
|
||||
};
|
||||
|
||||
ircd::cl::exec::exec(kern &kern,
|
||||
const kern::range &work)
|
||||
try
|
||||
|
@ -270,15 +289,19 @@ try
|
|||
for(size_t i(0); i < work.global.size(); ++i)
|
||||
dim += work.global[i] > 0;
|
||||
|
||||
size_t dependencies {0};
|
||||
cl_event *const dependency
|
||||
auto &q
|
||||
{
|
||||
queue[0][0]
|
||||
};
|
||||
|
||||
cl_event dependency[1]
|
||||
{
|
||||
nullptr
|
||||
};
|
||||
|
||||
auto &q
|
||||
const auto dependencies
|
||||
{
|
||||
queue[0][0]
|
||||
depend_ctx_last(dependency, this)
|
||||
};
|
||||
|
||||
call
|
||||
|
@ -291,7 +314,7 @@ try
|
|||
work.global.data(),
|
||||
work.local.data(),
|
||||
dependencies,
|
||||
dependency,
|
||||
dependencies? dependency: nullptr,
|
||||
reinterpret_cast<cl_event *>(&this->handle)
|
||||
);
|
||||
}
|
||||
|
@ -311,15 +334,19 @@ ircd::cl::exec::exec(data &data,
|
|||
const bool blocking)
|
||||
try
|
||||
{
|
||||
size_t dependencies {0};
|
||||
cl_event *const dependency
|
||||
auto &q
|
||||
{
|
||||
queue[0][0]
|
||||
};
|
||||
|
||||
cl_event dependency[1]
|
||||
{
|
||||
nullptr
|
||||
};
|
||||
|
||||
auto &q
|
||||
const auto dependencies
|
||||
{
|
||||
queue[0][0]
|
||||
depend_ctx_last(dependency, this)
|
||||
};
|
||||
|
||||
call
|
||||
|
@ -332,7 +359,7 @@ try
|
|||
ircd::size(buf),
|
||||
ircd::data(buf),
|
||||
dependencies,
|
||||
dependency,
|
||||
dependencies? dependency: nullptr,
|
||||
reinterpret_cast<cl_event *>(&this->handle)
|
||||
);
|
||||
}
|
||||
|
@ -352,15 +379,19 @@ ircd::cl::exec::exec(data &data,
|
|||
const bool blocking)
|
||||
try
|
||||
{
|
||||
size_t dependencies {0};
|
||||
cl_event *const dependency
|
||||
auto &q
|
||||
{
|
||||
queue[0][0]
|
||||
};
|
||||
|
||||
cl_event dependency[1]
|
||||
{
|
||||
nullptr
|
||||
};
|
||||
|
||||
auto &q
|
||||
const auto dependencies
|
||||
{
|
||||
queue[0][0]
|
||||
depend_ctx_last(dependency, this)
|
||||
};
|
||||
|
||||
call
|
||||
|
@ -373,7 +404,7 @@ try
|
|||
ircd::size(buf),
|
||||
mutable_cast(ircd::data(buf)),
|
||||
dependencies,
|
||||
dependency,
|
||||
dependencies? dependency: nullptr,
|
||||
reinterpret_cast<cl_event *>(&this->handle)
|
||||
);
|
||||
}
|
||||
|
@ -388,6 +419,27 @@ catch(const std::exception &e)
|
|||
throw;
|
||||
}
|
||||
|
||||
size_t
|
||||
ircd::cl::depend_ctx_last(const vector_view<cl_event> &deps,
|
||||
cl::work *const &work)
|
||||
{
|
||||
size_t ret(0);
|
||||
for(auto it(rbegin(cl::work::list)); it != rend(cl::work::list); ++it)
|
||||
{
|
||||
cl::work *const &other{*it};
|
||||
if(other == work)
|
||||
continue;
|
||||
|
||||
if(other->context != ctx::current)
|
||||
continue;
|
||||
|
||||
deps.at(ret++) = cl_event(other->handle);
|
||||
break;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
//
|
||||
// kern
|
||||
//
|
||||
|
@ -679,12 +731,194 @@ catch(const std::exception &e)
|
|||
|
||||
namespace ircd::cl
|
||||
{
|
||||
struct handle_event_data
|
||||
struct alignas(16) pkg
|
||||
{
|
||||
ctx::ctx *c {ctx::current};
|
||||
};
|
||||
cl_event event {nullptr};
|
||||
ctx::ctx *ctx {nullptr};
|
||||
}
|
||||
__packed;
|
||||
|
||||
static std::unique_ptr<ctx::context> handle_context;
|
||||
static std::mutex offload_mutex alignas(64);
|
||||
static std::condition_variable offload_cond alignas(64);
|
||||
static std::atomic<bool> offload_term alignas(64);
|
||||
static std::atomic<pkg> offload_pkg alignas(64);
|
||||
|
||||
static void handle_event(cl_event, cl_int, void *) noexcept;
|
||||
static void handle_incomplete(work &, const int &);
|
||||
static void handle_offload();
|
||||
static void handle_worker();
|
||||
}
|
||||
|
||||
void
|
||||
ircd::cl::work::init()
|
||||
{
|
||||
assert(!handle_context);
|
||||
handle_context = std::make_unique<ctx::context>
|
||||
(
|
||||
"cl.work", handle_worker
|
||||
);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::cl::work::fini()
|
||||
noexcept
|
||||
{
|
||||
cl::sync();
|
||||
if(!handle_context)
|
||||
return;
|
||||
|
||||
offload_term.store(true, std::memory_order_release);
|
||||
offload_cond.notify_all();
|
||||
handle_context->terminate();
|
||||
handle_context->join();
|
||||
handle_context.reset(nullptr);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::cl::handle_worker()
|
||||
{
|
||||
ctx::ole::opts opts;
|
||||
opts.name = "cl.work"_sv;
|
||||
ctx::offload
|
||||
{
|
||||
opts, []
|
||||
{
|
||||
while(!offload_term.load(std::memory_order_relaxed))
|
||||
handle_offload();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void
|
||||
ircd::cl::handle_offload()
|
||||
{
|
||||
// This is a pre-locked mutex used to achieve "naked wakeup" with only the
|
||||
// standard API's and contractual assumptions and atomics between threads.
|
||||
static std::unique_lock lock{offload_mutex};
|
||||
|
||||
// Acquire the work assignment pkg. The mutex/lock does nothing here
|
||||
// because the main thread never takes it (nor does anyone else). Instead
|
||||
// the main thread (notifier) guarantees if our result is not visible, a
|
||||
// notify is to follow, therefor it's always safe for this thread to go to
|
||||
// sleep if the condition is not witnessed as satisfied.
|
||||
pkg p;
|
||||
bool terminated {false};
|
||||
offload_cond.wait(lock, [&p, &terminated]
|
||||
{
|
||||
// Also check for termination as another wakeup condition.
|
||||
terminated = offload_term.load(std::memory_order_relaxed);
|
||||
p = offload_pkg.load(std::memory_order_acquire);
|
||||
return terminated | (p.event != nullptr);
|
||||
});
|
||||
|
||||
if(unlikely(!p.event))
|
||||
{
|
||||
assert(terminated);
|
||||
return;
|
||||
}
|
||||
|
||||
const auto res
|
||||
{
|
||||
clSetEventCallback(p.event, CL_COMPLETE, &handle_event, p.ctx)
|
||||
};
|
||||
|
||||
if(unlikely(res != CL_SUCCESS))
|
||||
ircd::terminate
|
||||
{
|
||||
"clSetEventCallback(%p) context:%p unexpected error #%d.",
|
||||
p.event,
|
||||
p.ctx,
|
||||
res,
|
||||
};
|
||||
}
|
||||
|
||||
void
|
||||
ircd::cl::handle_event(cl_event event,
|
||||
cl_int status,
|
||||
void *const priv)
|
||||
noexcept
|
||||
{
|
||||
assert(status == CL_COMPLETE);
|
||||
|
||||
// Prepare response to the main thread. The event pointer is anulled to
|
||||
// indicate completion.
|
||||
pkg ours;
|
||||
ours.event = nullptr;
|
||||
ours.ctx = reinterpret_cast<ctx::ctx *>(priv);
|
||||
|
||||
// Expect the main thread to not have changed anything in the pacakge
|
||||
// during our operation. The context should be nicely suspended this whole
|
||||
// time. In case the context anulls the package to abandon the result this
|
||||
// transaction will fail and we'll do nothing further from here.
|
||||
//
|
||||
// Note at this time abandonment is not supported since we're missing
|
||||
// another phase indicating a commitment by this thread to notify. Without
|
||||
// any obligation by the main thread to such a commitment, the *ctx may
|
||||
// be invalid by the time this thread notifies.
|
||||
pkg theirs;
|
||||
theirs.ctx = ours.ctx;
|
||||
theirs.event = event;
|
||||
|
||||
constexpr auto
|
||||
success(std::memory_order_acq_rel),
|
||||
failure(std::memory_order_relaxed);
|
||||
if(unlikely(!offload_pkg.compare_exchange_strong(theirs, ours, success, failure)))
|
||||
return;
|
||||
|
||||
if(unlikely(!theirs.ctx))
|
||||
return;
|
||||
|
||||
// Wakeup the ctx for the result. This is a special notify meant to
|
||||
// originate from external threads.
|
||||
ctx::notify(*theirs.ctx, ctx::threadsafe);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::cl::handle_incomplete(work &work,
|
||||
const int &status)
|
||||
{
|
||||
// Expose the datagram to the other thread. On supported architectures
|
||||
// we can pass 16 bytes atomically, or two pointers. One for the event
|
||||
// (input) and the other for the context to notify (output).
|
||||
pkg theirs, ours;
|
||||
ours.event = cl_event(work.handle);
|
||||
ours.ctx = ctx::current;
|
||||
|
||||
constexpr auto
|
||||
success(std::memory_order_acq_rel),
|
||||
failure(std::memory_order_relaxed);
|
||||
if(unlikely(!offload_pkg.compare_exchange_strong(theirs, ours, success, failure)))
|
||||
{
|
||||
always_assert(false); // Conflict from two ircd::ctx.
|
||||
return;
|
||||
}
|
||||
|
||||
// When finished here the package is cleared immediately without condition.
|
||||
// It's probably unsafe to clear the package with zero cooperation from the
|
||||
// other thread, so we disable interrupts here for now; the other thread
|
||||
// must respond.
|
||||
const ctx::uninterruptible::nothrow ui;
|
||||
const unwind unset{[]
|
||||
{
|
||||
offload_pkg.store(pkg{}, std::memory_order_release);
|
||||
}};
|
||||
|
||||
// Send a "naked wakeup" to the other thread because we're not taking a
|
||||
// lock on this condition variable for the notify. To accomplish this we
|
||||
// fulfill two obligations on this side:
|
||||
// 1. Ensure every package is always followed by a notify after it.
|
||||
// 2. Ensure notify is followed by same release-semantics as an unlock().
|
||||
offload_cond.notify_one();
|
||||
std::atomic_thread_fence(std::memory_order_release); do
|
||||
{
|
||||
// Unconditionally suspend this context. The other thread will have
|
||||
// to respond with at least one notification preceded by a result.
|
||||
ctx::wait();
|
||||
theirs = offload_pkg.load(std::memory_order_acquire);
|
||||
}
|
||||
while(theirs.event != nullptr);
|
||||
assert(theirs.ctx == ctx::current);
|
||||
}
|
||||
|
||||
//
|
||||
|
@ -700,34 +934,24 @@ ircd::cl::work::work(void *const &handle)
|
|||
ircd::cl::work::~work()
|
||||
noexcept try
|
||||
{
|
||||
const auto &handle
|
||||
if(!this->handle)
|
||||
return;
|
||||
|
||||
const auto handle
|
||||
{
|
||||
cl_event(this->handle)
|
||||
};
|
||||
|
||||
if(likely(handle))
|
||||
char status_buf[8] {0};
|
||||
const auto &status
|
||||
{
|
||||
struct handle_event_data hdata;
|
||||
call(clSetEventCallback, handle, CL_COMPLETE, &cl::handle_event, &hdata);
|
||||
info<int>(clGetEventInfo, handle, CL_EVENT_COMMAND_EXECUTION_STATUS, status_buf)
|
||||
};
|
||||
|
||||
char status_buf[8] {0};
|
||||
const auto &status
|
||||
{
|
||||
info<int>(clGetEventInfo, handle, CL_EVENT_COMMAND_EXECUTION_STATUS, status_buf)
|
||||
};
|
||||
if(status != CL_COMPLETE)
|
||||
handle_incomplete(*this, status);
|
||||
|
||||
if(status != CL_COMPLETE)
|
||||
{
|
||||
const ctx::uninterruptible::nothrow ui;
|
||||
while(hdata.c)
|
||||
{
|
||||
ctx::wait();
|
||||
std::atomic_thread_fence(std::memory_order_acquire);
|
||||
}
|
||||
}
|
||||
|
||||
call(clReleaseEvent, cl_event(handle));
|
||||
}
|
||||
call(clReleaseEvent, cl_event(handle));
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
|
@ -759,29 +983,6 @@ const
|
|||
};
|
||||
}
|
||||
|
||||
void
|
||||
ircd::cl::handle_event(cl_event event,
|
||||
cl_int status,
|
||||
void *const priv)
|
||||
noexcept
|
||||
{
|
||||
auto hdata
|
||||
{
|
||||
reinterpret_cast<handle_event_data *>(priv)
|
||||
};
|
||||
|
||||
const auto c
|
||||
{
|
||||
std::exchange(hdata->c, nullptr)
|
||||
};
|
||||
|
||||
if(likely(c == ctx::current))
|
||||
return;
|
||||
|
||||
ctx::notify(*c);
|
||||
std::atomic_thread_fence(std::memory_order_release);
|
||||
}
|
||||
|
||||
//
|
||||
// callback surface
|
||||
//
|
||||
|
|
|
@ -364,10 +364,10 @@ noexcept try
|
|||
// also occur in ircd::init() or static initialization itself if either are
|
||||
// more appropriate.
|
||||
|
||||
ctx::ole::init _ole_; // Thread OffLoad Engine
|
||||
fs::init _fs_; // Local filesystem
|
||||
cl::init _cl_; // OpenCL
|
||||
magic::init _magic_; // libmagic
|
||||
ctx::ole::init _ole_; // Thread OffLoad Engine
|
||||
magick::init _magick_; // ImageMagick
|
||||
openssl::init _ossl_; // openssl crypto
|
||||
net::init _net_; // Networking
|
||||
|
|
Loading…
Reference in a new issue