0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-13 16:33:53 +01:00

ircd::db: Reorg environment task pool related.

This commit is contained in:
Jason Volk 2018-12-16 16:26:33 -08:00
parent 2a23d9e6dd
commit 55023041b8
2 changed files with 194 additions and 119 deletions

View file

@ -14,25 +14,41 @@
struct ircd::db::database::env::state
{
struct task;
struct pool;
/// Backreference to database
database &d;
/// Convenience alias of the number of pools
static constexpr const size_t POOLS
{
rocksdb::Env::Priority::TOTAL
};
static conf::item<size_t> pool_stack_size;
database &d;
std::array<std::unique_ptr<pool>, POOLS> pool;
std::array<std::deque<task>, POOLS> tasks;
std::array<ctx::pool, POOLS> pool;
state(database *const &d);
state(database *const &);
~state() noexcept;
};
struct ircd::db::database::env::state::pool
{
using Priority = rocksdb::Env::Priority;
static conf::item<size_t> stack_size;
database &d;
Priority pri;
std::deque<task> tasks;
ctx::pool p;
size_t cancel(void *const &tag);
void operator()(task &&);
void wait();
void join();
pool(database &, const Priority &);
~pool() noexcept;
};
struct ircd::db::database::env::state::task
{
void (*func)(void *arg);

View file

@ -3415,6 +3415,11 @@ st{std::make_unique<state>(d)}
ircd::db::database::env::~env()
noexcept
{
log::debug
{
log, "'%s': Shutting down environment...",
d.name
};
}
rocksdb::Status
@ -4181,47 +4186,13 @@ noexcept try
assert(st);
auto &pool
{
st->pool.at(prio)
*st->pool.at(prio)
};
auto &tasks
{
st->tasks.at(prio)
};
tasks.emplace_back(state::task
pool(state::task
{
f, u, a
});
pool([this, &tasks]
{
const ctx::uninterruptible::nothrow ui;
assert(this->st);
if(tasks.empty())
return;
const auto task
{
std::move(tasks.front())
};
tasks.pop_front();
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': func:%p arg:%p",
this->d.name,
task.func,
task.arg,
};
#endif
// Execute the task
task.func(task.arg);
});
}
catch(const std::exception &e)
{
@ -4255,16 +4226,12 @@ noexcept try
#endif
assert(st);
auto &tasks
auto &pool
{
st->tasks.at(prio)
*st->pool.at(prio)
};
size_t i(0);
for(auto it(begin(tasks)); it != end(tasks); it = tasks.erase(it), ++i)
it->cancel(it->arg);
return i;
return pool.cancel(tag);
}
catch(const std::exception &e)
{
@ -4319,7 +4286,8 @@ noexcept try
assert(st);
for(auto &pool : st->pool)
pool.join();
if(pool)
pool->join();
}
catch(const std::exception &e)
{
@ -4349,10 +4317,10 @@ const noexcept try
assert(st);
const auto &pool
{
st->pool.at(prio)
*st->pool.at(prio)
};
return pool.queued();
return pool.tasks.size();
}
catch(const std::exception &e)
{
@ -4374,35 +4342,29 @@ noexcept try
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': set background threads num:%d prio:%s",
log, "'%s': set background threads prio:%s num:%d",
d.name,
num,
reflect(prio)
reflect(prio),
num
};
#endif
assert(st);
auto &pool
{
st->pool.at(prio)
*st->pool.at(prio)
};
const auto &size
{
ssize_t(pool.size())
};
if(size > num)
pool.del(size - num);
else if(size < num)
pool.add(num - size);
pool.p.set(num);
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': set background threads :%s",
log, "'%s': set background threads prio:%s num:%d :%s",
d.name,
reflect(prio),
num,
e.what()
};
}
@ -4427,10 +4389,10 @@ noexcept try
assert(st);
auto &pool
{
st->pool.at(prio)
*st->pool.at(prio)
};
pool.add(num);
pool.p.add(num);
}
catch(const std::exception &e)
{
@ -4577,10 +4539,10 @@ noexcept try
assert(st);
const auto &pool
{
st->pool.at(prio)
*st->pool.at(prio)
};
return pool.size();
return pool.p.size();
}
catch(const std::exception &e)
{
@ -7353,74 +7315,171 @@ noexcept
// db/database/env/state.h
//
decltype(ircd::db::database::env::state::pool_stack_size)
ircd::db::database::env::state::pool_stack_size
{
{ "name", "ircd.db.env.pool.stack_size" },
{ "default", long(128_KiB) },
};
//
// env::state::state
//
ircd::db::database::env::state::state(database *const &d)
:d{*d}
,pool
{{
// name of pool stack size initial workers
{ "rdb bott", size_t(pool_stack_size), 0 },
{ "rdb low", size_t(pool_stack_size), 0 },
{ "rdb high", size_t(pool_stack_size), 0 },
}}
{
for(size_t i(0); i < pool.size(); ++i)
pool.at(i) = std::make_unique<struct pool>(this->d, Priority(i));
}
ircd::db::database::env::state::~state()
noexcept
{
for(size_t i(0); i < POOLS; ++i) try
{
auto &tasks(this->tasks.at(i));
auto &pool(this->pool.at(i));
if(tasks.size() || pool.pending())
log::warning
{
log, "'%s': Waiting for tasks:%zu queued:%zu active:%zu in pool '%s'",
d.name,
tasks.size(),
pool.queued(),
pool.active(),
pool.name
};
}
pool.q.dock.wait([&tasks, &pool]
//
// state::pool
//
decltype(ircd::db::database::env::state::pool::stack_size)
ircd::db::database::env::state::pool::stack_size
{
{ "name", "ircd.db.env.pool.stack_size" },
{ "default", long(128_KiB) },
};
//
// state::pool::pool
//
ircd::db::database::env::state::pool::pool(database &d,
const Priority &pri)
:d{d}
,pri{pri}
,p
{
reflect(pri), // name of pool
size_t(stack_size), // stack size of worker
0, // initial workers
}
{
}
ircd::db::database::env::state::pool::~pool()
noexcept
{
join();
}
void
ircd::db::database::env::state::pool::join()
try
{
if(!tasks.empty() || p.pending())
log::warning
{
return tasks.empty() && !pool.pending();
});
log, "'%s': Waiting for tasks:%zu queued:%zu active:%zu in pool '%s'",
d.name,
tasks.size(),
p.queued(),
p.active(),
ctx::name(p)
};
pool.terminate();
pool.join();
this->wait();
assert(!p.pending());
assert(tasks.empty());
p.join();
log::debug
{
log, "'%s': Terminated pool '%s'.",
d.name,
ctx::name(p)
};
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': Environment pool '%s' join :%s",
d.name,
ctx::name(p),
e.what()
};
throw;
}
void
ircd::db::database::env::state::pool::wait()
{
ctx::dock &dock(p);
dock.wait([this]
{
return tasks.empty() && !p.pending();
});
}
void
ircd::db::database::env::state::pool::operator()(task &&task)
{
tasks.emplace_back(std::move(task));
p([this]
{
if(tasks.empty())
return;
const ctx::uninterruptible::nothrow ui;
const auto task{std::move(tasks.front())};
tasks.pop_front();
assert(tasks.empty());
assert(!pool.queued());
assert(!pool.pending());
log::debug
{
log, "'%s': Terminated pool '%s'.",
d.name,
pool.name
log, "'%s': pool:%s tasks:%zu starting func:%p arg:%p",
this->d.name,
ctx::name(p),
tasks.size(),
task.func,
task.arg,
};
}
catch(const std::exception &e)
{
log::critical
// Execute the task
task.func(task.arg);
log::debug
{
log, "'%s': Environment state shutdown :%s",
d.name,
e.what()
log, "'%s': pool:%s tasks:%zu task finished func:%p arg:%p",
this->d.name,
ctx::name(p),
tasks.size(),
task.func,
task.arg,
};
});
}
size_t
ircd::db::database::env::state::pool::cancel(void *const &tag)
{
size_t i(0);
auto it(begin(tasks));
while(it != end(tasks))
{
auto &task(*it);
log::debug
{
log, "'%s': pool:%s tasks:%zu cancel %zu func:%p cancel:%p arg:%p tag:%p",
d.name,
ctx::name(p),
tasks.size(),
i,
task.func,
task.cancel,
task.arg,
tag
};
task.cancel(task.arg);
it = tasks.erase(it);
++i;
}
return i;
}
///////////////////////////////////////////////////////////////////////////////