mirror of
https://github.com/matrix-construct/construct
synced 2024-06-08 13:08:56 +02:00
modules/m_bridge: Hook configuration updates to start/stop workers automatically.
This commit is contained in:
parent
62001d9e7e
commit
60bff84004
|
@ -21,7 +21,11 @@ namespace ircd::m::bridge
|
|||
static event::idx worker_handle(const config &, const net::hostport &, const events::range &, window_buffer);
|
||||
static void worker_loop(const config &, const rfc3986::uri &, const mutable_buffer &);
|
||||
static void worker(std::string event, std::string event_id);
|
||||
static bool start(const event &, const config &);
|
||||
static bool stop(const string_view &id);
|
||||
static void handle_event(const event &, vm::eval &);
|
||||
static void handle_redact(const event &, vm::eval &);
|
||||
static void handle_config(const event &, vm::eval &);
|
||||
static void fini();
|
||||
static void init();
|
||||
|
||||
|
@ -30,7 +34,9 @@ namespace ircd::m::bridge
|
|||
extern conf::item<seconds> txn_timeout;
|
||||
extern conf::item<size_t> txn_bufsize;
|
||||
extern ctx::dock worker_dock;
|
||||
extern std::vector<context> worker_context;
|
||||
extern std::map<std::string, context> workers;
|
||||
extern hookfn<vm::eval &> config_hook;
|
||||
extern hookfn<vm::eval &> redact_hook;
|
||||
extern hookfn<vm::eval &> notify_hook;
|
||||
extern ircd::run::changed quit_handler;
|
||||
}
|
||||
|
@ -74,8 +80,30 @@ ircd::m::bridge::txn_bufsize
|
|||
decltype(ircd::m::bridge::worker_dock)
|
||||
ircd::m::bridge::worker_dock;
|
||||
|
||||
decltype(ircd::m::bridge::worker_context)
|
||||
ircd::m::bridge::worker_context;
|
||||
decltype(ircd::m::bridge::workers)
|
||||
ircd::m::bridge::workers;
|
||||
|
||||
decltype(ircd::m::bridge::config_hook)
|
||||
ircd::m::bridge::config_hook
|
||||
{
|
||||
handle_config,
|
||||
{
|
||||
{ "_site", "vm.effect" },
|
||||
{ "type", "ircd.bridge" },
|
||||
{ "origin", my_host() },
|
||||
}
|
||||
};
|
||||
|
||||
decltype(ircd::m::bridge::redact_hook)
|
||||
ircd::m::bridge::redact_hook
|
||||
{
|
||||
handle_redact,
|
||||
{
|
||||
{ "_site", "vm.effect" },
|
||||
{ "type", "m.room.redaction" },
|
||||
{ "origin", my_host() },
|
||||
}
|
||||
};
|
||||
|
||||
decltype(ircd::m::bridge::notify_hook)
|
||||
ircd::m::bridge::notify_hook
|
||||
|
@ -91,11 +119,201 @@ ircd::m::bridge::quit_handler
|
|||
{
|
||||
run::level::QUIT, []
|
||||
{
|
||||
for(auto &worker : worker_context)
|
||||
for(auto &[name, worker] : workers)
|
||||
worker.terminate();
|
||||
}
|
||||
};
|
||||
|
||||
void
|
||||
ircd::m::bridge::init()
|
||||
{
|
||||
config::for_each([]
|
||||
(const event::idx &event_idx, const event &event, const config &config)
|
||||
{
|
||||
log::logf
|
||||
{
|
||||
log, log::level::DEBUG,
|
||||
"Found configuration for '%s' in %s by %s",
|
||||
json::get<"id"_>(config),
|
||||
json::get<"room_id"_>(event),
|
||||
string_view{event.event_id},
|
||||
};
|
||||
|
||||
if(!enable)
|
||||
return true;
|
||||
|
||||
start(event, config);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::bridge::fini()
|
||||
{
|
||||
for(auto &[name, worker] : workers)
|
||||
worker.terminate();
|
||||
|
||||
if(!workers.empty())
|
||||
log::debug
|
||||
{
|
||||
log, "Waiting for %zu bridge workers...",
|
||||
workers.size(),
|
||||
};
|
||||
|
||||
for(auto &[name, worker] : workers)
|
||||
worker.join();
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::bridge::handle_config(const m::event &event,
|
||||
vm::eval &eval)
|
||||
try
|
||||
{
|
||||
if(!enable)
|
||||
return;
|
||||
|
||||
const config config
|
||||
{
|
||||
at<"content"_>(event)
|
||||
};
|
||||
|
||||
// the state_key has to match the id for now
|
||||
if(json::get<"id"_>(config) != json::get<"state_key"_>(event))
|
||||
return;
|
||||
|
||||
const m::room::id room_id
|
||||
{
|
||||
json::get<"room_id"_>(event)
|
||||
};
|
||||
|
||||
// Bridge's id and the room localpart have to match or this is a bogus
|
||||
if(json::get<"id"_>(config) != room_id.localname())
|
||||
return;
|
||||
|
||||
const m::user::id sender
|
||||
{
|
||||
json::get<"sender"_>(event)
|
||||
};
|
||||
|
||||
if(!is_oper(sender))
|
||||
return;
|
||||
|
||||
log::logf
|
||||
{
|
||||
log, log::level::DEBUG,
|
||||
"Updating configuration for '%s' in %s by %s",
|
||||
json::get<"id"_>(config),
|
||||
json::get<"room_id"_>(event),
|
||||
string_view{event.event_id},
|
||||
};
|
||||
|
||||
const bool stopped
|
||||
{
|
||||
stop(at<"id"_>(config))
|
||||
};
|
||||
|
||||
const bool started
|
||||
{
|
||||
start(event, config)
|
||||
};
|
||||
|
||||
log::info
|
||||
{
|
||||
log, "Bridge '%s' [stop:%b start:%b] with updated configuration %s",
|
||||
at<"id"_>(config),
|
||||
stopped,
|
||||
started,
|
||||
string_view{event.event_id},
|
||||
};
|
||||
}
|
||||
catch(const ctx::interrupted &)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
log::error
|
||||
{
|
||||
log, "Failed to handle bridge config update in %s :%s",
|
||||
string_view{event.event_id},
|
||||
e.what(),
|
||||
};
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::bridge::handle_redact(const m::event &event,
|
||||
vm::eval &eval)
|
||||
try
|
||||
{
|
||||
if(!enable)
|
||||
return;
|
||||
|
||||
const m::room::id room_id
|
||||
{
|
||||
at<"room_id"_>(event)
|
||||
};
|
||||
|
||||
if(!workers.count(room_id.localname()))
|
||||
return;
|
||||
|
||||
const m::user::id sender
|
||||
{
|
||||
at<"sender"_>(event)
|
||||
};
|
||||
|
||||
if(!is_oper(sender))
|
||||
return;
|
||||
|
||||
const m::event::id event_id
|
||||
{
|
||||
at<"redacts"_>(event)
|
||||
};
|
||||
|
||||
const m::event::fetch redacted
|
||||
{
|
||||
event_id
|
||||
};
|
||||
|
||||
const config config
|
||||
{
|
||||
at<"content"_>(redacted)
|
||||
};
|
||||
|
||||
// the state_key has to match the id for now
|
||||
if(json::get<"id"_>(config) != json::get<"state_key"_>(redacted))
|
||||
return;
|
||||
|
||||
// Bridge's id and the room localpart have to match or this is a bogus
|
||||
if(json::get<"id"_>(config) != room_id.localname())
|
||||
return;
|
||||
|
||||
const bool stopped
|
||||
{
|
||||
stop(at<"id"_>(config))
|
||||
};
|
||||
|
||||
log::info
|
||||
{
|
||||
log, "Bridge worker '%s' terminated by redaction %s of config %s",
|
||||
at<"id"_>(config),
|
||||
string_view{event.event_id},
|
||||
string_view{event_id},
|
||||
};
|
||||
}
|
||||
catch(const ctx::interrupted &)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
log::error
|
||||
{
|
||||
log, "Failed to handle bridge config redact in %s :%s",
|
||||
string_view{event.event_id},
|
||||
e.what(),
|
||||
};
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::bridge::handle_event(const m::event &event,
|
||||
vm::eval &eval)
|
||||
|
@ -123,50 +341,52 @@ catch(const std::exception &e)
|
|||
};
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::bridge::init()
|
||||
bool
|
||||
ircd::m::bridge::stop(const string_view &id)
|
||||
{
|
||||
if(!enable)
|
||||
return;
|
||||
|
||||
config::for_each([]
|
||||
(const event::idx &event_idx, const event &event, const config &config)
|
||||
const auto it
|
||||
{
|
||||
log::debug
|
||||
{
|
||||
log, "Found configuration for '%s' in %s by %s",
|
||||
json::get<"id"_>(config),
|
||||
json::get<"room_id"_>(event),
|
||||
string_view{event.event_id},
|
||||
};
|
||||
workers.find(id)
|
||||
};
|
||||
|
||||
worker_context.emplace_back(context
|
||||
{
|
||||
"m.bridge",
|
||||
512_KiB,
|
||||
context::POST,
|
||||
std::bind(&bridge::worker, std::string(event.source), std::string(event.event_id)),
|
||||
});
|
||||
if(it == workers.end())
|
||||
return false;
|
||||
|
||||
return true;
|
||||
});
|
||||
auto &context
|
||||
{
|
||||
it->second
|
||||
};
|
||||
|
||||
context.terminate();
|
||||
context.join();
|
||||
workers.erase(it);
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::bridge::fini()
|
||||
bool
|
||||
ircd::m::bridge::start(const m::event &event,
|
||||
const config &config)
|
||||
{
|
||||
for(auto &worker : worker_context)
|
||||
worker.terminate();
|
||||
auto handle
|
||||
{
|
||||
std::bind(&bridge::worker, std::string(event.source), std::string(event.event_id))
|
||||
};
|
||||
|
||||
if(!worker_context.empty())
|
||||
log::debug
|
||||
{
|
||||
log, "Waiting for %zu bridge workers...",
|
||||
worker_context.size(),
|
||||
};
|
||||
const auto pit
|
||||
{
|
||||
workers.emplace
|
||||
(
|
||||
at<"id"_>(config), context
|
||||
{
|
||||
"m.bridge",
|
||||
512_KiB,
|
||||
context::POST,
|
||||
std::move(handle),
|
||||
}
|
||||
)
|
||||
};
|
||||
|
||||
for(auto &worker : worker_context)
|
||||
worker.join();
|
||||
return pit.second;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
Loading…
Reference in a new issue