0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-09-27 11:18:51 +02:00

modules/federation/sender: Multiple workers; use ctx::queue; fix legacy externs.

This commit is contained in:
Jason Volk 2023-02-23 16:56:15 -08:00
parent 137824eb77
commit 73c5a4f36b

View file

@ -80,16 +80,16 @@ struct node
{}
};
std::list<txn> txns;
std::map<std::string, node, std::less<>> nodes;
static std::list<txn> txns;
static std::map<std::string, node, std::less<>> nodes;
void remove_node(const node &);
static void remove_node(const node &);
static void recv_timeout(txn &, node &);
static void recv_timeouts();
static bool recv_handle(txn &, node &);
static void recv();
static void recv_worker();
ctx::dock recv_action;
static ctx::dock recv_action;
static void send_from_user(const m::event &, const m::user::id &user_id);
static void send_to_user(const m::event &, const m::user::id &user_id);
@ -100,38 +100,40 @@ static void send_worker();
static bool should_notify(const m::event &, const m::vm::eval &);
static void handle_notify(const m::event &, m::vm::eval &);
context
sender
static context
receiver[1]
{
"m.fedsnd.S", 1_MiB, &send_worker, context::POST,
{ "m.fedsnd.R", 1_MiB, &recv_worker, context::POST, },
};
context
receiver
static context
sender[2]
{
"m.fedsnd.R", 1_MiB, &recv_worker, context::POST,
{ "m.fedsnd.S", 1_MiB, &send_worker, context::POST, },
{ "m.fedsnd.S", 1_MiB, &send_worker, context::POST, },
};
mapi::header
IRCD_MODULE
{
"federation sender", nullptr,
[]
"federation sender", nullptr, []
{
for(auto &sender : sender)
sender.terminate();
receiver.terminate();
for(auto &sender : sender)
sender.join();
for(auto &receiver : receiver)
receiver.terminate(),
receiver.join();
}
};
std::deque<std::pair<std::string, m::event::id::buf>>
static ctx::queue<std::pair<std::string, m::event::id::buf>>
notified_queue;
ctx::dock
notified_dock;
m::hookfn<m::vm::eval &>
static m::hookfn<m::vm::eval &>
notified
{
handle_notify,
@ -155,8 +157,7 @@ try
m::event::id::buf{}
};
notified_queue.emplace_back(json::strung{event}, event_id);
notified_dock.notify_all();
notified_queue.emplace(json::strung{event}, event_id);
}
catch(const ctx::interrupted &)
{
@ -194,20 +195,9 @@ send_worker()
{
while(1) try
{
notified_dock.wait([]() noexcept
const auto [event_, event_id]
{
return !notified_queue.empty();
});
const unwind pop{[]
{
assert(!notified_queue.empty());
notified_queue.pop_front();
}};
const auto &[event_, event_id]
{
notified_queue.front()
notified_queue.pop()
};
const m::event event