0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-27 07:54:05 +01:00

ircd::db: Move prefetcher to header; reorg definitions; minor reorg.

This commit is contained in:
Jason Volk 2019-09-18 15:15:02 -07:00
parent 362122c951
commit 35d39a7d70
4 changed files with 491 additions and 476 deletions

View file

@ -56,6 +56,7 @@ namespace ircd::db
#include "row.h"
#include "json.h"
#include "txn.h"
#include "prefetcher.h"
#include "stats.h"
//

View file

@ -0,0 +1,57 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2019 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_DB_PREFETCHER_H
namespace ircd::db
{
struct prefetcher extern *prefetcher;
}
struct ircd::db::prefetcher
{
struct request;
using closure = std::function<bool (request &)>;
ctx::dock dock;
std::deque<request> queue;
ctx::context context;
size_t handles {0};
size_t request_workers {0};
size_t request_counter {0};
size_t handles_counter {0};
size_t fetched_counter {0};
size_t cancels_counter {0};
size_t wait_pending();
void request_handle(request &);
void request_worker();
void handle();
void worker();
public:
size_t cancel(const closure &);
size_t cancel(database &); // Cancel all for db
size_t cancel(column &); // Cancel all for column
bool operator()(column &, const string_view &key, const gopts &);
prefetcher();
~prefetcher() noexcept;
};
struct ircd::db::prefetcher::request
{
std::string key;
database *d {nullptr};
steady_point start;
uint32_t cid {0};
};

View file

@ -2358,175 +2358,6 @@ catch(const std::exception &e)
return false;
}
///////////////////////////////////////////////////////////////////////////////
//
// db/stats.h
//
std::string
ircd::db::string(const rocksdb::IOStatsContext &ic,
const bool &all)
{
const bool exclude_zeros(!all);
return ic.ToString(exclude_zeros);
}
const rocksdb::IOStatsContext &
ircd::db::iostats_current()
{
const auto *const &ret
{
rocksdb::get_iostats_context()
};
if(unlikely(!ret))
throw error
{
"IO counters are not available on this thread."
};
return *ret;
}
std::string
ircd::db::string(const rocksdb::PerfContext &pc,
const bool &all)
{
const bool exclude_zeros(!all);
return pc.ToString(exclude_zeros);
}
const rocksdb::PerfContext &
ircd::db::perf_current()
{
const auto *const &ret
{
rocksdb::get_perf_context()
};
if(unlikely(!ret))
throw error
{
"Performance counters are not available on this thread."
};
return *ret;
}
void
ircd::db::perf_level(const uint &level)
{
if(level >= rocksdb::PerfLevel::kOutOfBounds)
throw error
{
"Perf level of '%u' is invalid; maximum is '%u'",
level,
uint(rocksdb::PerfLevel::kOutOfBounds)
};
rocksdb::SetPerfLevel(rocksdb::PerfLevel(level));
}
uint
ircd::db::perf_level()
{
return rocksdb::GetPerfLevel();
}
//
// ticker
//
uint64_t
ircd::db::ticker(const database &d,
const string_view &key)
{
return ticker(d, ticker_id(key));
}
uint64_t
ircd::db::ticker(const database &d,
const uint32_t &id)
{
return d.stats->getTickerCount(id);
}
uint32_t
ircd::db::ticker_id(const string_view &key)
{
for(const auto &pair : rocksdb::TickersNameMap)
if(key == pair.second)
return pair.first;
throw std::out_of_range
{
"No ticker with that key"
};
}
ircd::string_view
ircd::db::ticker_id(const uint32_t &id)
{
for(const auto &pair : rocksdb::TickersNameMap)
if(id == pair.first)
return pair.second;
return {};
}
decltype(ircd::db::ticker_max)
ircd::db::ticker_max
{
rocksdb::TICKER_ENUM_MAX
};
//
// histogram
//
const struct ircd::db::histogram &
ircd::db::histogram(const database &d,
const string_view &key)
{
return histogram(d, histogram_id(key));
}
const struct ircd::db::histogram &
ircd::db::histogram(const database &d,
const uint32_t &id)
{
return d.stats->histogram.at(id);
}
uint32_t
ircd::db::histogram_id(const string_view &key)
{
for(const auto &pair : rocksdb::HistogramsNameMap)
if(key == pair.second)
return pair.first;
throw std::out_of_range
{
"No histogram with that key"
};
}
ircd::string_view
ircd::db::histogram_id(const uint32_t &id)
{
for(const auto &pair : rocksdb::HistogramsNameMap)
if(id == pair.first)
return pair.second;
return {};
}
decltype(ircd::db::histogram_max)
ircd::db::histogram_max
{
rocksdb::HISTOGRAM_ENUM_MAX
};
///////////////////////////////////////////////////////////////////////////////
//
// database::stats (db/database/stats.h) internal
@ -3808,6 +3639,439 @@ ircd::db::database::wal::info::operator=(const rocksdb::LogFile &lf)
return *this;
}
///////////////////////////////////////////////////////////////////////////////
//
// db/stats.h
//
std::string
ircd::db::string(const rocksdb::IOStatsContext &ic,
const bool &all)
{
const bool exclude_zeros(!all);
return ic.ToString(exclude_zeros);
}
const rocksdb::IOStatsContext &
ircd::db::iostats_current()
{
const auto *const &ret
{
rocksdb::get_iostats_context()
};
if(unlikely(!ret))
throw error
{
"IO counters are not available on this thread."
};
return *ret;
}
std::string
ircd::db::string(const rocksdb::PerfContext &pc,
const bool &all)
{
const bool exclude_zeros(!all);
return pc.ToString(exclude_zeros);
}
const rocksdb::PerfContext &
ircd::db::perf_current()
{
const auto *const &ret
{
rocksdb::get_perf_context()
};
if(unlikely(!ret))
throw error
{
"Performance counters are not available on this thread."
};
return *ret;
}
void
ircd::db::perf_level(const uint &level)
{
if(level >= rocksdb::PerfLevel::kOutOfBounds)
throw error
{
"Perf level of '%u' is invalid; maximum is '%u'",
level,
uint(rocksdb::PerfLevel::kOutOfBounds)
};
rocksdb::SetPerfLevel(rocksdb::PerfLevel(level));
}
uint
ircd::db::perf_level()
{
return rocksdb::GetPerfLevel();
}
//
// ticker
//
uint64_t
ircd::db::ticker(const database &d,
const string_view &key)
{
return ticker(d, ticker_id(key));
}
uint64_t
ircd::db::ticker(const database &d,
const uint32_t &id)
{
return d.stats->getTickerCount(id);
}
uint32_t
ircd::db::ticker_id(const string_view &key)
{
for(const auto &pair : rocksdb::TickersNameMap)
if(key == pair.second)
return pair.first;
throw std::out_of_range
{
"No ticker with that key"
};
}
ircd::string_view
ircd::db::ticker_id(const uint32_t &id)
{
for(const auto &pair : rocksdb::TickersNameMap)
if(id == pair.first)
return pair.second;
return {};
}
decltype(ircd::db::ticker_max)
ircd::db::ticker_max
{
rocksdb::TICKER_ENUM_MAX
};
//
// histogram
//
const struct ircd::db::histogram &
ircd::db::histogram(const database &d,
const string_view &key)
{
return histogram(d, histogram_id(key));
}
const struct ircd::db::histogram &
ircd::db::histogram(const database &d,
const uint32_t &id)
{
return d.stats->histogram.at(id);
}
uint32_t
ircd::db::histogram_id(const string_view &key)
{
for(const auto &pair : rocksdb::HistogramsNameMap)
if(key == pair.second)
return pair.first;
throw std::out_of_range
{
"No histogram with that key"
};
}
ircd::string_view
ircd::db::histogram_id(const uint32_t &id)
{
for(const auto &pair : rocksdb::HistogramsNameMap)
if(id == pair.first)
return pair.second;
return {};
}
decltype(ircd::db::histogram_max)
ircd::db::histogram_max
{
rocksdb::HISTOGRAM_ENUM_MAX
};
///////////////////////////////////////////////////////////////////////////////
//
// db/prefetcher.h
//
decltype(ircd::db::prefetcher)
ircd::db::prefetcher;
//
// db::prefetcher
//
ircd::db::prefetcher::prefetcher()
:context
{
"db.prefetcher",
128_KiB,
context::POST,
std::bind(&prefetcher::worker, this)
}
{
}
ircd::db::prefetcher::~prefetcher()
noexcept
{
while(!queue.empty())
{
log::warning
{
log, "Prefetcher waiting for %zu requests to clear...",
queue.size(),
};
dock.wait_for(seconds(5), [this]
{
return queue.empty();
});
}
assert(queue.empty());
}
bool
ircd::db::prefetcher::operator()(column &c,
const string_view &key,
const gopts &opts)
{
auto &d
{
static_cast<database &>(c)
};
const auto start
{
#ifdef IRCD_DB_DEBUG_PREFETCH
now<steady_point>()
#else
steady_point{}
#endif
};
queue.emplace_back(request
{
key, &d, now<steady_point>(), db::id(c)
});
++request_counter;
dock.notify_one();
return true;
}
size_t
ircd::db::prefetcher::cancel(column &c)
{
return cancel([&c]
(const auto &request)
{
return request.cid == id(c);
});
}
size_t
ircd::db::prefetcher::cancel(database &d)
{
return cancel([&d]
(const auto &request)
{
return request.d == std::addressof(d);
});
}
size_t
ircd::db::prefetcher::cancel(const closure &closure)
{
const auto e
{
std::remove_if(begin(queue), end(queue), closure)
};
const ssize_t remain
{
std::distance(begin(queue), e)
};
assert(remain >= 0);
const ssize_t canceled
{
ssize_t(queue.size()) - remain
};
assert(canceled >= 0);
queue.resize(remain);
cancels_counter += canceled;
if(canceled)
dock.notify_all();
return canceled;
}
void
ircd::db::prefetcher::worker()
try
{
while(1)
{
dock.wait([this]
{
return !queue.empty() && request_counter > handles_counter;
});
handle();
}
}
catch(const std::exception &e)
{
log::critical
{
log, "prefetcher worker: %s",
e.what()
};
}
void
ircd::db::prefetcher::handle()
{
const scope_count handles
{
this->handles
};
auto handler
{
std::bind(&prefetcher::request_worker, this)
};
db::request(std::move(handler));
++handles_counter;
}
void
ircd::db::prefetcher::request_worker()
{
const ctx::scope_notify notify
{
this->dock
};
const scope_count request_workers
{
this->request_workers
};
if(unlikely(queue.empty()))
return;
auto request
{
std::move(queue.front())
};
queue.pop_front();
request_handle(request);
++fetched_counter;
}
void
ircd::db::prefetcher::request_handle(request &request)
try
{
assert(request.d);
db::column column
{
(*request.d)[request.cid]
};
const bool has
{
db::has(column, request.key)
};
#ifdef IRCD_DB_DEBUG_PREFETCH
char pbuf[32];
log::debug
{
log, "[%s][%s] completed fetch:%b queue:%zu h:%zu rw:%zu rc:%zu hc:%zu fc:%zu cc:%zu in %s",
name(*request.d),
name(column),
has? false : true,
queue.size(),
this->handles,
this->request_workers,
this->request_counter,
this->handles_counter,
this->fetched_counter,
this->cancels_counter,
pretty(pbuf, now<steady_point>() - request.start, 1),
};
#endif
}
catch(const not_found &e)
{
assert(request.d);
log::dwarning
{
log, "[%s][%u] :%s",
name(*request.d),
request.cid,
e.what(),
};
}
catch(const std::exception &e)
{
assert(request.d);
log::error
{
log, "[%s][%u] :%s",
name(*request.d),
request.cid,
e.what(),
};
}
size_t
ircd::db::prefetcher::wait_pending()
{
const size_t fetched_counter
{
this->fetched_counter
};
const size_t fetched_target
{
fetched_counter + request_workers
};
dock.wait([this, &fetched_target]
{
return this->fetched_counter >= fetched_target;
});
assert(fetched_target >= fetched_counter);
return fetched_target - fetched_counter;
}
///////////////////////////////////////////////////////////////////////////////
//
@ -5852,267 +6116,6 @@ ircd::db::prefetch(column &column,
return (*prefetcher)(column, key, gopts);
}
//
// db::prefetcher
//
decltype(ircd::db::prefetcher)
ircd::db::prefetcher;
ircd::db::prefetcher::prefetcher()
:context
{
"db.prefetcher",
128_KiB,
context::POST,
std::bind(&prefetcher::worker, this)
}
{
}
ircd::db::prefetcher::~prefetcher()
noexcept
{
while(!queue.empty())
{
log::warning
{
log, "Prefetcher waiting for %zu requests to clear...",
queue.size(),
};
dock.wait_for(seconds(5), [this]
{
return queue.empty();
});
}
assert(queue.empty());
}
bool
ircd::db::prefetcher::operator()(column &c,
const string_view &key,
const gopts &opts)
{
auto &d
{
static_cast<database &>(c)
};
const auto start
{
#ifdef IRCD_DB_DEBUG_PREFETCH
now<steady_point>()
#else
steady_point{}
#endif
};
queue.emplace_back(request
{
key, &d, now<steady_point>(), db::id(c)
});
++request_counter;
dock.notify_one();
return true;
}
size_t
ircd::db::prefetcher::cancel(column &c)
{
return cancel([&c]
(const auto &request)
{
return request.cid == id(c);
});
}
size_t
ircd::db::prefetcher::cancel(database &d)
{
return cancel([&d]
(const auto &request)
{
return request.d == std::addressof(d);
});
}
size_t
ircd::db::prefetcher::cancel(const closure &closure)
{
const auto e
{
std::remove_if(begin(queue), end(queue), closure)
};
const ssize_t remain
{
std::distance(begin(queue), e)
};
assert(remain >= 0);
const ssize_t canceled
{
ssize_t(queue.size()) - remain
};
assert(canceled >= 0);
queue.resize(remain);
cancels_counter += canceled;
if(canceled)
dock.notify_all();
return canceled;
}
void
ircd::db::prefetcher::worker()
try
{
while(1)
{
dock.wait([this]
{
return !queue.empty() && request_counter > handles_counter;
});
handle();
}
}
catch(const std::exception &e)
{
log::critical
{
log, "prefetcher worker: %s",
e.what()
};
}
void
ircd::db::prefetcher::handle()
{
const scope_count handles
{
this->handles
};
auto handler
{
std::bind(&prefetcher::request_worker, this)
};
db::request(std::move(handler));
++handles_counter;
}
void
ircd::db::prefetcher::request_worker()
{
const ctx::scope_notify notify
{
this->dock
};
const scope_count request_workers
{
this->request_workers
};
if(unlikely(queue.empty()))
return;
auto request
{
std::move(queue.front())
};
queue.pop_front();
request_handle(request);
++fetched_counter;
}
void
ircd::db::prefetcher::request_handle(request &request)
try
{
assert(request.d);
db::column column
{
(*request.d)[request.cid]
};
const bool has
{
db::has(column, request.key)
};
#ifdef IRCD_DB_DEBUG_PREFETCH
char pbuf[32];
log::debug
{
log, "[%s][%s] completed fetch:%b queue:%zu h:%zu rw:%zu rc:%zu hc:%zu fc:%zu cc:%zu in %s",
name(*request.d),
name(column),
has? false : true,
queue.size(),
this->handles,
this->request_workers,
this->request_counter,
this->handles_counter,
this->fetched_counter,
this->cancels_counter,
pretty(pbuf, now<steady_point>() - request.start, 1),
};
#endif
}
catch(const not_found &e)
{
assert(request.d);
log::dwarning
{
log, "[%s][%u] :%s",
name(*request.d),
request.cid,
e.what(),
};
}
catch(const std::exception &e)
{
assert(request.d);
log::error
{
log, "[%s][%u] :%s",
name(*request.d),
request.cid,
e.what(),
};
}
size_t
ircd::db::prefetcher::wait_pending()
{
const size_t fetched_counter
{
this->fetched_counter
};
const size_t fetched_target
{
fetched_counter + request_workers
};
dock.wait([this, &fetched_target]
{
return this->fetched_counter >= fetched_target;
});
assert(fetched_target >= fetched_counter);
return fetched_target - fetched_counter;
}
//
// db::cached
//

View file

@ -56,7 +56,6 @@ namespace ircd::db
{
struct throw_on_error;
struct error_to_status;
struct prefetcher;
constexpr const auto BLOCKING { rocksdb::ReadTier::kReadAllTier };
constexpr const auto NON_BLOCKING { rocksdb::ReadTier::kBlockCacheTier };
@ -68,7 +67,6 @@ namespace ircd::db
extern ctx::pool::opts request_pool_opts;
extern ctx::pool request;
extern ctx::mutex write_mutex;
extern db::prefetcher *prefetcher;
// reflections
string_view reflect(const rocksdb::Status::Severity &);
@ -187,47 +185,3 @@ struct ircd::db::txn::handler
,cb{cb}
{}
};
//
// prefetcher
//
struct ircd::db::prefetcher
{
struct request;
using closure = std::function<bool (request &)>;
ctx::dock dock;
std::deque<request> queue;
ctx::context context;
size_t handles {0};
size_t request_workers {0};
size_t request_counter {0};
size_t handles_counter {0};
size_t fetched_counter {0};
size_t cancels_counter {0};
size_t wait_pending();
void request_handle(request &);
void request_worker();
void handle();
void worker();
public:
size_t cancel(const closure &);
size_t cancel(database &); // Cancel all for db
size_t cancel(column &); // Cancel all for column
bool operator()(column &, const string_view &key, const gopts &);
prefetcher();
~prefetcher() noexcept;
};
struct ircd::db::prefetcher::request
{
std::string key;
database *d {nullptr};
steady_point start;
uint32_t cid {0};
};