mirror of
https://github.com/matrix-construct/construct
synced 2024-12-27 07:54:05 +01:00
ircd::server: Basic pipeline.
This commit is contained in:
parent
8e9bae5209
commit
bf99718a0f
1 changed files with 57 additions and 19 deletions
|
@ -22,6 +22,18 @@
|
|||
|
||||
namespace ircd::server
|
||||
{
|
||||
// Coarse maximum number of connections to a server
|
||||
const size_t LINK_MAX_DEFAULT
|
||||
{
|
||||
2
|
||||
};
|
||||
|
||||
// Maximum number of requests "in flight" in the pipe at at time
|
||||
const size_t TAG_COMMIT_MAX_DEFAULT
|
||||
{
|
||||
2
|
||||
};
|
||||
|
||||
ctx::dock dock;
|
||||
|
||||
template<class F> size_t accumulate_nodes(F&&);
|
||||
|
@ -251,47 +263,70 @@ ircd::server::node::link_get(const request &request)
|
|||
if(links.empty())
|
||||
return link_add(1);
|
||||
|
||||
link *best{&links.back()};
|
||||
for(auto &link : links)
|
||||
// Indicates that we can't add anymore links for this node and the rest
|
||||
// of the algorithm should consider this.
|
||||
const bool links_maxed
|
||||
{
|
||||
// Candidate isn't in service yet. best may not be in service yet
|
||||
// yet either, but skip for now.
|
||||
if(!link.ready())
|
||||
continue;
|
||||
links.size() >= link_max()
|
||||
};
|
||||
|
||||
// Candidate's queue has a backlog of tags which haven't even had their
|
||||
// request sent yet; when best has less of these, best is probably
|
||||
// better so far.
|
||||
if(link.tag_uncommitted() >= best->tag_uncommitted())
|
||||
link *best{nullptr};
|
||||
while(!best) for(auto &cand : links)
|
||||
{
|
||||
if(!best)
|
||||
{
|
||||
best = &cand;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Indicates that the best candidate has its pipe saturated which can
|
||||
// be factored into the comparisons here.
|
||||
const bool best_maxed
|
||||
{
|
||||
best->tag_committed() >= best->tag_commit_max()
|
||||
};
|
||||
|
||||
const bool cand_maxed
|
||||
{
|
||||
cand.tag_committed() >= cand.tag_commit_max()
|
||||
};
|
||||
|
||||
if(best_maxed && !cand_maxed)
|
||||
{
|
||||
best = &cand;
|
||||
continue;
|
||||
}
|
||||
|
||||
if(!best_maxed && cand_maxed)
|
||||
continue;
|
||||
|
||||
// Candidates's queue has less or same backlog of unsent requests, but
|
||||
// now measure if candidate will take longer to process at least the
|
||||
// write-side of those requests.
|
||||
if(link.write_remaining() >= best->write_remaining())
|
||||
if(cand.write_remaining() > best->write_remaining())
|
||||
continue;
|
||||
|
||||
// Candidate might be working through really large content; though
|
||||
// this is a very sketchy measurement right now since we only *might*
|
||||
// know about content-length for the *one* active tag occupying the
|
||||
// socket.
|
||||
if(link.read_remaining() >= best->read_remaining())
|
||||
if(cand.read_remaining() > best->read_remaining())
|
||||
continue;
|
||||
|
||||
// Coarse distribution based on who has more work; this is weak, should
|
||||
// be replaced.
|
||||
if(link.tag_total() >= best->tag_total())
|
||||
if(cand.tag_total() > best->tag_total())
|
||||
continue;
|
||||
|
||||
best = &link;
|
||||
best = &cand;
|
||||
}
|
||||
|
||||
if(links_maxed)
|
||||
return *best;
|
||||
|
||||
// best might not be good enough, we could try another connection. If best
|
||||
// has a backlog or is working on a large download or slow request.
|
||||
|
||||
if(links.size() >= link_max())
|
||||
return *best;
|
||||
|
||||
if(best->tag_uncommitted() < best->tag_commit_max())
|
||||
return *best;
|
||||
|
||||
|
@ -541,7 +576,7 @@ ircd::server::node::link_max()
|
|||
const
|
||||
{
|
||||
//TODO: conf
|
||||
return 4;
|
||||
return LINK_MAX_DEFAULT;
|
||||
}
|
||||
|
||||
template<class F>
|
||||
|
@ -873,6 +908,9 @@ try
|
|||
}
|
||||
while(!done);
|
||||
|
||||
if(tag_committed() >= tag_commit_max())
|
||||
wait_writable();
|
||||
|
||||
queue.pop_front();
|
||||
return true;
|
||||
}
|
||||
|
@ -1068,7 +1106,7 @@ ircd::server::link::tag_commit_max()
|
|||
const
|
||||
{
|
||||
//TODO: config
|
||||
return 1;
|
||||
return TAG_COMMIT_MAX_DEFAULT;
|
||||
}
|
||||
|
||||
size_t
|
||||
|
|
Loading…
Reference in a new issue