0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-10-01 05:08:59 +02:00

ircd::db: Develop basic key/value API.

This commit is contained in:
Jason Volk 2016-09-24 21:12:43 -07:00
parent 3083bb8e62
commit acd5593aac
2 changed files with 179 additions and 88 deletions

View file

@ -123,12 +123,27 @@ class handle
public:
using char_closure = std::function<void (const char *, size_t)>;
using string_closure = std::function<void (const std::string &)>;
// Tests if key exists
bool has(const std::string &key, const gopts & = {});
// Perform a get into a closure. This offers a reference to the data with zero-copy.
// Be very, very cognizant of the things you do as you sojourn on this odyssey.
void get(const std::string &key, const char_closure &, const gopts & = {});
// Get data into your buffer. The signed char buffer is null terminated; the unsigned is not.
size_t get(const std::string &key, char *const &buf, const size_t &max, const gopts & = {});
size_t get(const std::string &key, uint8_t *const &buf, const size_t &max, const gopts & = {});
std::string get(const std::string &key, const gopts & = {});
// Write data to the db
void set(const std::string &key, const char *const &buf, const size_t &size, const sopts & = {});
void set(const std::string &key, const uint8_t *const &buf, const size_t &size, const sopts & = {});
void set(const std::string &key, const std::string &value, const sopts & = {});
// Remove data from the db. not_found is never thrown.
void del(const std::string &key, const sopts & = {});
handle(const std::string &name, const opts & = {});
~handle() noexcept;
};

View file

@ -30,12 +30,6 @@ struct log::log log
"db", 'D' // Database subsystem takes SNOMASK +D
};
void throw_on_error(const rocksdb::Status &);
rocksdb::WriteOptions make_opts(const sopts &);
rocksdb::ReadOptions make_opts(const gopts &);
rocksdb::Options make_opts(const opts &);
struct meta
{
std::string name;
@ -69,7 +63,7 @@ rocksdb::WriteOptions make_opts(const sopts &);
rocksdb::ReadOptions make_opts(const gopts &, const bool &iterator = false);
rocksdb::Options make_opts(const opts &);
void query(std::function<void ()>);
void yielding(const std::function<void ()> &);
} // namespace db
} // namespace ircd
@ -140,19 +134,97 @@ noexcept
}
void
db::handle::set(const std::string &key,
const std::string &value,
db::handle::del(const std::string &key,
const sopts &sopts)
{
using rocksdb::Slice;
auto opts(make_opts(sopts));
const Slice k(key.data(), key.size());
const Slice v(value.data(), value.size());
throw_on_error(d->Delete(opts, k));
}
void
db::handle::set(const std::string &key,
const std::string &value,
const sopts &sopts)
{
set(key, value.data(), value.size(), sopts);
}
void
db::handle::set(const std::string &key,
const uint8_t *const &buf,
const size_t &size,
const sopts &sopts)
{
set(key, reinterpret_cast<const char *>(buf), size, sopts);
}
void
db::handle::set(const std::string &key,
const char *const &buf,
const size_t &size,
const sopts &sopts)
{
using rocksdb::Slice;
auto opts(make_opts(sopts));
const Slice k(key.data(), key.size());
const Slice v(buf, size);
throw_on_error(d->Put(opts, k, v));
}
std::string
db::handle::get(const std::string &key,
const gopts &gopts)
{
std::string ret;
const auto copy([&ret]
(const char *const &src, const size_t &size)
{
ret.assign(src, size);
});
get(key, copy, gopts);
return ret;
}
size_t
db::handle::get(const std::string &key,
uint8_t *const &buf,
const size_t &max,
const gopts &gopts)
{
size_t ret(0);
const auto copy([&ret, &buf, &max]
(const char *const &src, const size_t &size)
{
ret = std::min(size, max);
memcpy(buf, src, ret);
});
get(key, copy, gopts);
return ret;
}
size_t
db::handle::get(const std::string &key,
char *const &buf,
const size_t &max,
const gopts &gopts)
{
size_t ret(0);
const auto copy([&ret, &buf, &max]
(const char *const &src, const size_t &size)
{
ret = rb_strlcpy(buf, src, std::min(size, max));
});
get(key, copy, gopts);
return ret;
}
void
db::handle::get(const std::string &key,
const char_closure &func,
@ -163,12 +235,16 @@ db::handle::get(const std::string &key,
auto opts(make_opts(gopts));
const Slice sk(key.data(), key.size());
query([this, &sk, &func, &opts]
yielding([this, &sk, &func, &opts]
{
const std::unique_ptr<Iterator> it(d->NewIterator(opts));
it->Seek(sk);
throw_on_error(it->status());
if(!it->Valid())
throw not_found();
if(it->key().compare(sk) != 0)
throw not_found();
const auto &v(it->value());
func(v.data(), v.size());
@ -186,7 +262,7 @@ db::handle::has(const std::string &key,
bool ret;
auto opts(make_opts(gopts));
const Slice k(key.data(), key.size());
query([this, &k, &ret, &opts]
yielding([this, &k, &ret, &opts]
{
if(!d->KeyMayExist(opts, k, nullptr, nullptr))
{
@ -210,7 +286,7 @@ db::handle::has(const std::string &key,
}
void
db::query(std::function<void ()> func)
db::yielding(const std::function<void ()> &func)
{
std::exception_ptr eptr;
auto &context(ctx::cur());
@ -241,72 +317,6 @@ db::query(std::function<void ()> func)
std::rethrow_exception(eptr);
}
void
db::work::init()
{
assert(!thread);
interruption = false;
thread = new std::thread(&worker);
}
void
db::work::fini()
{
if(!thread)
return;
mutex.lock();
interruption = true;
cond.notify_one();
mutex.unlock();
thread->join();
delete thread;
thread = nullptr;
}
void
db::work::push(closure &&func)
{
const std::lock_guard<decltype(mutex)> lock(mutex);
queue.emplace_back(std::move(func));
cond.notify_one();
}
void
db::work::worker()
noexcept try
{
while(1)
{
const auto func(pop());
func();
}
}
catch(const ctx::interrupted &)
{
return;
}
db::work::closure
db::work::pop()
{
std::unique_lock<decltype(mutex)> lock(mutex);
cond.wait(lock, []
{
if(!queue.empty())
return true;
if(unlikely(interruption))
throw ctx::interrupted();
return false;
});
auto c(std::move(queue.front()));
queue.pop_front();
return std::move(c);
}
rocksdb::Options
db::make_opts(const opts &opts)
{
@ -465,13 +475,6 @@ db::has_opt(const optlist<T> &list,
}
std::string
db::path(const std::string &name)
{
const auto prefix(path::get(path::DB));
return path::build({prefix, name});
}
void
db::throw_on_error(const rocksdb::Status &s)
{
@ -497,3 +500,76 @@ db::throw_on_error(const rocksdb::Status &s)
throw error("Unknown error");
}
}
std::string
db::path(const std::string &name)
{
const auto prefix(path::get(path::DB));
return path::build({prefix, name});
}
void
db::work::init()
{
assert(!thread);
interruption = false;
thread = new std::thread(&worker);
}
void
db::work::fini()
{
if(!thread)
return;
mutex.lock();
interruption = true;
cond.notify_one();
mutex.unlock();
thread->join();
delete thread;
thread = nullptr;
}
void
db::work::push(closure &&func)
{
const std::lock_guard<decltype(mutex)> lock(mutex);
queue.emplace_back(std::move(func));
cond.notify_one();
}
void
db::work::worker()
noexcept try
{
while(1)
{
const auto func(pop());
func();
}
}
catch(const ctx::interrupted &)
{
return;
}
db::work::closure
db::work::pop()
{
std::unique_lock<decltype(mutex)> lock(mutex);
cond.wait(lock, []
{
if(!queue.empty())
return true;
if(unlikely(interruption))
throw ctx::interrupted();
return false;
});
auto c(std::move(queue.front()));
queue.pop_front();
return std::move(c);
}