mirror of
https://github.com/matrix-construct/construct
synced 2024-10-31 19:08:59 +01:00
ircd::magick: Reorg and elaborate the job state tracking; add interface.
This commit is contained in:
parent
a6e65d1efe
commit
b8dfa9ccee
2 changed files with 198 additions and 84 deletions
|
@ -16,6 +16,7 @@ namespace ircd::magick
|
||||||
{
|
{
|
||||||
IRCD_EXCEPTION(ircd::error, error)
|
IRCD_EXCEPTION(ircd::error, error)
|
||||||
|
|
||||||
|
struct job;
|
||||||
struct crop;
|
struct crop;
|
||||||
struct shave;
|
struct shave;
|
||||||
struct scale;
|
struct scale;
|
||||||
|
@ -84,3 +85,21 @@ struct ircd::magick::crop
|
||||||
const offset &,
|
const offset &,
|
||||||
const result_closure &);
|
const result_closure &);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct ircd::magick::job
|
||||||
|
{
|
||||||
|
struct state;
|
||||||
|
|
||||||
|
static thread_local struct job cur, tot; // current job, total for all jobs
|
||||||
|
static thread_local struct state state; // internal state
|
||||||
|
|
||||||
|
uint64_t id {0}; // monotonic
|
||||||
|
int64_t tick {0}; // quantum
|
||||||
|
uint64_t ticks {0}; // span
|
||||||
|
uint64_t cycles {0}; // rdtsc reference
|
||||||
|
uint64_t yields {0}; // ircd::ctx relinquish count for large jobs
|
||||||
|
uint64_t intrs {0}; // ircd::ctx interrupt count
|
||||||
|
uint64_t errors {0}; // exception/error count
|
||||||
|
string_view description; // only valid for current job duration
|
||||||
|
std::exception_ptr eptr; // apropos exception reference
|
||||||
|
};
|
||||||
|
|
|
@ -32,7 +32,7 @@ namespace ircd::magick
|
||||||
static void init();
|
static void init();
|
||||||
static void fini();
|
static void fini();
|
||||||
|
|
||||||
extern conf::item<uint64_t> limit_span;
|
extern conf::item<uint64_t> limit_ticks;
|
||||||
extern conf::item<uint64_t> limit_cycles;
|
extern conf::item<uint64_t> limit_cycles;
|
||||||
extern conf::item<uint64_t> yield_threshold;
|
extern conf::item<uint64_t> yield_threshold;
|
||||||
extern conf::item<uint64_t> yield_interval;
|
extern conf::item<uint64_t> yield_interval;
|
||||||
|
@ -68,11 +68,11 @@ ircd::magick::log
|
||||||
"magick"
|
"magick"
|
||||||
};
|
};
|
||||||
|
|
||||||
decltype(ircd::magick::limit_span)
|
decltype(ircd::magick::limit_ticks)
|
||||||
ircd::magick::limit_span
|
ircd::magick::limit_ticks
|
||||||
{
|
{
|
||||||
{ "name", "ircd.magick.limit.span" },
|
{ "name", "ircd.magick.limit.ticks" },
|
||||||
{ "default", 10000L },
|
{ "default", 10000L },
|
||||||
};
|
};
|
||||||
|
|
||||||
decltype(ircd::magick::limit_cycles)
|
decltype(ircd::magick::limit_cycles)
|
||||||
|
@ -471,20 +471,36 @@ ircd::magick::call(function&& f,
|
||||||
return f(std::forward<args>(a)...);
|
return f(std::forward<args>(a)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// ircd::magick::job
|
||||||
|
//
|
||||||
|
|
||||||
namespace ircd::magick
|
namespace ircd::magick
|
||||||
{
|
{
|
||||||
static thread_local uint64_t job_ctr;
|
static void job_init(const string_view &, const int64_t &, const uint64_t &, const uint64_t &);
|
||||||
static thread_local uint64_t job_cycles;
|
static void finished(job &);
|
||||||
static thread_local uint64_t last_cycles;
|
static bool check_yield(job &);
|
||||||
static thread_local int64_t last_quantum;
|
static void check_cycles(job &);
|
||||||
static thread_local uint64_t last_span;
|
|
||||||
static thread_local uint64_t last_yield;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ircd::magick::job::state
|
||||||
|
{
|
||||||
|
uint64_t cycles {0};
|
||||||
|
uint64_t yield {0};
|
||||||
|
char description[1024];
|
||||||
|
}
|
||||||
|
thread_local ircd::magick::job::state;
|
||||||
|
|
||||||
|
decltype(ircd::magick::job::cur) thread_local
|
||||||
|
ircd::magick::job::cur;
|
||||||
|
|
||||||
|
decltype(ircd::magick::job::tot) thread_local
|
||||||
|
ircd::magick::job::tot;
|
||||||
|
|
||||||
uint
|
uint
|
||||||
ircd::magick::handle_progress(const char *text,
|
ircd::magick::handle_progress(const char *const text,
|
||||||
const int64_t quantum,
|
const int64_t tick,
|
||||||
const uint64_t span,
|
const uint64_t ticks,
|
||||||
ExceptionInfo *ei)
|
ExceptionInfo *ei)
|
||||||
noexcept try
|
noexcept try
|
||||||
{
|
{
|
||||||
|
@ -492,126 +508,205 @@ noexcept try
|
||||||
// accumulated cycle count for only this ircd::ctx and the current slice,
|
// accumulated cycle count for only this ircd::ctx and the current slice,
|
||||||
// (all other cycles are not accumulated here) which is non-zero by now
|
// (all other cycles are not accumulated here) which is non-zero by now
|
||||||
// and monotonically increases across jobs as well.
|
// and monotonically increases across jobs as well.
|
||||||
const auto cur_cycles
|
const auto cycles_sample
|
||||||
{
|
{
|
||||||
cycles(ctx::cur()) + ctx::prof::cur_slice_cycles()
|
cycles(ctx::cur()) + ctx::prof::cur_slice_cycles()
|
||||||
};
|
};
|
||||||
|
|
||||||
// Detect if this is a new job. Quantum is usually zero for a new job, but for
|
// Detect if this is a new job. Tick is usually zero for a new job, but for
|
||||||
// large jobs it may start after 0. Quantum always appears monotonic for a job.
|
// large jobs it may start after 0. Tick always appears monotonic for a job.
|
||||||
// The span appears constant for a job, though could be the same for different
|
// The ticks appears constant for a job, though could be the same for different
|
||||||
// jobs. We don't know of any succinct way to test for a new job, so we use all
|
// jobs. We don't know of any succinct way to test for a new job, so we use all
|
||||||
// of the above information.
|
// of the above information.
|
||||||
const bool new_job
|
const bool new_job
|
||||||
{
|
{
|
||||||
quantum == 0
|
tick == 0
|
||||||
|| quantum < last_quantum
|
|| tick < job::cur.tick
|
||||||
|| span != last_span
|
|| ticks != job::cur.ticks
|
||||||
};
|
};
|
||||||
|
|
||||||
assert(new_job || span == last_span); // the span is always constant same for a job
|
// Assert general assumptions about invocations of this callback.
|
||||||
assert(new_job || quantum >= last_quantum); // quantum is monotonic for the same job
|
assert(new_job || tick >= job::cur.tick);
|
||||||
|
assert(new_job || ticks == job::cur.ticks);
|
||||||
|
|
||||||
|
// Branch after detecting this callback is unrelated to the last job.
|
||||||
if(new_job)
|
if(new_job)
|
||||||
{
|
{
|
||||||
++job_ctr;
|
finished(job::cur);
|
||||||
last_quantum = quantum;
|
job_init(text, tick, ticks, cycles_sample);
|
||||||
last_span = span;
|
|
||||||
last_yield = 0;
|
|
||||||
job_cycles = 0;
|
|
||||||
last_cycles = cur_cycles;
|
|
||||||
|
|
||||||
// This job is too large based on the span measurement. This is an ad hoc
|
|
||||||
// measurement of the job size created internally by ImageMagick.
|
|
||||||
if(span > uint64_t(limit_span))
|
|
||||||
throw error
|
|
||||||
{
|
|
||||||
"job:%lu computation span:%lu exceeds server limit:%lu",
|
|
||||||
job_ctr,
|
|
||||||
span,
|
|
||||||
uint64_t(limit_span),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the cycle counters first so the log::debug has better info.
|
// Unconditional bookkeeping updates for this invocation. These statements
|
||||||
assert(cur_cycles >= last_cycles);
|
// behave properly regardless of whether this is the same or a new job.
|
||||||
job_cycles += cur_cycles - last_cycles;
|
assert(cycles_sample >= job::state.cycles);
|
||||||
last_cycles = cur_cycles;
|
job::cur.cycles += cycles_sample - job::state.cycles;
|
||||||
|
job::state.cycles = cycles_sample;
|
||||||
|
job::cur.tick = tick;
|
||||||
|
|
||||||
|
// This debug message is very noisy, even for debug mode. Developer can
|
||||||
|
// enable it at their discretion.
|
||||||
#ifdef IRCD_MAGICK_DEBUG_PROGRESS
|
#ifdef IRCD_MAGICK_DEBUG_PROGRESS
|
||||||
log::debug
|
log::debug
|
||||||
{
|
{
|
||||||
log, "job:%lu progress %2.2lf%% (%ld/%ld) cycles:%lu :%s",
|
log, "job:%lu progress %2.2lf%% (%ld/%ld) cycles:%lu :%s",
|
||||||
job_ctr,
|
job::cur.id,
|
||||||
(quantum / double(span) * 100.0),
|
(job::cur.tick / double(job::cur.ticks) * 100.0),
|
||||||
quantum,
|
job::cur.tick,
|
||||||
span,
|
job::cur.ticks,
|
||||||
job_cycles,
|
job::cur.cycles,
|
||||||
text,
|
job::cur.text,
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Check if job exceeded its reference cycle limit if enabled.
|
check_cycles(job::cur);
|
||||||
if(unlikely(uint64_t(limit_cycles) && job_cycles > uint64_t(limit_cycles)))
|
check_yield(job::cur);
|
||||||
throw error
|
|
||||||
{
|
|
||||||
"job:%lu CPU cycles:%lu exceeded server limit:%lu (progress %2.2lf%% (%ld/%ld))",
|
|
||||||
job_ctr,
|
|
||||||
job_cycles,
|
|
||||||
uint64_t(limit_cycles),
|
|
||||||
(quantum / double(span) * 100.0),
|
|
||||||
quantum,
|
|
||||||
span,
|
|
||||||
};
|
|
||||||
|
|
||||||
// This is a larger job; we yield this ircd::ctx at interval
|
|
||||||
if(span > uint64_t(yield_threshold))
|
|
||||||
{
|
|
||||||
if(quantum - last_yield > uint64_t(yield_interval))
|
|
||||||
{
|
|
||||||
last_yield = quantum;
|
|
||||||
ctx::yield();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
last_quantum = quantum;
|
|
||||||
|
|
||||||
// return false to interrupt the job; set the exception saying why.
|
|
||||||
//
|
|
||||||
// If MonitorEvent (or any *Event) is the code the interruption is
|
|
||||||
// not an error and the operation will silently complete, possibly with
|
|
||||||
// incomplete or corrupt results (i guess? this might be ok for raster
|
|
||||||
// or optimization operations maybe which can go on indefinitely)
|
|
||||||
//
|
|
||||||
// If MonitorError (or any *Error) is the code we propagate the exception
|
|
||||||
// all the way back through our user.
|
|
||||||
//
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
catch(const ctx::interrupted &e)
|
catch(const ctx::interrupted &e)
|
||||||
{
|
{
|
||||||
|
++job::cur.intrs;
|
||||||
|
job::cur.eptr = std::current_exception();
|
||||||
::ThrowException(ei, MonitorError, "interrupted", e.what());
|
::ThrowException(ei, MonitorError, "interrupted", e.what());
|
||||||
ei->signature = MagickSignature; // ???
|
ei->signature = MagickSignature; // ???
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
catch(const ctx::terminated &)
|
catch(const ctx::terminated &)
|
||||||
{
|
{
|
||||||
|
++job::cur.intrs;
|
||||||
|
job::cur.eptr = std::current_exception();
|
||||||
::ThrowException(ei, MonitorError, "terminated", nullptr);
|
::ThrowException(ei, MonitorError, "terminated", nullptr);
|
||||||
ei->signature = MagickSignature; // ???
|
ei->signature = MagickSignature; // ???
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
catch(const std::exception &e)
|
catch(const std::exception &e)
|
||||||
{
|
{
|
||||||
|
++job::cur.errors;
|
||||||
|
job::cur.eptr = std::current_exception();
|
||||||
::ThrowLoggedException(ei, MonitorError, "error", e.what(), __FILE__, __FUNCTION__, __LINE__);
|
::ThrowLoggedException(ei, MonitorError, "error", e.what(), __FILE__, __FUNCTION__, __LINE__);
|
||||||
ei->signature = MagickSignature; // ???
|
ei->signature = MagickSignature; // ???
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
catch(...)
|
catch(...)
|
||||||
{
|
{
|
||||||
|
++job::cur.errors;
|
||||||
|
job::cur.eptr = std::current_exception();
|
||||||
::ThrowLoggedException(ei, MonitorFatalError, "unknown", nullptr, __FILE__, __FUNCTION__, __LINE__);
|
::ThrowLoggedException(ei, MonitorFatalError, "unknown", nullptr, __FILE__, __FUNCTION__, __LINE__);
|
||||||
ei->signature = MagickSignature; // ???
|
ei->signature = MagickSignature; // ???
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ircd::magick::check_cycles(job &job)
|
||||||
|
{
|
||||||
|
const uint64_t &limit_cycles
|
||||||
|
{
|
||||||
|
magick::limit_cycles
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check if job exceeded its reference cycle limit if enabled.
|
||||||
|
if(unlikely(limit_cycles && job.cycles > limit_cycles))
|
||||||
|
throw error
|
||||||
|
{
|
||||||
|
"job:%lu CPU cycles:%lu exceeded server limit:%lu (progress %2.2lf%% (%ld/%ld))",
|
||||||
|
job.id,
|
||||||
|
job.cycles,
|
||||||
|
limit_cycles,
|
||||||
|
(job.tick / double(job.ticks) * 100.0),
|
||||||
|
job.tick,
|
||||||
|
job.ticks,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
ircd::magick::check_yield(job &job)
|
||||||
|
{
|
||||||
|
const uint64_t &yield_threshold
|
||||||
|
{
|
||||||
|
magick::yield_threshold
|
||||||
|
};
|
||||||
|
|
||||||
|
// This job is too small to conduct any yields.
|
||||||
|
if(likely(job.ticks < yield_threshold))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
const uint64_t &yield_interval
|
||||||
|
{
|
||||||
|
magick::yield_interval
|
||||||
|
};
|
||||||
|
|
||||||
|
// Haven't reached the yield interval yet.
|
||||||
|
if(likely(job.tick - job::state.yield <= yield_interval))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
job::state.yield = job.tick;
|
||||||
|
ctx::yield();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ircd::magick::finished(job &job)
|
||||||
|
{
|
||||||
|
// Update total state from last job
|
||||||
|
assert(job.id == job::tot.id + 1 || (job.id == job::tot.id && !job.id));
|
||||||
|
job::tot.id = job.id;
|
||||||
|
job::tot.tick += job.tick;
|
||||||
|
job::tot.ticks += job.ticks;
|
||||||
|
job::tot.cycles += job.cycles;
|
||||||
|
job::tot.yields += job.yields;
|
||||||
|
job::tot.intrs += job.intrs;
|
||||||
|
job::tot.errors += job.errors;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ircd::magick::job_init(const string_view &text,
|
||||||
|
const int64_t &tick,
|
||||||
|
const uint64_t &ticks,
|
||||||
|
const uint64_t &cycles_sample)
|
||||||
|
{
|
||||||
|
// Reset the current job structure
|
||||||
|
job::cur =
|
||||||
|
{
|
||||||
|
job::tot.id + 1, // id
|
||||||
|
tick, // tick
|
||||||
|
ticks, // ticks
|
||||||
|
};
|
||||||
|
|
||||||
|
// Update internal state
|
||||||
|
job::state.cycles = cycles_sample;
|
||||||
|
|
||||||
|
// The description text may have this annoying empty "[]" on this
|
||||||
|
// message so we'll strip that here.
|
||||||
|
job::cur.description = strlcpy
|
||||||
|
{
|
||||||
|
job::state.description, lstrip(text, "[] ")
|
||||||
|
};
|
||||||
|
|
||||||
|
log::debug
|
||||||
|
{
|
||||||
|
log, "job:%lu started; ticks:%lu :%s",
|
||||||
|
job::cur.id,
|
||||||
|
job::cur.ticks,
|
||||||
|
job::cur.description,
|
||||||
|
};
|
||||||
|
|
||||||
|
// This job is too large based on the ticks measurement. This is an ad hoc
|
||||||
|
// measurement of the job size created internally by ImageMagick.
|
||||||
|
if(job::cur.ticks > uint64_t(limit_ticks))
|
||||||
|
throw error
|
||||||
|
{
|
||||||
|
"job:%lu computation ticks:%lu exceeds server limit:%lu :%s",
|
||||||
|
job::cur.id,
|
||||||
|
job::cur.ticks,
|
||||||
|
uint64_t(limit_ticks),
|
||||||
|
job::cur.description,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// (Internal) patch panels
|
||||||
|
//
|
||||||
|
|
||||||
void
|
void
|
||||||
ircd::magick::handle_free(void *const ptr)
|
ircd::magick::handle_free(void *const ptr)
|
||||||
noexcept
|
noexcept
|
||||||
|
|
Loading…
Reference in a new issue