mirror of
https://github.com/matrix-construct/construct
synced 2024-11-29 10:12:39 +01:00
ircd::db::prefetcher: Move all counters into a dedicated structure.
This commit is contained in:
parent
99d080767f
commit
5b28f51d1e
2 changed files with 64 additions and 35 deletions
|
@ -18,25 +18,15 @@ namespace ircd::db
|
||||||
|
|
||||||
struct ircd::db::prefetcher
|
struct ircd::db::prefetcher
|
||||||
{
|
{
|
||||||
|
struct ticker;
|
||||||
struct request;
|
struct request;
|
||||||
using closure = std::function<bool (request &)>;
|
using closure = std::function<bool (request &)>;
|
||||||
|
|
||||||
ctx::dock dock;
|
ctx::dock dock;
|
||||||
std::deque<request> queue;
|
std::deque<request> queue;
|
||||||
|
std::unique_ptr<ticker> ticker;
|
||||||
ctx::context context;
|
ctx::context context;
|
||||||
size_t cache_hits {0};
|
|
||||||
size_t request_workers {0};
|
size_t request_workers {0};
|
||||||
size_t request_counter {0};
|
|
||||||
size_t directs_counter {0};
|
|
||||||
size_t handles_counter {0};
|
|
||||||
size_t handled_counter {0};
|
|
||||||
size_t fetches_counter {0};
|
|
||||||
size_t fetched_counter {0};
|
|
||||||
size_t cancels_counter {0};
|
|
||||||
size_t fetched_bytes_key {0};
|
|
||||||
size_t fetched_bytes_val {0};
|
|
||||||
microseconds total_snd_req {0us};
|
|
||||||
microseconds total_req_fin {0us};
|
|
||||||
|
|
||||||
size_t wait_pending();
|
size_t wait_pending();
|
||||||
void request_handle(request &);
|
void request_handle(request &);
|
||||||
|
@ -77,3 +67,29 @@ static_assert
|
||||||
sizeof(ircd::db::prefetcher::request) == 256,
|
sizeof(ircd::db::prefetcher::request) == 256,
|
||||||
"struct ircd::db::prefetcher::request fell out of alignment"
|
"struct ircd::db::prefetcher::request fell out of alignment"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
struct ircd::db::prefetcher::ticker
|
||||||
|
{
|
||||||
|
// montonic event counts
|
||||||
|
size_t queries {0}; ///< All incoming user requests
|
||||||
|
size_t rejects {0}; ///< Queries which were ignored; already cached
|
||||||
|
size_t request {0}; ///< Prefetcher requests added to the queue
|
||||||
|
size_t directs {0}; ///< Direct dispatches to db::request pool
|
||||||
|
size_t handles {0}; ///< Incremented before dispatch to db::request
|
||||||
|
size_t handled {0}; ///< Incremented after dispatch to db::request
|
||||||
|
size_t fetches {0}; ///< Incremented before actual database operation
|
||||||
|
size_t fetched {0}; ///< Incremented after actual database operation
|
||||||
|
size_t cancels {0}; ///< Count of canceled operations
|
||||||
|
|
||||||
|
// throughput totals
|
||||||
|
size_t fetched_bytes_key {0}; ///< Total bytes of key data received
|
||||||
|
size_t fetched_bytes_val {0}; ///< Total bytes of value data received
|
||||||
|
|
||||||
|
// from last operation only
|
||||||
|
microseconds last_snd_req {0us}; ///< duration request was queued here
|
||||||
|
microseconds last_req_fin {0us}; ///< duration for database operation
|
||||||
|
|
||||||
|
// accumulated latency totals
|
||||||
|
microseconds accum_snd_req {0us};
|
||||||
|
microseconds accum_req_fin {0us};
|
||||||
|
};
|
||||||
|
|
59
ircd/db.cc
59
ircd/db.cc
|
@ -3820,7 +3820,11 @@ ircd::db::prefetcher;
|
||||||
// db::prefetcher
|
// db::prefetcher
|
||||||
//
|
//
|
||||||
ircd::db::prefetcher::prefetcher()
|
ircd::db::prefetcher::prefetcher()
|
||||||
:context
|
:ticker
|
||||||
|
{
|
||||||
|
std::make_unique<struct ticker>()
|
||||||
|
}
|
||||||
|
,context
|
||||||
{
|
{
|
||||||
"db.prefetcher",
|
"db.prefetcher",
|
||||||
128_KiB,
|
128_KiB,
|
||||||
|
@ -3860,15 +3864,17 @@ ircd::db::prefetcher::operator()(column &c,
|
||||||
static_cast<database &>(c)
|
static_cast<database &>(c)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
assert(ticker);
|
||||||
|
ticker->queries++;
|
||||||
if(db::cached(c, key, opts))
|
if(db::cached(c, key, opts))
|
||||||
{
|
{
|
||||||
++cache_hits;
|
ticker->rejects++;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.emplace_back(d, c, key);
|
queue.emplace_back(d, c, key);
|
||||||
queue.back().snd = now<steady_point>();
|
queue.back().snd = now<steady_point>();
|
||||||
++request_counter;
|
ticker->request++;
|
||||||
|
|
||||||
// Branch here based on whether it's not possible to directly dispatch
|
// Branch here based on whether it's not possible to directly dispatch
|
||||||
// a db::request worker. If all request workers are busy we notify our own
|
// a db::request worker. If all request workers are busy we notify our own
|
||||||
|
@ -3891,7 +3897,7 @@ ircd::db::prefetcher::operator()(column &c,
|
||||||
}
|
}
|
||||||
|
|
||||||
const ctx::critical_assertion ca;
|
const ctx::critical_assertion ca;
|
||||||
++directs_counter;
|
ticker->directs++;
|
||||||
this->handle();
|
this->handle();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -3942,6 +3948,8 @@ ircd::db::prefetcher::cancel(const closure &closure)
|
||||||
if(canceled)
|
if(canceled)
|
||||||
dock.notify_all();
|
dock.notify_all();
|
||||||
|
|
||||||
|
assert(ticker);
|
||||||
|
ticker->cancels += canceled;
|
||||||
return canceled;
|
return canceled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3956,7 +3964,8 @@ try
|
||||||
if(queue.empty())
|
if(queue.empty())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if(request_counter <= handles_counter)
|
assert(ticker);
|
||||||
|
if(ticker->request <= ticker->handles)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -3982,9 +3991,9 @@ ircd::db::prefetcher::handle()
|
||||||
std::bind(&prefetcher::request_worker, this)
|
std::bind(&prefetcher::request_worker, this)
|
||||||
};
|
};
|
||||||
|
|
||||||
++handles_counter;
|
ticker->handles++;
|
||||||
db::request(std::move(handler));
|
db::request(std::move(handler));
|
||||||
++handled_counter;
|
ticker->handled++;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -4027,24 +4036,27 @@ ircd::db::prefetcher::request_worker()
|
||||||
if(request == end(queue))
|
if(request == end(queue))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
request->req = now<steady_point>();
|
assert(ticker);
|
||||||
assert(request->fin == steady_point::min());
|
assert(request->fin == steady_point::min());
|
||||||
total_snd_req += duration_cast<microseconds>(request->req - request->snd);
|
request->req = now<steady_point>();
|
||||||
++fetches_counter;
|
ticker->last_snd_req = duration_cast<microseconds>(request->req - request->snd);
|
||||||
|
ticker->accum_snd_req += ticker->last_snd_req;
|
||||||
|
|
||||||
|
ticker->fetches++;
|
||||||
request_handle(*request);
|
request_handle(*request);
|
||||||
assert(request->fin != steady_point::min());
|
assert(request->fin != steady_point::min());
|
||||||
++fetched_counter;
|
ticker->fetched++;
|
||||||
|
|
||||||
#ifdef IRCD_DB_DEBUG_PREFETCH
|
#ifdef IRCD_DB_DEBUG_PREFETCH
|
||||||
log::debug
|
log::debug
|
||||||
{
|
{
|
||||||
log, "prefetcher ch:%zu rc:%zu hc:%zu fc:%zu dc:%zu cc:%zu queue:%zu rw:%zu",
|
log, "prefetcher reject:%zu request:%zu handle:%zu fetch:%zu direct:%zu cancel:%zu queue:%zu rw:%zu",
|
||||||
cache_hits,
|
ticker->rejects,
|
||||||
request_counter,
|
ticker->request,
|
||||||
handles_counter,
|
ticker->handles,
|
||||||
fetches_counter,
|
ticker->fetches,
|
||||||
directs_counter,
|
ticker->directs,
|
||||||
cancels_counter,
|
ticker->cancels,
|
||||||
queue.size(),
|
queue.size(),
|
||||||
this->request_workers,
|
this->request_workers,
|
||||||
};
|
};
|
||||||
|
@ -4085,7 +4097,8 @@ try
|
||||||
|
|
||||||
const ctx::critical_assertion ca;
|
const ctx::critical_assertion ca;
|
||||||
request.fin = now<steady_point>();
|
request.fin = now<steady_point>();
|
||||||
total_req_fin += duration_cast<microseconds>(request.fin - request.req);
|
ticker->last_req_fin = duration_cast<microseconds>(request.fin - request.req);
|
||||||
|
ticker->accum_req_fin += ticker->last_req_fin;
|
||||||
const bool lte
|
const bool lte
|
||||||
{
|
{
|
||||||
valid_lte(*it, key)
|
valid_lte(*it, key)
|
||||||
|
@ -4093,8 +4106,8 @@ try
|
||||||
|
|
||||||
if(likely(lte))
|
if(likely(lte))
|
||||||
{
|
{
|
||||||
fetched_bytes_key += size(it->key());
|
ticker->fetched_bytes_key += size(it->key());
|
||||||
fetched_bytes_val += size(it->value());
|
ticker->fetched_bytes_val += size(it->value());
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef IRCD_DB_DEBUG_PREFETCH
|
#ifdef IRCD_DB_DEBUG_PREFETCH
|
||||||
|
@ -4139,7 +4152,7 @@ ircd::db::prefetcher::wait_pending()
|
||||||
{
|
{
|
||||||
const size_t fetched_counter
|
const size_t fetched_counter
|
||||||
{
|
{
|
||||||
this->fetched_counter
|
ticker->fetched
|
||||||
};
|
};
|
||||||
|
|
||||||
const size_t fetched_target
|
const size_t fetched_target
|
||||||
|
@ -4149,7 +4162,7 @@ ircd::db::prefetcher::wait_pending()
|
||||||
|
|
||||||
dock.wait([this, &fetched_target]
|
dock.wait([this, &fetched_target]
|
||||||
{
|
{
|
||||||
return this->fetched_counter >= fetched_target;
|
return this->ticker->fetched >= fetched_target;
|
||||||
});
|
});
|
||||||
|
|
||||||
assert(fetched_target >= fetched_counter);
|
assert(fetched_target >= fetched_counter);
|
||||||
|
|
Loading…
Reference in a new issue