0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-26 15:33:54 +01:00

modules/client/sync: Implement iteration base of linear sync.

This commit is contained in:
Jason Volk 2019-02-26 17:02:21 -08:00
parent 78a1281b1c
commit cc586a352e
4 changed files with 173 additions and 50 deletions

View file

@ -95,7 +95,8 @@ struct ircd::m::sync::data
const m::event *event {nullptr};
const m::room *room {nullptr};
string_view membership;
event::idx room_head {0};
event::idx room_head {0}; // if *room
event::idx event_idx {0}; // if *event
data(const m::user &user,
const m::events::range &range,

View file

@ -755,6 +755,9 @@ bool
ircd::m::sync::item::linear(data &data)
try
{
if(!enable)
return false;
return _linear(data);
}
catch(const std::bad_function_call &e)

View file

@ -49,8 +49,8 @@ ircd::m::sync::buffer_size
{ "default", long(128_KiB) },
};
decltype(ircd::m::sync::linear::delta_max)
ircd::m::sync::linear::delta_max
decltype(ircd::m::sync::linear_delta_max)
ircd::m::sync::linear_delta_max
{
{ "name", "ircd.client.sync.linear.delta.max" },
{ "default", 1024 },
@ -145,9 +145,9 @@ ircd::m::sync::handle_get(client &client,
{
range.first > range.second?
false:
range.second - range.first <= size_t(linear::delta_max)?
polylog::handle(data):
polylog::handle(data)
range.second - range.first <= size_t(linear_delta_max)?
polylog_handle(data):
polylog_handle(data)
};
// When shortpoll was successful, do nothing else.
@ -208,12 +208,20 @@ ircd::m::sync::flush(data &data,
};
}
//
// polylog
//
// Random access approach for large `since` ranges. The /sync schema itself is
// recursed. For every component in the schema, the handler seeks the events
// appropriate for the user and appends it to the output. Concretely, this
// involves a full iteration of the rooms a user is a member of, and a full
// iteration of the presence status for all users visible to a user, etc.
//
// This entire process occurs in a single pass. The schema is traced with
// json::stack and its buffer is flushed to the client periodically with
// chunked encoding.
bool
ircd::m::sync::polylog::handle(data &data)
ircd::m::sync::polylog_handle(data &data)
try
{
json::stack::checkpoint checkpoint
@ -279,49 +287,72 @@ catch(const std::exception &e)
//
// linear
//
// Approach for small `since` ranges. The range of events is iterated and
// the event itself is presented to each handler in the schema. This also
// involves a json::stack trace of the schema so that if the handler determines
// the event is appropriate for syncing to the user the output buffer will
// contain a residue of a /sync response with a single event.
//
// After the iteration of events is complete we are left with several buffers
// of properly formatted individual /sync responses which we rewrite into a
// single response to overcome the inefficiency of request ping-pong under
// heavy load.
namespace ircd::m::sync
{
static bool linear_proffer_event_one(data &);
static size_t linear_proffer_event(data &, const mutable_buffer &);
static event::idx linear_proffer(data &, window_buffer &);
static void linear_rewrite(data &, const json::vector &);
}
bool
ircd::m::sync::linear::handle(data &data)
ircd::m::sync::linear_handle(data &data)
try
{
bool ret{false};
m::events::for_each(data.range, [&data, &ret]
(const m::event::idx &event_idx, const m::event &event)
json::stack::checkpoint checkpoint
{
const scope_restore<decltype(data.event)> theirs
*data.out
};
const unique_buffer<mutable_buffer> buf
{
96_KiB //TODO: XXX
};
window_buffer wb{buf};
const event::idx last
{
linear_proffer(data, wb)
};
const json::vector vector
{
wb.completed()
};
if(last)
{
json::stack::member
{
data.event, &event
*data.out, "next_batch", json::value
{
lex_cast(last+1), json::STRING
}
};
m::sync::for_each(string_view{}, [&data, &ret]
(item &item)
{
json::stack::member member
{
data.out, item.member_name()
};
ret |= item.linear(data);
return true;
});
return true;
});
json::stack::member
{
data.out, "next_batch", json::value
{
lex_cast(data.range.second), json::STRING
}
};
linear_rewrite(data, vector);
}
else checkpoint.rollback();
log::debug
{
log, "linear %s complete", loghead(data)
log, "linear %s last:%lu complete",
loghead(data),
last
};
return ret;
return last;
}
catch(const std::exception &e)
{
@ -335,6 +366,103 @@ catch(const std::exception &e)
throw;
}
void
ircd::m::sync::linear_rewrite(data &data,
const json::vector &vector)
{
}
/// Iterates the events in the data.range and creates a json::vector in
/// the supplied window_buffer. The return value is the event_idx of the
/// last event which fit in the buffer, or 0 of nothing was of interest
/// to our client in the event iteration.
ircd::m::event::idx
ircd::m::sync::linear_proffer(data &data,
window_buffer &wb)
{
event::idx ret(0);
m::events::for_each(data.range, [&data, &wb, &ret]
(const m::event::idx &event_idx, const m::event &event)
{
const scope_restore their_event
{
data.event, &event
};
const scope_restore their_event_idx
{
data.event_idx, event_idx
};
wb([&data, &ret, &event_idx]
(const mutable_buffer &buf)
{
const auto consumed
{
linear_proffer_event(data, buf)
};
if(consumed)
ret = event_idx;
return consumed;
});
return wb.remaining() >= 65_KiB; //TODO: XXX
});
return ret;
}
/// Sets up a json::stack for the iteration of handlers for
/// one event.
size_t
ircd::m::sync::linear_proffer_event(data &data,
const mutable_buffer &buf)
{
json::stack out{buf};
const scope_restore their_out
{
data.out, &out
};
return linear_proffer_event_one(data)?
size(out.completed()):
0UL;
}
/// Generates a candidate /sync response for a single event by
/// iterating all of the handlers.
bool
ircd::m::sync::linear_proffer_event_one(data &data)
{
json::stack::object top
{
*data.out
};
return !m::sync::for_each(string_view{}, [&data]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object object
{
*data.out, item.member_name()
};
if(item.linear(data))
return false;
checkpoint.rollback();
return true;
});
}
//
// longpoll
//

View file

@ -21,9 +21,12 @@ namespace ircd::m::sync
extern conf::item<size_t> flush_hiwat;
extern conf::item<size_t> buffer_size;
extern conf::item<size_t> linear_delta_max;
static const_buffer flush(data &, resource::response::chunked &, const const_buffer &);
static void empty_response(data &);
static bool linear_handle(data &);
static bool polylog_handle(data &);
static resource::response handle_get(client &, const resource::request &);
}
@ -65,18 +68,6 @@ namespace ircd::m::sync::longpoll
extern m::hookfn<m::vm::eval &> notified;
}
namespace ircd::m::sync::linear
{
extern conf::item<size_t> delta_max;
static bool handle(data &);
}
namespace ircd::m::sync::polylog
{
static bool handle(data &);
}
struct ircd::m::sync::args
{
static conf::item<milliseconds> timeout_max;