0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-25 05:49:58 +01:00
construct/modules/m_pusher.cc

694 lines
13 KiB
C++

// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
namespace ircd::m::push
{
static long count_missed_calls(const user &, const room &, const event::idx &);
static long count_missed_calls(const user &, const event::idx &);
static long count_unread(const user &, const room &, const event::idx &);
static long count_unread(const user &, const event::idx &);
static void make_content_devices(json::stack::array &, const pusher &, const event::idx &);
static void make_content_counts(json::stack::object &, const user &, const event::idx &);
static void make_content(json::stack::object &, const user &, const room &, const event &, const event::idx &, const pusher &, const event::idx &);
static bool notify_email(const user &, const room &, const event::fetch &, const pusher &, const event::idx &pusher_idx);
static bool notify_http(const user &, const room &, const event::fetch &, const pusher &, const event::idx &pusher_idx);
static bool notify(const user &, const room &, const event::fetch &, const pusher &, const event::idx &pusher_idx);
static void handle_event(const event &, vm::eval &);
static bool complete(request &);
static void worker();
static void fini();
static server::request request_skip;
extern hookfn<vm::eval &> hook_event;
extern context worker_context;
}
ircd::mapi::header
IRCD_MODULE
{
"Matrix 13.13 :Push Notifications Pusher",
nullptr,
ircd::m::push::fini,
};
decltype(ircd::m::push::worker_context)
ircd::m::push::worker_context
{
"m.pusher",
256_KiB,
context::WAIT_JOIN,
worker,
};
decltype(ircd::m::push::hook_event)
ircd::m::push::hook_event
{
handle_event,
{
{ "_site", "vm.effect" },
}
};
void
ircd::m::push::fini()
{
for(auto *const &req : request::list)
server::cancel(req->req);
worker_context.terminate();
request::dock.notify_all();
}
void
ircd::m::push::worker()
try
{
// Wait for run::level RUN before entering work loop.
run::barrier<ctx::interrupted>{};
const ctx::uninterruptible ui; do
{
static const auto valid{[](const auto *const &req)
{
return !!req->req;
}};
// Wait for at least one active request in the list
request::dock.wait([]
{
if(std::any_of(begin(request::list), end(request::list), valid))
return true;
if(ctx::termination(worker_context))
return true;
return false;
});
if(request::list.empty() && ctx::termination(worker_context))
break;
auto next
{
ctx::when_any(begin(request::list), end(request::list), []
(auto &it) -> server::request &
{
return !(*it)->req? request_skip: (*it)->req;
})
};
// Wait for the next activity
if(!next.wait(milliseconds(250), std::nothrow))
continue;
const std::lock_guard lock
{
request::mutex
};
const auto it
{
next.get()
};
assert(it != end(request::list));
if(unlikely(it == end(request::list)))
continue;
std::unique_ptr<request> req
{
*it
};
// Handle completion
if(!complete(*req))
req.release();
}
while(1);
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::critical
{
log, "Worker unhandled :%s",
e.what(),
};
}
bool
ircd::m::push::complete(request &req)
try
{
req.code = req.req.get();
req.response = json::object
{
req.req.in.content
};
const auto level
{
http::category(req.code) == http::category::SUCCESS?
log::level::DEBUG:
log::level::DERROR
};
log::logf
{
log, level,
"Request id:%lu [%u] notified %s `%s'",
req.id,
uint(req.code),
req.url.remote,
req.url.path,
};
return true;
}
catch(const std::exception &e)
{
log::error
{
log, "Request id:%lu [---] notifying %s `%s' :%s",
req.id,
req.url.remote,
req.url.path,
e.what(),
};
return true;
}
void
ircd::m::push::handle_event(const m::event &event,
vm::eval &eval)
try
{
// Pushing disabled by configuration
if(!request::enable)
return;
// All pusher notifications are generated from internal rooms only
if(!eval.room_internal)
return;
const auto &type
{
json::get<"type"_>(event)
};
// Filter out all push notification types by prefix
if(!startswith(type, "ircd.push.note"))
return;
const json::string &user_id
{
json::get<"content"_>(event).get("user_id")
};
if(unlikely(!user_id))
return;
const m::user user
{
user_id
};
const m::room::id user_room_id
{
at<"room_id"_>(event)
};
// The event has to be in the user's room and not some other room
if(unlikely(!m::user::room::is(user_room_id, user)))
return;
const auto subject
{
m::user::notifications::unmake_type(type)
};
const m::room room
{
subject.room_id
};
const event::idx event_idx
{
json::get<"content"_>(event).at<event::idx>("event_idx")
};
// Note the subject event data is not fetched yet. If the user
// has any pushers it's fetched on the first and reused subsequently.
m::event::fetch push_event;
const m::user::pushers pushers
{
user
};
pushers.for_each([&user, &room, &event_idx, &push_event]
(const event::idx &pusher_idx, const string_view &pushkey, const push::pusher &pusher)
{
// If the subject event data hasn't been fetched yet, do it here.
if(!push_event.valid)
m::seek(push_event, event_idx);
assert(push_event.valid);
return notify(user, room, push_event, pusher, pusher_idx);
});
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Pushing %s :%s",
string_view{event.event_id},
e.what(),
};
}
bool
ircd::m::push::notify(const m::user &user,
const m::room &room,
const m::event::fetch &event,
const pusher &pusher,
const m::event::idx &pusher_idx)
try
{
const json::string &kind
{
json::get<"kind"_>(pusher)
};
if(kind == "http")
return notify_http(user, room, event, pusher, pusher_idx);
if(kind == "email")
return notify_email(user, room, event, pusher, pusher_idx);
return true;
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Notify to pusher:%lu by %s in %s for %s :%s",
pusher_idx,
string_view{user.user_id},
string_view{room.room_id},
string_view{event.event_id},
e.what(),
};
return true;
}
bool
ircd::m::push::notify_http(const m::user &user,
const m::room &room,
const m::event::fetch &event,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
const std::lock_guard lock
{
request::mutex
};
auto req
{
std::make_unique<request>()
};
window_buffer window{req->buf};
mutable_buffer buf{req->buf};
req->event_idx = event.event_idx;
// Target URL copied to request and verified on assignment
window([&req, &pusher](const mutable_buffer &buf)
{
const json::object data
{
json::get<"data"_>(pusher)
};
const string_view url
{
buffer::data(buf), copy(buf, json::string(data["url"]))
};
req->url = url;
return url;
});
consume(buf, window.consumed());
window = window_buffer(window.remains());
// Compose request content
window([&](const mutable_buffer &buf)
{
json::stack out{buf};
{
json::stack::object top{out};
make_content(top, user, room, event, event.event_idx, pusher, pusher_idx);
}
req->content = out.completed();
return string_view{req->content};
});
consume(buf, window.consumed());
window = window_buffer(window.remains());
const net::hostport target
{
req->url
};
// Compose request head
http::request
{
window,
host(target),
"POST",
req->url.path,
size(string_view(req->content)),
"application/json; charset=utf-8"_sv,
};
// Outputs
server::out out;
out.head = window.completed();
consume(buf, window.consumed());
out.content = req->content;
// Inputs to remaining buffer
server::in in;
in.head = buf;
in.content = in.head;
// Start request
static server::request::opts sopts;
sopts.http_exceptions = false;
req->req = server::request
{
target, std::move(out), std::move(in), &sopts
};
log::debug
{
log, "Request id:%lu to pusher[%s...] by %s in %s for %s",
req->id,
trunc(json::get<"pushkey"_>(pusher), 16),
string_view{user.user_id},
string_view{room.room_id},
string_view{event.event_id},
};
request::dock.notify();
req.release();
return true;
}
bool
ircd::m::push::notify_email(const m::user &user,
const m::room &room,
const m::event::fetch &event,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
return true;
}
void
ircd::m::push::make_content(json::stack::object &top,
const m::user &user,
const m::room &room,
const m::event &event,
const event::idx &event_idx,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
const m::user sender
{
json::get<"sender"_>(event)
};
const bool event_id_only
{
json::string(json::get<"data"_>(pusher).get("format")) == "event_id_only"
};
json::stack::object note
{
top, "notification"
};
json::stack::member
{
note, "event_id", event.event_id
};
json::stack::member
{
note, "room_id", room.room_id
};
json::stack::member
{
note, "sender", json::get<"sender"_>(event)
};
json::stack::member
{
note, "type", json::get<"type"_>(event)
};
if(!event_id_only && (false)) // buffer size?
json::stack::member
{
note, "content", json::get<"content"_>(event)
};
json::stack::member
{
note, "prio", string_view
{
"high" //TODO: xxx
}
};
// Devices
{
json::stack::array devices
{
note, "devices"
};
make_content_devices(devices, pusher, pusher_idx);
}
// Counts
{
json::stack::object counts
{
note, "counts"
};
make_content_counts(counts, user, event_idx);
}
char room_name_buf[256];
const auto room_name
{
m::display_name(room_name_buf, room)
};
if(room_name)
json::stack::member
{
note, "room_name", room_name
};
char sender_name_buf[256];
const auto sender_display_name
{
user::profile(sender).get(sender_name_buf, "displayname")
};
if(sender_display_name)
json::stack::member
{
note, "sender_display_name", sender_display_name
};
json::stack::member
{
note, "user_is_target", json::value{bool
{
json::get<"type"_>(event) == "m.room.member" &&
json::get<"state_key"_>(event) == user.user_id
}}
};
}
void
ircd::m::push::make_content_devices(json::stack::array &devices,
const pusher &pusher,
const m::event::idx &pusher_idx)
{
json::stack::object device
{
devices
};
json::stack::member
{
device, "app_id", at<"app_id"_>(pusher)
};
json::stack::member
{
device, "pushkey", at<"pushkey"_>(pusher)
};
json::stack::member
{
device, "data", json::get<"data"_>(pusher)
};
time_t pushkey_ts;
if(m::get(pusher_idx, "origin_server_ts", pushkey_ts))
json::stack::member
{
device, "pushkey_ts", json::value
{
pushkey_ts
}
};
return; //TODO: XXX get tweaks from matched push rule
json::stack::member
{
device, "tweaks", json::empty_object
};
}
void
ircd::m::push::make_content_counts(json::stack::object &counts,
const user &user,
const event::idx &event_idx)
{
const auto unread
{
count_unread(user, event_idx)
};
json::stack::member
{
counts, "unread", json::value
{
unread
}
};
long missed_calls
{
count_missed_calls(user, event_idx)
};
if(missed_calls)
json::stack::member
{
counts, "missed_calls", json::value
{
missed_calls
}
};
}
long
ircd::m::push::count_unread(const user &user,
const event::idx &event_idx)
{
const m::user::rooms rooms
{
user
};
long ret(0);
rooms.for_each("join", [&ret, &user, &event_idx]
(const m::room &room, const auto &membership)
{
ret += count_unread(user, room, event_idx);
});
return ret;
}
long
ircd::m::push::count_unread(const user &user,
const room &room,
const event::idx &event_idx)
{
event::id::buf read_buf;
const auto read_idx
{
index(std::nothrow, receipt::get(read_buf, room, user))
};
const event::idx_range unread_range
{
std::minmax(read_idx, event_idx)
};
const user::notifications notifications
{
user
};
user::notifications::opts opts;
opts.room_id = room.room_id;
opts.from = unread_range.second;
opts.to = unread_range.first;
const auto unread
{
notifications.count(opts)
};
return unread;
}
long
ircd::m::push::count_missed_calls(const user &user,
const event::idx &event_idx)
{
return 0L; //TODO: XXX
}
long
ircd::m::push::count_missed_calls(const user &user,
const room &room,
const event::idx &event_idx)
{
return 0L; //TODO: XXX
}