mirror of
https://github.com/matrix-construct/construct
synced 2024-12-27 07:54:05 +01:00
modules/federation/sender: Eliminate internal node error bit.
This commit is contained in:
parent
774a365465
commit
215e79b870
1 changed files with 53 additions and 42 deletions
|
@ -65,7 +65,6 @@ struct node
|
||||||
m::node::room room;
|
m::node::room room;
|
||||||
server::request::opts sopts;
|
server::request::opts sopts;
|
||||||
txn *curtxn {nullptr};
|
txn *curtxn {nullptr};
|
||||||
bool err {false};
|
|
||||||
|
|
||||||
bool flush();
|
bool flush();
|
||||||
void push(std::shared_ptr<unit>);
|
void push(std::shared_ptr<unit>);
|
||||||
|
@ -278,18 +277,26 @@ send_to_room(const m::event &event,
|
||||||
if(my_host(origin))
|
if(my_host(origin))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto it{nodes.lower_bound(origin)};
|
const auto remote
|
||||||
if(it == end(nodes) || it->first != origin)
|
|
||||||
{
|
{
|
||||||
if(server::errant(m::fed::matrix_service(origin)))
|
m::fed::matrix_service(origin)
|
||||||
|
};
|
||||||
|
|
||||||
|
if(server::errant(remote))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
auto it
|
||||||
|
{
|
||||||
|
nodes.lower_bound(origin)
|
||||||
|
};
|
||||||
|
|
||||||
|
if(it == end(nodes) || it->first != origin)
|
||||||
it = nodes.emplace_hint(it, origin, origin);
|
it = nodes.emplace_hint(it, origin, origin);
|
||||||
}
|
|
||||||
|
|
||||||
auto &node{it->second};
|
auto &node
|
||||||
if(node.err)
|
{
|
||||||
return;
|
it->second
|
||||||
|
};
|
||||||
|
|
||||||
if(!unit)
|
if(!unit)
|
||||||
unit = std::make_shared<struct unit>(event);
|
unit = std::make_shared<struct unit>(event);
|
||||||
|
@ -326,26 +333,34 @@ void
|
||||||
send_to_user(const m::event &event,
|
send_to_user(const m::event &event,
|
||||||
const m::user::id &user_id)
|
const m::user::id &user_id)
|
||||||
{
|
{
|
||||||
const string_view &remote
|
const string_view &origin
|
||||||
{
|
{
|
||||||
user_id.host()
|
user_id.host()
|
||||||
};
|
};
|
||||||
|
|
||||||
if(my_host(remote))
|
if(my_host(origin))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto it{nodes.lower_bound(remote)};
|
const auto remote
|
||||||
if(it == end(nodes) || it->first != remote)
|
|
||||||
{
|
{
|
||||||
if(server::errant(m::fed::matrix_service(remote)))
|
m::fed::matrix_service(origin)
|
||||||
|
};
|
||||||
|
|
||||||
|
if(server::errant(remote))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
it = nodes.emplace_hint(it, remote, remote);
|
auto it
|
||||||
}
|
{
|
||||||
|
nodes.lower_bound(origin)
|
||||||
|
};
|
||||||
|
|
||||||
auto &node{it->second};
|
if(it == end(nodes) || it->first != origin)
|
||||||
if(node.err)
|
it = nodes.emplace_hint(it, origin, origin);
|
||||||
return;
|
|
||||||
|
auto &node
|
||||||
|
{
|
||||||
|
it->second
|
||||||
|
};
|
||||||
|
|
||||||
auto unit
|
auto unit
|
||||||
{
|
{
|
||||||
|
@ -374,18 +389,26 @@ send_from_user(const m::event &event,
|
||||||
if(my_host(origin))
|
if(my_host(origin))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
auto it{nodes.lower_bound(origin)};
|
const auto remote
|
||||||
if(it == end(nodes) || it->first != origin)
|
|
||||||
{
|
{
|
||||||
if(server::errant(m::fed::matrix_service(origin)))
|
m::fed::matrix_service(origin)
|
||||||
|
};
|
||||||
|
|
||||||
|
if(server::errant(remote))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
|
auto it
|
||||||
|
{
|
||||||
|
nodes.lower_bound(origin)
|
||||||
|
};
|
||||||
|
|
||||||
|
if(it == end(nodes) || it->first != origin)
|
||||||
it = nodes.emplace_hint(it, origin, origin);
|
it = nodes.emplace_hint(it, origin, origin);
|
||||||
}
|
|
||||||
|
|
||||||
auto &node{it->second};
|
auto &node
|
||||||
if(node.err)
|
{
|
||||||
return true;
|
it->second
|
||||||
|
};
|
||||||
|
|
||||||
auto unit
|
auto unit
|
||||||
{
|
{
|
||||||
|
@ -480,7 +503,6 @@ catch(const std::exception &e)
|
||||||
"flush error to %s :%s", remote, e.what()
|
"flush error to %s :%s", remote, e.what()
|
||||||
};
|
};
|
||||||
|
|
||||||
err = true;
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -538,9 +560,6 @@ try
|
||||||
node.curtxn = nullptr;
|
node.curtxn = nullptr;
|
||||||
txns.erase(it);
|
txns.erase(it);
|
||||||
|
|
||||||
if(node.err)
|
|
||||||
return remove_node(node);
|
|
||||||
|
|
||||||
if(!ret)
|
if(!ret)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -553,8 +572,6 @@ catch(const std::exception &e)
|
||||||
m::log, "Federation sender :recv worker unhandled :%s",
|
m::log, "Federation sender :recv worker unhandled :%s",
|
||||||
e.what(),
|
e.what(),
|
||||||
};
|
};
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
|
@ -616,7 +633,6 @@ catch(const http::error &e)
|
||||||
e.what()
|
e.what()
|
||||||
};
|
};
|
||||||
|
|
||||||
node.err = true;
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
catch(const std::exception &e)
|
catch(const std::exception &e)
|
||||||
|
@ -629,7 +645,6 @@ catch(const std::exception &e)
|
||||||
e.what()
|
e.what()
|
||||||
};
|
};
|
||||||
|
|
||||||
node.err = true;
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -646,9 +661,6 @@ recv_timeouts()
|
||||||
{
|
{
|
||||||
auto &txn(*it);
|
auto &txn(*it);
|
||||||
assert(txn.node);
|
assert(txn.node);
|
||||||
if(txn.node->err)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if(txn.timeout + seconds(45) < now) //TODO: conf
|
if(txn.timeout + seconds(45) < now) //TODO: conf
|
||||||
recv_timeout(txn, *txn.node);
|
recv_timeout(txn, *txn.node);
|
||||||
}
|
}
|
||||||
|
@ -666,7 +678,6 @@ recv_timeout(txn &txn,
|
||||||
};
|
};
|
||||||
|
|
||||||
cancel(txn);
|
cancel(txn);
|
||||||
node.err = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|
Loading…
Reference in a new issue