mirror of
https://github.com/matrix-construct/construct
synced 2024-12-25 23:14:13 +01:00
ircd:Ⓜ️:sync: Checkpoint.
This commit is contained in:
parent
5815fc87c2
commit
2d860b25a8
12 changed files with 405 additions and 600 deletions
|
@ -51,7 +51,8 @@ struct ircd::m::sync::item
|
|||
string_view name() const;
|
||||
string_view member_name() const;
|
||||
|
||||
bool linear(data &, const m::event &);
|
||||
bool poll(data &, const m::event &);
|
||||
bool linear(data &);
|
||||
bool polylog(data &);
|
||||
|
||||
item(std::string name,
|
||||
|
|
99
ircd/m/m.cc
99
ircd/m/m.cc
|
@ -422,7 +422,12 @@ ircd::m::sync::pool
|
|||
bool
|
||||
ircd::m::sync::for_each(const item_closure_bool &closure)
|
||||
{
|
||||
return for_each(string_view{}, closure);
|
||||
auto it(begin(item::map));
|
||||
for(; it != end(item::map); ++it)
|
||||
if(!closure(*it->second))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -489,7 +494,7 @@ ircd::m::sync::loghead(const data &data)
|
|||
thread_local char headbuf[256], rembuf[128], iecbuf[2][64], tmbuf[32];
|
||||
return fmt::sprintf
|
||||
{
|
||||
headbuf, "%s %s [%lu -> %lu] %s chunk:%zu %s in %s",
|
||||
headbuf, "%s %s %lu:%lu %s chunk:%zu %s in %s",
|
||||
string(rembuf, ircd::remote(data.client)),
|
||||
string_view{data.user.user_id},
|
||||
data.since,
|
||||
|
@ -714,38 +719,6 @@ noexcept
|
|||
};
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::item::linear(data &data,
|
||||
const m::event &event)
|
||||
try
|
||||
{
|
||||
const scope_restore<decltype(data.event)> theirs
|
||||
{
|
||||
data.event, &event
|
||||
};
|
||||
|
||||
const auto ret
|
||||
{
|
||||
_linear(data)
|
||||
};
|
||||
|
||||
return ret;
|
||||
}
|
||||
catch(const std::bad_function_call &e)
|
||||
{
|
||||
thread_local char rembuf[128];
|
||||
log::dwarning
|
||||
{
|
||||
log, "linear %s %s '%s' missing handler :%s",
|
||||
string(rembuf, ircd::remote(data.client)),
|
||||
string_view{data.user.user_id},
|
||||
name(),
|
||||
e.what()
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::item::polylog(data &data)
|
||||
try
|
||||
|
@ -799,6 +772,64 @@ catch(const std::exception &e)
|
|||
throw;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::item::linear(data &data)
|
||||
try
|
||||
{
|
||||
const auto ret
|
||||
{
|
||||
_linear(data)
|
||||
};
|
||||
|
||||
return ret;
|
||||
}
|
||||
catch(const std::bad_function_call &e)
|
||||
{
|
||||
thread_local char rembuf[128];
|
||||
log::dwarning
|
||||
{
|
||||
log, "linear %s %s '%s' missing handler :%s",
|
||||
string(rembuf, ircd::remote(data.client)),
|
||||
string_view{data.user.user_id},
|
||||
name(),
|
||||
e.what()
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::item::poll(data &data,
|
||||
const m::event &event)
|
||||
try
|
||||
{
|
||||
const scope_restore<decltype(data.event)> theirs
|
||||
{
|
||||
data.event, &event
|
||||
};
|
||||
|
||||
const auto ret
|
||||
{
|
||||
_linear(data)
|
||||
};
|
||||
|
||||
return ret;
|
||||
}
|
||||
catch(const std::bad_function_call &e)
|
||||
{
|
||||
thread_local char rembuf[128];
|
||||
log::dwarning
|
||||
{
|
||||
log, "poll %s %s '%s' missing handler :%s",
|
||||
string(rembuf, ircd::remote(data.client)),
|
||||
string_view{data.user.user_id},
|
||||
name(),
|
||||
e.what()
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ircd::string_view
|
||||
ircd::m::sync::item::member_name()
|
||||
const
|
||||
|
|
|
@ -113,7 +113,7 @@ try
|
|||
range.first > range.second?
|
||||
false:
|
||||
range.second - range.first <= linear_delta_max?
|
||||
linear::handle(data):
|
||||
polylog::handle(data):
|
||||
polylog::handle(data)
|
||||
};
|
||||
|
||||
|
@ -122,14 +122,14 @@ try
|
|||
return {};
|
||||
|
||||
// When longpoll was successful, do nothing else.
|
||||
if(longpoll::poll(client, data, args))
|
||||
if(longpoll::poll(data, args))
|
||||
return {};
|
||||
|
||||
// A user-timeout occurred. According to the spec we return a
|
||||
// 200 with empty fields rather than a 408.
|
||||
const json::value next_batch
|
||||
{
|
||||
lex_cast(m::vm::current_sequence + 1), json::STRING
|
||||
lex_cast(m::vm::current_sequence + 0), json::STRING
|
||||
};
|
||||
|
||||
return resource::response
|
||||
|
@ -158,15 +158,12 @@ bool
|
|||
ircd::m::sync::polylog::handle(data &data)
|
||||
try
|
||||
{
|
||||
data.commit();
|
||||
|
||||
// Top level sync object.
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
m::sync::for_each([&data]
|
||||
m::sync::for_each(string_view{}, [&data]
|
||||
(item &item)
|
||||
{
|
||||
json::stack::member member
|
||||
|
@ -195,7 +192,7 @@ try
|
|||
string_view{next_batch}
|
||||
};
|
||||
|
||||
return true;
|
||||
return data.committed();
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
|
@ -209,6 +206,85 @@ catch(const std::exception &e)
|
|||
throw;
|
||||
}
|
||||
|
||||
//
|
||||
// linear
|
||||
//
|
||||
|
||||
decltype(ircd::m::sync::linear::delta_max)
|
||||
ircd::m::sync::linear::delta_max
|
||||
{
|
||||
{ "name", "ircd.client.sync.linear.delta.max" },
|
||||
{ "default", 1024 },
|
||||
};
|
||||
|
||||
bool
|
||||
ircd::m::sync::linear::handle(data &data)
|
||||
try
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
const m::events::range range
|
||||
{
|
||||
data.since, data.current + 1
|
||||
};
|
||||
|
||||
m::events::for_each(range, [&data]
|
||||
(const m::event::idx &event_idx, const m::event &event)
|
||||
{
|
||||
const scope_restore<decltype(data.event)> theirs
|
||||
{
|
||||
data.event, &event
|
||||
};
|
||||
|
||||
m::sync::for_each(string_view{}, [&data]
|
||||
(item &item)
|
||||
{
|
||||
json::stack::member member
|
||||
{
|
||||
data.out, item.member_name()
|
||||
};
|
||||
|
||||
item.linear(data);
|
||||
return true;
|
||||
});
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
const json::value next_batch
|
||||
{
|
||||
lex_cast(data.current + 1), json::STRING
|
||||
};
|
||||
|
||||
json::stack::member
|
||||
{
|
||||
object, "next_batch", next_batch
|
||||
};
|
||||
|
||||
log::debug
|
||||
{
|
||||
log, "linear %s (next_batch:%s)",
|
||||
loghead(data),
|
||||
string_view{next_batch}
|
||||
};
|
||||
|
||||
return data.committed();
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
log::error
|
||||
{
|
||||
log, "linear %s FAILED :%s",
|
||||
loghead(data),
|
||||
e.what()
|
||||
};
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
//
|
||||
// longpoll
|
||||
//
|
||||
|
@ -241,8 +317,7 @@ ircd::m::sync::longpoll::handle_notify(const m::event &event,
|
|||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::longpoll::poll(client &client,
|
||||
data &data,
|
||||
ircd::m::sync::longpoll::poll(data &data,
|
||||
const args &args)
|
||||
{
|
||||
++polling;
|
||||
|
@ -259,459 +334,26 @@ ircd::m::sync::longpoll::poll(client &client,
|
|||
if(queue.empty())
|
||||
continue;
|
||||
|
||||
const auto &a(queue.front());
|
||||
const auto &accepted
|
||||
{
|
||||
queue.front()
|
||||
};
|
||||
|
||||
const unwind pop{[]
|
||||
{
|
||||
if(polling <= 1)
|
||||
queue.pop_front();
|
||||
}};
|
||||
|
||||
if(handle(client, data, args, a))
|
||||
if(handle(data, args, accepted))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::longpoll::handle(client &client,
|
||||
data &data,
|
||||
ircd::m::sync::longpoll::handle(data &data,
|
||||
const args &args,
|
||||
const accepted &event)
|
||||
{
|
||||
const auto &room_id
|
||||
{
|
||||
json::get<"room_id"_>(event)
|
||||
};
|
||||
|
||||
if(room_id)
|
||||
{
|
||||
const m::room room{room_id};
|
||||
return handle(client, data, args, event, room);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::longpoll::handle(client &client,
|
||||
data &data,
|
||||
const args &args,
|
||||
const accepted &event,
|
||||
const m::room &room)
|
||||
{
|
||||
const m::user::id &user_id
|
||||
{
|
||||
args.request.user_id
|
||||
};
|
||||
|
||||
if(!room.membership(user_id, "join"))
|
||||
return false;
|
||||
|
||||
const auto rooms
|
||||
{
|
||||
sync_rooms(client, user_id, room, args, event)
|
||||
};
|
||||
|
||||
const m::user::room ur
|
||||
{
|
||||
m::user::id
|
||||
{
|
||||
args.request.user_id
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<json::value> presents;
|
||||
ur.get(std::nothrow, "ircd.presence", [&]
|
||||
(const m::event &event)
|
||||
{
|
||||
const auto &content
|
||||
{
|
||||
at<"content"_>(event)
|
||||
};
|
||||
|
||||
presents.emplace_back(event);
|
||||
});
|
||||
|
||||
const json::members presence
|
||||
{
|
||||
{ "events", json::value { presents.data(), presents.size() } },
|
||||
};
|
||||
|
||||
const auto &next_batch
|
||||
{
|
||||
int64_t(data.current + 1)
|
||||
};
|
||||
|
||||
resource::response
|
||||
{
|
||||
client, json::members
|
||||
{
|
||||
{ "next_batch", json::value { lex_cast(next_batch), json::STRING } },
|
||||
{ "rooms", rooms },
|
||||
{ "presence", presence },
|
||||
}
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string
|
||||
ircd::m::sync::longpoll::sync_rooms(client &client,
|
||||
const m::user::id &user_id,
|
||||
const m::room &room,
|
||||
const args &args,
|
||||
const accepted &event)
|
||||
{
|
||||
std::vector<std::string> r;
|
||||
std::vector<json::member> m;
|
||||
|
||||
thread_local char membership_buf[64];
|
||||
const auto membership
|
||||
{
|
||||
room.membership(membership_buf, user_id)
|
||||
};
|
||||
|
||||
r.emplace_back(sync_room(client, room, args, event));
|
||||
m.emplace_back(room.room_id, r.back());
|
||||
|
||||
const json::strung body
|
||||
{
|
||||
m.data(), m.data() + m.size()
|
||||
};
|
||||
|
||||
return json::strung{json::members
|
||||
{
|
||||
{ membership, body }
|
||||
}};
|
||||
}
|
||||
|
||||
std::string
|
||||
ircd::m::sync::longpoll::sync_room(client &client,
|
||||
const m::room &room,
|
||||
const args &args,
|
||||
const accepted &accepted)
|
||||
{
|
||||
const auto &since
|
||||
{
|
||||
args.since
|
||||
};
|
||||
|
||||
const m::event &event{accepted};
|
||||
std::vector<std::string> timeline;
|
||||
if(defined(json::get<"event_id"_>(event)))
|
||||
{
|
||||
json::strung strung(event);
|
||||
if(!!accepted.client_txnid)
|
||||
strung = json::insert(strung, json::member
|
||||
{
|
||||
"unsigned", json::members
|
||||
{
|
||||
{ "transaction_id", accepted.client_txnid }
|
||||
}
|
||||
});
|
||||
|
||||
timeline.emplace_back(std::move(strung));
|
||||
}
|
||||
|
||||
const json::strung timeline_serial
|
||||
{
|
||||
timeline.data(), timeline.data() + timeline.size()
|
||||
};
|
||||
|
||||
std::vector<std::string> ephemeral;
|
||||
if(json::get<"type"_>(event) == "m.typing") //TODO: X
|
||||
ephemeral.emplace_back(json::strung{event});
|
||||
|
||||
if(json::get<"type"_>(event) == "m.receipt") //TODO: X
|
||||
ephemeral.emplace_back(json::strung(event));
|
||||
|
||||
const json::strung ephemeral_serial
|
||||
{
|
||||
ephemeral.data(), ephemeral.data() + ephemeral.size()
|
||||
};
|
||||
|
||||
const auto &prev_batch
|
||||
{
|
||||
!timeline.empty()?
|
||||
unquote(json::object{timeline.front()}.get("event_id")):
|
||||
string_view{}
|
||||
};
|
||||
|
||||
//TODO: XXX
|
||||
const bool limited
|
||||
{
|
||||
false
|
||||
};
|
||||
|
||||
m::event::id::buf last_read_buf;
|
||||
const auto &last_read
|
||||
{
|
||||
m::receipt::read(last_read_buf, room.room_id, args.request.user_id)
|
||||
};
|
||||
|
||||
const auto last_read_idx
|
||||
{
|
||||
last_read && json::get<"event_id"_>(event)?
|
||||
index(last_read):
|
||||
0UL
|
||||
};
|
||||
|
||||
const auto current_idx
|
||||
{
|
||||
last_read_idx?
|
||||
index(at<"event_id"_>(event)):
|
||||
0UL
|
||||
};
|
||||
|
||||
const auto notes
|
||||
{
|
||||
last_read_idx?
|
||||
notification_count(room, last_read_idx, current_idx):
|
||||
json::undefined_number
|
||||
};
|
||||
|
||||
const auto highlights
|
||||
{
|
||||
last_read_idx?
|
||||
highlight_count(room, args.request.user_id, last_read_idx, current_idx):
|
||||
json::undefined_number
|
||||
};
|
||||
|
||||
const json::members body
|
||||
{
|
||||
{ "account_data", json::members{} },
|
||||
{ "unread_notifications",
|
||||
{
|
||||
{ "highlight_count", highlights },
|
||||
{ "notification_count", notes },
|
||||
}},
|
||||
{ "ephemeral",
|
||||
{
|
||||
{ "events", ephemeral_serial },
|
||||
}},
|
||||
{ "timeline",
|
||||
{
|
||||
{ "events", timeline_serial },
|
||||
{ "prev_batch", prev_batch },
|
||||
{ "limited", limited },
|
||||
}},
|
||||
};
|
||||
|
||||
return json::strung(body);
|
||||
}
|
||||
|
||||
//
|
||||
// linear
|
||||
//
|
||||
|
||||
decltype(ircd::m::sync::linear::delta_max)
|
||||
ircd::m::sync::linear::delta_max
|
||||
{
|
||||
{ "name", "ircd.client.sync.linear.delta.max" },
|
||||
{ "default", 1024 },
|
||||
};
|
||||
|
||||
bool
|
||||
ircd::m::sync::linear::handle(data &data)
|
||||
{
|
||||
auto &client{data.client};
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
uint64_t since
|
||||
{
|
||||
data.since
|
||||
};
|
||||
|
||||
std::map<std::string, std::vector<std::string>, std::less<>> r;
|
||||
|
||||
bool limited{false};
|
||||
m::events::for_each(since, [&]
|
||||
(const uint64_t &sequence, const m::event &event)
|
||||
{
|
||||
if(!r.empty() && (since - data.since > 128))
|
||||
{
|
||||
limited = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
since = sequence;
|
||||
if(!json::get<"room_id"_>(event))
|
||||
return true;
|
||||
|
||||
const m::room room
|
||||
{
|
||||
json::get<"room_id"_>(event)
|
||||
};
|
||||
|
||||
if(!room.membership(data.user.user_id))
|
||||
return true;
|
||||
|
||||
auto it
|
||||
{
|
||||
r.lower_bound(room.room_id)
|
||||
};
|
||||
|
||||
if(it == end(r) || it->first != room.room_id)
|
||||
it = r.emplace_hint(it, std::string{room.room_id}, std::vector<std::string>{});
|
||||
|
||||
it->second.emplace_back(json::strung{event});
|
||||
return true;
|
||||
});
|
||||
|
||||
if(r.empty())
|
||||
return false;
|
||||
|
||||
std::vector<json::member> events[3];
|
||||
|
||||
for(auto &p : r)
|
||||
{
|
||||
const m::room::id &room_id{p.first};
|
||||
auto &vec{p.second};
|
||||
|
||||
std::vector<std::string> timeline;
|
||||
std::vector<std::string> state;
|
||||
std::vector<std::string> ephemeral;
|
||||
|
||||
for(std::string &event : vec)
|
||||
if(json::object{event}.has("state_key"))
|
||||
state.emplace_back(std::move(event));
|
||||
else if(!json::object{event}.has("prev_events"))
|
||||
ephemeral.emplace_back(std::move(event));
|
||||
else
|
||||
timeline.emplace_back(std::move(event));
|
||||
|
||||
const json::strung timeline_serial{timeline.data(), timeline.data() + timeline.size()};
|
||||
const json::strung state_serial{state.data(), state.data() + state.size()};
|
||||
const json::strung ephemeral_serial{ephemeral.data(), ephemeral.data() + ephemeral.size()};
|
||||
|
||||
m::event::id::buf last_read_buf;
|
||||
const auto &last_read
|
||||
{
|
||||
m::receipt::read(last_read_buf, room_id, data.user)
|
||||
};
|
||||
|
||||
const auto last_read_idx
|
||||
{
|
||||
last_read?
|
||||
index(last_read):
|
||||
0UL
|
||||
};
|
||||
|
||||
const auto notes
|
||||
{
|
||||
last_read_idx?
|
||||
notification_count(room_id, last_read_idx, data.current):
|
||||
json::undefined_number
|
||||
};
|
||||
|
||||
const auto highlights
|
||||
{
|
||||
last_read_idx?
|
||||
highlight_count(room_id, data.user, last_read_idx, data.current):
|
||||
json::undefined_number
|
||||
};
|
||||
|
||||
const string_view prev_batch
|
||||
{
|
||||
!timeline.empty()?
|
||||
unquote(json::object{timeline.front()}.at("event_id")):
|
||||
string_view{}
|
||||
};
|
||||
|
||||
const json::members body
|
||||
{
|
||||
{ "ephemeral",
|
||||
{
|
||||
{ "events", ephemeral_serial },
|
||||
}},
|
||||
{ "state",
|
||||
{
|
||||
{ "events", state_serial }
|
||||
}},
|
||||
{ "timeline",
|
||||
{
|
||||
{ "events", timeline_serial },
|
||||
{ "prev_batch", prev_batch },
|
||||
{ "limited", limited },
|
||||
}},
|
||||
{ "unread_notifications",
|
||||
{
|
||||
{ "highlight_count", highlights },
|
||||
{ "notification_count", notes },
|
||||
}},
|
||||
};
|
||||
|
||||
thread_local char membership_buf[64];
|
||||
const auto membership{m::room{room_id}.membership(membership_buf, data.user)};
|
||||
const int ep
|
||||
{
|
||||
membership == "join"? 0:
|
||||
membership == "leave"? 1:
|
||||
membership == "invite"? 2:
|
||||
1 // default to leave (catches "ban" for now)
|
||||
};
|
||||
|
||||
events[ep].emplace_back(room_id, body);
|
||||
};
|
||||
|
||||
const json::value joinsv
|
||||
{
|
||||
events[0].data(), events[0].size()
|
||||
};
|
||||
|
||||
const json::value leavesv
|
||||
{
|
||||
events[1].data(), events[1].size()
|
||||
};
|
||||
|
||||
const json::value invitesv
|
||||
{
|
||||
events[2].data(), events[2].size()
|
||||
};
|
||||
|
||||
const json::members rooms
|
||||
{
|
||||
{ "join", joinsv },
|
||||
{ "leave", leavesv },
|
||||
{ "invite", invitesv },
|
||||
};
|
||||
|
||||
resource::response
|
||||
{
|
||||
client, json::members
|
||||
{
|
||||
{ "next_batch", json::value { lex_cast(int64_t(data.current + 1)), json::STRING } },
|
||||
{ "rooms", rooms },
|
||||
{ "presence", json::object{} },
|
||||
}
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static long
|
||||
ircd::m::sync::notification_count(const m::room &room,
|
||||
const m::event::idx &a,
|
||||
const m::event::idx &b)
|
||||
{
|
||||
return m::count_since(room, a, a < b? b : a);
|
||||
}
|
||||
|
||||
static long
|
||||
ircd::m::sync::highlight_count(const m::room &r,
|
||||
const m::user &u,
|
||||
const m::event::idx &a,
|
||||
const m::event::idx &b)
|
||||
{
|
||||
using namespace ircd::m;
|
||||
using proto = size_t (const user &, const room &, const event::idx &, const event::idx &);
|
||||
|
||||
static mods::import<proto> count
|
||||
{
|
||||
"m_user", "highlighted_count__between"
|
||||
};
|
||||
|
||||
return count(u, r, a, a < b? b : a);
|
||||
}
|
||||
|
|
|
@ -56,12 +56,8 @@ namespace ircd::m::sync::longpoll
|
|||
std::deque<accepted> queue;
|
||||
ctx::dock dock;
|
||||
|
||||
static std::string sync_room(client &, const m::room &, const args &, const accepted &);
|
||||
static std::string sync_rooms(client &, const m::user::id &, const m::room &, const args &, const accepted &);
|
||||
static bool handle(client &, data &, const args &, const accepted &, const m::room &);
|
||||
static bool handle(client &, data &, const args &, const accepted &);
|
||||
static bool poll(client &, data &, const args &);
|
||||
|
||||
static bool handle(data &, const args &, const accepted &);
|
||||
static bool poll(data &, const args &);
|
||||
static void handle_notify(const m::event &, m::vm::eval &);
|
||||
extern m::hookfn<m::vm::eval &> notified;
|
||||
}
|
||||
|
|
|
@ -16,7 +16,9 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static bool account_data_(data &, const m::event &, const m::event::idx &);
|
||||
static bool account_data_polylog(data &);
|
||||
static bool account_data_linear(data &);
|
||||
|
||||
extern item account_data;
|
||||
}
|
||||
|
@ -25,13 +27,19 @@ decltype(ircd::m::sync::account_data)
|
|||
ircd::m::sync::account_data
|
||||
{
|
||||
"account_data",
|
||||
account_data_polylog
|
||||
account_data_polylog,
|
||||
account_data_linear
|
||||
};
|
||||
|
||||
bool
|
||||
ircd::m::sync::account_data_linear(data &data)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::account_data_polylog(data &data)
|
||||
{
|
||||
// Open an object
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
|
@ -42,32 +50,54 @@ ircd::m::sync::account_data_polylog(data &data)
|
|||
data.out, "events"
|
||||
};
|
||||
|
||||
const m::room::state &state{data.user_state};
|
||||
const m::room::state &state
|
||||
{
|
||||
data.user_state
|
||||
};
|
||||
|
||||
state.for_each("ircd.account_data", [&data, &array]
|
||||
(const m::event &event)
|
||||
{
|
||||
// Ignore events outside this sync window
|
||||
if(!apropos(data, event))
|
||||
return;
|
||||
|
||||
// Each account_data event is an object in the events array
|
||||
json::stack::object object
|
||||
{
|
||||
array
|
||||
};
|
||||
|
||||
// type
|
||||
json::stack::member
|
||||
{
|
||||
object, "type", at<"state_key"_>(event)
|
||||
};
|
||||
|
||||
// content
|
||||
json::stack::member
|
||||
{
|
||||
object, "content", at<"content"_>(event)
|
||||
};
|
||||
if(account_data_(data, event, index(event)))
|
||||
data.commit();
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::account_data_(data &data,
|
||||
const m::event &event,
|
||||
const m::event::idx &event_idx)
|
||||
{
|
||||
if(!apropos(data, event_idx))
|
||||
return false;
|
||||
|
||||
if(json::get<"type"_>(event) != "ircd.account_data")
|
||||
return false;
|
||||
|
||||
if(json::get<"room_id"_>(event) != data.user_room.room_id)
|
||||
return false;
|
||||
|
||||
data.commit();
|
||||
|
||||
// Each account_data event is an object in the events array
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
// type
|
||||
json::stack::member
|
||||
{
|
||||
data.out, "type", at<"state_key"_>(event)
|
||||
};
|
||||
|
||||
// content
|
||||
json::stack::member
|
||||
{
|
||||
data.out, "content", at<"content"_>(event)
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -16,9 +16,12 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static void presence_events_polylog(data &);
|
||||
static void presence_polylog_events(data &);
|
||||
static bool presence_polylog(data &);
|
||||
|
||||
static void presence_linear_events(data &);
|
||||
static bool presence_linear(data &);
|
||||
|
||||
extern item presence;
|
||||
}
|
||||
|
||||
|
@ -33,33 +36,36 @@ ircd::m::sync::presence
|
|||
bool
|
||||
ircd::m::sync::presence_linear(data &data)
|
||||
{
|
||||
if(!data.event)
|
||||
return false;
|
||||
return true;
|
||||
|
||||
if(json::get<"type"_>(*data.event) != "ircd.presence")
|
||||
return false;
|
||||
|
||||
json::stack::object object
|
||||
assert(data.event);
|
||||
const m::event &event
|
||||
{
|
||||
data.out
|
||||
*data.event
|
||||
};
|
||||
|
||||
if(json::get<"type"_>(event) != "ircd.presence")
|
||||
return true;
|
||||
|
||||
if(json::get<"sender"_>(event) != m::me.user_id)
|
||||
return true;
|
||||
|
||||
// sender
|
||||
json::stack::member
|
||||
{
|
||||
object, "sender", unquote(at<"content"_>(*data.event).get("user_id"))
|
||||
data.out, "sender", unquote(at<"content"_>(event).get("user_id"))
|
||||
};
|
||||
|
||||
// type
|
||||
json::stack::member
|
||||
{
|
||||
object, "type", json::value{"m.presence"}
|
||||
data.out, "type", json::value{"m.presence"}
|
||||
};
|
||||
|
||||
// content
|
||||
json::stack::member
|
||||
{
|
||||
object, "content", at<"content"_>(*data.event)
|
||||
data.out, "content", at<"content"_>(event)
|
||||
};
|
||||
|
||||
return true;
|
||||
|
@ -73,12 +79,12 @@ ircd::m::sync::presence_polylog(data &data)
|
|||
data.out
|
||||
};
|
||||
|
||||
presence_events_polylog(data);
|
||||
presence_polylog_events(data);
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::sync::presence_events_polylog(data &data)
|
||||
ircd::m::sync::presence_polylog_events(data &data)
|
||||
{
|
||||
json::stack::array array
|
||||
{
|
||||
|
@ -93,6 +99,8 @@ ircd::m::sync::presence_events_polylog(data &data)
|
|||
// contended during a json::stack flush to the client; not during database
|
||||
// queries leading to this.
|
||||
const std::lock_guard<decltype(mutex)> l{mutex};
|
||||
|
||||
data.commit();
|
||||
json::stack::object object
|
||||
{
|
||||
array
|
||||
|
|
|
@ -19,7 +19,10 @@ namespace ircd::m::sync
|
|||
static void _rooms_polylog_room(data &, const m::room &);
|
||||
static void _rooms_polylog(data &, const string_view &membership);
|
||||
static bool rooms_polylog(data &);
|
||||
|
||||
static void _rooms_linear(data &, const string_view &membership);
|
||||
static bool rooms_linear(data &);
|
||||
|
||||
extern item rooms;
|
||||
}
|
||||
|
||||
|
@ -34,9 +37,35 @@ ircd::m::sync::rooms
|
|||
bool
|
||||
ircd::m::sync::rooms_linear(data &data)
|
||||
{
|
||||
json::stack::object object{data.out};
|
||||
_rooms_linear(data, "invite");
|
||||
_rooms_linear(data, "join");
|
||||
_rooms_linear(data, "leave");
|
||||
_rooms_linear(data, "ban");
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::sync::_rooms_linear(data &data,
|
||||
const string_view &membership)
|
||||
{
|
||||
const scope_restore<decltype(data.membership)> theirs
|
||||
{
|
||||
data.membership, membership
|
||||
};
|
||||
|
||||
m::sync::for_each("rooms", [&](item &item)
|
||||
{
|
||||
json::stack::member member
|
||||
{
|
||||
data.out, item.member_name()
|
||||
};
|
||||
|
||||
item.linear(data);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::rooms_polylog(data &data)
|
||||
{
|
||||
|
@ -52,15 +81,26 @@ void
|
|||
ircd::m::sync::_rooms_polylog(data &data,
|
||||
const string_view &membership)
|
||||
{
|
||||
const scope_restore<decltype(data.membership)> theirs
|
||||
{
|
||||
data.membership, membership
|
||||
};
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, membership
|
||||
};
|
||||
|
||||
data.user_rooms.for_each(membership, [&data]
|
||||
(const m::room &room, const string_view &membership)
|
||||
(const m::room &room, const string_view &membership_)
|
||||
{
|
||||
if(head_idx(std::nothrow, room) <= data.since)
|
||||
const auto head_idx
|
||||
{
|
||||
m::head_idx(std::nothrow, room)
|
||||
};
|
||||
|
||||
assert(head_idx); // room should exist
|
||||
if(!head_idx || head_idx < data.since)
|
||||
return;
|
||||
|
||||
// Generate individual stats for this room's sync
|
||||
|
@ -69,16 +109,7 @@ ircd::m::sync::_rooms_polylog(data &data,
|
|||
stats.timer = timer{};
|
||||
#endif
|
||||
|
||||
// This scope ensures the object destructs and flushes before
|
||||
// the log message tallying the stats for this room below.
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, room.room_id
|
||||
};
|
||||
|
||||
_rooms_polylog_room(data, room);
|
||||
}
|
||||
_rooms_polylog_room(data, room);
|
||||
|
||||
#ifdef RB_DEBUG
|
||||
thread_local char tmbuf[32];
|
||||
|
@ -96,14 +127,19 @@ ircd::m::sync::_rooms_polylog(data &data,
|
|||
void
|
||||
ircd::m::sync::_rooms_polylog_room(data &data,
|
||||
const m::room &room)
|
||||
try
|
||||
{
|
||||
const scope_restore<decltype(data.room)> theirs
|
||||
{
|
||||
data.room, &room
|
||||
};
|
||||
|
||||
m::sync::for_each("rooms", [&](item &item)
|
||||
json::stack::object object
|
||||
{
|
||||
data.out, room.room_id
|
||||
};
|
||||
|
||||
m::sync::for_each("rooms", [&data]
|
||||
(item &item)
|
||||
{
|
||||
json::stack::member member
|
||||
{
|
||||
|
@ -114,13 +150,3 @@ try
|
|||
return true;
|
||||
});
|
||||
}
|
||||
catch(const json::not_found &e)
|
||||
{
|
||||
log::critical
|
||||
{
|
||||
log, "polylog %s room %s error :%s"
|
||||
,loghead(data)
|
||||
,string_view{room.room_id}
|
||||
,e.what()
|
||||
};
|
||||
}
|
||||
|
|
|
@ -16,8 +16,10 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static bool room_account_data_events_polylog(data &);
|
||||
static void room_account_data_polylog_events_event(data &, const m::event &);
|
||||
static void room_account_data_polylog_events(data &);
|
||||
static bool room_account_data_polylog(data &);
|
||||
|
||||
extern item room_account_data;
|
||||
}
|
||||
|
||||
|
@ -36,12 +38,12 @@ ircd::m::sync::room_account_data_polylog(data &data)
|
|||
data.out
|
||||
};
|
||||
|
||||
room_account_data_events_polylog(data);
|
||||
room_account_data_polylog_events(data);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::room_account_data_events_polylog(data &data)
|
||||
void
|
||||
ircd::m::sync::room_account_data_polylog_events(data &data)
|
||||
{
|
||||
json::stack::array array
|
||||
{
|
||||
|
@ -53,33 +55,38 @@ ircd::m::sync::room_account_data_events_polylog(data &data)
|
|||
data.user_room
|
||||
};
|
||||
|
||||
assert(data.room);
|
||||
char typebuf[288]; //TODO: room_account_data_typebuf_size
|
||||
const auto type
|
||||
{
|
||||
m::user::_account_data_type(typebuf, data.room->room_id)
|
||||
};
|
||||
|
||||
state.for_each(type, [&data, &array]
|
||||
state.for_each(type, [&data]
|
||||
(const m::event &event)
|
||||
{
|
||||
if(!apropos(data, event))
|
||||
return;
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::member
|
||||
{
|
||||
object, "type", at<"state_key"_>(event)
|
||||
};
|
||||
|
||||
json::stack::member
|
||||
{
|
||||
object, "content", at<"content"_>(event)
|
||||
};
|
||||
if(apropos(data, event))
|
||||
room_account_data_polylog_events_event(data, event);
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::sync::room_account_data_polylog_events_event(data &data,
|
||||
const m::event &event)
|
||||
{
|
||||
data.commit();
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
json::stack::member
|
||||
{
|
||||
object, "type", at<"state_key"_>(event)
|
||||
};
|
||||
|
||||
json::stack::member
|
||||
{
|
||||
object, "content", at<"content"_>(event)
|
||||
};
|
||||
}
|
||||
|
|
|
@ -83,6 +83,7 @@ ircd::m::sync::_handle_receipt(data &data,
|
|||
at<"content"_>(event)
|
||||
};
|
||||
|
||||
data.commit();
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
|
|
|
@ -16,8 +16,10 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static bool _room_state_polylog_events(data &);
|
||||
static bool room_state_polylog_events(data &);
|
||||
static bool room_state_polylog(data &);
|
||||
|
||||
static bool room_state_linear_events(data &);
|
||||
static bool room_state_linear(data &);
|
||||
|
||||
extern const event::keys::include _default_keys;
|
||||
|
@ -71,11 +73,11 @@ ircd::m::sync::room_state_polylog(data &data)
|
|||
data.out
|
||||
};
|
||||
|
||||
return _room_state_polylog_events(data);
|
||||
return room_state_polylog_events(data);
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::_room_state_polylog_events(data &data)
|
||||
ircd::m::sync::room_state_polylog_events(data &data)
|
||||
{
|
||||
json::stack::array array
|
||||
{
|
||||
|
@ -100,6 +102,7 @@ ircd::m::sync::_room_state_polylog_events(data &data)
|
|||
return;
|
||||
|
||||
const std::lock_guard<decltype(mutex)> lock{mutex};
|
||||
data.commit();
|
||||
array.append(event);
|
||||
}};
|
||||
|
||||
|
|
|
@ -16,8 +16,15 @@ IRCD_MODULE
|
|||
|
||||
namespace ircd::m::sync
|
||||
{
|
||||
static event::id::buf _room_timeline_events(data &, const m::room &, bool &);
|
||||
struct room_timeline;
|
||||
|
||||
static event::id::buf _room_timeline_polylog_events(data &, const m::room &, bool &);
|
||||
static bool room_timeline_polylog(data &);
|
||||
|
||||
static event::id::buf _room_timeline_linear_events(data &, const m::room &, bool &);
|
||||
static bool room_timeline_linear(data &);
|
||||
|
||||
extern const event::keys::include default_keys;
|
||||
extern item room_timeline;
|
||||
}
|
||||
|
||||
|
@ -25,22 +32,42 @@ decltype(ircd::m::sync::room_timeline)
|
|||
ircd::m::sync::room_timeline
|
||||
{
|
||||
"rooms.timeline",
|
||||
room_timeline_polylog
|
||||
room_timeline_polylog,
|
||||
room_timeline_linear,
|
||||
};
|
||||
|
||||
decltype(ircd::m::sync::default_keys)
|
||||
ircd::m::sync::default_keys
|
||||
{
|
||||
"content",
|
||||
"depth",
|
||||
"event_id",
|
||||
"origin_server_ts",
|
||||
"prev_events",
|
||||
"redacts",
|
||||
"room_id",
|
||||
"sender",
|
||||
"state_key",
|
||||
"type",
|
||||
};
|
||||
|
||||
bool
|
||||
ircd::m::sync::room_timeline_polylog(data &data)
|
||||
ircd::m::sync::room_timeline_linear(data &data)
|
||||
{
|
||||
return true;
|
||||
|
||||
json::stack::object object
|
||||
{
|
||||
data.out
|
||||
};
|
||||
|
||||
m::room room;
|
||||
|
||||
// events
|
||||
bool limited{false};
|
||||
m::event::id::buf prev
|
||||
{
|
||||
_room_timeline_events(data, *data.room, limited)
|
||||
_room_timeline_linear_events(data, room, limited)
|
||||
};
|
||||
|
||||
// prev_batch
|
||||
|
@ -59,30 +86,61 @@ ircd::m::sync::room_timeline_polylog(data &data)
|
|||
}
|
||||
|
||||
ircd::m::event::id::buf
|
||||
ircd::m::sync::_room_timeline_events(data &data,
|
||||
const m::room &room,
|
||||
bool &limited)
|
||||
ircd::m::sync::_room_timeline_linear_events(data &data,
|
||||
const m::room &room,
|
||||
bool &limited)
|
||||
{
|
||||
json::stack::array array
|
||||
{
|
||||
data.out, "events"
|
||||
};
|
||||
|
||||
static const m::event::fetch::opts fopts
|
||||
return {};
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::sync::room_timeline_polylog(data &data)
|
||||
{
|
||||
json::stack::object object
|
||||
{
|
||||
m::event::keys::include
|
||||
{
|
||||
"content",
|
||||
"depth",
|
||||
"event_id",
|
||||
"origin_server_ts",
|
||||
"prev_events",
|
||||
"redacts",
|
||||
"room_id",
|
||||
"sender",
|
||||
"state_key",
|
||||
"type",
|
||||
},
|
||||
data.out
|
||||
};
|
||||
|
||||
// events
|
||||
bool limited{false};
|
||||
m::event::id::buf prev
|
||||
{
|
||||
_room_timeline_polylog_events(data, *data.room, limited)
|
||||
};
|
||||
|
||||
// prev_batch
|
||||
json::stack::member
|
||||
{
|
||||
object, "prev_batch", string_view{prev}
|
||||
};
|
||||
|
||||
// limited
|
||||
json::stack::member
|
||||
{
|
||||
object, "limited", json::value{limited}
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ircd::m::event::id::buf
|
||||
ircd::m::sync::_room_timeline_polylog_events(data &data,
|
||||
const m::room &room,
|
||||
bool &limited)
|
||||
{
|
||||
static const event::fetch::opts fopts
|
||||
{
|
||||
default_keys
|
||||
};
|
||||
|
||||
json::stack::array array
|
||||
{
|
||||
data.out, "events"
|
||||
};
|
||||
|
||||
// messages seeks to the newest event, but the client wants the oldest
|
||||
|
@ -117,6 +175,7 @@ ircd::m::sync::_room_timeline_events(data &data,
|
|||
|
||||
if(i > 0 && it)
|
||||
{
|
||||
data.commit();
|
||||
//const m::event &event{*it};
|
||||
//data.state_at = at<"depth"_>(event);
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ ircd::m::sync::room_unread_notifications_polylog(data &data)
|
|||
if(!m::receipt::read(last_read, room.room_id, data.user))
|
||||
return false;
|
||||
|
||||
data.commit();
|
||||
json::stack::object out
|
||||
{
|
||||
data.out
|
||||
|
|
Loading…
Reference in a new issue