0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-12 13:01:07 +01:00

ircd::ctx: Create and use an options structure for ctx::pool.

This commit is contained in:
Jason Volk 2018-12-28 12:57:32 -08:00
parent c9c280c864
commit 4a47b39298
7 changed files with 117 additions and 78 deletions

View file

@ -33,8 +33,9 @@ struct ircd::client
static log::log log;
static struct settings settings;
static struct conf default_conf;
static ctx::dock dock;
static ctx::pool::opts pool_opts;
static ctx::pool pool;
static ctx::dock dock;
static uint64_t ctr; // monotonic
static void create(const std::shared_ptr<socket> &);

View file

@ -16,18 +16,21 @@ namespace ircd::ctx
struct pool;
const string_view &name(const pool &);
void debug_stats(const pool &);
}
class ircd::ctx::pool
struct ircd::ctx::pool
{
struct opts;
using closure = std::function<void ()>;
string_view name;
size_t stack_size;
size_t q_max_soft;
size_t q_max_hard;
size_t running;
size_t working;
static const string_view default_name;
static const opts default_opts;
string_view name {default_name};
const opts *opt {&default_opts};
size_t running {0};
size_t working {0};
dock q_max;
queue<closure> q;
std::vector<context> ctxs;
@ -36,16 +39,13 @@ class ircd::ctx::pool
void main() noexcept;
public:
explicit operator const queue<closure> &() const;
explicit operator const dock &() const;
explicit operator queue<closure> &();
explicit operator dock &();
explicit operator const opts &() const;
// indicators
auto size() const { return ctxs.size(); }
auto queued() const { return q.size(); }
auto active() const { return working; }
auto avail() const { return running - working; }
auto avail() const { return running - active(); }
auto pending() const { return active() + queued(); }
// dispatch to pool
@ -62,11 +62,8 @@ class ircd::ctx::pool
void interrupt();
void join();
pool(const string_view &name = "<unnamed pool>"_sv,
const size_t &stack_size = DEFAULT_STACK_SIZE,
const size_t &initial_ctxs = 0,
const size_t &q_max_soft = -1,
const size_t &q_max_hard = -1);
pool(const string_view &name = default_name,
const opts & = default_opts);
pool(pool &&) = delete;
pool(const pool &) = delete;
@ -78,6 +75,40 @@ class ircd::ctx::pool
friend void debug_stats(const pool &);
};
struct ircd::ctx::pool::opts
{
/// When the pool spawns a new context this will be the stack size it has.
size_t stack_size { DEFAULT_STACK_SIZE };
/// When the pool is constructed this will be how many contexts it spawns
/// This value may be ignored for static duration instances.
size_t initial_ctxs {0};
/// Hard-limit for jobs queued. A submit to the pool over this limit throws
/// an exception. Default is -1, effectively unlimited.
ssize_t queue_max_hard {-1};
/// Soft-limit for jobs queued. The behavior of the limit is configurable.
/// The default is 0, meaning if there is no context available to service
/// the request being submitted then the soft limit is immediately reached.
/// See the specific behavior options following this.
ssize_t queue_max_soft {0};
/// Yield a context submitting to the pool if it will violate the soft
/// limit. This is true by default. Note the default of 0 for the
/// soft-limit itself combined with this: by default there is no queueing
/// of jobs at all! This behavior purposely propagates flow control by
/// slowing down the submitting context and prevents flooding the queue.
/// This option has no effect if the submitter is not on any ircd::ctx.
bool queue_max_blocking {true};
/// Log a DWARNING (developer-warning level) when the soft limit is
/// exceeded. The soft-limit is never actually exceeded when contexts
/// are blocked from submitting (see: queue_max_blocking). This warning
/// will still be seen for submissions outside any ircd::ctx.
bool queue_max_dwarning {true};
};
template<class F,
class... A>
ircd::ctx::future_value<F, A...>
@ -93,8 +124,7 @@ ircd::ctx::pool::async(F&& f,
promise<R> p;
future<R> ret{p};
(*this)([p(std::move(p)), func(std::move(func))]
() -> void
operator()([p(std::move(p)), func(std::move(func))]
{
p.set_value(func());
});
@ -117,8 +147,7 @@ ircd::ctx::pool::async(F&& f,
promise<R> p;
future<R> ret{p};
(*this)([p(std::move(p)), func(std::move(func))]
() -> void
operator()([p(std::move(p)), func(std::move(func))]
{
func();
p.set_value();
@ -128,29 +157,9 @@ ircd::ctx::pool::async(F&& f,
}
inline ircd::ctx::pool::operator
dock &()
{
dock &d(q);
return d;
}
inline ircd::ctx::pool::operator
queue<closure> &()
{
return q;
}
inline ircd::ctx::pool::operator
const dock &()
const opts &()
const
{
const dock &d(q);
return d;
}
inline ircd::ctx::pool::operator
const queue<closure> &()
const
{
return q;
assert(opt);
return *opt;
}

View file

@ -38,7 +38,9 @@ struct ircd::db::database::env::state::pool
database &d;
Priority pri;
IOPriority iopri;
ctx::dock dock;
std::deque<task> tasks;
ctx::pool::opts popts;
ctx::pool p;
size_t cancel(void *const &tag);

View file

@ -93,20 +93,27 @@ ircd::client::log
"client", 'C'
};
/// A general semaphore for the client system; used for coarse operations
/// like waiting for all clients to disconnect / system shutdown et al.
decltype(ircd::client::dock)
ircd::client::dock;
decltype(ircd::client::pool_opts)
ircd::client::pool_opts
{
size_t(settings.stack_size),
size_t(settings.pool_size),
};
/// The pool of request contexts. When a client makes a request it does so by acquiring
/// a stack from this pool. The request handling and response logic can then be written
/// in a synchronous manner as if each connection had its own thread.
ircd::ctx::pool
decltype(ircd::client::pool)
ircd::client::pool
{
"client", size_t(settings.stack_size)
"client", pool_opts
};
/// A general semaphore for the client system; used for coarse operations
/// like waiting for all clients to disconnect / system shutdown et al.
decltype(ircd::client::dock)
ircd::client::dock;
decltype(ircd::client::ctr)
ircd::client::ctr
{};

View file

@ -1226,23 +1226,26 @@ ircd::ctx::name(const pool &pool)
return pool.name;
}
decltype(ircd::ctx::pool::default_name)
ircd::ctx::pool::default_name
{
"<unnamed pool>"
};
decltype(ircd::ctx::pool::default_opts)
ircd::ctx::pool::default_opts
{};
//
// pool::pool
//
ircd::ctx::pool::pool(const string_view &name,
const size_t &stack_size,
const size_t &size,
const size_t &q_max_soft,
const size_t &q_max_hard)
const opts &opt)
:name{name}
,stack_size{stack_size}
,q_max_soft{q_max_soft}
,q_max_hard{q_max_hard}
,running{0}
,working{0}
,opt{&opt}
{
add(size);
add(this->opt->initial_ctxs);
}
ircd::ctx::pool::~pool()
@ -1311,14 +1314,16 @@ ircd::ctx::pool::del(const size_t &num)
void
ircd::ctx::pool::add(const size_t &num)
{
assert(opt);
for(size_t i(0); i < num; ++i)
ctxs.emplace_back(this->name, stack_size, context::POST, std::bind(&pool::main, this));
ctxs.emplace_back(name, opt->stack_size, context::POST, std::bind(&pool::main, this));
}
void
ircd::ctx::pool::operator()(closure closure)
{
if(!avail() && q.size() > q_max_soft)
assert(opt);
if(!avail() && q.size() > size_t(opt->queue_max_soft) && opt->queue_max_dwarning)
log::dwarning
{
"pool(%p '%s') ctx(%p): size:%zu active:%zu queue:%zu exceeded soft max:%zu",
@ -1328,22 +1333,22 @@ ircd::ctx::pool::operator()(closure closure)
size(),
active(),
q.size(),
q_max_soft
opt->queue_max_soft
};
if(current)
if(current && opt->queue_max_soft >= 0 && opt->queue_max_blocking)
q_max.wait([this]
{
if(q.size() < q_max_soft)
if(q.size() < size_t(opt->queue_max_soft))
return true;
if(!q_max_soft && q.size() < avail())
if(!opt->queue_max_soft && q.size() < avail())
return true;
return false;
});
if(unlikely(q.size() >= q_max_hard))
if(unlikely(q.size() >= size_t(opt->queue_max_hard)))
throw error
{
"pool(%p '%s') ctx(%p): size:%zu avail:%zu queue:%zu exceeded hard max:%zu",
@ -1353,7 +1358,7 @@ ircd::ctx::pool::operator()(closure closure)
size(),
avail(),
q.size(),
q_max_hard
opt->queue_max_hard
};
q.push(std::move(closure));
@ -1436,9 +1441,8 @@ ircd::ctx::debug_stats(const pool &pool)
{
log::debug
{
"pool '%s' (stack size: %zu) total: %zu avail: %zu queued: %zu active: %zu pending: %zu",
"pool '%s' total: %zu avail: %zu queued: %zu active: %zu pending: %zu",
pool.name,
pool.stack_size,
pool.size(),
pool.avail(),
pool.queued(),

View file

@ -96,6 +96,16 @@ ircd::db::request_pool_size
}
};
decltype(ircd::db::request_pool_opts)
ircd::db::request_pool_opts
{
size_t(request_pool_stack_size),
size_t(request_pool_size),
-1, // No hard limit
0, // Soft limit at any queued
true, // Yield before hitting soft limit
};
/// Concurrent request pool. Requests to seek may be executed on this
/// pool in cases where a single context would find it advantageous.
/// Some examples are a db::row seek, or asynchronous prefetching.
@ -106,10 +116,7 @@ ircd::db::request_pool_size
decltype(ircd::db::request)
ircd::db::request
{
"db req",
size_t(request_pool_stack_size),
0, // don't prespawn because this is static
0, // zero-size queue will yield submitter
"db req", request_pool_opts
};
/// This mutex is necessary to serialize entry into rocksdb's write impl
@ -7509,11 +7516,17 @@ ircd::db::database::env::state::pool::pool(database &d,
IOPriority::IO_HIGH:
IOPriority::IO_LOW
}
,popts
{
size_t(stack_size), // stack size of worker
0, // initial workers
-1, // queue hard limit
-1, // queue soft limit
}
,p
{
reflect(pri), // name of pool
size_t(stack_size), // stack size of worker
0, // initial workers
this->popts // pool options
}
{
}
@ -7567,7 +7580,6 @@ catch(const std::exception &e)
void
ircd::db::database::env::state::pool::wait()
{
ctx::dock &dock(p);
dock.wait([this]
{
return tasks.empty() && !p.pending();
@ -7617,6 +7629,8 @@ ircd::db::database::env::state::pool::operator()(task &&task)
task.func,
task.arg,
};
dock.notify_all();
});
}
@ -7646,6 +7660,7 @@ ircd::db::database::env::state::pool::cancel(void *const &tag)
++i;
}
dock.notify_all();
return i;
}

View file

@ -48,6 +48,7 @@ namespace ircd::db
extern log::log rog;
extern conf::item<size_t> request_pool_size;
extern conf::item<size_t> request_pool_stack_size;
extern ctx::pool::opts request_pool_opts;
extern ctx::pool request;
extern ctx::mutex write_mutex;