0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-30 02:32:43 +01:00
construct/ircd/m_io.cc
2017-11-30 11:23:44 -08:00

525 lines
11 KiB
C++

/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
namespace ircd::m::io
{
bool acquire_local(event::fetch &);
bool acquire_local(room::fetch &);
bool acquire_local(room::state::fetch &);
}
///////////////////////////////////////////////////////////////////////////////
//
// m/io.h
//
struct ircd::m::io::fetch::opts
const ircd::m::io::fetch::defaults
{};
struct ircd::m::io::sync::opts
const ircd::m::io::sync::defaults
{};
ircd::json::object
ircd::m::io::get(const id::event &event_id,
const mutable_buffer &buf)
{
event::fetch tab
{
event_id, buf
};
return acquire(tab);
}
void
ircd::m::io::release(event::sync &tab)
{
release({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
}
ircd::json::array
ircd::m::io::acquire(room::state::fetch &tab)
{
acquire({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
return tab.pdus;
}
ircd::json::array
ircd::m::io::acquire(room::fetch &tab)
{
acquire({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
return tab.pdus;
}
ircd::json::object
ircd::m::io::acquire(event::fetch &tab)
{
acquire({&tab, 1});
if(unlikely(bool(tab.error)))
std::rethrow_exception(tab.error);
return tab.pdu;
}
//
// mass release suite
//
//TODO: XXX
namespace ircd::m::io
{
size_t txn_ctr
{
1
};
net::remote
destination_remote(const string_view &destination)
{
return net::remote{destination};
}
m::event::id
event_id(const event::sync &es)
{
const json::object &event{es.buf};
return event.get("event_id");
}
}
size_t
ircd::m::io::release(vector_view<event::sync> tab)
{
const auto count
{
tab.size()
};
size_t out(0);
std::string url[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].destination || !tab[i].buf)
continue;
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/send/%zu", txn_ctr++
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : destination_remote(tab[i].destination)
};
request[i] =
{
"PUT", url[i], {}, json::object{tab[i].buf}
};
request[i].destination = tab[i].destination;
request[i](session[i].server);
++out;
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("sync %s will not succeed: %s",
string_view{event_id(tab[i])},
e.what());
}
static const size_t rbuf_size{4_KiB};
const unique_buffer<mutable_buffer> rbuf
{
out * rbuf_size
};
size_t in(0), ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].destination)
continue;
if(bool(tab[i].error))
continue;
const mutable_buffer buf
{
data(rbuf) + (in * rbuf_size), rbuf_size
};
ircd::parse::buffer pb{buf};
response[i] = m::io::response{session[i].server, pb};
ret += !tab[i].error;
++in;
log.debug("%s received event %s (size: %zu) error: %d",
string(net::remote{session[i].server}),
string_view{event_id(tab[i])},
size(tab[i].buf),
bool(tab[i].error));
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request to sync event_id %s failed: %s",
string_view{event_id(tab[i])},
e.what());
}
return ret;
}
//
// mass acquire suite
//
size_t
ircd::m::io::acquire(vector_view<room::state::fetch> tab)
{
const auto count
{
tab.size()
};
std::string url[count];
std::string query[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
tab[i].local_result = acquire_local(tab[i]);
if(tab[i].local_result)
continue;
static char tmp[768];
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/state/%s/", urlencode(tab[i].room_id, tmp)
};
query[i] = fmt::snstringf
{
1024, "event_id=%s", urlencode(tab[i].event_id, tmp)
};
request[i] =
{
"GET", url[i], query[i], {}
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : tab[i].event_id.hostname()
};
request[i].destination = session[i].destination;
request[i](session[i].server);
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s in room_id %s will not succeed: %s",
string_view{tab[i].event_id},
string_view{tab[i].room_id},
e.what());
}
size_t ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
if(tab[i].local_result || bool(tab[i].error))
continue;
ircd::parse::buffer pb{tab[i].buf};
response[i] = m::io::response(session[i].server, pb);
tab[i].auth_chain = response[i]["auth_chain"];
tab[i].pdus = response[i]["pdus"];
//TODO: check event id
//TODO: check hashes
//TODO: check signatures
ret += !tab[i].error;
log.debug("%s sent us event %s in room %s pdus: %zu (size: %zu) error: %d",
string(net::remote{session[i].server}),
string_view{tab[i].event_id},
string_view{tab[i].room_id},
json::array{response[i]["pdus"]}.count(),
string_view{response[i]}.size(),
bool(tab[i].error));
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s failed: %s",
string_view{tab[i].event_id},
e.what());
}
return ret;
}
size_t
ircd::m::io::acquire(vector_view<room::fetch> tab)
{
const auto count
{
tab.size()
};
std::string url[count];
std::string query[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
tab[i].local_result = acquire_local(tab[i]);
if(tab[i].local_result)
continue;
static char tmp[768];
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/backfill/%s/", urlencode(tab[i].room_id, tmp)
};
query[i] = fmt::snstringf
{
1024, "limit=%zu&v=%s", tab[i].opts->limit, urlencode(tab[i].event_id, tmp)
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : tab[i].event_id.hostname()
};
request[i] =
{
"GET", url[i], query[i], {}
};
request[i].destination = session[i].destination;
request[i](session[i].server);
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s in room_id %s will not succeed: %s",
string_view{tab[i].event_id},
string_view{tab[i].room_id},
e.what());
}
size_t ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
if(tab[i].local_result || bool(tab[i].error))
continue;
ircd::parse::buffer pb{tab[i].buf};
response[i] = m::io::response(session[i].server, pb);
tab[i].auth_chain = response[i]["auth_chain"];
tab[i].pdus = response[i]["pdus"];
//TODO: check event id
//TODO: check hashes
//TODO: check signatures
ret += !tab[i].error;
log.debug("%s sent us event %s in room %s pdus: %zu (size: %zu) error: %d",
string(net::remote{session[i].server}),
string_view{tab[i].event_id},
string_view{tab[i].room_id},
json::array{response[i]["pdus"]}.count(),
string_view{response[i]}.size(),
bool(tab[i].error));
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s failed: %s",
string_view{tab[i].event_id},
e.what());
}
return ret;
}
size_t
ircd::m::io::acquire(vector_view<event::fetch> tab)
{
const auto count
{
tab.size()
};
std::string url[count];
m::io::request request[count];
struct session session[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
tab[i].local_result = acquire_local(tab[i]);
if(tab[i].local_result)
continue;
static char tmp[768];
url[i] = fmt::snstringf
{
1024, "_matrix/federation/v1/event/%s/", urlencode(tab[i].event_id, tmp)
};
session[i] =
{
tab[i].opts->hint? tab[i].opts->hint : tab[i].event_id.hostname()
};
request[i] =
{
"GET", url[i], {}, {}
};
request[i].destination = session[i].destination;
request[i](session[i].server);
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s will not succeed: %s",
string_view{tab[i].event_id},
e.what());
}
size_t ret(0);
json::object response[count];
for(size_t i(0); i < count; ++i) try
{
if(!tab[i].event_id)
continue;
if(tab[i].local_result || bool(tab[i].error))
continue;
ircd::parse::buffer pb{tab[i].buf};
response[i] = m::io::response{session[i].server, pb};
tab[i].pdu = json::array{response[i]["pdus"]}[0];
//TODO: check event id
//TODO: check hashes
//TODO: check signatures
ret += !tab[i].error;
log.debug("%s sent us event %s pdus: %zu (size: %zu) error: %d",
string(net::remote{session[i].server}),
string_view{tab[i].event_id},
json::array{response[i]["pdus"]}.count(),
string_view{response[i]}.size(),
bool(tab[i].error));
}
catch(const std::exception &e)
{
tab[i].error = std::make_exception_ptr(e);
log.warning("request for event_id %s failed: %s",
string_view{tab[i].event_id},
e.what());
}
return ret;
}
//
// acquire_local suite.
//
bool
ircd::m::io::acquire_local(room::state::fetch &tab)
try
{
return false;
}
catch(const std::exception &e)
{
tab.error = std::make_exception_ptr(e);
return false;
}
bool
ircd::m::io::acquire_local(room::fetch &tab)
try
{
return false;
}
catch(const std::exception &e)
{
tab.error = std::make_exception_ptr(e);
return false;
}
bool
ircd::m::io::acquire_local(event::fetch &tab)
try
{
const m::vm::query<m::vm::where::equal> query
{
{ "event_id", tab.event_id },
};
const auto test{[&tab](const auto &event)
{
tab.pdu = stringify(mutable_buffer{tab.buf}, event);
return true;
}};
return m::vm::test(query, test);
}
catch(const std::exception &e)
{
tab.error = std::make_exception_ptr(e);
return false;
}