0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-16 08:58:20 +02:00

ircd:Ⓜ️ Improve interfaces; Add prelim sync loop; Add send resource handler.

This commit is contained in:
Jason Volk 2017-09-25 21:42:07 -07:00
parent b2f7e360a1
commit 92a8d8e2bf
13 changed files with 922 additions and 395 deletions

View file

@ -498,27 +498,52 @@ try
break;
}
const auto args(tokens_after(line, " ", 0));
const params token{args, " ", {"user", "pass"}};
static char query[2048];
snprintf(query, sizeof(query), "%s=%s",
"access_token",
moi->access_token.c_str());
m::request request
const auto args
{
"GET", "_matrix/client/r0/sync", query,
tokens_after(line, " ", 0)
};
const params token
{
args, " ",
{
"timeout", "filter_id", "full_state", "set_presence"
}
};
static char buf[8192];
ircd::parse::buffer pb{buf};
const auto doc((*moi)(pb, request));
for(const auto &member : doc)
std::cout << string_view{member.first} << " => " << string_view{member.second} << std::endl;
const time_t timeout
{
token.at(0, 0)
};
static char query[2048];
snprintf(query, sizeof(query), "%s=%s&timeout=%zd",
"access_token",
moi->access_token.c_str(),
timeout * 1000);
while(1)
{
m::request request
{
"GET", "_matrix/client/r0/sync", query,
{
}
};
static char buf[8192];
ircd::parse::buffer pb{buf};
const json::object doc((*moi)(pb, request));
const auto since(doc.at("next_batch"));
for(const auto &member : doc)
std::cout << string_view{member.first} << " => " << string_view{member.second} << std::endl;
fmt::snprintf(query, sizeof(query), "%s=%s&since=%s&timeout=%zd",
"access_token",
moi->access_token,
since,
timeout * 1000);
}
break;
}
@ -571,6 +596,53 @@ try
break;
}
*/
case hash("privmsg"):
{
if(!moi)
{
std::cerr << "No current session" << std::endl;
break;
}
static uint txnid;
const auto args(tokens_after(line, " ", 0));
const params token{args, " ", {"room_id", "msgtype"}};
const auto &room_id{token.at(0)};
const auto &msgtype{token.at(1)};
const auto &event_type{"m.room.message"};
const auto text(tokens_after(line, " ", 2));
static char query[512]; const auto query_len
{
fmt::snprintf(query, sizeof(query), "%s=%s",
"access_token",
moi->access_token)
};
static char url[512]; const auto url_len
{
fmt::snprintf(url, sizeof(url), "_matrix/client/r0/rooms/%s/send/%s/%u",
room_id,
event_type,
txnid++)
};
m::request request
{
"PUT", url, query, json::members
{
{ "msgtype", msgtype },
{ "body", text }
}
};
static char buf[4096];
ircd::parse::buffer pb{buf};
const json::object response{(*moi)(pb, request)};
std::cout << string_view{response} << std::endl;
break;
}
case hash("password"):
{
if(!moi)

View file

@ -29,6 +29,7 @@
#include "m/error.h"
#include "m/id.h"
#include "m/event.h"
#include "m/events.h"
#include "m/room.h"
#include "m/user.h"
#include "m/filter.h"

View file

@ -65,7 +65,7 @@ namespace ircd::m::name
struct ircd::m::event
:json::tuple
<
json::property<name::content, string_view>,
json::property<name::content, json::object>,
json::property<name::event_id, string_view>,
json::property<name::origin_server_ts, time_t>,
json::property<name::prev_ids, string_view>,

61
include/ircd/m/events.h Normal file
View file

@ -0,0 +1,61 @@
/*
* charybdis: 21st Century IRC++d
*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#pragma once
#define HAVE_IRCD_M_EVENTS_H
namespace ircd::m
{
struct events;
}
struct ircd::m::events
{
using event_closure = std::function<void (const event &)>;
using event_closure_bool = std::function<bool (const event &)>;
virtual bool _query_(const event::where &, const event_closure_bool &) const;
virtual bool _rquery_(const event::where &, const event_closure_bool &) const;
bool query(const event::where &, const event_closure_bool &) const;
bool query(const event_closure_bool &) const;
bool rquery(const event::where &, const event_closure_bool &) const;
bool rquery(const event_closure_bool &) const;
void for_each(const event::where &, const event_closure &) const;
void for_each(const event_closure &) const;
void rfor_each(const event::where &, const event_closure &) const;
void rfor_each(const event_closure &) const;
size_t count(const event::where &, const event_closure_bool &) const;
size_t count(const event::where &) const;
bool test(const event::where &, const event_closure_bool &) const;
bool test(const event::where &) const;
events() = default;
virtual ~events() noexcept;
};

View file

@ -44,13 +44,14 @@ struct ircd::m::room
id room_id;
void send(json::iov &event);
void send(const json::members &event);
event::id::buf send(json::iov &event);
event::id::buf send(const json::members &event);
bool is_member(const m::id::user &, const string_view &membership = "join");
void membership(const m::id::user &, json::iov &content);
void leave(const m::id::user &, json::iov &content);
void join(const m::id::user &, json::iov &content);
void create(const m::id::user &sender, const m::id::user &creator, json::iov &content);
room(const id &room_id)
:room_id{room_id}
@ -62,25 +63,12 @@ struct ircd::m::room
};
struct ircd::m::room::events
:m::events
{
id room_id;
using event_closure = std::function<void (const event &)>;
using event_closure_bool = std::function<bool (const event &)>;
bool query(const event::where &, const event_closure_bool &) const;
bool rquery(const event::where &, const event_closure_bool &) const;
void for_each(const event::where &, const event_closure &) const;
void rfor_each(const event::where &, const event_closure &) const;
size_t count(const event::where &, const event_closure_bool &) const;
bool any(const event::where &, const event_closure_bool &) const;
bool query(const event_closure_bool &) const;
bool rquery(const event_closure_bool &) const;
void for_each(const event_closure &) const;
void rfor_each(const event_closure &) const;
size_t count(const event::where &) const;
bool any(const event::where &) const;
bool _query_(const event::where &, const event_closure_bool &) const override;
bool _rquery_(const event::where &, const event_closure_bool &) const override;
events(const id &room_id)
:room_id{room_id}
@ -92,21 +80,12 @@ struct ircd::m::room::events
};
struct ircd::m::room::state
:m::events
{
id room_id;
using event_closure = std::function<void (const event &)>;
using event_closure_bool = std::function<bool (const event &)>;
bool query(const event::where &, const event_closure_bool &) const;
void for_each(const event::where &, const event_closure &) const;
size_t count(const event::where &, const event_closure_bool &) const;
bool any(const event::where &, const event_closure_bool &) const;
bool query(const event_closure_bool &) const;
void for_each(const event_closure &) const;
size_t count(const event::where &) const;
bool any(const event::where &) const;
bool _query_(const event::where &, const event_closure_bool &) const override;
bool _rquery_(const event::where &, const event_closure_bool &) const override;
state(const id &room_id)
:room_id{room_id}

View file

@ -36,11 +36,14 @@ namespace ircd::m
struct ircd::m::user
{
struct rooms;
using id = m::id::user;
id user_id;
static room accounts;
static room sessions;
bool is_active() const;
bool is_password(const string_view &password) const;
@ -53,3 +56,16 @@ struct ircd::m::user
:user_id{user_id}
{}
};
struct ircd::m::user::rooms
:m::events
{
id user_id;
bool _query_(const event::where &, const event_closure_bool &) const override;
bool _rquery_(const event::where &, const event_closure_bool &) const override;
rooms(const id &user_id)
:user_id{user_id}
{}
};

View file

@ -183,40 +183,12 @@ ircd::m::bootstrap()
"database is empty. I will be bootstrapping it with initial events now..."
);
my_room.send(
{
{ "type", "m.room.create" },
{ "sender", me.user_id },
{ "state_key", "" },
{ "content", json::members
{
{ "creator", me.user_id }
}}
});
user::accounts.send(
{
{ "type", "m.room.create" },
{ "sender", me.user_id },
{ "state_key", "" },
{ "content", json::members
{
{ "creator", me.user_id }
}}
});
json::iov content;
my_room.create(me.user_id, me.user_id, content);
user::accounts.create(me.user_id, me.user_id, content);
user::accounts.join(me.user_id, content);
filter::filters.send(
{
{ "type", "m.room.create" },
{ "sender", me.user_id },
{ "state_key", "" },
{ "content", json::members
{
{ "creator", me.user_id }
}}
});
user::sessions.create(me.user_id, me.user_id, content);
filter::filters.create(me.user_id, me.user_id, content);
}
///////////////////////////////////////////////////////////////////////////////
@ -283,6 +255,30 @@ ircd::m::dbs::init_modules()
// m::room
//
void
ircd::m::room::create(const m::id::user &sender,
const m::id::user &creator,
json::iov &content)
{
const json::iov::add _creator
{
content, { "creator", me.user_id }
};
const auto _content
{
json::string(content)
};
send(
{
{ "type", "m.room.create" },
{ "sender", sender },
{ "state_key", "" },
{ "content", _content }
});
}
void
ircd::m::room::join(const m::id::user &user_id,
json::iov &content)
@ -344,15 +340,13 @@ bool
ircd::m::room::is_member(const m::id::user &user_id,
const string_view &membership)
{
const m::event::where::equal event
const m::event::where::equal query
{
{ "type", "m.room.member" },
{ "state_key", user_id }
};
bool ret{false};
const events events{*this};
events.query(event, [&ret, &membership]
return events{*this}.test(query, [&membership]
(const auto &event)
{
const json::object &content
@ -365,14 +359,11 @@ ircd::m::room::is_member(const m::id::user &user_id,
unquote(content["membership"])
};
ret = membership == existing;
return true;
return membership == existing;
});
return ret;
}
void
ircd::m::event::id::buf
ircd::m::room::send(const json::members &event)
{
size_t i(0);
@ -381,10 +372,10 @@ ircd::m::room::send(const json::members &event)
for(const auto &member : event)
new (members + i++) json::iov::push(iov, member);
send(iov);
return send(iov);
}
void
ircd::m::event::id::buf
ircd::m::room::send(json::iov &event)
{
const json::iov::add room_id
@ -403,106 +394,49 @@ ircd::m::room::send(json::iov &event)
};
m::event::insert(event);
}
//
// m::room::events
//
bool
ircd::m::room::events::any(const event::where &where)
const
{
return any(where, [](const auto &event)
{
return true;
});
return generated_event_id;
}
bool
ircd::m::room::events::any(const event::where &where,
const event_closure_bool &closure)
ircd::m::room::state::_rquery_(const event::where &where,
const event_closure_bool &closure)
const
{
return query(where, closure);
}
size_t
ircd::m::room::events::count(const event::where &where)
const
{
return count(where, [](const auto &event)
event::cursor cursor
{
return true;
});
}
"event_id for type,state_key in room_id",
&where
};
size_t
ircd::m::room::events::count(const event::where &where,
const event_closure_bool &closure)
const
{
size_t i(0);
for_each(where, [&closure, &i](const auto &event)
{
i += closure(event);
});
for(auto it(cursor.rbegin(room_id)); bool(it); ++it)
if(closure(*it))
return true;
return i;
}
void
ircd::m::room::events::rfor_each(const event_closure &closure)
const
{
const m::event::where::noop where{};
rfor_each(where, closure);
}
void
ircd::m::room::events::rfor_each(const event::where &where,
const event_closure &closure)
const
{
rquery(where, [&closure](const auto &event)
{
closure(event);
return false;
});
}
void
ircd::m::room::events::for_each(const event::where &where,
const event_closure &closure)
const
{
query(where, [&closure](const auto &event)
{
closure(event);
return false;
});
}
void
ircd::m::room::events::for_each(const event_closure &closure)
const
{
const m::event::where::noop where{};
for_each(where, closure);
return false;
}
bool
ircd::m::room::events::rquery(const event_closure_bool &closure)
const
{
const m::event::where::noop where{};
return rquery(where, closure);
}
bool
ircd::m::room::events::rquery(const event::where &where,
ircd::m::room::state::_query_(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
{
"event_id for type,state_key in room_id",
&where
};
for(auto it(cursor.begin(room_id)); bool(it); ++it)
if(closure(*it))
return true;
return false;
}
bool
ircd::m::room::events::_rquery_(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
{
@ -518,16 +452,8 @@ const
}
bool
ircd::m::room::events::query(const event_closure_bool &closure)
const
{
const m::event::where::noop where{};
return query(where, closure);
}
bool
ircd::m::room::events::query(const event::where &where,
const event_closure_bool &closure)
ircd::m::room::events::_query_(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
@ -543,101 +469,6 @@ const
return false;
}
//
// m::room::state
//
bool
ircd::m::room::state::any(const event::where &where)
const
{
return any(where, [](const auto &event)
{
return true;
});
}
bool
ircd::m::room::state::any(const event::where &where,
const event_closure_bool &closure)
const
{
return query(where, closure);
}
size_t
ircd::m::room::state::count(const event::where &where)
const
{
return count(where, [](const auto &event)
{
return true;
});
}
size_t
ircd::m::room::state::count(const event::where &where,
const event_closure_bool &closure)
const
{
size_t i(0);
for_each(where, [&closure, &i](const auto &event)
{
i += closure(event);
});
return i;
}
void
ircd::m::room::state::for_each(const event::where &where,
const event_closure &closure)
const
{
query(where, [&closure](const auto &event)
{
closure(event);
return false;
});
}
void
ircd::m::room::state::for_each(const event_closure &closure)
const
{
query([&closure](const auto &event)
{
closure(event);
return false;
});
}
bool
ircd::m::room::state::query(const event_closure_bool &closure)
const
{
const m::event::where::noop where{};
return query(where, closure);
}
bool
ircd::m::room::state::query(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
{
"event_id for type,state_key in room_id",
&where
};
for(auto it(cursor.begin(room_id)); bool(it); ++it)
if(closure(*it))
return true;
return false;
}
///////////////////////////////////////////////////////////////////////////////
//
// m/user.h
@ -649,6 +480,12 @@ ircd::m::user::accounts
ircd::m::room::id{"!accounts:cdc.z"}
};
ircd::m::room
ircd::m::user::sessions
{
ircd::m::room::id{"!sessions:cdc.z"}
};
/// Register the user by joining them to the accounts room.
///
/// The content of the join event may store keys including the registration
@ -724,8 +561,7 @@ const
{ "state_key", user_id }
};
bool ret{false};
const m::event::where::test correct_password{[&ret, &supplied_password]
const m::event::where::test correct_password{[&supplied_password]
(const auto &event)
{
const json::object &content
@ -738,8 +574,7 @@ const
unquote(content.at("plaintext"))
};
ret = supplied_password == correct_password;
return true;
return supplied_password == correct_password;
}};
const auto query
@ -754,8 +589,7 @@ const
accounts
};
events.query(member_event && correct_password);
return ret;
return events.test(member_event && correct_password);
}
bool
@ -768,8 +602,7 @@ const
{ "state_key", user_id }
};
bool ret{false};
const m::event::where::test is_joined{[&ret]
const m::event::where::test is_joined{[]
(const auto &event)
{
const json::object &content
@ -782,8 +615,7 @@ const
unquote(content["membership"])
};
ret = membership == "join";
return true;
return membership == "join";
}};
const room::events events
@ -791,10 +623,213 @@ const
accounts
};
events.query(member_event && is_joined);
return events.test(member_event && is_joined);
}
bool
ircd::m::user::rooms::_rquery_(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
{
"event_id for room_id in sender",
&where
};
for(auto it(cursor.rbegin(user_id)); bool(it); ++it)
if(closure(*it))
return true;
return false;
}
bool
ircd::m::user::rooms::_query_(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
{
"event_id for room_id in sender",
&where
};
for(auto it(cursor.begin(user_id)); bool(it); ++it)
if(closure(*it))
return true;
return false;
}
///////////////////////////////////////////////////////////////////////////////
//
// m/events.h
//
ircd::m::events::~events()
noexcept
{
}
bool
ircd::m::events::test(const event::where &where)
const
{
return test(where, [](const auto &event)
{
return true;
});
}
bool
ircd::m::events::test(const event::where &where,
const event_closure_bool &closure)
const
{
bool ret{false};
query(where, [&ret, &closure]
(const auto &event)
{
ret = closure(event);
return true;
});
return ret;
}
size_t
ircd::m::events::count(const event::where &where)
const
{
return count(where, [](const auto &event)
{
return true;
});
}
size_t
ircd::m::events::count(const event::where &where,
const event_closure_bool &closure)
const
{
size_t i(0);
for_each(where, [&closure, &i](const auto &event)
{
i += closure(event);
});
return i;
}
void
ircd::m::events::rfor_each(const event_closure &closure)
const
{
const m::event::where::noop where{};
rfor_each(where, closure);
}
void
ircd::m::events::rfor_each(const event::where &where,
const event_closure &closure)
const
{
rquery(where, [&closure](const auto &event)
{
closure(event);
return false;
});
}
void
ircd::m::events::for_each(const event::where &where,
const event_closure &closure)
const
{
query(where, [&closure](const auto &event)
{
closure(event);
return false;
});
}
void
ircd::m::events::for_each(const event_closure &closure)
const
{
const m::event::where::noop where{};
for_each(where, closure);
}
bool
ircd::m::events::rquery(const event_closure_bool &closure)
const
{
const m::event::where::noop where{};
return rquery(where, closure);
}
bool
ircd::m::events::rquery(const event::where &where,
const event_closure_bool &closure)
const
{
return _rquery_(where, closure);
}
bool
ircd::m::events::query(const event_closure_bool &closure)
const
{
const m::event::where::noop where{};
return query(where, closure);
}
bool
ircd::m::events::query(const event::where &where,
const event_closure_bool &closure)
const
{
return _query_(where, closure);
}
bool
ircd::m::events::_rquery_(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
{
"event_id",
&where
};
for(auto it(cursor.rbegin()); bool(it); ++it)
if(closure(*it))
return true;
return false;
}
bool
ircd::m::events::_query_(const event::where &where,
const event_closure_bool &closure)
const
{
event::cursor cursor
{
"event_id",
&where
};
for(auto it(cursor.begin()); bool(it); ++it)
if(closure(*it))
return true;
return false;
}
///////////////////////////////////////////////////////////////////////////////
//
// m/event.h
@ -1087,6 +1122,7 @@ std::set<std::shared_ptr<ircd::m::indexer>> ircd::m::indexers
std::make_shared<ircd::m::indexer::concat>("event_id", "sender"),
std::make_shared<ircd::m::indexer::concat>("event_id", "room_id"),
std::make_shared<ircd::m::indexer::concat_v>("event_id", "room_id", "type"),
std::make_shared<ircd::m::indexer::concat_v>("event_id", "room_id", "sender"),
std::make_shared<ircd::m::indexer::concat_2v>("event_id", "type", "state_key", "room_id"),
}};

View file

@ -106,11 +106,6 @@ noexcept
namespace ircd {
const m::room::events accounts
{
m::id::room{"!accounts:cdc.z"}
};
static void
authenticate(client &client,
resource::method &method,
@ -122,7 +117,12 @@ try
request.query.at("access_token")
};
// Sets up the query to find the access_token in the accounts room
static const m::room::state sessions
{
m::id::room{"!sessions:cdc.z"}
};
// Sets up the query to find the access_token in the sessions rooms
const m::event::where::equal query
{
{ "type", "ircd.access_token" },
@ -131,8 +131,21 @@ try
const bool result
{
accounts.query(query, [&request, &access_token](const m::event &event)
sessions.test(query, [&request, &access_token](const m::event &event)
{
// Checks if the access token has expired. Tokens are expired when
// an m.room.redaction event is issued for the ircd.access_token
// event. Instead of making another query here for the redaction
// we expect the original event to be updated with the following
// key which must be part of the redaction process.
const json::object &unsigned_
{
json::val<m::name::unsigned_>(event)
};
if(unsigned_.has("redacted_because"))
return false;
assert(at<m::name::state_key>(event) == access_token);
request.user_id = at<m::name::sender>(event);
return true;

View file

@ -103,10 +103,10 @@ post_login_password(client &client,
rand::string(token_dict, token_len, token_buf, sizeof(token_buf))
};
// Log the user in by issuing an event in the accounts room containing
// Log the user in by issuing an event in the sessions room containing
// the generated token. When this call completes without throwing the
// access_token will be committed and the user will be logged in.
m::user::accounts.send(
m::user::sessions.send(
{
{ "type", "ircd.access_token" },
{ "sender", user_id },

View file

@ -57,6 +57,12 @@ get_state(client &client,
state.count(query)
};
if(!count)
throw m::NOT_FOUND
{
"No state."
};
size_t j(0);
json::value ret[count];
state.for_each(query, [&count, &j, &ret]
@ -209,3 +215,94 @@ resource::method method_get
{
rooms_resource, "GET", get_rooms
};
resource::response
put_send(client &client,
const resource::request &request,
const string_view &params,
const m::room::id &room_id)
{
string_view token[4];
tokens(params, "/", token);
const string_view &type
{
token[2]
};
const string_view &txnid
{
token[3]
};
json::iov event;
const json::iov::push _type
{
event, "type", type
};
const json::iov::push _sender
{
event, "sender", request.user_id
};
const json::iov::push _content
{
event, "content", json::object{request}
};
m::room room
{
room_id
};
const auto event_id
{
room.send(event)
};
return resource::response
{
client, json::members
{
{ "event_id", event_id }
}
};
}
resource::response
put_rooms(client &client, const resource::request &request)
{
const auto params
{
lstrip(request.head.path, room::base_url)
};
string_view token[2];
if(tokens(params, "/", token) != 2)
throw http::error(http::code::NOT_FOUND, "/rooms command required");
const m::room::id &room_id
{
token[0]
};
const string_view &cmd
{
token[1]
};
if(cmd == "send")
return put_send(client, request, params, room_id);
throw http::error(http::code::NOT_FOUND, "/rooms command not found");
}
resource::method method_put
{
rooms_resource, "PUT", put_rooms,
{
method_put.REQUIRES_AUTH
}
};

View file

@ -21,84 +21,221 @@
using namespace ircd;
const auto sync_description
{R"(
6.2.
Synchronise the client's state with the latest state on the server. Clients
use this API when they first log in to get an initial snapshot of the state
on the server, and then continue to call this API to get incremental deltas
to the state, and to receive new messages.
)"};
resource sync_resource
{
"_matrix/client/r0/sync",
R"(
6.2. Synchronise the client's state with the latest state on the server.
Clients use this API when they first log in to get an initial snapshot of
the state on the server, and then continue to call this API to get
incremental deltas to the state, and to receive new messages.
)"
sync_description
};
struct polldata
struct syncpoll
{
static std::multimap<std::string, syncpoll> polling;
static std::multimap<steady_point, decltype(polling)::iterator> pollout;
std::weak_ptr<ircd::client> client;
steady_point timeout;
decltype(pollout)::iterator it { std::end(pollout) };
};
std::deque<polldata>
polling
{};
decltype(syncpoll::polling) syncpoll::polling {};
decltype(syncpoll::pollout) syncpoll::pollout {};
ircd::ctx::dock
polldock
{};
void longpoll(client &client, const resource::request &request, const steady_point &timeout);
void synchronizer_worker();
ircd::context synchronizer_context
{
"synchronizer",
256_KiB,
&synchronizer_worker,
ircd::context::POST,
};
void synchronizer_timeout_worker();
ircd::context synchronizer_timeout_context
{
"synchronizer",
256_KiB,
&synchronizer_timeout_worker,
ircd::context::POST,
};
const auto on_unload{[]
{
synchronizer_context.interrupt();
synchronizer_timeout_context.interrupt();
synchronizer_context.join();
synchronizer_timeout_context.join();
}};
mapi::header IRCD_MODULE
{
"registers the resource 'client/sync' to handle requests.",
nullptr,
on_unload
};
resource::response
sync_now(client &client,
const resource::request &request,
const string_view &filter_id,
const bool &full_state,
const string_view &set_presence)
{
json::value events[1];
const json::members timeline
{
{ "events", { events, 1 } }
};
const json::members state
{
{ "events", { events, 1 } }
};
const json::members join
{
{ "timeline", timeline },
{ "state", state },
};
const json::object leave{};
const json::object invite{};
const json::members rooms
{
{ "leave", leave },
{ "join", join },
{ "invite", invite },
};
const string_view next_batch{};
const json::object presence{};
const m::event::id head_event_id
{
"$12382382:cdc.z"
};
const json::members content
{
{ "event_id", head_event_id }
};
m::user::sessions.send(
{
{ "type", "ircd.tape.head" },
{ "state_key", request.query.at("access_token") },
{ "sender", request.user_id },
{ "content", content },
});
return resource::response
{
client, json::members
{
{ "next_batch", next_batch },
{ "rooms", rooms },
{ "presence", presence }
}
};
}
resource::response
sync(client &client, const resource::request &request)
{
const auto filter
// 6.2.1 The ID of a filter created using the filter API or a filter JSON object
// encoded as a string. The server will detect whether it is an ID or a JSON object
// by whether the first character is a "{" open brace. Passing the JSON inline is best
// suited to one off requests. Creating a filter using the filter API is recommended
// for clients that reuse the same filter multiple times, for example in long poll requests.
const auto filter_id
{
// 6.2.1 The ID of a filter created using the filter API or a filter JSON object
// encoded as a string. The server will detect whether it is an ID or a JSON object
// by whether the first character is a "{" open brace. Passing the JSON inline is best
// suited to one off requests. Creating a filter using the filter API is recommended
// for clients that reuse the same filter multiple times, for example in long poll requests.
request["filter"]
request.query["filter"]
};
// 6.2.1 A point in time to continue a sync from.
const auto since
{
// 6.2.1 A point in time to continue a sync from.
request["since"]
request.query["since"]
};
const auto full_state
// 6.2.1 Controls whether to include the full state for all rooms the user is a member of.
// If this is set to true, then all state events will be returned, even if since is non-empty.
// The timeline will still be limited by the since parameter. In this case, the timeout
// parameter will be ignored and the query will return immediately, possibly with an
// empty timeline. If false, and since is non-empty, only state which has changed since
// the point indicated by since will be returned. By default, this is false.
const bool full_state
{
// 6.2.1 Controls whether to include the full state for all rooms the user is a member of.
// If this is set to true, then all state events will be returned, even if since is non-empty.
// The timeline will still be limited by the since parameter. In this case, the timeout
// parameter will be ignored and the query will return immediately, possibly with an
// empty timeline. If false, and since is non-empty, only state which has changed since
// the point indicated by since will be returned. By default, this is false.
request.get<bool>("full_state", false)
request.query["full_state"] == "true"
};
const auto set_presence
// 6.2.1 Controls whether the client is automatically marked as online by polling this API.
// If this parameter is omitted then the client is automatically marked as online when it
// uses this API. Otherwise if the parameter is set to "offline" then the client is not
// marked as being online when it uses this API. One of: ["offline"]
const string_view set_presence
{
// 6.2.1 Controls whether the client is automatically marked as online by polling this API.
// If this parameter is omitted then the client is automatically marked as online when it
// uses this API. Otherwise if the parameter is set to "offline" then the client is not
// marked as being online when it uses this API. One of: ["offline"]
request.get("set_presence", "offline")
request.query["set_presence"]
};
const milliseconds timeout
if(!since)
return sync_now(client, request, filter_id, full_state, set_presence);
// The !sessions:your.host room is where the ircd.tape.head event holds
// the state we use to calculate the last event the user has seen.
const m::room::state sessions
{
// 6.2.1 The maximum time to poll in milliseconds before returning this request.
request.get<time_t>("timeout", 30 * 1000)
m::user::sessions
};
// A reference to the client is saved. We save a weak reference to still
// allow the client to disconnect.
polling.emplace_back(polldata
// The ircd.tape.head
const m::event::where::equal query
{
weak_from(client),
now<steady_point>() + timeout
});
{ "type", "ircd.tape.head" },
{ "state_key", request.query.at("access_token") },
};
m::event::id::buf head;
if(!sessions.test(query, [&head](const auto &event)
{
const json::object &content
{
at<m::name::content>(event)
};
head = unquote(content.at("event_id"));
return true;
}))
throw m::NOT_FOUND{"since parameter invalid"};
// 6.2.1 The maximum time to poll in milliseconds before returning this request.
const auto timeout
{
request.query["timeout"]
};
const auto timeout_at
{
now<steady_point>() + milliseconds(std::max(timeout? lex_cast<int64_t>(timeout) : 30000L, 1000L))
};
longpoll(client, request, timeout_at);
// This handler returns no response. As long as this handler doesn't throw
// an exception IRCd will keep the client alive.
@ -113,71 +250,108 @@ resource::method get_sync
}
};
void worker();
ircd::context synchronizer_context
{
"synchronizer",
1_MiB,
&worker,
ircd::context::POST,
};
const auto on_unload{[]
{
synchronizer_context.interrupt();
synchronizer_context.join();
}};
mapi::header IRCD_MODULE
{
"registers the resource 'client/sync' to handle requests.",
nullptr,
on_unload
};
/// Input
///
///
void
handle_event(const m::event &event,
const polldata &request)
longpoll(client &client,
const resource::request &request,
const steady_point &timeout)
{
const auto it
{
syncpoll::polling.emplace(request.user_id, syncpoll{weak_from(client)})
};
syncpoll &data
{
it->second
};
data.it = syncpoll::pollout.emplace(timeout, it);
if(syncpoll::pollout.size() == 1)
notify(synchronizer_timeout_context);
}
//
// Timeout worker stack
//
void synchronizer_timeout(const std::string &user_id, const syncpoll &sp);
/// This function is the base of an ircd::context which yields until a client
/// is due to timeout. This worker reaps timed out clients from the lists.
void
synchronizer_timeout_worker()
try
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
while(1)
{
while(!pollout.empty())
{
const auto &timeout{std::begin(pollout)->first};
const auto &iterator{std::begin(pollout)->second};
if(timeout > now<steady_point>())
{
ctx::wait_until<std::nothrow_t>(timeout);
continue;
}
const auto &user_id{iterator->first};
const auto &data{iterator->second};
synchronizer_timeout(user_id, data);
polling.erase(iterator);
pollout.erase(std::begin(pollout));
}
while(pollout.empty())
ctx::wait();
}
}
catch(const ircd::ctx::interrupted &e)
{
ircd::log::debug("Synchronizer timeout worker interrupted");
}
///
/// TODO: The http error response should not yield this context. If the sendq
/// TODO: is backed up the client should be dc'ed.
void
synchronizer_timeout(const std::string &user_id,
const syncpoll &sp)
try
{
const life_guard<client> client
{
request.client
sp.client
};
resource::response
{
*client, json::members
{
{ "event", json::string(event) }
}
*client, http::REQUEST_TIMEOUT
};
}
catch(const std::exception &e)
{
log::error("%s", e.what());
log::error("synchronizer_timeout(): %s", e.what());
}
void
synchronize(const m::event &event)
{
if(polling.empty())
return;
//
// Main worker stack
//
const auto &request
{
polling.front()
};
handle_event(event, request);
polling.pop_front();
}
void synchronize(const m::event &, const m::room::id &);
void synchronize(const m::event &);
void
worker()
synchronizer_worker()
try
{
for(;; ctx::interruption_point())
while(1) try
{
const auto &event
{
@ -186,8 +360,60 @@ try
synchronize(event);
}
catch(const timeout &e)
{
ircd::log::debug("Synchronizer worker timeout");
}
}
catch(const ircd::ctx::interrupted &e)
{
ircd::log::debug("Synchronizer interrupted");
ircd::log::debug("Synchronizer worker interrupted");
}
void
synchronize(const m::event &event)
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
const auto &room_id
{
json::val<m::name::room_id>(event)
};
if(room_id)
{
synchronize(event, room_id);
return;
}
std::cout << event << std::endl;
}
void
synchronize(const m::event &event,
const m::room::id &room_id)
{
std::cout << event << std::endl;
}
bool
handle_event(const m::event &event,
const syncpoll &request)
try
{
const life_guard<const client> client
{
request.client
};
// if(request.timeout < now<steady_point>())
// return false;
return true;
}
catch(const std::exception &e)
{
log::error("%s", e.what());
return false;
}

View file

@ -72,7 +72,7 @@ try
m::filter::filters
};
if(!filters_room_events.any(query, result))
if(!filters_room_events.test(query, result))
throw m::NOT_FOUND("No matching filter with that ID");
// Response already made

View file

@ -399,6 +399,31 @@ const database::descriptor event_id_for_room_id_in_type
room_id_in,
};
const database::descriptor event_id_for_room_id_in_sender
{
// name
"event_id for room_id in sender",
// explanation
R"(### developer note:
)",
// typing (key, value)
{
typeid(string_view), typeid(string_view)
},
// options
{},
// comparator
{},
// prefix transform
room_id_in,
};
/// prefix transform for type,state_key in room_id
///
/// This transform is special for concatenating room_id with type and state_key
@ -460,6 +485,7 @@ const database::description events_description
event_id_in_sender,
event_id_in_room_id,
event_id_for_room_id_in_type,
event_id_for_room_id_in_sender,
event_id_for_type_state_key_in_room_id,
};