0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-26 00:32:35 +01:00

ircd:Ⓜ️:sync: Introduce phased polylogarithmic initial sync.

This commit is contained in:
Jason Volk 2019-04-08 02:04:24 -07:00
parent a780609b76
commit a82410009e
5 changed files with 153 additions and 26 deletions

View file

@ -51,6 +51,7 @@ struct ircd::m::sync::item
handle _linear;
json::strung feature;
json::object opts;
bool phased;
public:
string_view name() const;
@ -77,6 +78,10 @@ struct ircd::m::sync::data
/// index is one beyond the vm::current_sequence and used for next_batch.
m::events::range range;
/// Whether to enable phased sync mode. The range.first will be <= 0
/// in this case, and only handlers with the phased feature
bool phased {false};
/// Statistics tracking. If null, stats won't be accumulated for the sync.
sync::stats *stats {nullptr};

View file

@ -569,7 +569,7 @@ ircd::m::sync::loghead(const data &data)
return fmt::sprintf
{
headbuf, "%s %s %lu:%lu|%lu chunk:%zu sent:%s of %s in %s",
headbuf, "%s %s %ld:%lu|%lu chunk:%zu sent:%s of %s in %s",
remstr,
string_view{data.user.user_id},
data.range.first,
@ -697,6 +697,10 @@ ircd::m::sync::item::item(std::string name,
{
this->feature
}
,phased
{
opts.get<bool>("phased", false)
}
{
log::debug
{
@ -722,9 +726,22 @@ bool
ircd::m::sync::item::polylog(data &data)
try
{
// Skip the item if disabled by configuration
if(!enable)
return false;
// Skip the item for phased-sync ranges if it's not phased-sync aware.
if(!phased && data.phased && int64_t(data.range.first) < 0L)
{
assert(data.phased);
return false;
}
// Skip the item for the initial-sync pass if it's phased-sync aware;
// it will be called for the first time at the next phase.
if(phased && data.phased && data.range.first == 0UL)
return false;
#ifdef RB_DEBUG
sync::stats stats
{

View file

@ -98,6 +98,13 @@ ircd::m::sync::linear_delta_max
{ "help", linear_delta_max_help },
};
decltype(ircd::m::sync::polylog_phased)
ircd::m::sync::polylog_phased
{
{ "name", "ircd.client.sync.polylog.phased" },
{ "default", true },
};
decltype(ircd::m::sync::polylog_only)
ircd::m::sync::polylog_only
{
@ -145,18 +152,34 @@ ircd::m::sync::handle_get(client &client,
args.since, std::min(args.next_batch, m::vm::sequence::retired + 1)
};
// When the range indexes are the same, the client is polling for the next
// event which doesn't exist yet. There is no reason for the since parameter
// to be greater than that.
if(range.first > range.second)
// The phased initial sync feature uses negative since tokens.
const bool phased_range
{
int64_t(range.first) < 0L
};
// Check if the admin disabled phased sync.
if(!polylog_phased && phased_range)
throw m::NOT_FOUND
{
"Since parameter '%lu' is too far in the future."
" Cannot be greater than '%lu'.",
"Since parameter '%ld' must be >= 0.",
range.first,
range.second
};
// When the range indexes are the same, the client is polling for the next
// event which doesn't exist yet. There is no reason for the since parameter
// to be greater than that, unless it's a negative integer and phased
// sync is enabled
if(!polylog_phased || !phased_range)
if(range.first > range.second)
throw m::NOT_FOUND
{
"Since parameter '%lu' is too far in the future."
" Cannot be greater than '%lu'.",
range.first,
range.second
};
// Keep state for statistics of this sync here on the stack.
stats stats;
data data
@ -169,6 +192,18 @@ ircd::m::sync::handle_get(client &client,
args.filter_id
};
const bool initial_sync
{
range.first == 0UL
};
// Conditions for phased sync for this client
data.phased =
{
(polylog_phased && args.phased) &&
(phased_range || initial_sync)
};
// Start the chunked encoded response.
resource::response::chunked response
{
@ -190,11 +225,13 @@ ircd::m::sync::handle_get(client &client,
const bool should_longpoll
{
!data.phased &&
range.first > vm::sequence::retired
};
const bool should_linear
{
!data.phased &&
!should_longpoll &&
!bool(polylog_only) &&
range.second - range.first <= size_t(linear_delta_max)
@ -213,8 +250,9 @@ ircd::m::sync::handle_get(client &client,
if(shortpolled)
return {};
if(longpoll_enable && longpoll::poll(data, args))
return {};
if(longpoll_enable && (!data.phased || initial_sync))
if(longpoll::poll(data, args))
return {};
const auto &next_batch
{
@ -335,23 +373,34 @@ try
});
if(ret)
{
const int64_t next_batch
{
data.phased?
int64_t(data.range.first) - 1L:
int64_t(data.range.second)
};
json::stack::member
{
*data.out, "next_batch", json::value
{
lex_cast(data.range.second), json::STRING
lex_cast(next_batch), json::STRING
}
};
}
if(!ret)
checkpoint.decommit();
if(stats_info) log::info
if(!data.phased && stats_info) log::info
{
log, "request %s polylog commit:%b complete @%lu",
log, "request %s polylog commit:%b complete @%ld",
loghead(data),
ret,
data.range.second
data.phased?
data.range.first:
data.range.second
};
return ret;

View file

@ -24,6 +24,7 @@ namespace ircd::m::sync
extern conf::item<size_t> linear_buffer_size;
extern conf::item<size_t> linear_delta_max;
extern conf::item<bool> longpoll_enable;
extern conf::item<bool> polylog_phased;
extern conf::item<bool> polylog_only;
static const_buffer flush(data &, resource::response::chunked &, const const_buffer &);
@ -143,5 +144,10 @@ struct ircd::m::sync::args
request.query.get("set_presence", true)
};
bool phased
{
request.query.get("phased", true)
};
args(const resource::request &request);
};

View file

@ -19,7 +19,7 @@ namespace ircd::m::sync
static bool should_ignore(const data &);
static bool _rooms_polylog_room(data &, const m::room &);
static bool _rooms_polylog(data &, const string_view &membership);
static bool _rooms_polylog(data &, const string_view &membership, int64_t &phase);
static bool rooms_polylog(data &);
static bool _rooms_linear(data &, const string_view &membership);
@ -31,9 +31,10 @@ namespace ircd::m::sync
decltype(ircd::m::sync::rooms)
ircd::m::sync::rooms
{
"rooms",
rooms_polylog,
rooms_linear
"rooms", rooms_polylog, rooms_linear,
{
{ "phased", true }
}
};
bool
@ -92,16 +93,31 @@ bool
ircd::m::sync::rooms_polylog(data &data)
{
bool ret{false};
ret |= _rooms_polylog(data, "invite");
ret |= _rooms_polylog(data, "join");
ret |= _rooms_polylog(data, "leave");
ret |= _rooms_polylog(data, "ban");
int64_t phase(0);
ret |= _rooms_polylog(data, "join", phase);
if(data.phased && ret)
return ret;
ret |= _rooms_polylog(data, "invite", phase);
if(data.phased && ret)
return ret;
ret |= _rooms_polylog(data, "leave", phase);
if(data.phased && ret)
return ret;
ret |= _rooms_polylog(data, "ban", phase);
if(data.phased && ret)
return ret;
return ret;
}
bool
ircd::m::sync::_rooms_polylog(data &data,
const string_view &membership)
const string_view &membership,
int64_t &phase)
{
const scope_restore theirs
{
@ -114,9 +130,23 @@ ircd::m::sync::_rooms_polylog(data &data,
};
bool ret{false};
data.user_rooms.for_each(membership, [&data, &ret]
const user::rooms::closure_bool closure{[&data, &ret, &phase]
(const m::room &room, const string_view &membership_)
{
assert(!data.phased || int64_t(data.range.first) < 0L);
if(data.phased)
{
if(phase >= int64_t(data.range.first))
{
--phase;
return true;
}
if(phase < int64_t(data.range.first) && ret)
return false;
}
#if defined(RB_DEBUG)
sync::stats stats
{
@ -129,7 +159,20 @@ ircd::m::sync::_rooms_polylog(data &data,
stats.timer = timer{};
#endif
ret |= _rooms_polylog_room(data, room);
{
const scope_restore range
{
data.range.first, data.phased? 0UL : data.range.first
};
ret |= _rooms_polylog_room(data, room);
}
if(data.phased && !ret)
{
--data.range.first;
return true;
}
#if defined(RB_DEBUG)
thread_local char tmbuf[32];
@ -141,7 +184,14 @@ ircd::m::sync::_rooms_polylog(data &data,
ircd::pretty(tmbuf, stats.timer.at<milliseconds>(), true)
};
#endif
});
return true;
}};
const bool done
{
data.user_rooms.for_each(membership, closure)
};
return ret;
}