0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-26 07:23:53 +01:00

modules/m_pusher: Implement Push Gateway 3.1 POST Server Behavior. (closes #63)

This commit is contained in:
Jason Volk 2020-10-24 23:32:33 -07:00
parent 3cbf610cd0
commit 891cba3fb9
3 changed files with 527 additions and 51 deletions

View file

@ -18,6 +18,7 @@ namespace ircd::m::push
struct rules; struct rules;
struct pusher; struct pusher;
struct match; struct match;
struct request;
IRCD_M_EXCEPTION(m::error, error, http::INTERNAL_SERVER_ERROR) IRCD_M_EXCEPTION(m::error, error, http::INTERNAL_SERVER_ERROR)
IRCD_M_EXCEPTION(error, NOT_A_RULE, http::BAD_REQUEST) IRCD_M_EXCEPTION(error, NOT_A_RULE, http::BAD_REQUEST)
@ -34,6 +35,24 @@ namespace ircd::m::push
extern log::log log; extern log::log log;
} }
struct ircd::m::push::request
:instance_list<ircd::m::push::request>
{
static conf::item<seconds> timeout;
static ctx::mutex mutex;
static ctx::dock dock;
static uint64_t id_ctr;
uint64_t id {++id_ctr};
event::idx event_idx {0};
rfc3986::uri url;
json::object content;
server::request req;
http::code code {0};
json::object response;
char buf[15_KiB];
};
struct ircd::m::push::match struct ircd::m::push::match
:boolean :boolean
{ {

View file

@ -26,6 +26,38 @@ ircd::m::push::rule::type_prefix
"ircd.push.rule" "ircd.push.rule"
}; };
//
// request
//
template<>
decltype(ircd::util::instance_list<ircd::m::push::request>::allocator)
ircd::util::instance_list<ircd::m::push::request>::allocator
{};
template<>
decltype(ircd::util::instance_list<ircd::m::push::request>::list)
ircd::util::instance_list<ircd::m::push::request>::list
{
allocator
};
decltype(ircd::m::push::request::timeout)
ircd::m::push::request::timeout
{
{ "name", "ircd.m.push.request.timeout" },
{ "default", 8L },
};
decltype(ircd::m::push::request::mutex)
ircd::m::push::request::mutex;
decltype(ircd::m::push::request::dock)
ircd::m::push::request::dock;
decltype(ircd::m::push::request::id_ctr)
ircd::m::push::request::id_ctr;
// //
// match // match
// //

View file

@ -10,13 +10,19 @@
namespace ircd::m::push namespace ircd::m::push
{ {
static void handle_event(const m::event &, vm::eval &); static void make_content_devices(json::stack::array &, const pusher &, const event::idx &);
static void worker_handle(); static void make_content_counts(json::stack::object &, const user &);
static void make_content(json::stack::object &, const user &, const room &, const event &, const pusher &, const event::idx &);
static bool notify_email(const user &, const room &, const event::fetch &, const pusher &, const event::idx &pusher_idx);
static bool notify_http(const user &, const room &, const event::fetch &, const pusher &, const event::idx &pusher_idx);
static bool notify(const user &, const room &, const event::fetch &, const pusher &, const event::idx &pusher_idx);
static void handle_event(const event &, vm::eval &);
static bool complete(request &);
static void worker(); static void worker();
static void fini();
static std::deque<ctx::future<http::code>> completion; static server::request request_skip;
extern hookfn<vm::eval &> hook_event; extern hookfn<vm::eval &> hook_event;
extern ctx::dock worker_dock;
extern context worker_context; extern context worker_context;
} }
@ -24,12 +30,17 @@ ircd::mapi::header
IRCD_MODULE IRCD_MODULE
{ {
"Matrix 13.13 :Push Notifications Pusher", "Matrix 13.13 :Push Notifications Pusher",
nullptr,
ircd::m::push::fini,
}; };
decltype(ircd::m::push::worker_context) decltype(ircd::m::push::worker_context)
ircd::m::push::worker_context ircd::m::push::worker_context
{ {
"m.pusher", 256_KiB, worker, "m.pusher",
256_KiB,
context::WAIT_JOIN,
worker,
}; };
decltype(ircd::m::push::hook_event) decltype(ircd::m::push::hook_event)
@ -41,19 +52,81 @@ ircd::m::push::hook_event
} }
}; };
void
ircd::m::push::fini()
{
for(auto *const &req : request::list)
server::cancel(req->req);
worker_context.terminate();
request::dock.notify_all();
}
void void
ircd::m::push::worker() ircd::m::push::worker()
try try
{ {
run::barrier<ctx::interrupted>{}; do // Wait for run::level RUN before entering work loop.
run::barrier<ctx::interrupted>{};
const ctx::uninterruptible ui; do
{ {
worker_dock.wait([] static const auto valid{[](const auto *const &req)
{ {
return !completion.empty(); return !!req->req;
}};
// Wait for at least one active request in the list
request::dock.wait([]
{
if(std::any_of(begin(request::list), end(request::list), valid))
return true;
if(ctx::termination(worker_context))
return true;
return false;
}); });
} if(request::list.empty() && ctx::termination(worker_context))
while(run::level == run::level::RUN); break;
auto next
{
ctx::when_any(begin(request::list), end(request::list), []
(auto &it) -> server::request &
{
return !(*it)->req? request_skip: (*it)->req;
})
};
// Wait for the next activity
if(!next.wait(milliseconds(250), std::nothrow))
continue;
const std::lock_guard lock
{
request::mutex
};
const auto it
{
next.get()
};
assert(it != end(request::list));
if(unlikely(it == end(request::list)))
continue;
std::unique_ptr<request> req
{
*it
};
// Handle completion
if(!complete(*req))
req.release();
}
while(1);
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
@ -64,19 +137,47 @@ catch(const std::exception &e)
}; };
} }
void bool
ircd::m::push::worker_handle() ircd::m::push::complete(request &req)
try try
{ {
req.code = req.req.get();
req.response = json::object
{
req.req.in.content
};
const auto level
{
http::category(req.code) == http::category::SUCCESS?
log::level::DEBUG:
log::level::DERROR
};
log::logf
{
log, level,
"Request id:%lu [%u] notified %s `%s'",
req.id,
uint(req.code),
req.url.remote,
req.url.path,
};
return true;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log::error log::error
{ {
log, "Pusher worker :%s", log, "Request id:%lu [---] notifying %s `%s' :%s",
req.id,
req.url.remote,
req.url.path,
e.what(), e.what(),
}; };
return true;
} }
void void
@ -97,7 +198,7 @@ try
if(!startswith(type, "ircd.push.note")) if(!startswith(type, "ircd.push.note"))
return; return;
const json::string user_id const json::string &user_id
{ {
json::get<"content"_>(event).get("user_id") json::get<"content"_>(event).get("user_id")
}; };
@ -110,53 +211,47 @@ try
user_id user_id
}; };
// The event has to be in the user's room and not some other room const m::room::id user_room_id
if(unlikely(!m::user::room::is(at<"room_id"_>(event), user)))
return;
// Tweak type; room apropos
const auto &[only, room_id]
{ {
split(lstrip(type, "ircd.push.note."), '!') at<"room_id"_>(event)
}; };
// The event has to be in the user's room and not some other room
if(unlikely(!m::user::room::is(user_room_id, user)))
return;
const auto subject
{
m::user::notifications::unmake_type(type)
};
const m::room room
{
subject.room_id
};
const event::idx event_idx
{
json::get<"content"_>(event).at<event::idx>("event_idx")
};
// Note the subject event data is not fetched yet. If the user
// has any pushers it's fetched on the first and reused subsequently.
m::event::fetch push_event;
const m::user::pushers pushers const m::user::pushers pushers
{ {
user user
}; };
pushers.for_each([&] pushers.for_each([&user, &room, &event_idx, &push_event]
(const event::idx &pusher_idx, const string_view &pushkey, const push::pusher &pusher) (const event::idx &pusher_idx, const string_view &pushkey, const push::pusher &pusher)
{ {
const json::object &data // If the subject event data hasn't been fetched yet, do it here.
{ if(!push_event.valid)
json::get<"data"_>(pusher) m::seek(push_event, event_idx);
};
const json::string &url assert(push_event.valid);
{ return notify(user, room, push_event, pusher, pusher_idx);
data["url"]
};
if(!rfc3986::valid(std::nothrow, rfc3986::parser::uri, url))
{
log::derror
{
log, "Pusher in idx:%lu data.url not a valid uri '%s'",
pusher_idx,
url,
};
return true;
}
log::debug
{
log, "Notifying `%s' ",
url,
};
return true;
}); });
} }
catch(const ctx::interrupted &) catch(const ctx::interrupted &)
@ -167,8 +262,338 @@ catch(const std::exception &e)
{ {
log::error log::error
{ {
log, "Pusher pushing by %s :%s", log, "Pushing %s :%s",
string_view{event.event_id}, string_view{event.event_id},
e.what(), e.what(),
}; };
} }
bool
ircd::m::push::notify(const m::user &user,
const m::room &room,
const m::event::fetch &event,
const pusher &pusher,
const m::event::idx &pusher_idx)
try
{
const json::string &kind
{
json::get<"kind"_>(pusher)
};
if(kind == "http")
return notify_http(user, room, event, pusher, pusher_idx);
if(kind == "email")
return notify_email(user, room, event, pusher, pusher_idx);
return true;
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Notify to pusher:%lu by %s in %s for %s :%s",
pusher_idx,
string_view{user.user_id},
string_view{room.room_id},
string_view{event.event_id},
e.what(),
};
return true;
}
bool
ircd::m::push::notify_http(const m::user &user,
const m::room &room,
const m::event::fetch &event,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
const std::lock_guard lock
{
request::mutex
};
auto req
{
std::make_unique<request>()
};
window_buffer wb{req->buf};
mutable_buffer buf{req->buf};
req->event_idx = event.event_idx;
// Target URL copied to request and verified on assignment
wb([&req, &pusher](const mutable_buffer &buf)
{
const json::object data
{
json::get<"data"_>(pusher)
};
const string_view url
{
buffer::data(buf), copy(buf, json::string(data["url"]))
};
req->url = url;
return url;
});
consume(buf, wb.consumed());
wb = window_buffer(wb.remains());
// Compose request content
wb([&](const mutable_buffer &buf)
{
json::stack out{buf};
{
json::stack::object top{out};
make_content(top, user, room, event, pusher, pusher_idx);
}
req->content = out.completed();
return string_view{req->content};
});
consume(buf, wb.consumed());
wb = window_buffer(wb.remains());
const net::hostport target
{
req->url.remote, "https"
};
// Compose request head
http::request
{
wb,
host(target),
"POST",
req->url.path,
size(string_view(req->content)),
"application/json; charset=utf-8"_sv,
};
// Outputs
server::out out;
out.head = wb.completed();
consume(buf, wb.consumed());
out.content = req->content;
// Inputs to remaining buffer
server::in in;
in.head = buf;
in.content = in.head;
// Start request
static server::request::opts sopts;
sopts.http_exceptions = false;
req->req = server::request
{
target, std::move(out), std::move(in), &sopts
};
log::debug
{
log, "Request id:%lu to pusher[%s...] by %s in %s for %s",
req->id,
trunc(json::get<"pushkey"_>(pusher), 16),
string_view{user.user_id},
string_view{room.room_id},
string_view{event.event_id},
};
request::dock.notify();
req.release();
return true;
}
bool
ircd::m::push::notify_email(const m::user &user,
const m::room &room,
const m::event::fetch &event,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
return true;
}
void
ircd::m::push::make_content(json::stack::object &top,
const m::user &user,
const m::room &room,
const m::event &event,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
const m::user sender
{
json::get<"sender"_>(event)
};
const bool event_id_only
{
json::string(json::get<"data"_>(pusher).get("format")) == "event_id_only"
};
json::stack::object note
{
top, "notification"
};
json::stack::member
{
note, "event_id", event.event_id
};
json::stack::member
{
note, "room_id", room.room_id
};
json::stack::member
{
note, "sender", json::get<"sender"_>(event)
};
json::stack::member
{
note, "type", json::get<"type"_>(event)
};
if(!event_id_only && (false)) // buffer size?
json::stack::member
{
note, "content", json::get<"content"_>(event)
};
json::stack::member
{
note, "prio", string_view
{
"high" //TODO: xxx
}
};
// Devices
{
json::stack::array devices
{
note, "devices"
};
make_content_devices(devices, pusher, pusher_idx);
}
// Counts
{
json::stack::object counts
{
note, "counts"
};
make_content_counts(counts, user);
}
char room_name_buf[256];
json::stack::member
{
note, "room_name", m::display_name(room_name_buf, room)
};
char sender_name_buf[256];
json::stack::member
{
note, "sender_display_name", m::user::profile(sender).get(sender_name_buf, "displayname")
};
json::stack::member
{
note, "user_is_target", json::value{bool
{
json::get<"type"_>(event) == "m.room.member" &&
json::get<"state_key"_>(event) == user.user_id
}}
};
}
void
ircd::m::push::make_content_devices(json::stack::array &devices,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
json::stack::object device
{
devices
};
json::stack::member
{
device, "app_id", at<"app_id"_>(pusher)
};
json::stack::member
{
device, "pushkey", at<"pushkey"_>(pusher)
};
json::stack::member
{
device, "data", json::get<"data"_>(pusher)
};
time_t pushkey_ts;
if(m::get(pusher_idx, "origin_server_ts", pushkey_ts))
json::stack::member
{
device, "pushkey_ts", json::value
{
pushkey_ts
}
};
return; //TODO: XXX get tweaks from matched push rule
json::stack::member
{
device, "tweaks", json::empty_object
};
}
void
ircd::m::push::make_content_counts(json::stack::object &counts,
const m::user &user)
{
long total_unread
{
0L//TODO: XXX
};
if(total_unread)
json::stack::member
{
counts, "unread", json::value
{
total_unread
}
};
long missed_calls
{
0L //TODO: XXX
};
if(missed_calls)
json::stack::member
{
counts, "missed_calls", json::value
{
missed_calls
}
};
}