0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-09-28 11:48:54 +02:00

modules/client/sync: Refactor longpoll handler.

This commit is contained in:
Jason Volk 2019-07-07 19:56:32 -07:00
parent cbb39105e1
commit 61fcc07fcf
3 changed files with 69 additions and 110 deletions

View file

@ -306,7 +306,7 @@ ircd::m::sync::handle_get(client &client,
const bool complete
{
should_longpoll?
longpoll::poll(data, args):
longpoll_handle(data):
should_linear?
linear_handle(data):
polylog_handle(data)
@ -712,6 +712,19 @@ ircd::m::sync::linear_proffer_event_one(data &data)
// longpoll
//
namespace ircd::m::sync::longpoll
{
static bool polled(data &, const args &);
static int poll(data &);
static void handle_notify(const m::event &, m::vm::eval &);
extern m::hookfn<m::vm::eval &> notified;
extern ctx::dock dock;
}
decltype(ircd::m::sync::longpoll::dock)
ircd::m::sync::longpoll::dock;
decltype(ircd::m::sync::longpoll::notified)
ircd::m::sync::longpoll::notified
{
@ -729,58 +742,16 @@ ircd::m::sync::longpoll::handle_notify(const m::event &event,
if(!eval.opts->notify_clients)
return;
if(!polling)
{
queue.clear();
return;
}
queue.emplace_back(eval);
dock.notify_all();
}
bool
ircd::m::sync::longpoll::poll(data &data,
const args &args)
ircd::m::sync::longpoll_handle(data &data)
try
{
const unique_buffer<mutable_buffer> scratch
{
96_KiB
};
const scope_count polling{longpoll::polling}; do
{
if(!dock.wait_until(args.timesout))
break;
assert(data.client && data.client->sock);
if(unlikely(!data.client || !data.client->sock))
break;
check(*data.client->sock);
if(queue.empty())
continue;
const auto &accepted
{
queue.front()
};
const unwind pop{[]
{
queue.pop_front();
}};
if(polylog_only)
break;
if(handle(data, args, accepted, scratch))
return true;
}
while(1);
return false;
int ret;
for(ret = -1; ret == -1; ret = longpoll::poll(data));
return ret;
}
catch(const std::system_error &e)
{
@ -805,12 +776,53 @@ catch(const std::exception &e)
throw;
}
bool
ircd::m::sync::longpoll::handle(data &data,
const args &args,
const accepted &event,
const mutable_buffer &scratch)
int
ircd::m::sync::longpoll::poll(data &data)
{
assert(data.args);
if(!dock.wait_until(data.args->timesout))
return false;
// Check if client went away while we were sleeping,
// if so, just returning true is the easiest way out w/o throwing
assert(data.client && data.client->sock);
if(unlikely(!data.client || !data.client->sock))
return true;
// slightly more involved check of the socket before
// we waste resources on the operation; throws.
check(*data.client->sock);
// Keep in mind if the handler returns true that means
// it made a hit and we can return true to exit longpoll
// and end the request cleanly.
if(polled(data, *data.args))
return true;
// When the client explicitly gives a next_batch token we have to
// adhere to it and return an empty response before going past.
if(data.args->next_batch_token && m::vm::sequence::retired >= data.range.second)
return false;
if(data.range.second <= m::vm::sequence::retired)
++data.range.second;
return -1;
}
bool
ircd::m::sync::longpoll::polled(data &data,
const args &args)
{
const m::event::fetch event
{
data.range.second, std::nothrow
};
if(!event.valid)
return false;
const scope_restore their_event
{
data.event, &event
@ -821,9 +833,9 @@ ircd::m::sync::longpoll::handle(data &data,
data.event_idx, event.event_idx
};
const scope_restore client_txnid
const unique_buffer<mutable_buffer> scratch
{
data.client_txnid, event.client_txnid
96_KiB
};
const size_t consumed
@ -866,9 +878,10 @@ ircd::m::sync::longpoll::handle(data &data,
log::debug
{
log, "request %s longpoll hit:%lu complete @%lu",
log, "request %s longpoll hit:%lu consumed:%zu complete @%lu",
loghead(data),
event.event_idx,
consumed,
next
};

View file

@ -31,8 +31,8 @@ namespace ircd::m::sync
static void empty_response(data &, const uint64_t &next_batch);
static bool linear_handle(data &);
static bool polylog_handle(data &);
static bool longpoll_handle(data &);
static resource::response handle_get(client &, const resource::request &);
}
#include "sync/longpoll.h"
#include "sync/args.h"

View file

@ -1,54 +0,0 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2019 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
namespace ircd::m::sync::longpoll
{
struct accepted;
size_t polling {0};
std::deque<accepted> queue;
ctx::dock dock;
static bool handle(data &, const args &, const accepted &, const mutable_buffer &scratch);
static bool poll(data &, const args &);
static void handle_notify(const m::event &, m::vm::eval &);
extern m::hookfn<m::vm::eval &> notified;
}
struct ircd::m::sync::longpoll::accepted
:m::event
{
json::strung strung;
std::string client_txnid;
event::idx event_idx;
accepted(const m::vm::eval &eval)
:strung
{
*eval.event_
}
,client_txnid
{
eval.copts?
eval.copts->client_txnid:
string_view{}
}
,event_idx
{
eval.sequence
}
{
const json::object object{this->strung};
static_cast<m::event &>(*this) = m::event{object};
}
accepted(accepted &&) = default;
accepted(const accepted &) = delete;
};