From eca9bd7da94fc2533436af89ce4358784baf61dc Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 14 Feb 2019 14:35:09 -0800 Subject: [PATCH] modules/client/sync/rooms/ephemeral/receipt: Optimize polylog-sync strategy. --- .../client/sync/rooms/ephemeral/receipt.cc | 90 ++++++++----------- 1 file changed, 38 insertions(+), 52 deletions(-) diff --git a/modules/client/sync/rooms/ephemeral/receipt.cc b/modules/client/sync/rooms/ephemeral/receipt.cc index 3388a795a..10d2ecb51 100644 --- a/modules/client/sync/rooms/ephemeral/receipt.cc +++ b/modules/client/sync/rooms/ephemeral/receipt.cc @@ -16,9 +16,8 @@ IRCD_MODULE namespace ircd::m::sync { - static void _reformat_receipt(json::stack::object &, const m::event &); - static void _handle_receipt(data &, const m::event &); - static void _handle_user(data &, const m::user &, ctx::mutex &); + static void _handle_message_receipt(data &, const m::event &); + static void _handle_message(data &, const m::event::idx &); static void room_ephemeral_m_receipt_m_read_polylog(data &); extern item room_ephemeral_m_receipt_m_read; } @@ -34,76 +33,63 @@ void ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data) { const m::room &room{*data.room}; - const m::room::members members + m::room::messages it { room }; - static const size_t fibers(64); //TODO: conf - using queue = std::array; - using buffer = std::array; - - queue q; - const auto buf + ssize_t i(0); + event::idx idx(0); + for(; it && i < 10; --it) { - std::make_unique() - }; - - ctx::mutex mutex; - ctx::parallel parallel - { - m::sync::pool, q, [&data, &mutex] - (const m::user::id user_id) + if(apropos(data, it.event_idx())) { - const m::user user{user_id}; - _handle_user(data, user, mutex); + idx = it.event_idx(); + ++i; } - }; + else if(i > 0) + break; + } - members.for_each("join", m::room::members::closure{[¶llel, &q, &buf] - (const m::user::id &user_id) - { - const auto pos(parallel.nextpos()); - q[pos] = strlcpy(buf->at(pos), user_id); - parallel(); - }}); + if(i > 0 && !it && idx) + it.seek(idx); + + if(i > 0 && idx) + for(; it && i > -1; ++it, --i) + _handle_message(data, it.event_idx()); } void -ircd::m::sync::_handle_user(data &data, - const m::user &user, - ctx::mutex &mutex) +ircd::m::sync::_handle_message(data &data, + const event::idx &idx) { - static const m::event::fetch::opts fopts + const event::refs refs{idx}; + refs.for_each(dbs::ref::M_RECEIPT__M_READ, [&data] + (const event::idx &idx, const auto &type) { - m::event::keys::include + assert(type == dbs::ref::M_RECEIPT__M_READ); + static const m::event::fetch::opts fopts { - "event_id", "content", "sender", - }, - }; + m::event::keys::include{"content", "sender"} + }; - m::user::room user_room{user}; - user_room.fopts = &fopts; - if(head_idx(std::nothrow, user_room) < data.range.first) - return; - - const m::room::id &room_id{*data.room}; - user_room.get(std::nothrow, "ircd.read", room_id, [&data, &mutex] - (const m::event &event) - { - if(apropos(data, event)) + const m::event::fetch event { - data.commit(); - const std::lock_guard lock(mutex); - _handle_receipt(data, event); - } + idx, std::nothrow, fopts + }; + + if(event.valid) + _handle_message_receipt(data, event); + + return true; }); } void -ircd::m::sync::_handle_receipt(data &data, - const m::event &event) +ircd::m::sync::_handle_message_receipt(data &data, + const m::event &event) { + data.commit(); const json::object content { at<"content"_>(event)