0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-09-27 11:18:51 +02:00

modules/client/sync: Reorder/reorg definitions.

This commit is contained in:
Jason Volk 2020-06-03 19:09:36 -07:00
parent a70a6164be
commit 96222340f8

View file

@ -431,361 +431,17 @@ ircd::m::sync::flush(data &data,
return wrote;
}
// polylog
//
// Random access approach for large `since` ranges. The /sync schema itself is
// recursed. For every component in the schema, the handler seeks the events
// appropriate for the user and appends it to the output. Concretely, this
// involves a full iteration of the rooms a user is a member of, and a full
// iteration of the presence status for all users visible to a user, etc.
//
// This entire process occurs in a single pass. The schema is traced with
// json::stack and its buffer is flushed to the client periodically with
// chunked encoding.
bool
ircd::m::sync::polylog_handle(data &data)
try
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object top
{
*data.out
};
// Prefetch loop
if(data.range.first == 0)
{
const scope_restore prefetching
{
data.prefetch, true
};
m::sync::for_each(string_view{}, [&data]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object object
{
*data.out, item.member_name()
};
item.polylog(data);
checkpoint.committing(false);
return true;
});
}
// Output loop
bool ret{false};
m::sync::for_each(string_view{}, [&data, &ret]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object object
{
*data.out, item.member_name()
};
if(item.polylog(data))
{
ret = true;
data.out->invalidate_checkpoints();
}
else checkpoint.committing(false);
return true;
});
if(ret)
{
const int64_t next_batch
{
data.phased?
int64_t(data.range.first) - 1L:
int64_t(data.range.second)
};
char buf[64];
assert(data.phased || next_batch >= 0L);
const string_view &next_batch_token
{
// The polylog phased since token. We pack two numbers separted by a '_'
// character which cannot be urlencoded atm. The first is the usual
// since token integer, which is negative for phased initial sync. The
// second part is the next_batch upper-bound integer which is a snapshot
// of the server's sequence number when the phased sync started.
data.phased?
make_since(buf, m::events::range{uint64_t(next_batch), data.range.second}):
// The normal integer since token.
make_since(buf, next_batch)
};
json::stack::member
{
*data.out, "next_batch", json::value
{
next_batch_token, json::STRING
}
};
}
if(!ret)
checkpoint.committing(false);
if(!data.phased && stats_info)
log::info
{
log, "request %s polylog commit:%b complete @%ld",
loghead(data),
ret,
data.phased?
data.range.first:
data.range.second
};
return ret;
}
catch(const std::exception &e)
{
log::error
{
log, "polylog %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
//
// linear
//
// Approach for small `since` ranges. The range of events is iterated and
// the event itself is presented to each handler in the schema. This also
// involves a json::stack trace of the schema so that if the handler determines
// the event is appropriate for syncing to the user the output buffer will
// contain a residue of a /sync response with a single event.
//
// After the iteration of events is complete we are left with several buffers
// of properly formatted individual /sync responses which we rewrite into a
// single response to overcome the inefficiency of request ping-pong under
// heavy load.
namespace ircd::m::sync
{
static bool linear_proffer_event_one(data &);
static size_t linear_proffer_event(data &, const mutable_buffer &);
static std::pair<event::idx, bool> linear_proffer(data &, window_buffer &);
}
bool
ircd::m::sync::linear_handle(data &data)
try
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object top
{
*data.out
};
const unique_buffer<mutable_buffer> buf
{
// must be at least worst-case size of m::event plus some.
std::max(size_t(linear_buffer_size), size_t(128_KiB))
};
window_buffer wb{buf};
const auto &[last, completed]
{
linear_proffer(data, wb)
};
const json::vector vector
{
wb.completed()
};
const auto next
{
last && completed?
data.range.second:
last?
std::min(last + 1, data.range.second):
0UL
};
if(last)
{
char buf[64];
json::stack::member
{
top, "next_batch", json::value
{
make_since(buf, next), json::STRING
}
};
json::merge(top, vector);
}
else checkpoint.committing(false);
log::debug
{
log, "request %s linear last:%lu %s@%lu events:%zu",
loghead(data),
last,
completed? "complete "_sv : string_view{},
next,
vector.size(),
};
return last;
}
catch(const std::exception &e)
{
log::error
{
log, "linear %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
/// Iterates the events in the data.range and creates a json::vector in
/// the supplied window_buffer. The return value is the event_idx of the
/// last event which fit in the buffer, or 0 of nothing was of interest
/// to our client in the event iteration.
std::pair<ircd::m::event::idx, bool>
ircd::m::sync::linear_proffer(data &data,
window_buffer &wb)
{
event::idx ret(0);
const auto closure{[&data, &wb, &ret]
(const m::event::idx &event_idx, const m::event &event)
{
const scope_restore their_event
{
data.event, &event
};
const scope_restore their_event_idx
{
data.event_idx, event_idx
};
wb([&data, &ret, &event_idx]
(const mutable_buffer &buf)
{
const auto consumed
{
linear_proffer_event(data, buf)
};
if(consumed)
ret = event_idx;
return consumed;
});
const bool enough_space_for_more
{
// The buffer must have at least this much more space
// to continue with the iteration. Otherwise if the next
// worst-case event does not fit, bad things.
wb.remaining() >= 68_KiB
};
return enough_space_for_more;
}};
const auto completed
{
m::events::for_each(data.range, closure)
};
return
{
ret, completed
};
}
/// Sets up a json::stack for the iteration of handlers for
/// one event.
size_t
ircd::m::sync::linear_proffer_event(data &data,
const mutable_buffer &buf)
{
json::stack out{buf};
const scope_restore their_out
{
data.out, &out
};
json::stack::object top
{
*data.out
};
const bool success
{
linear_proffer_event_one(data)
};
top.~object();
return success?
size(out.completed()):
0UL;
}
/// Generates a candidate /sync response for a single event by
/// iterating all of the handlers.
bool
ircd::m::sync::linear_proffer_event_one(data &data)
{
bool ret{false};
m::sync::for_each(string_view{}, [&data, &ret]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
if(item.linear(data))
ret = true;
else
checkpoint.rollback();
return true;
});
return ret;
}
///////////////////////////////////////////////////////////////////////////////
//
// longpoll
//
namespace ircd::m::sync
{
// fwd decl as longpoll is a frontend to a linear-sync.
static size_t linear_proffer_event(data &, const mutable_buffer &);
}
namespace ircd::m::sync::longpoll
{
static bool polled(data &, const args &);
@ -1034,6 +690,362 @@ ircd::m::sync::longpoll::polled(data &data,
return true;
}
///////////////////////////////////////////////////////////////////////////////
//
// linear
//
// Approach for small `since` ranges. The range of events is iterated and
// the event itself is presented to each handler in the schema. This also
// involves a json::stack trace of the schema so that if the handler determines
// the event is appropriate for syncing to the user the output buffer will
// contain a residue of a /sync response with a single event.
//
// After the iteration of events is complete we are left with several buffers
// of properly formatted individual /sync responses which we rewrite into a
// single response to overcome the inefficiency of request ping-pong under
// heavy load.
namespace ircd::m::sync
{
static bool linear_proffer_event_one(data &);
static size_t linear_proffer_event(data &, const mutable_buffer &);
static std::pair<event::idx, bool> linear_proffer(data &, window_buffer &);
}
bool
ircd::m::sync::linear_handle(data &data)
try
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object top
{
*data.out
};
const unique_buffer<mutable_buffer> buf
{
// must be at least worst-case size of m::event plus some.
std::max(size_t(linear_buffer_size), size_t(128_KiB))
};
window_buffer wb{buf};
const auto &[last, completed]
{
linear_proffer(data, wb)
};
const json::vector vector
{
wb.completed()
};
const auto next
{
last && completed?
data.range.second:
last?
std::min(last + 1, data.range.second):
0UL
};
if(last)
{
char buf[64];
json::stack::member
{
top, "next_batch", json::value
{
make_since(buf, next), json::STRING
}
};
json::merge(top, vector);
}
else checkpoint.committing(false);
log::debug
{
log, "request %s linear last:%lu %s@%lu events:%zu",
loghead(data),
last,
completed? "complete "_sv : string_view{},
next,
vector.size(),
};
return last;
}
catch(const std::exception &e)
{
log::error
{
log, "linear %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
/// Iterates the events in the data.range and creates a json::vector in
/// the supplied window_buffer. The return value is the event_idx of the
/// last event which fit in the buffer, or 0 of nothing was of interest
/// to our client in the event iteration.
std::pair<ircd::m::event::idx, bool>
ircd::m::sync::linear_proffer(data &data,
window_buffer &wb)
{
event::idx ret(0);
const auto closure{[&data, &wb, &ret]
(const m::event::idx &event_idx, const m::event &event)
{
const scope_restore their_event
{
data.event, &event
};
const scope_restore their_event_idx
{
data.event_idx, event_idx
};
wb([&data, &ret, &event_idx]
(const mutable_buffer &buf)
{
const auto consumed
{
linear_proffer_event(data, buf)
};
if(consumed)
ret = event_idx;
return consumed;
});
const bool enough_space_for_more
{
// The buffer must have at least this much more space
// to continue with the iteration. Otherwise if the next
// worst-case event does not fit, bad things.
wb.remaining() >= 68_KiB
};
return enough_space_for_more;
}};
const auto completed
{
m::events::for_each(data.range, closure)
};
return
{
ret, completed
};
}
/// Sets up a json::stack for the iteration of handlers for
/// one event.
size_t
ircd::m::sync::linear_proffer_event(data &data,
const mutable_buffer &buf)
{
json::stack out{buf};
const scope_restore their_out
{
data.out, &out
};
json::stack::object top
{
*data.out
};
const bool success
{
linear_proffer_event_one(data)
};
top.~object();
return success?
size(out.completed()):
0UL;
}
/// Generates a candidate /sync response for a single event by
/// iterating all of the handlers.
bool
ircd::m::sync::linear_proffer_event_one(data &data)
{
bool ret{false};
m::sync::for_each(string_view{}, [&data, &ret]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
if(item.linear(data))
ret = true;
else
checkpoint.rollback();
return true;
});
return ret;
}
///////////////////////////////////////////////////////////////////////////////
//
// polylog
//
// Random access approach for large `since` ranges. The /sync schema itself is
// recursed. For every component in the schema, the handler seeks the events
// appropriate for the user and appends it to the output. Concretely, this
// involves a full iteration of the rooms a user is a member of, and a full
// iteration of the presence status for all users visible to a user, etc.
//
// This entire process occurs in a single pass. The schema is traced with
// json::stack and its buffer is flushed to the client periodically with
// chunked encoding.
bool
ircd::m::sync::polylog_handle(data &data)
try
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object top
{
*data.out
};
// Prefetch loop
if(data.range.first == 0)
{
const scope_restore prefetching
{
data.prefetch, true
};
m::sync::for_each(string_view{}, [&data]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object object
{
*data.out, item.member_name()
};
item.polylog(data);
checkpoint.committing(false);
return true;
});
}
// Output loop
bool ret{false};
m::sync::for_each(string_view{}, [&data, &ret]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object object
{
*data.out, item.member_name()
};
if(item.polylog(data))
{
ret = true;
data.out->invalidate_checkpoints();
}
else checkpoint.committing(false);
return true;
});
if(ret)
{
const int64_t next_batch
{
data.phased?
int64_t(data.range.first) - 1L:
int64_t(data.range.second)
};
char buf[64];
assert(data.phased || next_batch >= 0L);
const string_view &next_batch_token
{
// The polylog phased since token. We pack two numbers separted by a '_'
// character which cannot be urlencoded atm. The first is the usual
// since token integer, which is negative for phased initial sync. The
// second part is the next_batch upper-bound integer which is a snapshot
// of the server's sequence number when the phased sync started.
data.phased?
make_since(buf, m::events::range{uint64_t(next_batch), data.range.second}):
// The normal integer since token.
make_since(buf, next_batch)
};
json::stack::member
{
*data.out, "next_batch", json::value
{
next_batch_token, json::STRING
}
};
}
if(!ret)
checkpoint.committing(false);
if(!data.phased && stats_info)
log::info
{
log, "request %s polylog commit:%b complete @%ld",
loghead(data),
ret,
data.phased?
data.range.first:
data.range.second
};
return ret;
}
catch(const std::exception &e)
{
log::error
{
log, "polylog %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
//
// data
//