mirror of
https://github.com/matrix-construct/construct
synced 2024-11-26 00:32:35 +01:00
modules/client/sync: Employ json::stack::checkpoint using boolean return values in all handlers.
This commit is contained in:
parent
940975b448
commit
af5b0e69a1
15 changed files with 303 additions and 218 deletions
|
@ -42,7 +42,7 @@ namespace ircd::m::sync
|
|||
struct ircd::m::sync::item
|
||||
:instance_multimap<std::string, item, std::less<>>
|
||||
{
|
||||
using handle = std::function<void (data &)>;
|
||||
using handle = std::function<bool (data &)>;
|
||||
|
||||
std::string conf_name[2];
|
||||
conf::item<bool> enable;
|
||||
|
@ -54,9 +54,9 @@ struct ircd::m::sync::item
|
|||
string_view name() const;
|
||||
string_view member_name() const;
|
||||
|
||||
void poll(data &, const m::event &);
|
||||
void linear(data &);
|
||||
void polylog(data &);
|
||||
bool poll(data &, const m::event &);
|
||||
bool linear(data &);
|
||||
bool polylog(data &);
|
||||
|
||||
item(std::string name,
|
||||
handle polylog = {},
|
||||
|
@ -90,8 +90,6 @@ struct ircd::m::sync::data
|
|||
|
||||
/// The json::stack master object
|
||||
json::stack out;
|
||||
bool committed {false};
|
||||
bool commit();
|
||||
|
||||
// apropos contextual
|
||||
const m::event *event {nullptr};
|
||||
|
|
41
ircd/m.cc
41
ircd/m.cc
|
@ -531,13 +531,12 @@ ircd::m::sync::loghead(const data &data)
|
|||
|
||||
return fmt::sprintf
|
||||
{
|
||||
headbuf, "%s %s %lu:%lu %s commit:%b chunk:%zu %s in %s",
|
||||
headbuf, "%s %s %lu:%lu %s chunk:%zu %s in %s",
|
||||
remstr,
|
||||
string_view{data.user.user_id},
|
||||
data.range.first,
|
||||
data.range.second,
|
||||
ircd::pretty(iecbuf[0], iec(flush_bytes + size(data.out.completed()))),
|
||||
data.committed,
|
||||
flush_count,
|
||||
ircd::pretty(iecbuf[1], iec(flush_bytes)),
|
||||
tmstr
|
||||
|
@ -609,14 +608,6 @@ noexcept
|
|||
{
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::data::commit()
|
||||
{
|
||||
const auto ret{committed};
|
||||
committed = true;
|
||||
return ret;
|
||||
}
|
||||
|
||||
//
|
||||
// item
|
||||
//
|
||||
|
@ -680,12 +671,12 @@ noexcept
|
|||
};
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::item::polylog(data &data)
|
||||
try
|
||||
{
|
||||
if(!enable)
|
||||
return;
|
||||
return false;
|
||||
|
||||
#ifdef RB_DEBUG
|
||||
sync::stats stats
|
||||
|
@ -699,7 +690,10 @@ try
|
|||
stats.timer = {};
|
||||
#endif
|
||||
|
||||
_polylog(data);
|
||||
const bool ret
|
||||
{
|
||||
_polylog(data)
|
||||
};
|
||||
|
||||
#ifdef RB_DEBUG
|
||||
if(data.stats && (stats_info || stats_debug))
|
||||
|
@ -708,13 +702,16 @@ try
|
|||
thread_local char tmbuf[32];
|
||||
log::debug
|
||||
{
|
||||
log, "polylog %s '%s' %s",
|
||||
log, "polylog %s commit:%b '%s' %s",
|
||||
loghead(data),
|
||||
ret,
|
||||
name(),
|
||||
ircd::pretty(tmbuf, stats.timer.at<microseconds>(), true)
|
||||
};
|
||||
}
|
||||
#endif
|
||||
|
||||
return ret;
|
||||
}
|
||||
catch(const std::bad_function_call &e)
|
||||
{
|
||||
|
@ -725,6 +722,8 @@ catch(const std::bad_function_call &e)
|
|||
name(),
|
||||
e.what()
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
catch(const m::error &e)
|
||||
{
|
||||
|
@ -736,6 +735,8 @@ catch(const m::error &e)
|
|||
e.what(),
|
||||
e.content
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
|
@ -750,11 +751,11 @@ catch(const std::exception &e)
|
|||
throw;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::item::linear(data &data)
|
||||
try
|
||||
{
|
||||
_linear(data);
|
||||
return _linear(data);
|
||||
}
|
||||
catch(const std::bad_function_call &e)
|
||||
{
|
||||
|
@ -766,9 +767,11 @@ catch(const std::bad_function_call &e)
|
|||
name(),
|
||||
e.what()
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::item::poll(data &data,
|
||||
const m::event &event)
|
||||
try
|
||||
|
@ -778,7 +781,7 @@ try
|
|||
data.event, &event
|
||||
};
|
||||
|
||||
_linear(data);
|
||||
return _linear(data);
|
||||
}
|
||||
catch(const std::bad_function_call &e)
|
||||
{
|
||||
|
@ -790,6 +793,8 @@ catch(const std::bad_function_call &e)
|
|||
name(),
|
||||
e.what()
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ircd::string_view
|
||||
|
|
|
@ -160,7 +160,17 @@ ircd::m::sync::handle_get(client &client,
|
|||
void
|
||||
ircd::m::sync::empty_response(data &data)
|
||||
{
|
||||
data.commit();
|
||||
// Empty objects added to output otherwise Riot b0rks.
|
||||
json::stack::object
|
||||
{
|
||||
data.out, "rooms"
|
||||
};
|
||||
|
||||
json::stack::object
|
||||
{
|
||||
data.out, "presence"
|
||||
};
|
||||
|
||||
json::stack::member
|
||||
{
|
||||
data.out, "next_batch", json::value
|
||||
|
@ -168,10 +178,6 @@ ircd::m::sync::empty_response(data &data)
|
|||
lex_cast(data.range.second), json::STRING
|
||||
}
|
||||
};
|
||||
|
||||
// Empty objects added to output otherwise Riot b0rks.
|
||||
json::stack::object{data.out, "rooms"};
|
||||
json::stack::object{data.out, "presence"};
|
||||
}
|
||||
|
||||
ircd::const_buffer
|
||||
|
@ -179,12 +185,6 @@ ircd::m::sync::flush(data &data,
|
|||
resource::response::chunked &response,
|
||||
const const_buffer &buffer)
|
||||
{
|
||||
if(!data.committed)
|
||||
return const_buffer
|
||||
{
|
||||
buffer::data(buffer), 0UL
|
||||
};
|
||||
|
||||
const size_t wrote
|
||||
{
|
||||
response.write(buffer)
|
||||
|
@ -210,32 +210,53 @@ bool
|
|||
ircd::m::sync::polylog::handle(data &data)
|
||||
try
|
||||
{
|
||||
m::sync::for_each(string_view{}, [&data]
|
||||
json::stack::checkpoint checkpoint
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
bool ret{false};
|
||||
m::sync::for_each(string_view{}, [&data, &ret]
|
||||
(item &item)
|
||||
{
|
||||
json::stack::member member
|
||||
json::stack::checkpoint checkpoint
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, item.member_name()
|
||||
};
|
||||
|
||||
item.polylog(data);
|
||||
if(item.polylog(data))
|
||||
ret = true;
|
||||
else
|
||||
checkpoint.rollback();
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
json::stack::member
|
||||
{
|
||||
data.out, "next_batch", json::value
|
||||
if(ret)
|
||||
json::stack::member
|
||||
{
|
||||
lex_cast(data.range.second), json::STRING
|
||||
}
|
||||
};
|
||||
data.out, "next_batch", json::value
|
||||
{
|
||||
lex_cast(data.range.second), json::STRING
|
||||
}
|
||||
};
|
||||
|
||||
if(!ret)
|
||||
checkpoint.rollback();
|
||||
|
||||
if(stats_info) log::info
|
||||
{
|
||||
log, "polylog %s complete", loghead(data)
|
||||
log, "polylog %s commit:%b complete",
|
||||
loghead(data),
|
||||
ret
|
||||
};
|
||||
|
||||
return data.committed;
|
||||
return ret;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
|
@ -257,7 +278,8 @@ bool
|
|||
ircd::m::sync::linear::handle(data &data)
|
||||
try
|
||||
{
|
||||
m::events::for_each(data.range, [&data]
|
||||
bool ret{false};
|
||||
m::events::for_each(data.range, [&data, &ret]
|
||||
(const m::event::idx &event_idx, const m::event &event)
|
||||
{
|
||||
const scope_restore<decltype(data.event)> theirs
|
||||
|
@ -265,7 +287,7 @@ try
|
|||
data.event, &event
|
||||
};
|
||||
|
||||
m::sync::for_each(string_view{}, [&data]
|
||||
m::sync::for_each(string_view{}, [&data, &ret]
|
||||
(item &item)
|
||||
{
|
||||
json::stack::member member
|
||||
|
@ -273,7 +295,7 @@ try
|
|||
data.out, item.member_name()
|
||||
};
|
||||
|
||||
item.linear(data);
|
||||
ret |= item.linear(data);
|
||||
return true;
|
||||
});
|
||||
|
||||
|
@ -293,7 +315,7 @@ try
|
|||
log, "linear %s complete", loghead(data)
|
||||
};
|
||||
|
||||
return data.committed;
|
||||
return ret;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
|
|
|
@ -17,8 +17,8 @@ IRCD_MODULE
|
|||
namespace ircd::m::sync
|
||||
{
|
||||
static bool account_data_(data &, const m::event &, const m::event::idx &);
|
||||
static void account_data_polylog(data &);
|
||||
static void account_data_linear(data &);
|
||||
static bool account_data_polylog(data &);
|
||||
static bool account_data_linear(data &);
|
||||
|
||||
extern item account_data;
|
||||
}
|
||||
|
@ -31,20 +31,15 @@ ircd::m::sync::account_data
|
|||
account_data_linear
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::account_data_linear(data &data)
|
||||
{
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::account_data_polylog(data &data)
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::array array
|
||||
{
|
||||
data.out, "events"
|
||||
|
@ -60,12 +55,14 @@ ircd::m::sync::account_data_polylog(data &data)
|
|||
data.user_room, &fopts
|
||||
};
|
||||
|
||||
state.for_each("ircd.account_data", [&data, &array]
|
||||
bool ret{false};
|
||||
state.for_each("ircd.account_data", [&data, &array, &ret]
|
||||
(const m::event &event)
|
||||
{
|
||||
if(account_data_(data, event, index(event)))
|
||||
data.commit();
|
||||
ret |= account_data_(data, event, index(event));
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -82,8 +79,6 @@ ircd::m::sync::account_data_(data &data,
|
|||
if(json::get<"room_id"_>(event) != data.user_room.room_id)
|
||||
return false;
|
||||
|
||||
data.commit();
|
||||
|
||||
// Each account_data event is an object in the events array
|
||||
json::stack::object object
|
||||
{
|
||||
|
|
|
@ -16,8 +16,8 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void device_lists_polylog(data &);
|
||||
static void device_lists_linear(data &);
|
||||
static bool device_lists_polylog(data &);
|
||||
static bool device_lists_linear(data &);
|
||||
|
||||
extern item device_lists;
|
||||
}
|
||||
|
@ -30,20 +30,15 @@ ircd::m::sync::device_lists
|
|||
device_lists_linear
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::device_lists_linear(data &data)
|
||||
{
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::device_lists_polylog(data &data)
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::array
|
||||
{
|
||||
data.out, "changed"
|
||||
|
@ -53,4 +48,6 @@ ircd::m::sync::device_lists_polylog(data &data)
|
|||
{
|
||||
data.out, "left"
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void device_one_time_keys_count_polylog(data &);
|
||||
static void device_one_time_keys_count_linear(data &);
|
||||
static bool device_one_time_keys_count_polylog(data &);
|
||||
static bool device_one_time_keys_count_linear(data &);
|
||||
|
||||
extern item device_one_time_keys_count;
|
||||
}
|
||||
|
@ -30,18 +30,14 @@ ircd::m::sync::device_one_time_keys_count
|
|||
device_one_time_keys_count_linear
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::device_one_time_keys_count_linear(data &data)
|
||||
{
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::device_one_time_keys_count_polylog(data &data)
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -16,11 +16,11 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void presence_polylog_events(data &);
|
||||
static void presence_polylog(data &);
|
||||
static bool presence_polylog_events(data &);
|
||||
static bool presence_polylog(data &);
|
||||
|
||||
static void presence_linear_events(data &);
|
||||
static void presence_linear(data &);
|
||||
static bool presence_linear_events(data &);
|
||||
static bool presence_linear(data &);
|
||||
|
||||
extern item presence;
|
||||
}
|
||||
|
@ -33,11 +33,9 @@ ircd::m::sync::presence
|
|||
presence_linear,
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::presence_linear(data &data)
|
||||
{
|
||||
return;
|
||||
|
||||
assert(data.event);
|
||||
const m::event &event
|
||||
{
|
||||
|
@ -45,10 +43,10 @@ ircd::m::sync::presence_linear(data &data)
|
|||
};
|
||||
|
||||
if(json::get<"type"_>(event) != "ircd.presence")
|
||||
return;
|
||||
return false;
|
||||
|
||||
if(json::get<"sender"_>(event) != m::me.user_id)
|
||||
return;
|
||||
return false;
|
||||
|
||||
// sender
|
||||
json::stack::member
|
||||
|
@ -68,21 +66,16 @@ ircd::m::sync::presence_linear(data &data)
|
|||
data.out, "content", at<"content"_>(event)
|
||||
};
|
||||
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::presence_polylog(data &data)
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
presence_polylog_events(data);
|
||||
return presence_polylog_events(data);
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::presence_polylog_events(data &data)
|
||||
{
|
||||
json::stack::array array
|
||||
|
@ -90,16 +83,17 @@ ircd::m::sync::presence_polylog_events(data &data)
|
|||
data.out, "events"
|
||||
};
|
||||
|
||||
bool ret{false};
|
||||
ctx::mutex mutex;
|
||||
const auto append_event{[&data, &array, &mutex]
|
||||
const auto append_event{[&data, &array, &mutex, &ret]
|
||||
(const json::object &event)
|
||||
{
|
||||
// Lock the json::stack for the append operations. This mutex will only be
|
||||
// contended during a json::stack flush to the client; not during database
|
||||
// queries leading to this.
|
||||
const std::lock_guard<decltype(mutex)> l{mutex};
|
||||
ret = true;
|
||||
|
||||
data.commit();
|
||||
json::stack::object object
|
||||
{
|
||||
array
|
||||
|
@ -158,4 +152,6 @@ ircd::m::sync::presence_polylog_events(data &data)
|
|||
q[pos] = strlcpy(buf->at(pos), user.user_id);
|
||||
parallel();
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -16,12 +16,12 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void _rooms_polylog_room(data &, const m::room &);
|
||||
static void _rooms_polylog(data &, const string_view &membership);
|
||||
static void rooms_polylog(data &);
|
||||
static bool _rooms_polylog_room(data &, const m::room &);
|
||||
static bool _rooms_polylog(data &, const string_view &membership);
|
||||
static bool rooms_polylog(data &);
|
||||
|
||||
static void _rooms_linear(data &, const string_view &membership);
|
||||
static void rooms_linear(data &);
|
||||
static bool _rooms_linear(data &, const string_view &membership);
|
||||
static bool rooms_linear(data &);
|
||||
|
||||
extern item rooms;
|
||||
}
|
||||
|
@ -34,17 +34,33 @@ ircd::m::sync::rooms
|
|||
rooms_linear
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::rooms_linear(data &data)
|
||||
{
|
||||
json::stack::object object{data.out};
|
||||
_rooms_linear(data, "invite");
|
||||
_rooms_linear(data, "join");
|
||||
_rooms_linear(data, "leave");
|
||||
_rooms_linear(data, "ban");
|
||||
assert(data.event);
|
||||
const auto &event{*data.event};
|
||||
if(!json::get<"room_id"_>(event))
|
||||
return false;
|
||||
|
||||
const m::room room
|
||||
{
|
||||
json::get<"room_id"_>(event)
|
||||
};
|
||||
|
||||
const scope_restore<decltype(data.room)> theirs
|
||||
{
|
||||
data.room, &room
|
||||
};
|
||||
|
||||
bool ret{false};
|
||||
ret |= _rooms_linear(data, "invite");
|
||||
ret |= _rooms_linear(data, "join");
|
||||
ret |= _rooms_linear(data, "leave");
|
||||
ret |= _rooms_linear(data, "ban");
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::_rooms_linear(data &data,
|
||||
const string_view &membership)
|
||||
{
|
||||
|
@ -53,30 +69,34 @@ ircd::m::sync::_rooms_linear(data &data,
|
|||
data.membership, membership
|
||||
};
|
||||
|
||||
m::sync::for_each("rooms", [&](item &item)
|
||||
bool ret{false};
|
||||
m::sync::for_each("rooms", [&data, &ret]
|
||||
(item &item)
|
||||
{
|
||||
json::stack::member member
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, item.member_name()
|
||||
};
|
||||
|
||||
item.linear(data);
|
||||
ret |= item.linear(data);
|
||||
return true;
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::rooms_polylog(data &data)
|
||||
{
|
||||
json::stack::object object{data.out};
|
||||
_rooms_polylog(data, "invite");
|
||||
_rooms_polylog(data, "join");
|
||||
_rooms_polylog(data, "leave");
|
||||
_rooms_polylog(data, "ban");
|
||||
return;
|
||||
bool ret{false};
|
||||
ret |= _rooms_polylog(data, "invite");
|
||||
ret |= _rooms_polylog(data, "join");
|
||||
ret |= _rooms_polylog(data, "leave");
|
||||
ret |= _rooms_polylog(data, "ban");
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::_rooms_polylog(data &data,
|
||||
const string_view &membership)
|
||||
{
|
||||
|
@ -85,12 +105,18 @@ ircd::m::sync::_rooms_polylog(data &data,
|
|||
data.membership, membership
|
||||
};
|
||||
|
||||
json::stack::checkpoint checkpoint
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, membership
|
||||
};
|
||||
|
||||
data.user_rooms.for_each(membership, [&data]
|
||||
bool ret{false};
|
||||
data.user_rooms.for_each(membership, [&data, &ret]
|
||||
(const m::room &room, const string_view &membership_)
|
||||
{
|
||||
#if defined(RB_DEBUG) && 0
|
||||
|
@ -105,7 +131,7 @@ ircd::m::sync::_rooms_polylog(data &data,
|
|||
stats.timer = timer{};
|
||||
#endif
|
||||
|
||||
_rooms_polylog_room(data, room);
|
||||
ret |= _rooms_polylog_room(data, room);
|
||||
|
||||
#if defined(RB_DEBUG) && 0
|
||||
thread_local char tmbuf[32];
|
||||
|
@ -118,9 +144,14 @@ ircd::m::sync::_rooms_polylog(data &data,
|
|||
};
|
||||
#endif
|
||||
});
|
||||
|
||||
if(!ret)
|
||||
checkpoint.rollback();
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::_rooms_polylog_room(data &data,
|
||||
const m::room &room)
|
||||
{
|
||||
|
@ -129,6 +160,11 @@ ircd::m::sync::_rooms_polylog_room(data &data,
|
|||
data.room, &room
|
||||
};
|
||||
|
||||
json::stack::checkpoint checkpoint
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, room.room_id
|
||||
|
@ -144,15 +180,30 @@ ircd::m::sync::_rooms_polylog_room(data &data,
|
|||
data.room_head, room_head
|
||||
};
|
||||
|
||||
m::sync::for_each("rooms", [&data]
|
||||
bool ret{false};
|
||||
m::sync::for_each("rooms", [&data, &ret]
|
||||
(item &item)
|
||||
{
|
||||
json::stack::checkpoint checkpoint
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, item.member_name()
|
||||
};
|
||||
|
||||
item.polylog(data);
|
||||
if(item.polylog(data))
|
||||
ret = true;
|
||||
else
|
||||
checkpoint.rollback();
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if(!ret)
|
||||
checkpoint.rollback();
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -16,9 +16,9 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void room_account_data_polylog_events_event(data &, const m::event &);
|
||||
static void room_account_data_polylog_events(data &);
|
||||
static void room_account_data_polylog(data &);
|
||||
static bool room_account_data_polylog_events_event(data &, const m::event &);
|
||||
static bool room_account_data_polylog_events(data &);
|
||||
static bool room_account_data_polylog(data &);
|
||||
|
||||
extern item room_account_data;
|
||||
}
|
||||
|
@ -30,13 +30,13 @@ ircd::m::sync::room_account_data
|
|||
room_account_data_polylog
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_account_data_polylog(data &data)
|
||||
{
|
||||
room_account_data_polylog_events(data);
|
||||
return room_account_data_polylog_events(data);
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_account_data_polylog_events(data &data)
|
||||
{
|
||||
json::stack::array array
|
||||
|
@ -61,19 +61,21 @@ ircd::m::sync::room_account_data_polylog_events(data &data)
|
|||
data.user_room, &fopts
|
||||
};
|
||||
|
||||
state.for_each(type, [&data]
|
||||
bool ret{false};
|
||||
state.for_each(type, [&data, &ret]
|
||||
(const m::event &event)
|
||||
{
|
||||
if(apropos(data, event))
|
||||
room_account_data_polylog_events_event(data, event);
|
||||
ret |= room_account_data_polylog_events_event(data, event);
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_account_data_polylog_events_event(data &data,
|
||||
const m::event &event)
|
||||
{
|
||||
data.commit();
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
|
@ -88,4 +90,6 @@ ircd::m::sync::room_account_data_polylog_events_event(data &data,
|
|||
{
|
||||
object, "content", at<"content"_>(event)
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -16,9 +16,9 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void rooms_ephemeral_events_polylog(data &);
|
||||
static void rooms_ephemeral_polylog(data &);
|
||||
static void rooms_ephemeral_linear(data &);
|
||||
static bool rooms_ephemeral_events_polylog(data &);
|
||||
static bool rooms_ephemeral_polylog(data &);
|
||||
static bool rooms_ephemeral_linear(data &);
|
||||
extern item rooms_ephemeral;
|
||||
}
|
||||
|
||||
|
@ -30,19 +30,19 @@ ircd::m::sync::rooms_ephemeral
|
|||
rooms_ephemeral_linear
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::rooms_ephemeral_linear(data &data)
|
||||
{
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::rooms_ephemeral_polylog(data &data)
|
||||
{
|
||||
rooms_ephemeral_events_polylog(data);
|
||||
return rooms_ephemeral_events_polylog(data);
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::rooms_ephemeral_events_polylog(data &data)
|
||||
{
|
||||
json::stack::array array
|
||||
|
@ -50,10 +50,22 @@ ircd::m::sync::rooms_ephemeral_events_polylog(data &data)
|
|||
data.out, "events"
|
||||
};
|
||||
|
||||
m::sync::for_each("rooms.ephemeral", [&]
|
||||
bool ret{false};
|
||||
m::sync::for_each("rooms.ephemeral", [&data, &ret]
|
||||
(item &item)
|
||||
{
|
||||
item.polylog(data);
|
||||
json::stack::checkpoint checkpoint
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
if(item.polylog(data))
|
||||
ret = true;
|
||||
else
|
||||
checkpoint.rollback();
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -16,9 +16,9 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void _handle_message_receipt(data &, const m::event &);
|
||||
static void _handle_message(data &, const m::event::idx &);
|
||||
static void room_ephemeral_m_receipt_m_read_polylog(data &);
|
||||
static bool _handle_message_receipt(data &, const m::event &);
|
||||
static bool _handle_message(data &, const m::event::idx &);
|
||||
static bool room_ephemeral_m_receipt_m_read_polylog(data &);
|
||||
extern item room_ephemeral_m_receipt_m_read;
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ ircd::m::sync::room_ephemeral_m_receipt_m_read
|
|||
room_ephemeral_m_receipt_m_read_polylog
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data)
|
||||
{
|
||||
const m::room &room{*data.room};
|
||||
|
@ -54,17 +54,24 @@ ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data)
|
|||
if(i > 0 && !it && idx)
|
||||
it.seek(idx);
|
||||
|
||||
bool ret{false};
|
||||
if(i > 0 && idx)
|
||||
for(; it && i > -1; ++it, --i)
|
||||
_handle_message(data, it.event_idx());
|
||||
ret |= _handle_message(data, it.event_idx());
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::_handle_message(data &data,
|
||||
const event::idx &idx)
|
||||
{
|
||||
bool ret{false};
|
||||
if(!apropos(data, idx))
|
||||
return ret;
|
||||
|
||||
const event::refs refs{idx};
|
||||
refs.for_each(dbs::ref::M_RECEIPT__M_READ, [&data]
|
||||
refs.for_each(dbs::ref::M_RECEIPT__M_READ, [&data, &ret]
|
||||
(const event::idx &idx, const auto &type)
|
||||
{
|
||||
assert(type == dbs::ref::M_RECEIPT__M_READ);
|
||||
|
@ -79,17 +86,18 @@ ircd::m::sync::_handle_message(data &data,
|
|||
};
|
||||
|
||||
if(event.valid)
|
||||
_handle_message_receipt(data, event);
|
||||
ret |= _handle_message_receipt(data, event);
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::_handle_message_receipt(data &data,
|
||||
const m::event &event)
|
||||
{
|
||||
data.commit();
|
||||
const json::object content
|
||||
{
|
||||
at<"content"_>(event)
|
||||
|
@ -131,4 +139,6 @@ ircd::m::sync::_handle_message_receipt(data &data,
|
|||
{
|
||||
sender_, "ts", json::value(content.at("ts"))
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -10,11 +10,11 @@
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void room_state_polylog_events(data &);
|
||||
static void room_state_polylog(data &);
|
||||
static bool room_state_polylog_events(data &);
|
||||
static bool room_state_polylog(data &);
|
||||
|
||||
static void room_state_linear_events(data &);
|
||||
static void room_state_linear(data &);
|
||||
static bool room_state_linear_events(data &);
|
||||
static bool room_state_linear(data &);
|
||||
|
||||
extern const event::keys::include _default_keys;
|
||||
extern event::fetch::opts _default_fopts;
|
||||
|
@ -59,7 +59,7 @@ ircd::m::sync::_default_fopts
|
|||
_default_keys
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_state_linear(data &data)
|
||||
{
|
||||
assert(data.event);
|
||||
|
@ -67,22 +67,25 @@ ircd::m::sync::room_state_linear(data &data)
|
|||
assert(json::get<"room_id"_>(*data.event));
|
||||
|
||||
if(!json::get<"state_key"_>(*data.event))
|
||||
return;
|
||||
return false;
|
||||
|
||||
if(!data.room->membership(data.user, data.membership))
|
||||
return;
|
||||
return false;
|
||||
|
||||
//data.array->append(*data.event);
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_state_polylog(data &data)
|
||||
{
|
||||
if(apropos(data, data.room_head))
|
||||
room_state_polylog_events(data);
|
||||
if(!apropos(data, data.room_head))
|
||||
return false;
|
||||
|
||||
return room_state_polylog_events(data);
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_state_polylog_events(data &data)
|
||||
{
|
||||
const m::room &room{*data.room};
|
||||
|
@ -92,8 +95,9 @@ ircd::m::sync::room_state_polylog_events(data &data)
|
|||
data.out, "events"
|
||||
};
|
||||
|
||||
bool ret{false};
|
||||
ctx::mutex mutex;
|
||||
const event::closure_idx each_idx{[&data, &array, &mutex]
|
||||
const event::closure_idx each_idx{[&data, &array, &mutex, &ret]
|
||||
(const m::event::idx event_idx)
|
||||
{
|
||||
const event::fetch event
|
||||
|
@ -106,8 +110,8 @@ ircd::m::sync::room_state_polylog_events(data &data)
|
|||
return;
|
||||
|
||||
const std::lock_guard<decltype(mutex)> lock{mutex};
|
||||
data.commit();
|
||||
array.append(event);
|
||||
ret = true;
|
||||
}};
|
||||
|
||||
//TODO: conf
|
||||
|
@ -123,4 +127,6 @@ ircd::m::sync::room_state_polylog_events(data &data)
|
|||
if(apropos(data, event_idx))
|
||||
parallel(event_idx);
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -16,11 +16,11 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static event::id::buf _room_timeline_polylog_events(data &, const m::room &, bool &);
|
||||
static void room_timeline_polylog(data &);
|
||||
static event::id::buf _room_timeline_polylog_events(data &, const m::room &, bool &, bool &);
|
||||
static bool room_timeline_polylog(data &);
|
||||
|
||||
static event::id::buf _room_timeline_linear_events(data &, const m::room &, bool &);
|
||||
static void room_timeline_linear(data &);
|
||||
static bool room_timeline_linear(data &);
|
||||
|
||||
extern const event::keys::include default_keys;
|
||||
extern item room_timeline;
|
||||
|
@ -49,11 +49,9 @@ ircd::m::sync::default_keys
|
|||
"type",
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_timeline_linear(data &data)
|
||||
{
|
||||
return;
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
|
@ -80,7 +78,7 @@ ircd::m::sync::room_timeline_linear(data &data)
|
|||
object, "limited", json::value{limited}
|
||||
};
|
||||
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
ircd::m::event::id::buf
|
||||
|
@ -96,18 +94,18 @@ ircd::m::sync::_room_timeline_linear_events(data &data,
|
|||
return {};
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_timeline_polylog(data &data)
|
||||
{
|
||||
if(!apropos(data, data.room_head))
|
||||
return;
|
||||
return false;
|
||||
|
||||
// events
|
||||
assert(data.room);
|
||||
bool limited{false};
|
||||
bool limited{false}, ret{false};
|
||||
m::event::id::buf prev
|
||||
{
|
||||
_room_timeline_polylog_events(data, *data.room, limited)
|
||||
_room_timeline_polylog_events(data, *data.room, limited, ret)
|
||||
};
|
||||
|
||||
// prev_batch
|
||||
|
@ -121,12 +119,15 @@ ircd::m::sync::room_timeline_polylog(data &data)
|
|||
{
|
||||
data.out, "limited", json::value{limited}
|
||||
};
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
ircd::m::event::id::buf
|
||||
ircd::m::sync::_room_timeline_polylog_events(data &data,
|
||||
const m::room &room,
|
||||
bool &limited)
|
||||
bool &limited,
|
||||
bool &ret)
|
||||
{
|
||||
static const event::fetch::opts fopts
|
||||
{
|
||||
|
@ -159,23 +160,12 @@ ircd::m::sync::_room_timeline_polylog_events(data &data,
|
|||
}
|
||||
|
||||
limited = i >= 10;
|
||||
if(i > 0)
|
||||
data.commit();
|
||||
|
||||
if(i > 0 && !it)
|
||||
it.seek(event_id);
|
||||
|
||||
if(i > 0 && it)
|
||||
{
|
||||
data.commit();
|
||||
//const m::event &event{*it};
|
||||
//data.state_at = at<"depth"_>(event);
|
||||
}
|
||||
|
||||
if(i > 0)
|
||||
for(; it && i > -1; ++it, --i)
|
||||
{
|
||||
data.commit();
|
||||
json::stack::object object
|
||||
{
|
||||
array
|
||||
|
@ -183,6 +173,7 @@ ircd::m::sync::_room_timeline_polylog_events(data &data,
|
|||
|
||||
const m::event &event(*it);
|
||||
object.append(event);
|
||||
ret = true;
|
||||
|
||||
json::stack::object unsigned_
|
||||
{
|
||||
|
|
|
@ -18,8 +18,8 @@ namespace ircd::m::sync
|
|||
{
|
||||
static long _notification_count(const room &, const event::idx &a, const event::idx &b);
|
||||
static long _highlight_count(const room &, const user &u, const event::idx &a, const event::idx &b);
|
||||
static void room_unread_notifications_polylog(data &);
|
||||
static void room_unread_notifications_linear(data &);
|
||||
static bool room_unread_notifications_polylog(data &);
|
||||
static bool room_unread_notifications_linear(data &);
|
||||
extern item room_unread_notifications;
|
||||
}
|
||||
|
||||
|
@ -31,25 +31,28 @@ ircd::m::sync::room_unread_notifications
|
|||
room_unread_notifications_linear
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_unread_notifications_linear(data &data)
|
||||
{
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::room_unread_notifications_polylog(data &data)
|
||||
{
|
||||
const auto &room{*data.room};
|
||||
m::event::id::buf last_read;
|
||||
if(!m::receipt::read(last_read, room.room_id, data.user))
|
||||
return;
|
||||
return false;
|
||||
|
||||
const auto start_idx
|
||||
{
|
||||
index(last_read)
|
||||
};
|
||||
|
||||
if(!apropos(data, start_idx))
|
||||
return false;
|
||||
|
||||
// highlight_count
|
||||
json::stack::member
|
||||
{
|
||||
|
@ -68,7 +71,7 @@ ircd::m::sync::room_unread_notifications_polylog(data &data)
|
|||
}
|
||||
};
|
||||
|
||||
data.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
long
|
||||
|
|
|
@ -16,8 +16,8 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void to_device_polylog(data &);
|
||||
static void to_device_linear(data &);
|
||||
static bool to_device_polylog(data &);
|
||||
static bool to_device_linear(data &);
|
||||
|
||||
extern item to_device;
|
||||
}
|
||||
|
@ -30,20 +30,15 @@ ircd::m::sync::to_device
|
|||
to_device_linear
|
||||
};
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::to_device_linear(data &data)
|
||||
{
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
ircd::m::sync::to_device_polylog(data &data)
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::array array
|
||||
{
|
||||
data.out, "events"
|
||||
|
@ -59,6 +54,7 @@ ircd::m::sync::to_device_polylog(data &data)
|
|||
user_room
|
||||
};
|
||||
|
||||
bool ret{false};
|
||||
for(; it; ++it)
|
||||
{
|
||||
const auto &event_idx(it.event_idx());
|
||||
|
@ -75,10 +71,9 @@ ircd::m::sync::to_device_polylog(data &data)
|
|||
if(!relevant)
|
||||
continue;
|
||||
|
||||
m::get(std::nothrow, event_idx, "content", [&data, &array]
|
||||
m::get(std::nothrow, event_idx, "content", [&data, &array, &ret]
|
||||
(const json::object &content)
|
||||
{
|
||||
data.commit();
|
||||
json::stack::object event
|
||||
{
|
||||
array
|
||||
|
@ -109,6 +104,10 @@ ircd::m::sync::to_device_polylog(data &data)
|
|||
{
|
||||
content_, property, value
|
||||
};
|
||||
|
||||
ret = true;
|
||||
});
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue