0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-24 12:58:21 +02:00

ircd:Ⓜ️ Abort the current sorry state of m::io.

This commit is contained in:
Jason Volk 2018-02-09 18:27:56 -08:00
parent 6ff27aa45c
commit 063644feaa
6 changed files with 2 additions and 759 deletions

View file

@ -1,214 +0,0 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_M_IO_H
/// Interface to the matrix protocol IO bus making local and network queries.
///
/// This system is the backplane for the m::vm or anything else that needs to
/// get events, however it can, as best as it can, at a high level using a
/// convenient interface. Users of this interface fill out and maintain a
/// control structure (or several) on their stack and then make calls which may
/// yield their ircd::ctx with report given in the control structure. The
/// default behavior will try to hide all of the boilerplate from the user
/// when it comes to figuring out where to make a query and then verifying the
/// results. The control structure offers the ability to tailor very low level
/// aspects of the request and change behavior if desired.
///
/// For acquisition, this interface provides the means to find an event, or
/// set of events by first querying the local db and caches and then making
/// network queries using the matrix protocol endpoints.
///
/// For transmission, this interface provides the means to send events et al
/// to other servers; no local/database writes will happen here, just network.
///
/// There are several variations of requests to the bus; each reflects the matrix
/// protocol endpoint which is apt to best fulfill the request.
///
/// * fetch event - request for event by ID (/event)
/// * fetch room - request for vector of room events (/backfill)
/// * fetch room state - request for set of state events (/state)
///
/// Unless the control structure specifies otherwise, result data for these
/// requests may be filled entirely locally, remotely, or partially from
/// either.
///
namespace ircd::m::io
{
struct sync;
struct fetch;
struct response;
// Synchronous acquire many requests
size_t acquire(vector_view<event::fetch>);
size_t acquire(vector_view<room::fetch>);
size_t acquire(vector_view<room::state::fetch>);
// Synchronous acquire single request
json::object acquire(event::fetch &);
json::array acquire(room::fetch &);
json::array acquire(room::state::fetch &);
// Synchronous release
size_t release(vector_view<event::sync>);
void release(event::sync &);
// Convenience
json::object get(const event::id &, const mutable_buffer &);
}
namespace ircd::m
{
using io::get;
using io::response;
}
//
// Fetch & Sync base
//
struct ircd::m::io::fetch
{
struct opts;
static const opts defaults;
mutable_buffer buf;
const struct opts *opts {&defaults};
bool local_result {false};
std::exception_ptr error;
};
struct ircd::m::io::fetch::opts
{
net::remote hint;
uint64_t limit {256};
};
struct ircd::m::io::sync
{
struct opts;
static const opts defaults;
const_buffer buf;
const struct opts *opts {&defaults};
std::exception_ptr error;
};
struct ircd::m::io::sync::opts
{
net::remote hint;
};
//
// Event
//
struct ircd::m::event::fetch
:io::fetch
{
// out
event::id event_id;
// in
json::object pdu;
fetch(const event::id &event_id,
const mutable_buffer &buf,
const struct opts *const &opts = &defaults)
:io::fetch{buf, opts}
,event_id{event_id}
{}
fetch() = default;
};
struct ircd::m::event::sync
:io::sync
{
// out
string_view destination;
uint64_t txnid {0};
// in
std::exception_ptr error;
sync(const string_view &destination,
const const_buffer &buf,
const struct opts *const &opts = &defaults)
:io::sync{buf, opts}
,destination{destination}
{}
sync() = default;
};
//
// Room (backfill)
//
struct ircd::m::room::fetch
:io::fetch
{
// out
event::id event_id;
room::id room_id;
// in
json::array pdus;
json::array auth_chain;
fetch(const event::id &event_id,
const room::id &room_id,
const mutable_buffer &buf,
const struct opts *const &opts = &defaults)
:io::fetch{buf, opts}
,event_id{event_id}
,room_id{room_id}
{}
fetch() = default;
};
//
// Room (state)
//
struct ircd::m::room::state::fetch
:io::fetch
{
// out
event::id event_id;
room::id room_id;
// in
json::array pdus;
json::array auth_chain;
fetch(const event::id &event_id,
const room::id &room_id,
const mutable_buffer &buf,
const struct opts *const &opts = &defaults)
:io::fetch{buf, opts}
,event_id{event_id}
,room_id{room_id}
{}
fetch() = default;
};
struct ircd::m::io::response
:json::object
{
response(server::request &, parse::buffer &);
};

View file

@ -53,7 +53,6 @@ namespace ircd
#include "request.h"
#include "session.h"
#include "v1/v1.h"
#include "io.h"
#include "vm.h"
#include "filter.h"
#include "keys.h"

View file

@ -75,7 +75,6 @@ struct ircd::m::room
{
struct state;
struct members;
struct fetch;
using id = m::id::room;
using alias = m::id::room_alias;
@ -186,13 +185,10 @@ struct ircd::m::room::state
json::property<name::m_room_guest_access, event>
>
{
struct fetch;
using super_type::tuple;
state(const json::array &pdus);
state(fetch &);
state(const room::id &, const event::id &, const mutable_buffer &);
state(const room &, const mutable_buffer &);
state() = default;
using super_type::operator=;

View file

@ -80,7 +80,6 @@ libircd_la_SOURCES = \
m/event.cc \
m/id.cc \
m/v1.cc \
m/io.cc \
m/keys.cc \
m/request.cc \
m/room.cc \

View file

@ -1,520 +0,0 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#include <ircd/m/m.h>
namespace ircd::m::io
{
bool acquire_local(event::fetch &);
bool acquire_local(room::fetch &);
bool acquire_local(room::state::fetch &);
}
///////////////////////////////////////////////////////////////////////////////
//
// m/io.h
//
struct ircd::m::io::fetch::opts
const ircd::m::io::fetch::defaults
{};
struct ircd::m::io::sync::opts
const ircd::m::io::sync::defaults
{};
ircd::json::object
ircd::m::io::get(const id::event &event_id,
const mutable_buffer &buf)
{
v1::event request
{
event_id, buf
};
request.wait();
return json::object
{
request
};
}
void
ircd::m::io::release(event::sync &tab)
{
release({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
}
ircd::json::array
ircd::m::io::acquire(room::state::fetch &tab)
{
acquire({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
return tab.pdus;
}
ircd::json::array
ircd::m::io::acquire(room::fetch &tab)
{
acquire({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
return tab.pdus;
}
ircd::json::object
ircd::m::io::acquire(event::fetch &tab)
{
acquire({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
return tab.pdu;
}
//
// mass release suite
//
//TODO: XXX
namespace ircd::m::io
{
net::remote
destination_remote(const string_view &destination)
{
return net::remote{destination};
}
}
size_t
ircd::m::io::release(vector_view<event::sync> tab)
{
const auto count
{
tab.size()
};
size_t out(0);
std::string url[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].destination || !tab[i].buf)
continue;
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/send/%zu/", tab[i].txnid
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : destination_remote(tab[i].destination)
};
request[i] =
{
"PUT", url[i], {}, json::object{tab[i].buf}
};
request[i].destination = tab[i].destination;
request[i](session[i].server);
++out;
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("sync txn %ld will not succeed: %s",
tab[i].txnid,
e.what());
}
static const size_t rbuf_size{4_KiB};
const unique_buffer<mutable_buffer> rbuf
{
out * rbuf_size
};
size_t in(0), ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].destination)
continue;
if(bool(tab[i].error))
continue;
const mutable_buffer buf
{
data(rbuf) + (in * rbuf_size), rbuf_size
};
ircd::parse::buffer pb{buf};
response[i] = m::io::response{session[i].server, pb};
ret += !tab[i].error;
++in;
log.debug("%s received txn %ld (size: %zu) error: %d",
string(net::remote{session[i].server}),
tab[i].txnid,
size(tab[i].buf),
bool(tab[i].error));
}
catch(const http::error &e)
{
tab[i].error = std::make_exception_ptr(e);
log.error("request to sync txn %ld failed: %s: %s",
tab[i].txnid,
e.what(),
e.content);
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.error("request to sync txn %ld failed: %s",
tab[i].txnid,
e.what());
}
return ret;
}
//
// mass acquire suite
//
size_t
ircd::m::io::acquire(vector_view<room::state::fetch> tab)
{
const auto count
{
tab.size()
};
std::string url[count];
std::string query[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
tab[i].local_result = acquire_local(tab[i]);
if(tab[i].local_result)
continue;
static char tmp[768];
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/state/%s/", url::encode(tab[i].room_id, tmp)
};
query[i] = fmt::snstringf
{
1024, "event_id=%s", url::encode(tab[i].event_id, tmp)
};
request[i] =
{
"GET", url[i], query[i], json::object{}
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : tab[i].event_id.hostname()
};
request[i].destination = session[i].destination;
request[i](session[i].server);
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s in room_id %s will not succeed: %s",
string_view{tab[i].event_id},
string_view{tab[i].room_id},
e.what());
}
size_t ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
if(tab[i].local_result || bool(tab[i].error))
continue;
ircd::parse::buffer pb{tab[i].buf};
response[i] = m::io::response(session[i].server, pb);
tab[i].auth_chain = response[i]["auth_chain"];
tab[i].pdus = response[i]["pdus"];
//TODO: check event id
//TODO: check hashes
//TODO: check signatures
ret += !tab[i].error;
log.debug("%s sent us event %s in room %s pdus: %zu (size: %zu) error: %d",
string(net::remote{session[i].server}),
string_view{tab[i].event_id},
string_view{tab[i].room_id},
json::array{response[i]["pdus"]}.count(),
string_view{response[i]}.size(),
bool(tab[i].error));
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s failed: %s",
string_view{tab[i].event_id},
e.what());
}
return ret;
}
size_t
ircd::m::io::acquire(vector_view<room::fetch> tab)
{
const auto count
{
tab.size()
};
std::string url[count];
std::string query[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
tab[i].local_result = acquire_local(tab[i]);
if(tab[i].local_result)
continue;
static char tmp[768];
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/backfill/%s/", url::encode(tab[i].room_id, tmp)
};
query[i] = fmt::snstringf
{
1024, "limit=%zu&v=%s", tab[i].opts->limit, url::encode(tab[i].event_id, tmp)
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : tab[i].event_id.hostname()
};
request[i] =
{
"GET", url[i], query[i], {}
};
request[i].destination = session[i].destination;
request[i](session[i].server);
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s in room_id %s will not succeed: %s",
string_view{tab[i].event_id},
string_view{tab[i].room_id},
e.what());
}
size_t ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
if(tab[i].local_result || bool(tab[i].error))
continue;
ircd::parse::buffer pb{tab[i].buf};
response[i] = m::io::response(session[i].server, pb);
tab[i].auth_chain = response[i]["auth_chain"];
tab[i].pdus = response[i]["pdus"];
//TODO: check event id
//TODO: check hashes
//TODO: check signatures
ret += !tab[i].error;
log.debug("%s sent us event %s in room %s pdus: %zu (size: %zu) error: %d",
string(net::remote{session[i].server}),
string_view{tab[i].event_id},
string_view{tab[i].room_id},
json::array{response[i]["pdus"]}.count(),
string_view{response[i]}.size(),
bool(tab[i].error));
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s failed: %s",
string_view{tab[i].event_id},
e.what());
}
return ret;
}
size_t
ircd::m::io::acquire(vector_view<event::fetch> tab)
{
const auto count
{
tab.size()
};
std::string url[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
tab[i].local_result = acquire_local(tab[i]);
if(tab[i].local_result)
continue;
static char tmp[768];
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/event/%s/", url::encode(tab[i].event_id, tmp)
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : tab[i].event_id.hostname()
};
request[i] =
{
"GET", url[i], {}, {}
};
request[i].destination = session[i].destination;
request[i](session[i].server);
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s will not succeed: %s",
string_view{tab[i].event_id},
e.what());
}
size_t ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
if(tab[i].local_result || bool(tab[i].error))
continue;
ircd::parse::buffer pb{tab[i].buf};
response[i] = m::io::response{session[i].server, pb};
tab[i].pdu = json::array{response[i]["pdus"]}[0];
//TODO: check event id
//TODO: check hashes
//TODO: check signatures
ret += !tab[i].error;
log.debug("%s sent us event %s pdus: %zu (size: %zu) error: %d",
string(net::remote{session[i].server}),
string_view{tab[i].event_id},
json::array{response[i]["pdus"]}.count(),
string_view{response[i]}.size(),
bool(tab[i].error));
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s failed: %s",
string_view{tab[i].event_id},
e.what());
}
return ret;
}
//
// acquire_local suite.
//
bool
ircd::m::io::acquire_local(room::state::fetch &tab)
try
{
return false;
}
catch(const std::exception &e)
{
tab.error = std::make_exception_ptr(e);
return false;
}
bool
ircd::m::io::acquire_local(room::fetch &tab)
try
{
return false;
}
catch(const std::exception &e)
{
tab.error = std::make_exception_ptr(e);
return false;
}
bool
ircd::m::io::acquire_local(event::fetch &tab)
try
{
/*
const m::vm::query<m::vm::where::equal> query
{
{ "event_id", tab.event_id },
};
const auto test{[&tab](const auto &event)
{
tab.pdu = stringify(mutable_buffer{tab.buf}, event);
return true;
}};
return m::vm::test(query, test);
*/
return false;
}
catch(const std::exception &e)
{
tab.error = std::make_exception_ptr(e);
return false;
}

View file

@ -621,26 +621,9 @@ const
// room::state
//
ircd::m::room::state::state(const room::id &room_id,
const event::id &event_id,
const mutable_buffer &buf)
ircd::m::room::state::state(const room &room)
{
fetch tab
{
event_id, room_id, buf
};
new (this) state{tab};
}
ircd::m::room::state::state(fetch &tab)
{
io::acquire(tab);
if(bool(tab.error))
std::rethrow_exception(tab.error);
new (this) state{tab.pdus};
}
ircd::m::room::state::state(const json::array &pdus)