0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-02 18:18:56 +02:00

ircd::server: Improve peer state transitions and destruction paths.

This commit is contained in:
Jason Volk 2018-03-10 07:50:19 -08:00
parent 2c4498502a
commit d59de1a391
4 changed files with 167 additions and 123 deletions

View file

@ -18,7 +18,7 @@ struct ircd::server::link
static conf::item<size_t> tag_max_default;
static conf::item<size_t> tag_commit_max_default;
std::shared_ptr<server::peer> peer; ///< backreference to peer
server::peer *peer; ///< backreference to peer
std::shared_ptr<net::socket> socket; ///< link's socket
std::list<tag> queue; ///< link's work queue
bool op_init {false}; ///< link is connecting
@ -51,6 +51,7 @@ struct ircd::server::link
size_t tag_commit_max() const;
// indicator lights
bool finished() const;
bool opened() const noexcept;
bool ready() const;
bool busy() const;

View file

@ -14,7 +14,6 @@
/// Remote entity.
///
struct ircd::server::peer
:std::enable_shared_from_this<ircd::server::peer>
{
struct err;
@ -26,12 +25,14 @@ struct ircd::server::peer
std::list<link> links;
std::unique_ptr<err> e;
std::string server_name;
bool ready {true};
bool op_resolve {false};
bool op_fini {false};
template<class F> size_t accumulate_links(F&&) const;
template<class F> size_t accumulate_tags(F&&) const;
void handle_resolve(std::weak_ptr<peer>, std::exception_ptr, const ipport &);
void handle_finished();
void handle_resolve(std::exception_ptr, const ipport &);
void resolve(const hostport &);
void disperse_uncommitted(link &);
@ -48,6 +49,9 @@ struct ircd::server::peer
void handle_open(link &, std::exception_ptr);
public:
// indicator lights
bool finished() const;
// config related
size_t link_min() const;
size_t link_max() const;

View file

@ -29,11 +29,12 @@ namespace ircd::server
using error_code = boost::system::error_code;
extern ircd::log::log log;
extern std::map<string_view, std::shared_ptr<peer>> peers;
extern std::map<string_view, std::unique_ptr<peer>> peers;
size_t tag_count();
size_t link_count();
size_t peer_count();
size_t peer_unfinished();
string_view errmsg(const net::hostport &);
bool exists(const net::hostport &);

View file

@ -22,9 +22,10 @@ namespace ircd::server
template<class F> size_t accumulate_tags(F&&);
// Internal control
std::shared_ptr<peer> create(const net::hostport &);
std::unique_ptr<peer> create(const net::hostport &);
void interrupt_all();
void close_all();
void wait_all();
}
decltype(ircd::server::log)
@ -37,6 +38,54 @@ decltype(ircd::server::peers)
ircd::server::peers
{};
ircd::conf::item<ircd::seconds>
close_all_timeout
{
{ "name", "ircd.server.close_all_timeout" },
{ "default", 2L },
};
void
ircd::server::wait_all()
{
while(peer_unfinished())
{
if(dock.wait_for(seconds(2)) == ctx::cv_status::no_timeout)
continue;
log.warning("Waiting for %zu tags on %zu links on %zu of %zu peers to close...",
tag_count(),
link_count(),
peer_unfinished(),
peer_count());
}
}
void
ircd::server::close_all()
{
log.debug("Closing all %zu peers",
peer_count());
net::close_opts opts;
opts.timeout = seconds(close_all_timeout);
for(auto &peer : peers)
peer.second->close(opts);
}
void
ircd::server::interrupt_all()
{
log.debug("Interrupting %zu tags on %zu links on %zu peers",
tag_count(),
link_count(),
peer_count());
for(auto &peer : peers)
peer.second->interrupt();
}
ircd::server::peer &
ircd::server::get(const net::hostport &hostport)
{
@ -55,10 +104,10 @@ ircd::server::get(const net::hostport &hostport)
return *it->second;
}
std::shared_ptr<ircd::server::peer>
std::unique_ptr<ircd::server::peer>
ircd::server::create(const net::hostport &hostport)
{
auto peer(std::make_shared<peer>());
auto peer(std::make_unique<peer>());
peer->remote.hostname = std::string{host(hostport)};
peer->resolve(hostport);
return peer;
@ -90,6 +139,16 @@ ircd::server::errmsg(const net::hostport &hostport)
return it->second->err_msg();
}
size_t
ircd::server::peer_unfinished()
{
return accumulate_peers([]
(const auto &peer)
{
return !peer.finished();
});
}
size_t
ircd::server::peer_count()
{
@ -150,38 +209,6 @@ ircd::server::accumulate_peers(F&& closure)
});
}
ircd::conf::item<ircd::seconds>
close_all_timeout
{
{ "name", "ircd.server.close_all_timeout" },
{ "default", 2L },
};
void
ircd::server::close_all()
{
log.debug("Closing all %zu peers",
peer_count());
net::close_opts opts;
opts.timeout = seconds(close_all_timeout);
for(auto &peer : peers)
peer.second->close(opts);
}
void
ircd::server::interrupt_all()
{
log.debug("Interrupting %zu tags on %zu links on %zu peers",
tag_count(),
link_count(),
peer_count());
for(auto &peer : peers)
peer.second->interrupt();
}
//
// init
//
@ -194,7 +221,7 @@ ircd::server::init::init()
ircd::server::init::~init()
noexcept
{
close_all();
close();
wait();
peers.clear();
}
@ -202,17 +229,7 @@ noexcept
void
ircd::server::init::wait()
{
while(link_count())
{
if(dock.wait_for(seconds(2)) == ctx::cv_status::no_timeout)
continue;
log.warning("Waiting for %zu tags on %zu links on %zu peers to close...",
tag_count(),
link_count(),
peer_count());
}
wait_all();
}
void
@ -334,11 +351,14 @@ noexcept
void
ircd::server::peer::close(const net::close_opts &opts)
{
ready = false;
op_fini = true;
std::vector<link *> links(this->links.size());
pointers(this->links, links);
for(const auto &link : links)
link->close(opts);
if(finished())
return handle_finished();
}
void
@ -388,7 +408,7 @@ ircd::server::peer::error_clear_default
bool
ircd::server::peer::err_check()
{
if(!ready)
if(op_fini)
return false;
if(!e)
@ -540,7 +560,7 @@ ircd::server::peer::link_add(const size_t &num)
if(e)
std::rethrow_exception(e->eptr);
assert(ready);
assert(!op_fini);
links.emplace_back(*this);
auto &link{links.back()};
@ -553,23 +573,21 @@ ircd::server::peer::link_add(const size_t &num)
void
ircd::server::peer::handle_open(link &link,
std::exception_ptr eptr)
try
{
if(eptr && links.size() == 1)
err_set(eptr);
if(eptr)
std::rethrow_exception(eptr);
}
catch(const std::exception &e)
{
log.error("peer(%p) link(%p) [%s]: open: %s",
this,
&link,
string(remote),
e.what());
{
if(links.size() == 1)
err_set(eptr);
link.close(net::dc::RST);
log.error("peer(%p) link(%p) [%s]: open: %s",
this,
&link,
string(remote),
what(eptr));
link.close(net::dc::RST);
return;
}
}
void
@ -583,7 +601,8 @@ ircd::server::peer::handle_close(link &link,
string(remote),
what(eptr));
handle_finished(link);
if(link.finished())
handle_finished(link);
}
void
@ -647,11 +666,11 @@ ircd::server::peer::handle_error(link &link,
void
ircd::server::peer::handle_finished(link &link)
{
assert(link.op_fini);
assert(!link.op_init);
assert(!link.op_read);
assert(!link.op_write);
this->del(link);
assert(link.finished());
del(link);
if(finished())
handle_finished();
}
/// This is where we're notified a tag has been completed either to start the
@ -777,30 +796,29 @@ ircd::server::peer::del(link &link)
string(remote));
links.erase(it);
// Right now this is what the server:: ~init sequence needs
// to wait for all links to close on IRCd shutdown.
server::dock.notify_all();
}
void
ircd::server::peer::resolve(const hostport &hostport)
{
if(op_resolve || op_fini)
return;
auto handler
{
std::bind(&peer::handle_resolve, this, weak_from(*this), ph::_1, ph::_2)
std::bind(&peer::handle_resolve, this, ph::_1, ph::_2)
};
op_resolve = true;
net::dns(hostport, std::move(handler));
}
void
ircd::server::peer::handle_resolve(std::weak_ptr<peer> wp,
std::exception_ptr eptr,
ircd::server::peer::handle_resolve(std::exception_ptr eptr,
const ipport &ipport)
try
{
const life_guard<peer> lg(wp);
op_resolve = false;
if(eptr)
{
@ -810,26 +828,31 @@ try
static_cast<net::ipport &>(this->remote) = ipport;
if(!ready)
return;
if(unlikely(finished()))
return handle_finished();
for(auto &link : links)
link.open(this->remote);
}
catch(const std::bad_weak_ptr &)
{
return;
if(!op_fini)
for(auto &link : links)
link.open(this->remote);
}
catch(const std::exception &e)
{
assert(!wp.expired());
log.error("peer(%p): during name resolution: %s",
log.error("peer(%p): error: %s",
this,
e.what());
close();
}
void
ircd::server::peer::handle_finished()
{
assert(finished());
// Right now this is what the server:: ~init sequence needs
// to wait for all links to close on IRCd shutdown.
server::dock.notify_all();
}
size_t
ircd::server::peer::read_remaining()
const
@ -961,6 +984,13 @@ const
return link_max_default;
}
bool
ircd::server::peer::finished()
const
{
return links.empty() && !op_resolve && op_fini;
}
template<class F>
size_t
ircd::server::peer::accumulate_tags(F&& closure)
@ -1010,7 +1040,7 @@ ircd::server::link::tag_commit_max_default
//
ircd::server::link::link(server::peer &peer)
:peer{shared_from(peer)}
:peer{&peer}
{
}
@ -1165,7 +1195,7 @@ ircd::server::link::handle_close(std::exception_ptr eptr)
void
ircd::server::link::wait_writable()
{
if(op_write)
if(op_write || unlikely(op_fini))
return;
auto handler
@ -1191,6 +1221,13 @@ try
using boost::system::system_category;
op_write = false;
if(unlikely(finished()))
{
assert(peer);
return peer->handle_finished(*this);
}
if(ec.category() == system_category()) switch(ec.value())
{
case success:
@ -1208,21 +1245,13 @@ try
}
catch(const boost::system::system_error &e)
{
if(peer)
peer->handle_error(*this, e);
assert(peer);
peer->handle_error(*this, e);
}
catch(const std::exception &e)
{
if(peer)
{
peer->handle_error(*this, std::current_exception());
return;
}
throw assertive
{
"link::handle_writable(): %s", e.what()
};
assert(peer);
peer->handle_error(*this, std::current_exception());
}
void
@ -1313,7 +1342,7 @@ ircd::server::link::process_write_next(const const_buffer &buffer)
void
ircd::server::link::wait_readable()
{
if(op_read)
if(op_read || unlikely(op_fini))
return;
auto handler
@ -1339,6 +1368,13 @@ try
using boost::system::system_category;
op_read = false;
if(unlikely(finished()))
{
assert(peer);
return peer->handle_finished(*this);
}
if(ec.category() == system_category()) switch(ec.value())
{
case success:
@ -1356,21 +1392,13 @@ try
}
catch(const boost::system::system_error &e)
{
if(peer)
peer->handle_error(*this, e);
assert(peer);
peer->handle_error(*this, e);
}
catch(const std::exception &e)
{
if(peer)
{
peer->handle_error(*this, std::current_exception());
return;
}
throw assertive
{
"link::handle_readable(): %s", e.what()
};
assert(peer);
peer->handle_error(*this, std::current_exception());
}
/// Process as many read operations from as many tags as possible
@ -1509,7 +1537,7 @@ ircd::server::link::discard_read()
// Shouldn't ever be hit because the read() within discard() throws
// the pending error like an eof.
log.warning("Link to %s discarded %zu of %zu unexpected bytes",
peer? string(peer->remote) : string(remote_ipport(*socket)),
likely(peer)? string(peer->remote) : string(remote_ipport(*socket)),
discard,
discarded);
@ -1519,7 +1547,7 @@ ircd::server::link::discard_read()
throw assertive
{
"peer(%p) link(%p) queue is empty and nothing to discard.",
peer.get(),
peer,
this
};
}
@ -1629,6 +1657,16 @@ const noexcept
return bool(socket) && net::opened(*socket);
}
bool
ircd::server::link::finished()
const
{
if(!bool(socket))
return false;
return !opened() && op_fini && !op_init && !op_write && !op_read;
}
size_t
ircd::server::link::tag_commit_max()
const