0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 10:12:39 +01:00

ircd::db: Enable port; minimal context pool state.

This commit is contained in:
Jason Volk 2018-08-16 04:48:21 -07:00
parent 26a856acb8
commit 6db0659c06
3 changed files with 183 additions and 52 deletions

View file

@ -13,8 +13,37 @@
struct ircd::db::database::env::state
{
struct task;
/// Backreference to database
database &d;
state(database *const &d);
~state() noexcept;
/// Convenience alias of the number of pools
static constexpr const size_t POOLS
{
rocksdb::Env::Priority::TOTAL
};
/// Track of background tasks.
std::array<std::deque<task>, POOLS> tasks;
/// The background task pools.
std::array<ctx::pool, POOLS> pool
{{
// name of pool stack size initial workers
{ "bottom", 128_KiB, 1 },
{ "low", 128_KiB, 1 },
{ "high", 128_KiB, 1 },
}};
state(database *const &d)
:d{*d}
{}
};
struct ircd::db::database::env::state::task
{
void (*func)(void *arg);
void (*cancel)(void *arg);
void *arg;
};

View file

@ -46,6 +46,7 @@
#include <ircd/db/database/env/file_lock.h>
#include <ircd/db/database/env/state.h>
#define IRCD_DB_PORT
// RocksDB port linktime-overriding interfaces (experimental).
#ifdef IRCD_DB_PORT
@ -2230,31 +2231,88 @@ ircd::db::database::env::Schedule(void (*f)(void* arg),
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log.debug("'%s': schedule func:%p a:%p tag:%p u:%p prio:%s",
d.name,
f,
a,
tag,
u,
reflect(prio));
log::debug
{
log, "'%s': schedule func:%p a:%p tag:%p u:%p prio:%s",
d.name,
f,
a,
tag,
u,
reflect(prio)
};
#endif
return defaults.Schedule(f, a, prio, tag, u);
assert(st);
auto &pool
{
st->pool.at(prio)
};
auto &tasks
{
st->tasks.at(prio)
};
tasks.emplace_back(state::task
{
f, u, a
});
pool([this, &tasks]
{
assert(this->st);
if(tasks.empty())
return;
const auto task
{
std::move(tasks.front())
};
tasks.pop_front();
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': func:%p arg:%p",
this->d.name,
task.func,
task.arg,
};
#endif
// Execute the task
task.func(task.arg);
});
}
int
ircd::db::database::env::UnSchedule(void* tag,
Priority pri)
ircd::db::database::env::UnSchedule(void *const tag,
const Priority prio)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log.debug("'%s': unschedule tag:%p prio:%s",
d.name,
tag,
reflect(pri));
log::debug
{
log, "'%s': unschedule tag:%p prio:%s",
d.name,
tag,
reflect(prio)
};
#endif
return defaults.UnSchedule(tag, pri);
assert(st);
auto &tasks
{
st->tasks.at(prio)
};
size_t i(0);
for(auto it(begin(tasks)); it != end(tasks); it = tasks.erase(it), ++i)
it->cancel(it->arg);
return i;
}
void
@ -2263,12 +2321,16 @@ ircd::db::database::env::StartThread(void (*f)(void*),
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log.debug("'%s': start thread func:%p a:%p",
d.name,
f,
a);
log::debug
{
log, "'%s': start thread func:%p a:%p",
d.name,
f,
a
};
#endif
assert(0);
return defaults.StartThread(f, a);
}
@ -2288,46 +2350,81 @@ noexcept
}
unsigned int
ircd::db::database::env::GetThreadPoolQueueLen(Priority pri)
ircd::db::database::env::GetThreadPoolQueueLen(Priority prio)
const noexcept
{
#ifdef RB_DEBUG_DB_ENV
log.debug("'%s': get thread pool queue len prio:%s",
d.name,
reflect(pri));
log::debug
{
log, "'%s': get thread pool queue len prio:%s",
d.name,
reflect(prio)
};
#endif
return defaults.GetThreadPoolQueueLen(pri);
assert(st);
const auto &pool
{
st->pool.at(prio)
};
return pool.queued();
}
void
ircd::db::database::env::SetBackgroundThreads(int num,
Priority pri)
Priority prio)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log.debug("'%s': set background threads num:%d prio:%s",
d.name,
num,
reflect(pri));
log::debug
{
log, "'%s': set background threads num:%d prio:%s",
d.name,
num,
reflect(prio)
};
#endif
return defaults.SetBackgroundThreads(num, pri);
assert(st);
auto &pool
{
st->pool.at(prio)
};
const auto &size
{
ssize_t(pool.size())
};
if(size > num)
pool.del(size - num);
else if(size < num)
pool.add(num - size);
}
void
ircd::db::database::env::IncBackgroundThreadsIfNeeded(int num,
Priority pri)
Priority prio)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log.debug("'%s': increase background threads num:%d prio:%s",
d.name,
num,
reflect(pri));
log::debug
{
log, "'%s': increase background threads num:%d prio:%s",
d.name,
num,
reflect(prio)
};
#endif
return defaults.IncBackgroundThreadsIfNeeded(num, pri);
assert(st);
auto &pool
{
st->pool.at(prio)
};
pool.add(num);
}
void
@ -2387,11 +2484,11 @@ const noexcept
};
#endif
return defaults.GetThreadID();
return ctx::this_ctx::id();
}
int
ircd::db::database::env::GetBackgroundThreads(Priority pri)
ircd::db::database::env::GetBackgroundThreads(Priority prio)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
@ -2399,11 +2496,17 @@ noexcept
{
log, "'%s': get background threads prio:%s",
d.name,
reflect(pri)
reflect(prio)
};
#endif
return defaults.GetBackgroundThreads(pri);
assert(st);
const auto &pool
{
st->pool.at(prio)
};
return pool.size();
}
//
@ -3246,6 +3349,7 @@ rocksdb::port::CondVar::Wait()
assert(mu);
assert_main_thread();
mu->AssertHeld();
std::unique_lock<decltype(mu->mu)> l
{
mu->mu, std::adopt_lock
@ -3267,6 +3371,7 @@ rocksdb::port::CondVar::TimedWait(uint64_t abs_time_us)
assert(mu);
assert_main_thread();
mu->AssertHeld();
const std::chrono::microseconds us(abs_time_us);
const std::chrono::steady_clock::time_point tp(us);
std::unique_lock<decltype(mu->mu)> l
@ -3312,16 +3417,6 @@ rocksdb::port::CondVar::SignalAll()
// db/database/env/state.h
//
ircd::db::database::env::state::state(database *const &d)
:d{*d}
{
}
ircd::db::database::env::state::~state()
noexcept
{
}
///////////////////////////////////////////////////////////////////////////////
//
// db/txn.h

View file

@ -23,6 +23,13 @@
///
//#define RB_DEBUG_DB_SEEK
/// Uncomment or -D this #define to enable extensive log messages for the
/// experimental db environment-port implementation. This is only useful
/// for developers working on the port impl and want to debug all locking
/// and unlocking etc.
///
//#define RB_DEBUG_DB_PORT
namespace ircd::db
{
struct throw_on_error;