0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-28 17:52:54 +01:00

ircd::fs: Add a select()'ish multi-fd yielding device.

This commit is contained in:
Jason Volk 2020-05-08 17:26:46 -07:00
parent 1720aea7e2
commit a0476b8a9c
3 changed files with 113 additions and 0 deletions

View file

@ -53,6 +53,7 @@ namespace ircd::fs
#include "sync.h"
#include "aio.h"
#include "iou.h"
#include "select.h"
#include "stdin.h"
#include "support.h"

19
include/ircd/fs/select.h Normal file
View file

@ -0,0 +1,19 @@
// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_FS_SELECT_H
namespace ircd::fs
{
/// Yield ircd::ctx until one of the fd's becomes ready
///
size_t select(const vector_view<const fd> &); // read
}

View file

@ -595,6 +595,99 @@ ircd::fs::stdin::tty::write(const string_view &buf)
return syscall(::write, int(*this), buf.data(), buf.size());
}
///////////////////////////////////////////////////////////////////////////////
//
// fs/select.h
//
size_t
ircd::fs::select(const vector_view<const fd> &fd_)
{
using asio::posix::stream_descriptor;
static ios::descriptor desc
{
"ircd::fs::select"
};
const size_t num(size(fd_));
std::optional<stream_descriptor> _fd[num];
const unwind release{[&_fd]
{
for(auto &fd : _fd)
if(fd)
fd->release();
}};
size_t ret(-1);
ctx::latch latch(num);
const auto callback{[&num, &_fd, &latch, &ret]
(const boost::system::error_code &ec, const auto &fd)
{
// The first successful callback is associated with an input fd
// and its array indice becomes the return value.
if(!ec && ret == size_t(-1))
{
const auto it
{
std::find_if(_fd, _fd + num, [&fd]
(const auto &_fd)
{
return _fd && std::addressof(*_fd) == std::addressof(*fd);
})
};
ret = std::distance(_fd, it);
assert(ret < num);
}
latch.count_down();
}};
for(size_t i(0); i < num; ++i)
{
// Allow a closed descriptor in the vector to be no-op.
if(!fd_[i])
{
latch.count_down();
continue;
}
_fd[i] =
{
ios::get(), int(fd_[i])
};
auto handle
{
std::bind(callback, ph::_1, std::cref(_fd[i]))
};
_fd[i]->async_wait(stream_descriptor::wait_read, ios::handle(desc, std::move(handle)));
}
std::exception_ptr eptr; try
{
latch.wait();
assert(ret < num);
return ret;
}
catch(...)
{
eptr = std::current_exception();
const ctx::exception_handler eh;
const ctx::uninterruptible::nothrow ui;
for(auto &fd : _fd)
fd->cancel();
latch.wait();
assert(eptr);
std::rethrow_exception(eptr);
}
return ret;
}
///////////////////////////////////////////////////////////////////////////////
//
// fs/sync.h