0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-17 23:40:57 +01:00

modules/federation/sender: Determine a purge flow for errored nodes.

This commit is contained in:
Jason Volk 2018-03-15 16:44:51 -07:00
parent a585a86717
commit 67aaa00b63
2 changed files with 36 additions and 12 deletions

View file

@ -14,7 +14,8 @@
std::list<txn> txns; std::list<txn> txns;
std::map<std::string, node, std::less<>> nodes; std::map<std::string, node, std::less<>> nodes;
static void recv_timeout(txn &); void remove_node(const node &);
static void recv_timeout(txn &, node &);
static void recv_timeouts(); static void recv_timeouts();
static bool recv_handle(txn &, node &); static bool recv_handle(txn &, node &);
static void recv(); static void recv();
@ -133,12 +134,14 @@ send(const m::event &event,
} }
auto &node{it->second}; auto &node{it->second};
if(node.err)
return;
if(!unit) if(!unit)
unit = std::make_shared<struct unit>(event); unit = std::make_shared<struct unit>(event);
node.push(unit); node.push(unit);
if(!node.flush()) node.flush();
nodes.erase(it);
}); });
} }
@ -191,7 +194,7 @@ try
}; };
txns.emplace_back(*this, std::move(content), std::move(opts)); txns.emplace_back(*this, std::move(content), std::move(opts));
const unwind::nominal::assertion na;
q.clear(); q.clear();
recv_action.notify_one(); recv_action.notify_one();
return true; return true;
@ -203,6 +206,7 @@ catch(const std::exception &e)
"flush error to %s :%s", string_view{id}, e.what() "flush error to %s :%s", string_view{id}, e.what()
}; };
err = true;
return false; return false;
} }
@ -267,7 +271,12 @@ try
node.curtxn = nullptr; node.curtxn = nullptr;
txns.erase(it); txns.erase(it);
if(ret) if(node.err)
return remove_node(node);
if(!ret)
return;
node.flush(); node.flush();
} }
catch(const ctx::interrupted &e) catch(const ctx::interrupted &e)
@ -339,6 +348,7 @@ catch(const http::error &e)
e.what() e.what()
}; };
node.err = true;
return false; return false;
} }
catch(const std::exception &e) catch(const std::exception &e)
@ -351,6 +361,7 @@ catch(const std::exception &e)
e.what() e.what()
}; };
node.err = true;
return false; return false;
} }
@ -366,17 +377,19 @@ recv_timeouts()
for(; it != end(txns); ++it) for(; it != end(txns); ++it)
{ {
auto &txn(*it); auto &txn(*it);
assert(txn.node);
if(txn.node->err)
continue;
if(txn.timeout + seconds(15) < now) //TODO: conf if(txn.timeout + seconds(15) < now) //TODO: conf
recv_timeout(txn); recv_timeout(txn, *txn.node);
} }
} }
void void
recv_timeout(txn &txn) recv_timeout(txn &txn,
node &node)
{ {
assert(txn.node);
auto &node(*txn.node);
log::dwarning log::dwarning
{ {
"Timeout to %s for txn %s", "Timeout to %s for txn %s",
@ -385,4 +398,14 @@ recv_timeout(txn &txn)
}; };
cancel(txn); cancel(txn);
node.err = true;
}
void
remove_node(const node &node)
{
const string_view &id{node.id};
const auto it{nodes.find(id)};
assert(it != end(nodes));
nodes.erase(it);
} }

View file

@ -78,6 +78,7 @@ struct node
m::node::room room; m::node::room room;
server::request::opts sopts; server::request::opts sopts;
txn *curtxn {nullptr}; txn *curtxn {nullptr};
bool err {false};
bool flush(); bool flush();
void push(std::shared_ptr<unit>); void push(std::shared_ptr<unit>);
@ -100,7 +101,7 @@ struct txn
std::string content, std::string content,
m::v1::send::opts opts) m::v1::send::opts opts)
:txndata{std::move(content)} :txndata{std::move(content)}
,send{txnid, string_view{this->content}, headers, std::move(opts)} ,send{this->txnid, string_view{this->content}, this->headers, std::move(opts)}
,node{&node} ,node{&node}
,timeout{now<steady_point>()} //TODO: conf ,timeout{now<steady_point>()} //TODO: conf
{} {}