0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-26 08:42:34 +01:00

modules/client/sync/rooms/ephemeral/receipt: Optimize polylog-sync strategy.

This commit is contained in:
Jason Volk 2019-02-14 14:35:09 -08:00
parent b351b40961
commit eca9bd7da9

View file

@ -16,9 +16,8 @@ IRCD_MODULE
namespace ircd::m::sync namespace ircd::m::sync
{ {
static void _reformat_receipt(json::stack::object &, const m::event &); static void _handle_message_receipt(data &, const m::event &);
static void _handle_receipt(data &, const m::event &); static void _handle_message(data &, const m::event::idx &);
static void _handle_user(data &, const m::user &, ctx::mutex &);
static void room_ephemeral_m_receipt_m_read_polylog(data &); static void room_ephemeral_m_receipt_m_read_polylog(data &);
extern item room_ephemeral_m_receipt_m_read; extern item room_ephemeral_m_receipt_m_read;
} }
@ -34,76 +33,63 @@ void
ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data) ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data)
{ {
const m::room &room{*data.room}; const m::room &room{*data.room};
const m::room::members members m::room::messages it
{ {
room room
}; };
static const size_t fibers(64); //TODO: conf ssize_t i(0);
using queue = std::array<string_view, fibers>; event::idx idx(0);
using buffer = std::array<char[m::id::MAX_SIZE+1], fibers>; for(; it && i < 10; --it)
queue q;
const auto buf
{ {
std::make_unique<buffer>() if(apropos(data, it.event_idx()))
};
ctx::mutex mutex;
ctx::parallel<string_view> parallel
{
m::sync::pool, q, [&data, &mutex]
(const m::user::id user_id)
{ {
const m::user user{user_id}; idx = it.event_idx();
_handle_user(data, user, mutex); ++i;
} }
}; else if(i > 0)
break;
}
members.for_each("join", m::room::members::closure{[&parallel, &q, &buf] if(i > 0 && !it && idx)
(const m::user::id &user_id) it.seek(idx);
{
const auto pos(parallel.nextpos()); if(i > 0 && idx)
q[pos] = strlcpy(buf->at(pos), user_id); for(; it && i > -1; ++it, --i)
parallel(); _handle_message(data, it.event_idx());
}});
} }
void void
ircd::m::sync::_handle_user(data &data, ircd::m::sync::_handle_message(data &data,
const m::user &user, const event::idx &idx)
ctx::mutex &mutex)
{ {
static const m::event::fetch::opts fopts const event::refs refs{idx};
refs.for_each(dbs::ref::M_RECEIPT__M_READ, [&data]
(const event::idx &idx, const auto &type)
{ {
m::event::keys::include assert(type == dbs::ref::M_RECEIPT__M_READ);
static const m::event::fetch::opts fopts
{ {
"event_id", "content", "sender", m::event::keys::include{"content", "sender"}
}, };
};
m::user::room user_room{user}; const m::event::fetch event
user_room.fopts = &fopts;
if(head_idx(std::nothrow, user_room) < data.range.first)
return;
const m::room::id &room_id{*data.room};
user_room.get(std::nothrow, "ircd.read", room_id, [&data, &mutex]
(const m::event &event)
{
if(apropos(data, event))
{ {
data.commit(); idx, std::nothrow, fopts
const std::lock_guard<decltype(mutex)> lock(mutex); };
_handle_receipt(data, event);
} if(event.valid)
_handle_message_receipt(data, event);
return true;
}); });
} }
void void
ircd::m::sync::_handle_receipt(data &data, ircd::m::sync::_handle_message_receipt(data &data,
const m::event &event) const m::event &event)
{ {
data.commit();
const json::object content const json::object content
{ {
at<"content"_>(event) at<"content"_>(event)