mirror of
https://github.com/matrix-construct/construct
synced 2024-11-29 02:02:38 +01:00
modules/federation/sender: Remember how to use own synchronization primitives; fix origin call.
This commit is contained in:
parent
32f99e75d9
commit
80278462e3
1 changed files with 31 additions and 34 deletions
|
@ -24,7 +24,6 @@ ctx::dock recv_action;
|
||||||
|
|
||||||
static void send(const m::event &, const m::room::id &room_id);
|
static void send(const m::event &, const m::room::id &room_id);
|
||||||
static void send(const m::event &);
|
static void send(const m::event &);
|
||||||
static void send();
|
|
||||||
static void send_worker();
|
static void send_worker();
|
||||||
|
|
||||||
context
|
context
|
||||||
|
@ -56,8 +55,36 @@ void
|
||||||
send_worker()
|
send_worker()
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
while(1)
|
// In order to synchronize with the vm core, this context has to
|
||||||
send();
|
// maintain this shared_lock at all times. If this is unlocked we
|
||||||
|
// can miss an event being broadcast.
|
||||||
|
std::unique_lock<decltype(m::vm::accept)> lock
|
||||||
|
{
|
||||||
|
m::vm::accept
|
||||||
|
};
|
||||||
|
|
||||||
|
while(1) try
|
||||||
|
{
|
||||||
|
// reference to the event on the inserter's stack
|
||||||
|
const auto &event
|
||||||
|
{
|
||||||
|
m::vm::accept.wait(lock)
|
||||||
|
};
|
||||||
|
|
||||||
|
if(my(event))
|
||||||
|
send(event);
|
||||||
|
}
|
||||||
|
catch(const ircd::ctx::interrupted &e)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch(const std::exception &e)
|
||||||
|
{
|
||||||
|
log::error
|
||||||
|
{
|
||||||
|
"sender worker: %s", e.what()
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch(const ircd::ctx::interrupted &e)
|
catch(const ircd::ctx::interrupted &e)
|
||||||
{
|
{
|
||||||
|
@ -67,36 +94,6 @@ catch(const ircd::ctx::interrupted &e)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
|
||||||
send()
|
|
||||||
try
|
|
||||||
{
|
|
||||||
std::unique_lock<decltype(m::vm::accept)> lock
|
|
||||||
{
|
|
||||||
m::vm::accept
|
|
||||||
};
|
|
||||||
|
|
||||||
// reference to the event on the inserter's stack
|
|
||||||
const auto &event
|
|
||||||
{
|
|
||||||
m::vm::accept.wait(lock)
|
|
||||||
};
|
|
||||||
|
|
||||||
if(my(event))
|
|
||||||
send(event);
|
|
||||||
}
|
|
||||||
catch(const ircd::ctx::interrupted &e)
|
|
||||||
{
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
catch(const std::exception &e)
|
|
||||||
{
|
|
||||||
log::error
|
|
||||||
{
|
|
||||||
"sender worker: %s", e.what()
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
send(const m::event &event)
|
send(const m::event &event)
|
||||||
{
|
{
|
||||||
|
@ -186,7 +183,7 @@ try
|
||||||
}
|
}
|
||||||
|
|
||||||
m::v1::send::opts opts;
|
m::v1::send::opts opts;
|
||||||
opts.remote = id.host();
|
opts.remote = origin();
|
||||||
opts.sopts = &sopts;
|
opts.sopts = &sopts;
|
||||||
std::string content
|
std::string content
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in a new issue