0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-30 15:58:20 +02:00

modules/federation: Checkpoint preliminary sender.

This commit is contained in:
Jason Volk 2018-03-14 13:36:13 -07:00
parent 2e9ef15cf5
commit 5a933ee713
4 changed files with 469 additions and 111 deletions

View file

@ -231,12 +231,14 @@ federation_federation_send_la_SOURCES = federation/send.cc
federation_federation_event_la_SOURCES = federation/event.cc
federation_federation_get_missing_events_la_SOURCES = federation/get_missing_events.cc
federation_federation_version_la_SOURCES = federation/version.cc
federation_federation_sender_la_SOURCES = federation/sender.cc
federation_module_LTLIBRARIES = \
federation/federation_send.la \
federation/federation_event.la \
federation/federation_get_missing_events.la \
federation/federation_version.la \
federation/federation_sender.la \
###
###############################################################################

View file

@ -10,33 +10,13 @@
using namespace ircd;
void sender_worker();
ircd::context sender_context
mapi::header
IRCD_MODULE
{
"sender",
256_KiB,
&sender_worker,
ircd::context::POST,
"federation send"
};
const auto on_unload{[]
{
sender_context.interrupt();
sender_context.join();
}};
mapi::header IRCD_MODULE
{
"federation send",
nullptr,
on_unload
};
struct send
:resource
{
using resource::resource;
}
resource
send_resource
{
"/_matrix/federation/v1/send/",
@ -62,6 +42,8 @@ handle_edu(client &client,
vmopts.non_conform.set(m::event::conforms::INVALID_OR_MISSING_EVENT_ID);
vmopts.non_conform.set(m::event::conforms::INVALID_OR_MISSING_ROOM_ID);
vmopts.non_conform.set(m::event::conforms::INVALID_OR_MISSING_SENDER_ID);
vmopts.non_conform.set(m::event::conforms::MISSING_ORIGIN_SIGNATURE);
vmopts.non_conform.set(m::event::conforms::MISSING_SIGNATURES);
vmopts.non_conform.set(m::event::conforms::MISSING_PREV_EVENTS);
vmopts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
vmopts.non_conform.set(m::event::conforms::DEPTH_ZERO);
@ -171,7 +153,8 @@ handle_put(client &client,
};
}
resource::method method_put
resource::method
method_put
{
send_resource, "PUT", handle_put,
{
@ -179,89 +162,3 @@ resource::method method_put
4_MiB // larger = HTTP 413 //TODO: conf
}
};
//
// Main worker stack
//
void sender_handle(const m::event &, const m::room::id &room_id);
void sender_handle(const m::event &);
void
sender_worker()
{
while(1) try
{
std::unique_lock<decltype(m::vm::inserted)> lock
{
m::vm::inserted
};
// reference to the event on the inserter's stack
const auto &event
{
m::vm::inserted.wait(lock)
};
sender_handle(event);
}
catch(const ircd::ctx::interrupted &e)
{
ircd::log::debug("sender worker interrupted");
return;
}
catch(const timeout &e)
{
ircd::log::debug("sender worker: %s", e.what());
}
catch(const std::exception &e)
{
ircd::log::error("sender worker: %s", e.what());
}
}
void
sender_handle(const m::event &event)
{
const auto &room_id
{
json::get<"room_id"_>(event)
};
if(room_id)
{
sender_handle(event, room_id);
return;
}
assert(0);
}
ssize_t txn_ctr
{
8
};
void
sender_handle(const m::event &event,
const m::room::id &room_id)
try
{
const m::room room
{
room_id
};
const m::event::id &event_id
{
json::get<"event_id"_>(event)
};
}
catch(const http::error &e)
{
throw;
}
catch(const std::exception &e)
{
throw;
}

View file

@ -0,0 +1,373 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#include "sender.int.h"
std::list<txn> txns;
std::map<std::string, node, std::less<>> nodes;
static void recv_timeout(txn &);
static void recv_timeouts();
static bool recv_handle(txn &, node &);
static void recv();
static void recv_worker();
ctx::dock recv_action;
static void send(const m::event &, const m::room::id &room_id);
static void send(const m::event &);
static void send();
static void send_worker();
context
sender
{
"fedsnd S", 1_MiB, &send_worker, context::POST,
};
context
receiver
{
"fedsnd R", 1_MiB, &recv_worker, context::POST,
};
mapi::header
IRCD_MODULE
{
"federation sender",
nullptr, []
{
sender.interrupt();
receiver.interrupt();
sender.join();
receiver.join();
}
};
void
send_worker()
try
{
while(1)
send();
}
catch(const ircd::ctx::interrupted &e)
{
ircd::log::debug
{
"Sender worker interrupted..."
};
}
void
send()
try
{
std::unique_lock<decltype(m::vm::accept)> lock
{
m::vm::accept
};
// reference to the event on the inserter's stack
const auto &event
{
m::vm::accept.wait(lock)
};
if(my(event))
send(event);
}
catch(const ircd::ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
ircd::log::error
{
"sender worker: %s", e.what()
};
}
void
send(const m::event &event)
{
const auto &room_id
{
json::get<"room_id"_>(event)
};
if(room_id)
send(event, room_id);
}
void
send(const m::event &event,
const m::room::id &room_id)
{
// Unit is not allocated until we find another server in the room.
std::shared_ptr<struct unit> unit;
const m::room room{room_id};
const m::room::origins origins{room};
origins.for_each([&unit, &event]
(const string_view &origin)
{
if(my_host(origin))
return;
auto it{nodes.lower_bound(origin)};
if(it == end(nodes) || it->first != origin)
it = nodes.emplace_hint(it, origin, origin);
auto &node{it->second};
if(!unit)
unit = std::make_shared<struct unit>(event);
node.push(unit);
});
}
void
node::push(std::shared_ptr<unit> su)
{
q.emplace_back(std::move(su));
flush();
}
bool
node::flush()
try
{
if(q.empty())
return false;
if(curtxn)
return false;
const auto pdus
{
std::count_if(begin(q), end(q), [](const auto &p)
{
return p->type == unit::PDU;
})
};
std::vector<json::value> pdu(pdus);
for(ssize_t i(0); i < pdus; ++i)
pdu.at(i) = string_view{q.at(i)->s};
std::string content
{
m::txn::create({pdu.data(), pdu.size()}, {})
};
m::v1::send::opts opts;
opts.remote = id.host();
opts.sopts = &sopts;
txns.emplace_back(*this, std::move(content), std::move(opts));
q.clear();
recv_action.notify_one();
return true;
}
catch(const std::exception &e)
{
log::error
{
"flush error to %s :%s", string_view{id}, e.what()
};
return false;
}
void
recv_worker()
try
{
while(1)
{
recv_action.wait([]
{
return !txns.empty();
});
recv();
recv_timeouts();
}
}
catch(const ircd::ctx::interrupted &e)
{
log::debug
{
"Receive worker interrupted..."
};
}
void
recv()
try
{
auto next
{
ctx::when_any(begin(txns), end(txns))
};
if(next.wait(seconds(2), std::nothrow) == ctx::future_status::timeout) //TODO: conf
return;
const auto it
{
next.get()
};
if(unlikely(it == end(txns)))
{
assert(0);
return;
}
auto &txn
{
*it
};
assert(txn.node);
auto &node{*txn.node};
const auto ret
{
recv_handle(txn, node)
};
node.curtxn = nullptr;
txns.erase(it);
if(ret)
node.flush();
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
ircd::assertion(e);
return;
}
bool
recv_handle(txn &txn,
node &node)
try
{
const auto code
{
txn.get()
};
const json::object obj
{
txn
};
const m::v1::send::response resp
{
obj
};
log::debug
{
"%u %s from %s for %s",
ushort(code),
http::status(code),
string_view{node.id},
txn.txnid
};
resp.for_each_pdu([&txn, &node]
(const m::event::id &event_id, const json::object &error)
{
if(empty(error))
return;
log::error
{
"Error from %s in %s for %s :%s",
string_view{node.id},
txn.txnid,
string_view{event_id},
string_view{error}
};
});
return true;
}
catch(const http::error &e)
{
log::error
{
"%u %s from %s for %s :%s",
ushort(e.code),
http::status(e.code),
string_view{node.id},
txn.txnid,
e.what()
};
return false;
}
catch(const std::exception &e)
{
log::error
{
"Error from %s for %s :%s",
string_view{node.id},
txn.txnid,
e.what()
};
return false;
}
void
recv_timeouts()
{
const auto &now
{
ircd::now<steady_point>()
};
auto it(begin(txns));
while(it != end(txns))
{
auto &txn(*it);
if(txn.timeout + seconds(15) < now) //TODO: conf
{
recv_timeout(txn);
it = txns.erase(it);
}
else ++it;
}
}
void
recv_timeout(txn &txn)
{
assert(txn.node);
auto &node(*txn.node);
log::warning
{
"Timeout to %s for txn %s",
string_view{node.id},
txn.txnid
};
node.curtxn = nullptr;
}

View file

@ -0,0 +1,86 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
//
// Internal header for sender.cc only.
// Do not include.
//
using namespace ircd;
struct txn;
struct node;
struct unit
:std::enable_shared_from_this<unit>
{
enum type { PDU, EDU, FAILURE };
std::string s;
enum type type;
unit(std::string s, const enum type &type)
:s{std::move(s)}
,type{type}
{}
unit(const m::event &event)
:unit{json::strung{event}, PDU}
{}
};
struct txndata
{
std::string content;
string_view txnid;
char txnidbuf[64];
txndata(std::string content)
:content{std::move(content)}
,txnid{m::txn::create_id(txnidbuf, this->content)}
{}
};
struct node
{
std::deque<std::shared_ptr<unit>> q;
m::node::id::buf id;
m::node::room room;
server::request::opts sopts;
txn *curtxn {nullptr};
bool flush();
void push(std::shared_ptr<unit>);
node(const string_view &origin)
:id{"", origin}
,room{id}
{}
};
struct txn
:txndata
,m::v1::send
{
struct node *node;
steady_point timeout;
char headers[8_KiB];
txn(struct node &node,
std::string content,
m::v1::send::opts opts)
:txndata{std::move(content)}
,send{txnid, string_view{this->content}, headers, std::move(opts)}
,node{&node}
,timeout{now<steady_point>()} //TODO: conf
{}
txn() = default;
};