0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 10:12:39 +01:00

ircd::fs: Implement synchronous parallel-read interface.

This commit is contained in:
Jason Volk 2020-05-06 19:14:20 -07:00
parent c683365341
commit aaa5e785aa
4 changed files with 147 additions and 0 deletions

View file

@ -13,11 +13,13 @@
namespace ircd::fs namespace ircd::fs
{ {
struct read_op;
struct read_opts extern const read_opts_default; struct read_opts extern const read_opts_default;
// Yields ircd::ctx for read into buffers; returns bytes read // Yields ircd::ctx for read into buffers; returns bytes read
size_t read(const fd &, const mutable_buffers &, const read_opts & = read_opts_default); size_t read(const fd &, const mutable_buffers &, const read_opts & = read_opts_default);
size_t read(const string_view &path, const mutable_buffers &, const read_opts & = read_opts_default); size_t read(const string_view &path, const mutable_buffers &, const read_opts & = read_opts_default);
size_t read(const vector_view<read_op> &); // parallel read
// Yields ircd::ctx for read into buffer; returns view of read portion. // Yields ircd::ctx for read into buffer; returns view of read portion.
const_buffer read(const fd &, const mutable_buffer &, const read_opts & = read_opts_default); const_buffer read(const fd &, const mutable_buffer &, const read_opts & = read_opts_default);
@ -56,6 +58,19 @@ struct ircd::fs::read_opts
read_opts(const off_t & = 0); read_opts(const off_t & = 0);
}; };
/// Convenience aggregation for the parallel read() overload.
struct ircd::fs::read_op
{
// request
const fs::fd *fd {nullptr};
const read_opts *opts {nullptr};
mutable_buffers bufs;
// result
std::exception_ptr eptr;
size_t ret {0};
};
inline inline
ircd::fs::read_opts::read_opts(const off_t &offset) ircd::fs::read_opts::read_opts(const off_t &offset)
:opts{offset, op::READ} :opts{offset, op::READ}

View file

@ -825,6 +825,71 @@ ircd::fs::read(const string_view &path,
return read(fd, bufs, opts); return read(fd, bufs, opts);
} }
size_t
ircd::fs::read(const vector_view<read_op> &op)
{
// Use IOV_MAX as a sanity value for number of operations here
if(unlikely(op.size() > info::iov_max))
throw error
{
make_error_code(std::errc::invalid_argument),
"Read operation count:%zu exceeds max:%zu",
op.size(),
info::iov_max,
};
bool aio {true}, all {false};
for(size_t i(0); i < op.size(); ++i)
{
assert(op[i].opts);
assert(op[i].opts->aio);
// If any op isn't tolerant of less bytes actually read than they
// requested, they require us to perform the unix read loop, and
// that ruins things for everybody!
assert(!op[i].opts->all);
//all |= op[i].opts->all;
// If any op doesn't want AIO we have to fallback on sequential
// blocking reads for all ops.
assert(op[i].opts->aio);
//aio &= op[i].opts->aio;
// EINVAL for exceeding this system's IOV_MAX
if(unlikely(op[i].bufs.size() > info::iov_max))
throw error
{
make_error_code(std::errc::invalid_argument),
"op[%zu] :buffer count of %zu exceeds IOV_MAX of %zu",
i,
op[i].bufs.size(),
info::iov_max,
};
}
#ifdef IRCD_USE_AIO
if(likely(aio::system && aio && !all))
return aio::read(op);
#endif
// Fallback to sequential read operations
size_t ret(0);
for(size_t i(0); i < op.size(); ++i) try
{
assert(op[i].fd);
assert(op[i].opts);
op[i].ret = read(*op[i].fd, op[i].bufs, *op[i].opts);
ret += op[i].ret;
}
catch(const std::system_error &)
{
op[i].eptr = std::current_exception();
op[i].ret = 0;
}
return ret;
}
namespace ircd::fs namespace ircd::fs
{ {
static int flags(const read_opts &opts); static int flags(const read_opts &opts);

View file

@ -228,6 +228,72 @@ ircd::fs::aio::read(const fd &fd,
return bytes; return bytes;
} }
size_t
ircd::fs::aio::read(const vector_view<read_op> &op)
{
const size_t &num(op.size());
const size_t numbuf
{
std::accumulate(std::begin(op), std::end(op), 0UL, []
(auto ret, const auto &op)
{
return ret += op.bufs.size();
})
};
assert(num <=info::iov_max); // use as sanity limit on op count.
assert(numbuf <= num * info::iov_max);
aio::request::read request[num];
struct ::iovec buf[numbuf];
ctx::dock waiter;
for(size_t i(0), b(0); i < num; b += op[i].bufs.size(), ++i)
{
assert(op[i].bufs.size() <= info::iov_max);
assert(b + op[i].bufs.size() <= numbuf);
assert(b <= numbuf);
const iovec_view iov
{
buf + b, op[i].bufs.size()
};
assert(op[i].fd);
assert(op[i].opts);
request[i] =
{
waiter,
*op[i].fd,
*op[i].opts,
make_iov(iov, op[i].bufs)
};
}
// Update stats
const scope_count cur_reads{stats.cur_reads, ushort(num)};
stats.max_reads = std::max(stats.max_reads, stats.cur_reads);
// Send requests
for(size_t i(0); i < num; ++i)
request[i].submit();
// Recv results
size_t ret(0);
for(size_t i(0); i < num; ++i) try
{
op[i].ret = request[i].complete();
assert(op[i].ret == buffers::size(op[i].bufs) || !op[i].opts->blocking);
ret += op[i].ret;
stats.bytes_read += op[i].ret;
stats.reads++;
}
catch(const std::system_error &)
{
op[i].eptr = std::current_exception();
op[i].ret = 0;
}
return ret;
}
// //
// request::write // request::write
// //

View file

@ -28,6 +28,7 @@ namespace ircd::fs::aio
struct request; struct request;
size_t write(const fd &, const const_iovec_view &, const write_opts &); size_t write(const fd &, const const_iovec_view &, const write_opts &);
size_t read(const vector_view<read_op> &);
size_t read(const fd &, const const_iovec_view &, const read_opts &); size_t read(const fd &, const const_iovec_view &, const read_opts &);
size_t fsync(const fd &, const sync_opts &); size_t fsync(const fd &, const sync_opts &);
} }