0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-16 17:08:20 +02:00

modules/m_bridge: Add worker stack; push loop.

This commit is contained in:
Jason Volk 2021-02-02 07:52:37 -08:00
parent 5eb500e69f
commit 16898ba19e

View file

@ -8,18 +8,42 @@
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
namespace ircd::m::bridge
{
static size_t make_txn(const config &, json::stack &, events::range &);
static event::idx worker_handle(const config &, const rfc3986::uri &, const events::range &, window_buffer);
static void worker_loop(const config &, const rfc3986::uri &, const mutable_buffer &);
static void worker(std::string config);
static void handle_event(const event &, vm::eval &);
static void fini();
static void init();
extern conf::item<seconds> timeout;
extern ctx::dock worker_dock;
extern std::vector<context> worker_context;
extern hookfn<vm::eval &> notify_hook;
}
ircd::mapi::header
IRCD_MODULE
{
"Bridges (Application Services)"
"Bridges (Application Services)",
ircd::m::bridge::init,
ircd::m::bridge::fini,
};
namespace ircd::m::bridge
decltype(ircd::m::bridge::timeout)
ircd::m::bridge::timeout
{
static void handle_event(const m::event &, vm::eval &);
{ "name", "ircd.m.bridge.txn.timeout" },
{ "default", 10L },
};
extern hookfn<vm::eval &> notify_hook;
}
decltype(ircd::m::bridge::worker_dock)
ircd::m::bridge::worker_dock;
decltype(ircd::m::bridge::worker_context)
ircd::m::bridge::worker_context;
decltype(ircd::m::bridge::notify_hook)
ircd::m::bridge::notify_hook
@ -30,26 +54,295 @@ ircd::m::bridge::notify_hook
}
};
void
ircd::m::bridge::init()
{
}
void
ircd::m::bridge::fini()
{
for(auto &worker : worker_context)
worker.terminate();
for(auto &worker : worker_context)
worker.join();
}
void
ircd::m::bridge::handle_event(const m::event &event,
vm::eval &eval)
try
{
// Drop internal room traffic
if(eval.room_internal)
return;
// Drop EDU's ???
if(!event.event_id)
return;
worker_dock.notify_all();
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::critical
{
log, "Failed to handle %s notify :%s",
string_view{event.event_id},
e.what(),
};
}
void
ircd::m::bridge::worker(std::string config_)
try
{
const bridge::config config
{
config_
};
const rfc3986::uri uri
{
at<"url"_>(config)
};
const unique_mutable_buffer buf
{
event::MAX_SIZE * 8
};
// Wait for run::level RUN before entering work loop.
run::barrier<ctx::interrupted> {};
worker_loop(config, uri, buf);
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Failed to handle for bridgelication services :%s",
log, "Worker failed to initialize :%s",
e.what(),
};
}
void
ircd::m::bridge::worker_loop(const config &config,
const rfc3986::uri &uri,
const mutable_buffer &buf)
try
{
auto since {vm::sequence::retired}; do
{
worker_dock.wait([&since]
{
return since < vm::sequence::retired;
});
const events::range range
{
since, vm::sequence::retired + 1
};
since = worker_handle(config, uri, range, buf);
assert(since >= range.first);
assert(since <= range.second);
// Prevent spin for retrying the same range on handled exception.
if(unlikely(since == range.first))
sleep(15s);
}
while(run::level == run::level::RUN);
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::critical
{
log, "Worker unhandled :%s",
e.what(),
};
}
ircd::m::event::idx
ircd::m::bridge::worker_handle(const config &config,
const rfc3986::uri &uri,
const events::range &range_,
window_buffer buf)
try
{
size_t count {0};
auto range {range_};
buf([&config, &count, &range]
(const mutable_buffer &buf)
{
json::stack out
{
buf
};
count += make_txn(config, out, range);
return out.completed();
});
if(!count)
return range.second;
const json::object content
{
buf.completed()
};
buf = window_buffer
{
buf.remains()
};
// Generate URL for the PUT
char uribuf[448], txnidbuf[64];
const string_view url
{
make_uri(uribuf, config, fmt::sprintf
{
txnidbuf, "transactions/%lu", range.first,
})
};
log::debug
{
log, "[%s] PUT txn:%lu:%lu (%lu:%lu) events:%zu",
json::get<"id"_>(config),
range.first,
range.second,
range_.first,
range_.second,
count,
};
const net::hostport target
{
uri.remote
};
// HTTP request sans
http::request
{
buf,
host(target),
"PUT",
url,
size(string_view(content)),
"application/json; charset=utf-8"_sv,
};
// Outputs from consumed buffer
server::out out;
out.head = buf.completed();
out.content = content;
// Inputs to remaining buffer
server::in in;
in.head = buf.remains();
in.content = in.head;
// Send to bridge
server::request::opts sopts;
server::request req
{
target, std::move(out), std::move(in), &sopts
};
// Recv response
const auto code
{
req.get(seconds(timeout))
};
log::logf
{
log, log::level::DEBUG,
"[%s] %u txn:%lu:%lu (%lu:%lu) events:%zu :%s",
json::get<"id"_>(config),
uint(code),
range.first,
range.second,
range_.first,
range_.second,
count,
http::status(code),
};
return range.second + 1;
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "worker handle range:%lu:%lu :%s",
range_.first,
range_.second,
e.what(),
};
return range_.first;
}
size_t
ircd::m::bridge::make_txn(const config &config,
json::stack &out,
events::range &range)
{
json::stack::object top
{
out
};
json::stack::array events
{
top, "events"
};
size_t count {0};
m::events::for_each(events::range{range}, [&]
(const event::idx &event_idx, m::event event)
{
if(m::internal(json::get<"room_id"_>(event)))
return true;
m::event::append::opts opts;
opts.event_idx = &event_idx;
opts.query_txnid = false;
opts.query_prev_state = true;
opts.query_redacted = false;
m::event::append
{
events, event
};
range.second = event_idx;
++count;
const bool sufficient_buffer
{
out.remaining() > event::MAX_SIZE + 16_KiB
};
return sufficient_buffer;
});
return count;
}