0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-27 07:54:05 +01:00

ircd::db: Add direct-dispatch if request worker available; various optimizations.

This commit is contained in:
Jason Volk 2019-09-18 16:28:51 -07:00
parent e2ed860c04
commit 9d2e506253
2 changed files with 203 additions and 73 deletions

View file

@ -24,15 +24,19 @@ struct ircd::db::prefetcher
ctx::dock dock; ctx::dock dock;
std::deque<request> queue; std::deque<request> queue;
ctx::context context; ctx::context context;
size_t handles {0};
size_t request_workers {0}; size_t request_workers {0};
size_t request_counter {0}; size_t request_counter {0};
size_t directs_counter {0};
size_t handles_counter {0}; size_t handles_counter {0};
size_t handled_counter {0};
size_t fetches_counter {0};
size_t fetched_counter {0}; size_t fetched_counter {0};
size_t cancels_counter {0}; size_t cancels_counter {0};
size_t cache_hits {0};
size_t wait_pending(); size_t wait_pending();
void request_handle(request &); void request_handle(request &);
size_t request_cleanup() noexcept;
void request_worker(); void request_worker();
void handle(); void handle();
void worker(); void worker();
@ -50,8 +54,22 @@ struct ircd::db::prefetcher
struct ircd::db::prefetcher::request struct ircd::db::prefetcher::request
{ {
std::string key; database *d {nullptr}; // database instance
database *d {nullptr}; uint32_t cid {0}; // column ID
steady_point start; uint32_t len {0}; // length of key
uint32_t cid {0}; steady_point snd; // submitted by user
steady_point req; // request sent to database
steady_point fin; // result from database
char key[208] alignas(16); // key buffer
explicit operator string_view() const noexcept;
request(database &d, const column &c, const string_view &key) noexcept;
request() = default;
}; };
static_assert
(
sizeof(ircd::db::prefetcher::request) == 256,
"struct ircd::db::prefetcher::request fell out of alignment"
);

View file

@ -3860,22 +3860,30 @@ ircd::db::prefetcher::operator()(column &c,
static_cast<database &>(c) static_cast<database &>(c)
}; };
const auto start if(db::cached(c, key, opts))
{ {
#ifdef IRCD_DB_DEBUG_PREFETCH ++cache_hits;
now<steady_point>() return false;
#else }
steady_point{}
#endif
};
queue.emplace_back(request
{
key, &d, now<steady_point>(), db::id(c)
});
const ctx::critical_assertion ca;
queue.emplace_back(d, c, key);
queue.back().snd = now<steady_point>();
++request_counter; ++request_counter;
dock.notify_one();
// Branch here based on whether it's not possible to directly dispatch
// a db::request worker. If all request workers are busy we notify our own
// prefetcher worker, and then it blocks on submitting to the request
// worker instead of us blocking here. This is done to avoid use and growth
// of any request pool queue, and allow for more direct submission.
if(db::request.wouldblock())
{
dock.notify_one();
return true;
}
++directs_counter;
this->handle();
return true; return true;
} }
@ -3902,25 +3910,25 @@ ircd::db::prefetcher::cancel(database &d)
size_t size_t
ircd::db::prefetcher::cancel(const closure &closure) ircd::db::prefetcher::cancel(const closure &closure)
{ {
const auto e size_t canceled(0);
for(auto &request : queue)
{ {
std::remove_if(begin(queue), end(queue), closure) // already finished
}; if(request.fin != steady_point::min())
continue;
const ssize_t remain // in progress; can't cancel
{ if(request.req != steady_point::min())
std::distance(begin(queue), e) continue;
};
assert(remain >= 0); // allow user to accept or reject
const ssize_t canceled if(!closure(request))
{ continue;
ssize_t(queue.size()) - remain
};
assert(canceled >= 0); // cancel by precociously setting the finish time.
queue.resize(remain); request.fin = now<steady_point>();
cancels_counter += canceled; ++canceled;
}
if(canceled) if(canceled)
dock.notify_all(); dock.notify_all();
@ -3936,7 +3944,13 @@ try
{ {
dock.wait([this] dock.wait([this]
{ {
return !queue.empty() && request_counter > handles_counter; if(queue.empty())
return false;
if(request_counter <= handles_counter)
return false;
return true;
}); });
handle(); handle();
@ -3954,18 +3968,14 @@ catch(const std::exception &e)
void void
ircd::db::prefetcher::handle() ircd::db::prefetcher::handle()
{ {
const scope_count handles
{
this->handles
};
auto handler auto handler
{ {
std::bind(&prefetcher::request_worker, this) std::bind(&prefetcher::request_worker, this)
}; };
db::request(std::move(handler));
++handles_counter; ++handles_counter;
db::request(std::move(handler));
++handled_counter;
} }
void void
@ -3981,17 +3991,65 @@ ircd::db::prefetcher::request_worker()
this->request_workers this->request_workers
}; };
if(unlikely(queue.empty())) // Garbage collection of the queue invoked unconditionally on unwind.
return; const unwind cleanup_on_leave
auto request
{ {
std::move(queue.front()) std::bind(&prefetcher::request_cleanup, this)
}; };
queue.pop_front(); // GC the queue here to get rid of any cancelled requests which have
request_handle(request); // arrived at the front so they don't become our request.
const size_t cleanup_on_enter
{
request_cleanup()
};
// Find the first request in the queue which does not have its req
// timestamp sent.
auto request
{
std::find_if(begin(queue), end(queue), []
(const auto &request)
{
return request.req == steady_point::min();
})
};
if(request == end(queue))
return;
request->req = now<steady_point>();
assert(request->fin == steady_point::min());
++fetches_counter;
request_handle(*request);
++fetched_counter; ++fetched_counter;
#ifdef IRCD_DB_DEBUG_PREFETCH
log::debug
{
log, "prefetcher ch:%zu rc:%zu hc:%zu fc:%zu dc:%zu cc:%zu queue:%zu rw:%zu",
cache_hits,
request_counter,
handles_counter,
fetches_counter,
directs_counter,
cancels_counter,
queue.size(),
this->request_workers,
};
#endif
}
size_t
ircd::db::prefetcher::request_cleanup()
noexcept
{
size_t removed(0);
const ctx::critical_assertion ca;
for(; !queue.empty() && queue.front().fin != steady_point::min(); ++removed)
queue.pop_front();
return removed;
} }
void void
@ -4004,44 +4062,45 @@ try
(*request.d)[request.cid] (*request.d)[request.cid]
}; };
const bool has const string_view key
{ {
db::has(column, request.key) request
}; };
const auto it
{
seek(column, key, gopts{})
};
const bool lte
{
valid_lte(*it, key)
};
const ctx::critical_assertion ca;
request.fin = now<steady_point>();
#ifdef IRCD_DB_DEBUG_PREFETCH #ifdef IRCD_DB_DEBUG_PREFETCH
char pbuf[32]; char pbuf[3][32];
log::debug log::debug
{ {
log, "[%s][%s] completed fetch:%b queue:%zu h:%zu rw:%zu rc:%zu hc:%zu fc:%zu cc:%zu in %s", log, "[%s][%s] completed prefetch len:%zu lte:%b snd-req:%s req-fin:%s snd-fin:%s queue:%zu",
name(*request.d), name(*request.d),
name(column), name(column),
has? false : true, size(key),
lte,
pretty(pbuf[0], request.req - request.snd, 1),
pretty(pbuf[1], request.fin - request.req, 1),
pretty(pbuf[2], request.fin - request.snd, 1),
queue.size(), queue.size(),
this->handles,
this->request_workers,
this->request_counter,
this->handles_counter,
this->fetched_counter,
this->cancels_counter,
pretty(pbuf, now<steady_point>() - request.start, 1),
}; };
#endif #endif
} }
catch(const not_found &e)
{
assert(request.d);
log::dwarning
{
log, "[%s][%u] :%s",
name(*request.d),
request.cid,
e.what(),
};
}
catch(const std::exception &e) catch(const std::exception &e)
{ {
assert(request.d); assert(request.d);
request.fin = now<steady_point>();
log::error log::error
{ {
log, "[%s][%u] :%s", log, "[%s][%u] :%s",
@ -4050,6 +4109,11 @@ catch(const std::exception &e)
e.what(), e.what(),
}; };
} }
catch(...)
{
request.fin = now<steady_point>();
throw;
}
size_t size_t
ircd::db::prefetcher::wait_pending() ircd::db::prefetcher::wait_pending()
@ -4073,6 +4137,57 @@ ircd::db::prefetcher::wait_pending()
return fetched_target - fetched_counter; return fetched_target - fetched_counter;
} }
//
// prefetcher::request
//
ircd::db::prefetcher::request::request(database &d,
const column &c,
const string_view &key)
noexcept
:d
{
std::addressof(d)
}
,cid
{
db::id(c)
}
,len
{
uint32_t(std::min(size(key), sizeof(this->key)))
}
,snd
{
steady_point::min()
}
,req
{
steady_point::min()
}
,fin
{
steady_point::min()
}
{
const size_t &len
{
buffer::copy(this->key, key)
};
assert(this->len == len);
}
ircd::db::prefetcher::request::operator
ircd::string_view()
const noexcept
{
return
{
key, len
};
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// //
// db/txn.h // db/txn.h
@ -6109,9 +6224,6 @@ ircd::db::prefetch(column &column,
const string_view &key, const string_view &key,
const gopts &gopts) const gopts &gopts)
{ {
if(cached(column, key, gopts))
return false;
static construction instance static construction instance
{ {
[] { prefetcher = new struct prefetcher(); } [] { prefetcher = new struct prefetcher(); }