0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-02 10:08:56 +02:00

ircd::db::prefetcher: Add cancel() mechanism to remove items from queue.

This commit is contained in:
Jason Volk 2019-09-08 13:31:41 -07:00
parent e2f2120739
commit 0422f29a66
2 changed files with 92 additions and 10 deletions

View file

@ -5890,12 +5890,56 @@ ircd::db::prefetcher::operator()(column &c,
key, &d, now<steady_point>(), db::id(c)
});
++requests;
++request_counter;
dock.notify_one();
return true;
}
size_t
ircd::db::prefetcher::cancel(column &c)
{
return cancel([&c]
(const auto &request)
{
return request.cid == id(c);
});
}
size_t
ircd::db::prefetcher::cancel(database &d)
{
return cancel([&d]
(const auto &request)
{
return request.d == std::addressof(d);
});
}
size_t
ircd::db::prefetcher::cancel(const closure &closure)
{
const auto e
{
std::remove_if(begin(queue), end(queue), closure)
};
const ssize_t remain
{
std::distance(begin(queue), e)
};
assert(remain >= 0);
const ssize_t canceled
{
ssize_t(queue.size()) - remain
};
assert(canceled >= 0);
queue.resize(remain);
cancels_counter += canceled;
return canceled;
}
void
ircd::db::prefetcher::worker()
try
@ -5922,6 +5966,11 @@ catch(const std::exception &e)
void
ircd::db::prefetcher::handle()
{
const scope_count handles
{
this->handles
};
auto handler
{
std::bind(&prefetcher::request_worker, this)
@ -5939,7 +5988,9 @@ ircd::db::prefetcher::request_worker()
this->request_workers
};
assert(queue.size());
if(unlikely(queue.empty()))
return;
auto request
{
std::move(queue.front())
@ -5947,16 +5998,16 @@ ircd::db::prefetcher::request_worker()
queue.pop_front();
request_handle(request);
--requests;
++fetched_counter;
}
void
ircd::db::prefetcher::request_handle(request &request)
try
{
const scope_count request_handles
const ctx::scope_notify notify
{
this->request_handles
this->dock
};
assert(request.d);
@ -5974,16 +6025,17 @@ try
char pbuf[32];
log::debug
{
log, "[%s][%s] completed fetch:%b queue:%zu r:%zu rh:%zu rw:%zu rc:%zu hc:%zu in %s",
log, "[%s][%s] completed fetch:%b queue:%zu h:%zu rw:%zu rc:%zu hc:%zu fc:%zu cc:%zu in %s",
name(*request.d),
name(column),
has? false : true,
queue.size(),
this->requests,
this->request_handles,
this->handles,
this->request_workers,
this->request_counter,
this->handles_counter,
this->fetched_counter,
this->cancels_counter,
pretty(pbuf, now<steady_point>() - request.start, 1),
};
#endif
@ -6011,6 +6063,28 @@ catch(const std::exception &e)
};
}
size_t
ircd::db::prefetcher::wait_pending()
{
const size_t fetched_counter
{
this->fetched_counter
};
const size_t fetched_target
{
fetched_counter + request_workers
};
dock.wait([this, &fetched_target]
{
return this->fetched_counter >= fetched_target;
});
assert(fetched_target >= fetched_counter);
return fetched_target - fetched_counter;
}
//
// db::cached
//

View file

@ -195,21 +195,29 @@ struct ircd::db::txn::handler
struct ircd::db::prefetcher
{
struct request;
using closure = std::function<bool (request &)>;
ctx::dock dock;
std::deque<request> queue;
ctx::context context;
size_t requests {0};
size_t request_handles {0};
size_t handles {0};
size_t request_workers {0};
size_t request_counter {0};
size_t handles_counter {0};
size_t fetched_counter {0};
size_t cancels_counter {0};
size_t wait_pending();
void request_handle(request &);
void request_worker();
void handle();
void worker();
public:
size_t cancel(const closure &);
size_t cancel(database &); // Cancel all for db
size_t cancel(column &); // Cancel all for column
bool operator()(column &, const string_view &key, const gopts &);
prefetcher();