0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-01 02:14:13 +01:00
construct/ircd/db_env.cc

4160 lines
71 KiB
C++

// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#include <RB_INC_FCNTL_H
#include "db.h"
decltype(ircd::db::database::env::log)
ircd::db::database::env::log
{
"db.env"
};
//
// env::env
//
ircd::db::database::env::env(database *const &d)
:d{*d},
st{std::make_unique<state>(d)}
{
}
ircd::db::database::env::~env()
noexcept
{
}
rocksdb::Status
ircd::db::database::env::NewSequentialFile(const std::string &name,
std::unique_ptr<SequentialFile> *const r,
const EnvOptions &options)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] new sequential file '%s' options:%p [mm:%b direct:%b bufsz:%zu readahead:%zu]",
d.name,
name,
&options,
options.use_mmap_reads,
options.use_direct_reads,
options.random_access_max_buffer_size,
options.compaction_readahead_size,
};
#endif
*r = std::make_unique<sequential_file>(&d, name, options);
return Status::OK();
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::NewRandomAccessFile(const std::string &name,
std::unique_ptr<RandomAccessFile> *const r,
const EnvOptions &options)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] new random access file '%s' options:%p [mm:%b direct:%b bufsz:%zu readahead:%zu]",
d.name,
name,
&options,
options.use_mmap_reads,
options.use_direct_reads,
options.random_access_max_buffer_size,
options.compaction_readahead_size,
};
#endif
*r = std::make_unique<random_access_file>(&d, name, options);
return Status::OK();
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::NewWritableFile(const std::string &name,
std::unique_ptr<WritableFile> *const r,
const EnvOptions &options)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] new writable file '%s' options:%p [mm:%b direct:%b rl:%p bufsz:%zu syncsz:%zu]",
d.name,
name,
&options,
options.use_mmap_writes,
options.use_direct_writes,
options.rate_limiter,
options.writable_file_max_buffer_size,
options.bytes_per_sync,
};
#endif
if(options.use_direct_writes)
*r = std::make_unique<writable_file_direct>(&d, name, options, true);
else
*r = std::make_unique<writable_file>(&d, name, options, true);
return Status::OK();
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::ReopenWritableFile(const std::string &name,
std::unique_ptr<WritableFile> *const r,
const EnvOptions &options)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] reopen writable file '%s' options:%p",
d.name,
name,
&options
};
#endif
if(options.use_direct_writes)
*r = std::make_unique<writable_file_direct>(&d, name, options, false);
else
*r = std::make_unique<writable_file>(&d, name, options, false);
return Status::OK();
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
__attribute__((unused))
ircd::db::database::env::ReuseWritableFile(const std::string &name,
const std::string &old_name,
std::unique_ptr<WritableFile> *const r,
const EnvOptions &options)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] reuse writable file '%s' old '%s' options:%p",
d.name,
name,
old_name,
&options
};
#endif
throw ircd::not_implemented
{
"[%s] ReuseWritableFile(name:'%s' old:'%s')",
d.name,
name,
old_name
};
return Status::OK();
//return defaults.ReuseWritableFile(name, old_name, r, options);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::NewRandomRWFile(const std::string &name,
std::unique_ptr<RandomRWFile> *const result,
const EnvOptions &options)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] new random read/write file '%s' options:%p",
d.name,
name,
&options
};
#endif
*result = std::make_unique<random_rw_file>(&d, name, options);
return Status::OK();
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::NewDirectory(const std::string &name,
std::unique_ptr<Directory> *const result)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] new directory '%s'",
d.name,
name
};
#endif
std::unique_ptr<Directory> defaults;
const auto ret
{
this->defaults.NewDirectory(name, &defaults)
};
*result = std::make_unique<directory>(&d, name, std::move(defaults));
return ret;
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::FileExists(const std::string &f)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] file exists '%s'",
d.name,
f
};
#endif
return defaults.FileExists(f);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::GetChildren(const std::string &dir,
std::vector<std::string> *const r)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get children of directory '%s'",
d.name,
dir
};
#endif
return defaults.GetChildren(dir, r);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::GetChildrenFileAttributes(const std::string &dir,
std::vector<FileAttributes> *const result)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get children file attributes of directory '%s'",
d.name,
dir
};
#endif
return defaults.GetChildrenFileAttributes(dir, result);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::DeleteFile(const std::string &name)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] delete file '%s'",
d.name,
name
};
#endif
return defaults.DeleteFile(name);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::CreateDir(const std::string &name)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] create directory '%s'",
d.name,
name
};
#endif
return defaults.CreateDir(name);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::CreateDirIfMissing(const std::string &name)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] create directory if missing '%s'",
d.name,
name
};
#endif
return defaults.CreateDirIfMissing(name);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::DeleteDir(const std::string &name)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] delete directory '%s'",
d.name,
name
};
#endif
return defaults.DeleteDir(name);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::GetFileSize(const std::string &name,
uint64_t *const s)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get file size '%s'",
d.name,
name
};
#endif
assert(s);
*s = fs::size(name);
return Status::OK();
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::GetFileModificationTime(const std::string &name,
uint64_t *const file_mtime)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get file mtime '%s'",
d.name,
name
};
#endif
return defaults.GetFileModificationTime(name, file_mtime);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::RenameFile(const std::string &s,
const std::string &t)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rename file '%s' to '%s'",
d.name,
s,
t
};
#endif
return defaults.RenameFile(s, t);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::LinkFile(const std::string &s,
const std::string &t)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] link file '%s' to '%s'",
d.name,
s,
t
};
#endif
return defaults.LinkFile(s, t);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::LockFile(const std::string &name,
FileLock** l)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] lock file '%s'",
d.name,
name
};
#endif
return defaults.LockFile(name, l);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::UnlockFile(FileLock *const l)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] unlock file lock:%p",
d.name,
l
};
#endif
return defaults.UnlockFile(l);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::GetTestDirectory(std::string *const path)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
return defaults.GetTestDirectory(path);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::GetAbsolutePath(const std::string &db_path,
std::string *const output_path)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get absolute path from '%s' ret:%p",
d.name,
db_path,
output_path
};
#endif
return defaults.GetAbsolutePath(db_path, output_path);
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::NewLogger(const std::string &name,
std::shared_ptr<Logger> *const result)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] new logger '%s' result:%p",
d.name,
name,
(const void *)result
};
#endif
return defaults.NewLogger(name, result);
}
catch(const std::exception &e)
{
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::GetHostName(char *const name,
uint64_t len)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get host name name:%p len:%lu",
d.name,
name,
len
};
#endif
return defaults.GetHostName(name, len);
}
catch(const std::exception &e)
{
return error_to_status{e};
}
uint64_t
ircd::db::database::env::NowMicros()
noexcept
{
return ircd::time<microseconds>();
}
/// Bug in rocksdb causes this to not be called all the time, specifically
/// for perf events.
uint64_t
ircd::db::database::env::NowNanos()
noexcept
{
return ircd::now<nanoseconds>().count();
}
rocksdb::Status
ircd::db::database::env::GetCurrentTime(int64_t *const unix_time)
noexcept try
{
return defaults.GetCurrentTime(unix_time);
}
catch(const std::exception &e)
{
return error_to_status{e};
}
std::string
ircd::db::database::env::TimeToString(uint64_t time)
noexcept try
{
return defaults.TimeToString(time);
}
catch(const std::exception &e)
{
throw panic
{
"[%s] time to string :%s",
d.name,
e.what()
};
}
void
ircd::db::database::env::SleepForMicroseconds(int micros)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] sleep for %d microseconds",
d.name,
micros
};
#endif
ctx::sleep(microseconds(micros));
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] sleep micros:%d :%s",
d.name,
micros,
e.what()
};
}
void
ircd::db::database::env::Schedule(void (*f)(void* arg),
void *const a,
Priority prio,
void *const tag,
void (*u)(void* arg))
noexcept try
{
const ctx::uninterruptible::nothrow ui;
//#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] schedule func:%p a:%p tag:%p u:%p prio:%s",
d.name,
f,
a,
tag,
u,
reflect(prio)
};
//#endif
assert(st);
auto &pool
{
*st->pool.at(prio)
};
pool(state::task
{
f, u, a
});
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] schedule func:%p a:%p tag:%p u:%p prio:%s",
d.name,
f,
a,
tag,
u,
reflect(prio)
};
}
int
ircd::db::database::env::UnSchedule(void *const tag,
const Priority prio)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
//#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] unschedule tag:%p prio:%s",
d.name,
tag,
reflect(prio)
};
//#endif
assert(st);
auto &pool
{
*st->pool.at(prio)
};
return pool.cancel(tag);
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] unschedule tag:%p prio:%s :%s",
d.name,
tag,
reflect(prio),
e.what()
};
return 0;
}
void
ircd::db::database::env::StartThread(void (*f)(void*),
void *const a)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
//#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] start thread func:%p a:%p",
d.name,
f,
a
};
//#endif
throw ircd::not_implemented
{
"Independent (non-pool) context spawning not yet implemented"
};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] start thread :%s",
d.name,
e.what()
};
}
void
ircd::db::database::env::WaitForJoin()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wait for all ctx to join",
d.name
};
#endif
assert(st);
for(auto &pool : st->pool)
if(pool)
pool->join();
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wait for join :%s",
d.name,
e.what()
};
}
unsigned int
ircd::db::database::env::GetThreadPoolQueueLen(Priority prio)
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get thread pool queue len prio:%s",
d.name,
reflect(prio)
};
#endif
assert(st);
const auto &pool
{
*st->pool.at(prio)
};
return pool.tasks.size();
}
catch(const std::exception &e)
{
throw panic
{
"[%s] set background threads :%s",
d.name,
e.what()
};
}
void
ircd::db::database::env::SetBackgroundThreads(int num,
Priority prio)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] set background threads prio:%s num:%d",
d.name,
reflect(prio),
num
};
#endif
assert(st);
auto &pool
{
*st->pool.at(prio)
};
pool.p.set(num);
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] set background threads prio:%s num:%d :%s",
d.name,
reflect(prio),
num,
e.what()
};
}
void
ircd::db::database::env::IncBackgroundThreadsIfNeeded(int num,
Priority prio)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] increase background threads num:%d prio:%s",
d.name,
num,
reflect(prio)
};
#endif
assert(st);
auto &pool
{
*st->pool.at(prio)
};
pool.p.add(num);
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] inc background threads num:%d prio:%s :%s",
d.name,
num,
reflect(prio),
e.what()
};
}
void
ircd::db::database::env::LowerThreadPoolIOPriority(Priority prio)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] lower thread pool priority prio:%s",
d.name,
reflect(prio)
};
#endif
assert(st);
auto &pool
{
*st->pool.at(prio)
};
switch(pool.iopri)
{
case IOPriority::IO_HIGH:
pool.iopri = IOPriority::IO_LOW;
break;
default:
break;
}
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] lower thread pool IO priority pool:%s :%s",
d.name,
reflect(prio),
e.what()
};
}
rocksdb::Status
ircd::db::database::env::GetThreadList(std::vector<ThreadStatus> *const list)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get thread list %p (%zu)",
d.name,
list,
list? list->size() : 0UL
};
#endif
throw ircd::not_implemented
{
"[%s] GetThreadList()", d.name
};
return defaults.GetThreadList(list);
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] get thread list:%p :%s",
d.name,
list,
e.what()
};
return error_to_status{e};
}
rocksdb::ThreadStatusUpdater *
ircd::db::database::env::GetThreadStatusUpdater()
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get thread status updater",
d.name,
};
#endif
return defaults.GetThreadStatusUpdater();
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] get thread status updater :%s",
d.name,
e.what()
};
return nullptr;
}
uint64_t
ircd::db::database::env::GetThreadID()
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get thread ID",
d.name,
};
#endif
return ctx::this_ctx::id();
}
catch(const std::exception &e)
{
throw panic
{
"[%s] get thread id :%s",
d.name,
e.what()
};
}
int
ircd::db::database::env::GetBackgroundThreads(Priority prio)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] get background threads prio:%s",
d.name,
reflect(prio)
};
#endif
assert(st);
const auto &pool
{
*st->pool.at(prio)
};
return pool.p.size();
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] get background threads prio:%s :%s",
d.name,
reflect(prio),
e.what()
};
return 0;
}
//
// writable_file
//
ircd::db::database::env::writable_file::writable_file(database *const &d,
const std::string &name,
const EnvOptions &env_opts,
const bool &trunc)
try
:d
{
*d
}
,env_opts
{
env_opts
}
,opts{[this, &trunc]
{
fs::fd::opts ret
{
std::ios::out |
(trunc? std::ios::trunc : std::ios::openmode(0))
};
ret.direct = this->env_opts.use_direct_writes;
ret.cloexec = this->env_opts.set_fd_cloexec;
return ret;
}()}
,fd
{
name, this->opts
}
,preallocation_block_size
{
ircd::info::page_size
}
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] opened wfile:%p fd:%d '%s'",
d->name,
this,
int(fd),
name
};
#endif
// Workaround a RocksDB bug which doesn't propagate EnvOptions properly
// on some constructions of WritableFile early on during db open. We'll
// get an env_opts.allow_fallocate==true here while it should be false
// from the DBOptions at d->opts. We use &= so it's not set to true when
// the caller specifically wants it false just for them.
assert(d && d->opts);
this->env_opts.allow_fallocate &= d->opts->allow_fallocate;
//assert(env_opts.allow_fallocate == d->opts->allow_fallocate);
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] opening wfile:%p `%s' :%s",
d->name,
this,
name,
e.what()
};
}
ircd::db::database::env::writable_file::~writable_file()
noexcept
{
Close();
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] closed wfile:%p fd:%d",
d.name,
this,
int(fd)
};
#endif
}
rocksdb::Status
ircd::db::database::env::writable_file::Close()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
if(!fd)
return Status::OK();
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d close",
d.name,
this,
int(fd)
};
#endif
fd = fs::fd{};
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] wfile:%p close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Flush()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d flush",
d.name,
this,
int(fd),
};
#endif
fs::sync_opts opts;
opts.metadata = false;
fs::flush(fd, opts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d flush :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d flush :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Sync()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d sync",
d.name,
this,
int(fd),
};
#endif
fs::sync_opts opts;
fs::sync(fd, opts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d sync :%s",
d.name,
this,
int(fd),
e.what(),
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d sync :%s",
d.name,
this,
int(fd),
e.what(),
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Fsync()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d fsync",
d.name,
this,
int(fd),
};
#endif
fs::sync_opts opts;
fs::flush(fd, opts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d fsync :%s",
d.name,
this,
int(fd),
e.what(),
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d fsync :%s",
d.name,
this,
int(fd),
e.what(),
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::RangeSync(uint64_t offset,
uint64_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] wfile:%p fd:%d range sync offset:%lu length:%lu",
d.name,
this,
int(fd),
offset,
length
};
#endif
// RocksDB sez they want us to initiate flushing of dirty pages
// asynchronously without waiting for completion. RocksDB allows
// this callback to be a no-op and do nothing at all.
//
// We plug this into a "range flush" gimmick in ircd::fs which almost
// certainly calls fdatasync() and ignores the range; it may one day
// on supporting platforms and in certain circumstances call
// sync_file_range() without any of the wait flags and respect the range.
fs::sync_opts opts;
opts.metadata = false;
fs::flush(fd, offset, length, opts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d range sync offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
offset,
length,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p fd:%d range sync offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
offset,
length,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Truncate(uint64_t size)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] wfile:%p fd:%d truncate to %lu bytes",
d.name,
this,
int(fd),
size
};
#endif
fs::write_opts wopts;
wopts.priority = this->prio_val;
wopts.nodelay = this->nodelay;
fs::truncate(fd, size, wopts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::InvalidateCache(size_t offset,
size_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
#endif
if(opts.direct)
return Status::OK();
fs::evict(fd, length, offset);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Append(const Slice &s)
noexcept try
{
assert(!opts.direct);
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d append:%p bytes:%zu",
d.name,
this,
int(fd),
data(s),
size(s),
};
#endif
fs::write_opts wopts;
wopts.priority = this->prio_val;
wopts.nodelay = this->nodelay;
const const_buffer buf
{
data(s), size(s)
};
fs::append(fd, buf, wopts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d append:%p size:%zu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p fd:%d append:%p size:%zu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::PositionedAppend(const Slice &s,
uint64_t offset)
noexcept try
{
assert(!opts.direct);
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] wfile:%p fd:%d append:%p bytes:%zu offset:%lu",
d.name,
this,
int(fd),
data(s),
size(s),
offset
};
#endif
fs::write_opts wopts;
wopts.priority = this->prio_val;
wopts.nodelay = this->nodelay;
wopts.offset = offset;
const const_buffer buf
{
data(s), size(s)
};
fs::append(fd, buf, wopts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d append:%p size:%zu offset:%zu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
offset,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p fd:%d append:%p size:%zu offset:%lu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
offset,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Allocate(uint64_t offset,
uint64_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d allocate offset:%lu length:%lu%s%s",
d.name,
this,
int(fd),
offset,
length,
env_opts.fallocate_with_keep_size? " KEEP_SIZE" : "",
env_opts.allow_fallocate? "" : " (DISABLED)"
};
#endif
if(!env_opts.allow_fallocate)
return Status::NotSupported();
_allocate(offset, length);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
offset,
length,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
offset,
length,
e.what()
};
return error_to_status{e};
}
void
ircd::db::database::env::writable_file::PrepareWrite(size_t offset,
size_t length)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p prepare write offset:%zu length:%zu",
d.name,
this,
offset,
length
};
#endif
if(!env_opts.allow_fallocate)
return;
_allocate(offset, length);
}
void
ircd::db::database::env::writable_file::_allocate(const size_t &offset,
const size_t &length)
{
const size_t first_block
{
offset / preallocation_block_size
};
const size_t last_block
{
(offset + length) / preallocation_block_size
};
const ssize_t missing_blocks
{
ssize_t(last_block) - preallocation_last_block
};
// Fast bail when the offset and length are behind the last block already
// allocated. We don't support windowing here. If this branch is not taken
// we'll fallocate() contiguously from the last fallocate() (or offset 0).
if(missing_blocks <= 0)
return;
const ssize_t start_block
{
preallocation_last_block + 1
};
const size_t allocate_offset
{
start_block * preallocation_block_size
};
const size_t allocate_length
{
missing_blocks * preallocation_block_size
};
fs::write_opts wopts;
wopts.offset = allocate_offset;
wopts.priority = this->prio_val;
wopts.nodelay = this->nodelay;
wopts.keep_size = env_opts.fallocate_with_keep_size;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d allocating %zd blocks after block:%zu offset:%lu length:%lu%s",
d.name,
this,
int(fd),
missing_blocks,
start_block,
allocate_offset,
allocate_length,
wopts.keep_size? " KEEP_SIZE" : ""
};
#endif
assert(env_opts.allow_fallocate);
assert(bool(d.opts));
assert(d.opts->allow_fallocate);
fs::allocate(fd, allocate_length, wopts);
this->preallocation_last_block = last_block;
}
void
ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size,
size_t *const last_allocated_block)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
*block_size = this->preallocation_block_size;
*last_allocated_block = this->preallocation_last_block;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p get preallocation block_size(%p):%zu last_block(%p):%zu",
d.name,
this,
block_size,
*block_size,
last_allocated_block,
*last_allocated_block
};
#endif
}
void
ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p set preallocation block size:%zu",
d.name,
this,
size
};
#endif
this->preallocation_block_size = size;
}
uint64_t
ircd::db::database::env::writable_file::GetFileSize()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p fd:%d get file size",
d.name,
this,
int(fd)
};
#endif
return fs::size(fd);
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p fd:%d get file size :%s",
d.name,
this,
int(fd),
e.what()
};
return 0;
}
void
ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p IO priority %s",
d.name,
this,
reflect(prio)
};
#endif
this->prio = prio;
switch(this->prio)
{
case IOPriority::IO_HIGH:
prio_val = -5; //TODO: magic
nodelay = true;
break;
default:
case IOPriority::IO_LOW:
prio_val = 5; //TODO: magic
nodelay = false;
break;
}
}
rocksdb::Env::IOPriority
ircd::db::database::env::writable_file::GetIOPriority()
noexcept
{
return prio;
}
void
ircd::db::database::env::writable_file::SetWriteLifeTimeHint(WriteLifeTimeHint hint)
noexcept try
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p hint:%d %s",
d.name,
this,
int(hint),
reflect(hint),
};
#endif
this->hint = hint;
fs::write_life(fd, this->hint);
}
catch(const std::system_error &e)
{
log::derror
{
log, "[%s] wfile:%p fd:%d setting write lifetime hint:%d %s :%s",
d.name,
this,
int(fd),
int(hint),
reflect(hint),
e.what(),
};
}
rocksdb::Env::WriteLifeTimeHint
ircd::db::database::env::writable_file::GetWriteLifeTimeHint()
noexcept
{
return hint;
}
size_t
ircd::db::database::env::writable_file::GetUniqueId(char *const id,
size_t max_size)
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] wfile:%p get unique id:%p max_size:%zu",
d.name,
this,
id,
max_size
};
#endif
const mutable_buffer buf
{
id, max_size
};
//return size(fs::uuid(fd, buf));
return 0;
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p get unique id :%s",
d.name,
this,
e.what()
};
return 0;
}
bool
ircd::db::database::env::writable_file::IsSyncThreadSafe()
const noexcept try
{
return true;
}
catch(...)
{
return false;
}
//
// writable_file_direct
//
ircd::db::database::env::writable_file_direct::writable_file_direct(database *const &d,
const std::string &name,
const EnvOptions &env_opts,
const bool &trunc)
:writable_file
{
d, name, env_opts, trunc
}
,alignment
{
fs::block_size(fd)
}
,logical_offset
{
!trunc?
fs::size(fd):
size_t(0)
}
,buffer
{
alignment, alignment
}
{
zero(buffer);
if(!aligned(logical_offset))
throw panic
{
"direct writable file requires read into buffer."
};
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::Close()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
if(!fd)
return Status::OK();
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p DIRECT fd:%d close",
d.name,
this,
int(fd)
};
#endif
if(logical_offset > 0 && fs::size(fd) != logical_offset)
{
fs::write_opts wopts;
wopts.priority = this->prio_val;
wopts.nodelay = true;
fs::truncate(fd, logical_offset, wopts);
}
fd = fs::fd{};
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p DIRECT close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] wfile:%p DIRECT close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::Truncate(uint64_t size)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes",
d.name,
this,
int(fd),
size
};
#endif
fs::write_opts wopts;
wopts.priority = this->prio_val;
wopts.nodelay = true;
fs::truncate(fd, size, wopts);
logical_offset = size;
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::Append(const Slice &s)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
if(!aligned(logical_offset) || !aligned(data(s)))
log::dwarning
{
log, "[%s] ALIGNMENT CHECK fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s",
d.name,
int(fd),
data(s),
aligned(data(s))? "" : "#AC",
size(s),
aligned(size(s))? "" : "#AC",
logical_offset,
aligned(logical_offset)? "" : "#AC"
};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s",
d.name,
this,
int(fd),
data(s),
aligned(data(s))? "" : "#AC",
size(s),
aligned(size(s))? "" : "#AC",
logical_offset,
aligned(logical_offset)? "" : "#AC"
};
#endif
const auto logical_check
{
logical_offset
};
const_buffer buf
{
slice(s)
};
while(!empty(buf))
buf = write(buf);
assert(logical_check + size(slice(s)) == logical_offset);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] wfile:%p DIRECT fd:%d append:%p size:%zu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p DIRECT fd:%d append:%p size:%zu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::PositionedAppend(const Slice &s,
uint64_t offset)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s offset:%zu%s",
d.name,
this,
int(fd),
data(s),
aligned(data(s))? "" : "#AC",
size(s),
aligned(size(s))? "" : "#AC",
offset,
aligned(offset)? "" : "#AC"
};
#endif
return rocksdb::Status::NotSupported();
}
uint64_t
ircd::db::database::env::writable_file_direct::GetFileSize()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p DIRECT fd:%d get file size",
d.name,
this,
int(fd)
};
#endif
const auto &ret
{
logical_offset
};
assert(ret <= fs::size(fd));
return ret;
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] wfile:%p DIRECT fd:%d get file size :%s",
d.name,
this,
int(fd),
e.what()
};
return 0;
}
/// (Internal) Append buffer. This function is the internal entry interface
/// for appending a buffer of any size and alignment to the file. It is
/// internal because it does no locking or error handling back to rocksdb,
/// because it's expected to be called from some virtual override which does
/// those things. This function will branch off as required to other internal
/// write_* functions to properly align and rebuffer the supplied buffer
/// eventually culminating in an aligned append to the file.
///
/// Calling this function will always result in some write to the file; even
/// if temporary buffering is used to achieve alignment; even if the entire
/// supplied buffer is hopelessly unaligned: the supplied data will be written
/// out some way or another during this call. This means there is no
/// requirement to care about flushing the temporary this->buffer after this
/// call is made. Note that the temporary this->buffer has no reason to be
/// touched by anything other than this function stack.
///
/// !!! NOTE !!!
/// There is a requirement to truncate the file after this call is made before
/// closing the file. If a crash occurs after a write() which was padded out
/// to the block alignment: the file size will reflect the padding when it is
/// opened at next startup; RocksDB will not detect its terminator character
/// sequence and consider this file corrupt.
/// !!!
///
/// - any offset
/// - any data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write(const const_buffer &buf_)
{
const_buffer buf
{
aligned(logical_offset) && aligned(data(buf_))?
write_aligned(buf_):
!aligned(logical_offset)?
write_unaligned_off(buf_):
buf_
};
assert(aligned(logical_offset) || empty(buf));
// buf can be empty here if it was entirely dealt with by the above
// branches and there's nothing else to do here.
if(empty(buf))
return buf;
// Branch on whether the buffer's address is aligned. If so, considering
// the logical_offset is aligned here we are then finished.
if(aligned(data(buf)))
return write_aligned(buf);
return write_unaligned_buf(buf);
}
/// * aligned offset
/// * unaligned data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write_unaligned_buf(const const_buffer &buf)
{
assert(aligned(logical_offset));
assert(!aligned(data(buf)));
assert(!aligned(buf));
// Window on the data between the given buffer's pointer and the next
// alignment boundary.
const const_buffer under_buf
{
data(buf), std::min(remain(uintptr_t(data(buf))), size(buf))
};
// Window on the data from the alignment boundary to the end of the
// given buffer.
const const_buffer remaining_buf
{
buf + size(under_buf)
};
assert(size(under_buf) <= size(buf));
assert(size(under_buf) + size(remaining_buf) == size(buf));
assert(data(buf) + size(under_buf) == data(remaining_buf));
assert(aligned(data(remaining_buf)) || empty(remaining_buf));
mutable_buffer dst(this->buffer);
consume(dst, copy(dst, under_buf));
consume(dst, copy(dst, remaining_buf));
consume(dst, zero(dst));
assert(empty(dst));
// Flush the temporary buffer.
_write__aligned(this->buffer, logical_offset);
logical_offset += size(under_buf);
return remaining_buf;
}
/// * unaligned offset
/// - any data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write_unaligned_off(const const_buffer &buf)
{
assert(!aligned(logical_offset));
// Window on the amount of buf we can take to fill up remaining space in
// the temporary this->buffer
const const_buffer src
{
data(buf), std::min(size(buf), buffer_remain())
};
// Window on the remaining space in the temporary this->buffer.
const mutable_buffer dst
{
this->buffer + buffer_consumed()
};
// Window on the remaining space in dst after src is copied to dst, if any.
const mutable_buffer pad
{
dst + size(src)
};
assert(size(dst) - size(pad) == size(src));
assert(size(src) + size(pad) == buffer_remain());
assert(size(src) + size(pad) + buffer_consumed() == alignment);
assert(size(src) + buffer_consumed() != alignment || empty(pad));
copy(dst, src);
zero(pad);
// Backtrack the logical_offset to the aligned offset where this->buffer's
// data starts.
const auto aligned_offset
{
align(logical_offset)
};
// Write the whole temporary this->buffer at the aligned offset.
_write__aligned(this->buffer, aligned_offset);
logical_offset += size(src);
assert(aligned(logical_offset) || size(buf) < alignment);
return const_buffer
{
buf + size(src)
};
}
/// * aligned offset
/// * aligned data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write_aligned(const const_buffer &buf)
{
assert(aligned(data(buf)));
assert(aligned(logical_offset));
// This portion at the end of buf did not fill out to the alignment.
const const_buffer overflow
{
_write_aligned(buf, logical_offset)
};
// The aligned portion was written so the offset is incremented here.
logical_offset += size(buf) - size(overflow);
assert(aligned(logical_offset));
assert(size(overflow) < alignment);
assert(aligned(data(overflow)) || empty(overflow));
assert(align(size(buf)) + size(overflow) == size(buf));
assert(blocks(size(buf)) * alignment + size(overflow) == size(buf));
if(!empty(overflow))
{
mutable_buffer dst(this->buffer);
consume(dst, copy(dst, overflow));
consume(dst, zero(dst));
assert(empty(dst));
_write__aligned(this->buffer, logical_offset);
logical_offset += size(overflow);
assert(!aligned(logical_offset));
}
return {};
}
/// * aligned data
/// * aligned offset
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::_write_aligned(const const_buffer &buf,
const uint64_t &offset)
{
assert(aligned(data(buf)));
assert(aligned(offset));
// This portion will be written
const const_buffer aligned_buf
{
data(buf), blocks(size(buf)) * alignment
};
// This trailing portion will be returned to caller
const const_buffer ret
{
data(buf) + size(aligned_buf), size(buf) - size(aligned_buf)
};
assert(!empty(aligned_buf) || size(buf) < alignment);
assert(size(aligned_buf) + size(ret) == size(buf));
assert(size(ret) < alignment);
if(empty(aligned_buf))
{
assert(size(ret) == size(buf));
return ret;
}
_write__aligned(aligned_buf, offset);
return ret;
}
/// * aligned offset
/// * aligned data
/// * aligned size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::_write__aligned(const const_buffer &buf,
const uint64_t &offset)
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] wfile:%p DIRECT fd:%d write:%p%s bytes:%zu%s offset:%zu%s (logical:%zu)",
d.name,
this,
int(fd),
data(buf),
aligned(data(buf))? "" : "#AC",
size(buf),
aligned(size(buf))? "" : "#AC",
offset,
aligned(offset)? "" : "#AC",
logical_offset
};
#endif
assert(aligned(buf));
assert(aligned(offset));
fs::write_opts wopts;
wopts.priority = this->prio_val;
wopts.nodelay = this->nodelay;
wopts.offset = offset;
fs::write(fd, buf, wopts);
return {};
}
size_t
ircd::db::database::env::writable_file_direct::buffer_consumed()
const
{
return likely(alignment != 0)?
logical_offset % alignment:
0UL;
}
size_t
ircd::db::database::env::writable_file_direct::buffer_remain()
const
{
return remain(logical_offset);
}
size_t
ircd::db::database::env::writable_file_direct::blocks(const size_t &value)
const
{
return likely(alignment != 0)?
value / alignment:
0UL;
}
size_t
ircd::db::database::env::writable_file_direct::remain(const size_t &value)
const
{
return likely(alignment != 0)?
alignment - (value - align(value)):
0UL;
}
size_t
ircd::db::database::env::writable_file_direct::align(const size_t &value)
const
{
return likely(alignment != 0)?
value - (value % alignment):
value;
}
bool
ircd::db::database::env::writable_file_direct::aligned(const const_buffer &buf)
const
{
return buffer::aligned(buf, alignment);
}
bool
ircd::db::database::env::writable_file_direct::aligned(const void *const &value)
const
{
return aligned(size_t(value));
}
bool
ircd::db::database::env::writable_file_direct::aligned(const size_t &value)
const
{
return (alignment == 0) || (value % alignment == 0UL);
}
//
// sequential_file
//
decltype(ircd::db::database::env::sequential_file::default_opts)
ircd::db::database::env::sequential_file::default_opts{[]
{
ircd::fs::fd::opts ret{std::ios_base::in};
return ret;
}()};
ircd::db::database::env::sequential_file::sequential_file(database *const &d,
const std::string &name,
const EnvOptions &env_opts)
try
:d
{
*d
}
,opts{[&env_opts]
{
fs::fd::opts ret{default_opts};
ret.direct = env_opts.use_direct_reads;
return ret;
}()}
,fd
{
name, this->opts
}
,_buffer_align
{
opts.direct?
fs::block_size(fd):
1
}
,offset
{
0
}
,aio
{
// When this flag is false then AIO operations are never used for this
// file; if true, AIO may be used if available and/or other conditions.
// Currently the /proc filesystem doesn't like AIO.
!startswith(name, "/proc/")
}
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] opened seqfile:%p fd:%d bs:%zu '%s'",
d->name,
this,
int(fd),
_buffer_align,
name
};
#endif
#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_SEQUENTIAL)
if(!opts.direct)
syscall(::posix_fadvise, fd, offset, off_t(0), POSIX_FADV_SEQUENTIAL);
#endif
}
catch(const std::system_error &e)
{
// Set the level to downplay some errors which the user shouldn't
// be alerted to with a log message under normal operations.
const log::level level
{
is(e.code(), std::errc::no_such_file_or_directory)?
log::level::DERROR:
log::level::ERROR
};
log::logf
{
log, level, "[%s] opening seqfile:%p `%s' (%d) :%s",
d->name,
this,
name,
e.code().value(),
e.what()
};
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] opening seqfile:%p `%s' :%s",
d->name,
this,
name,
e.what()
};
}
ircd::db::database::env::sequential_file::~sequential_file()
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] close seqfile:%p fd:%d",
d.name,
this,
int(fd)
};
#endif
}
rocksdb::Status
ircd::db::database::env::sequential_file::Read(size_t length,
Slice *const result,
char *const scratch)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::unique_lock lock
{
mutex, std::try_to_lock
};
// RocksDB sez that this call requires "External synchronization" i.e the
// caller, not this class is responsible for exclusion. We assert anyway.
if(unlikely(!bool(lock)))
throw panic
{
"[%s] Unexpected concurrent access to seqfile %p",
d.name,
this
};
assert(result);
assert(scratch);
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p",
d.name,
this,
result,
offset,
length,
scratch
};
#endif
fs::read_opts opts;
opts.offset = offset;
opts.aio = this->aio;
opts.all = false;
const mutable_buffer buf
{
scratch, length
};
const const_buffer read
{
fs::read(fd, buf, opts)
};
*result = slice(read);
this->offset += size(read);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::sequential_file::PositionedRead(uint64_t offset,
size_t length,
Slice *const result,
char *const scratch)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::unique_lock lock
{
mutex, std::try_to_lock
};
if(unlikely(!bool(lock)))
throw panic
{
"[%s] Unexpected concurrent access to seqfile %p",
d.name,
this
};
assert(result);
assert(scratch);
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] seqfile:%p offset:%zu positioned read:%p offset:%zu length:%zu scratch:%p",
d.name,
this,
this->offset,
result,
offset,
length,
scratch
};
#endif
fs::read_opts opts;
opts.offset = offset;
opts.aio = this->aio;
opts.all = false;
const mutable_buffer buf
{
scratch, length
};
const const_buffer read
{
fs::read(fd, buf, opts)
};
*result = slice(read);
this->offset = std::max(this->offset, off_t(offset + size(read)));
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] seqfile:%p positioned read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] seqfile:%p positioned read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::sequential_file::Skip(uint64_t size)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::unique_lock lock
{
mutex, std::try_to_lock
};
// RocksDB sez that this call requires "External synchronization" i.e the
// caller, not this class is responsible for exclusion. We assert anyway.
if(unlikely(!bool(lock)))
throw panic
{
"[%s] Unexpected concurrent access to seqfile %p",
d.name,
this
};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] seqfile:%p offset:zu skip:%zu",
d.name,
this,
offset,
size
};
#endif
offset += size;
return Status::OK();
}
rocksdb::Status
ircd::db::database::env::sequential_file::InvalidateCache(size_t offset,
size_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"[%s] seqfile:%p invalidate cache offset:%zu length:%zu",
d.name,
this,
offset,
length
};
#endif
if(opts.direct)
return Status::OK();
fs::evict(fd, length, offset);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
"[%s] seqfile:%p invalidate cache offset:%zu length:%zu :%s",
d.name,
this,
offset,
length,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
"[%s] seqfile:%p invalidate cache offset:%zu length:%zu :%s",
d.name,
this,
offset,
length,
e.what()
};
return error_to_status{e};
}
bool
ircd::db::database::env::sequential_file::use_direct_io()
const noexcept
{
return opts.direct;
}
size_t
ircd::db::database::env::sequential_file::GetRequiredBufferAlignment()
const noexcept
{
const auto &ret
{
_buffer_align
};
return ret;
}
//
// random_access_file
//
decltype(ircd::db::database::env::random_access_file::default_opts)
ircd::db::database::env::random_access_file::default_opts{[]
{
ircd::fs::fd::opts ret{std::ios_base::in};
return ret;
}()};
ircd::db::database::env::random_access_file::random_access_file(database *const &d,
const std::string &name,
const EnvOptions &env_opts)
try
:d
{
*d
}
,opts{[&env_opts]
{
fs::fd::opts ret{default_opts};
ret.direct = env_opts.use_direct_reads;
return ret;
}()}
,fd
{
name, this->opts
}
,_buffer_align
{
opts.direct?
fs::block_size(fd):
1
}
,aio
{
// When this flag is false then AIO operations are never used for this
// file; if true, AIO may be used if available and/or other conditions.
// Currently the /proc filesystem doesn't like AIO.
!startswith(name, "/proc/")
}
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] opened rfile:%p fd:%d bs:%zu '%s'",
d->name,
this,
int(fd),
_buffer_align,
name
};
#endif
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] opening rfile:%p `%s' :%s",
d->name,
this,
name,
e.what()
};
}
ircd::db::database::env::random_access_file::~random_access_file()
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] close rfile:%p fd:%d",
d.name,
this,
int(fd)
};
#endif
}
rocksdb::Status
ircd::db::database::env::random_access_file::Prefetch(uint64_t offset,
size_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rfile:%p prefetch offset:%zu length:%zu",
d.name,
this,
offset,
length
};
#endif
// Note RocksDB does not call our prefetch() when using direct IO.
assert(!this->opts.direct);
fs::prefetch(fd, length, offset);
return Status::OK();
}
catch(const std::system_error &e)
{
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] rfile:%p prefetch offset:%zu length:%zu :%s",
d.name,
this,
offset,
length,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::random_access_file::Read(uint64_t offset,
size_t length,
Slice *const result,
char *const scratch)
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
assert(result);
assert(scratch);
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p",
d.name,
this,
result,
offset,
length,
scratch
};
#endif
fs::read_opts opts;
opts.offset = offset;
opts.aio = this->aio;
opts.all = !this->opts.direct;
const mutable_buffer buf
{
scratch, length
};
assert(!this->opts.direct || buffer::aligned(buf, _buffer_align));
const auto read
{
fs::read(fd, buf, opts)
};
*result = slice(read);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::random_access_file::InvalidateCache(size_t offset,
size_t length)
noexcept
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rfile:%p invalidate cache offset:%zu length:%zu",
d.name,
this,
offset,
length
};
#endif
if(opts.direct)
return Status::OK();
fs::evict(fd, length, offset);
return Status::OK();
}
size_t
ircd::db::database::env::random_access_file::GetUniqueId(char *const id,
size_t max_size)
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rfile:%p get unique id:%p max_size:%zu",
d.name,
this,
id,
max_size
};
#endif
const mutable_buffer buf
{
id, max_size
};
//return size(fs::uuid(fd, buf));
return 0;
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] rfile:%p GetUniqueId id:%p max_size:%zu :%s",
d.name,
this,
id,
max_size,
e.what()
};
return 0;
}
void
ircd::db::database::env::random_access_file::Hint(AccessPattern pattern)
noexcept
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rfile:%p hint %s",
d.name,
this,
reflect(pattern)
};
#endif
}
bool
ircd::db::database::env::random_access_file::use_direct_io()
const noexcept
{
return opts.direct;
}
size_t
ircd::db::database::env::random_access_file::GetRequiredBufferAlignment()
const noexcept
{
const auto &ret
{
_buffer_align
};
return ret;
}
//
// random_rw_file
//
decltype(ircd::db::database::env::random_rw_file::default_opts)
ircd::db::database::env::random_rw_file::default_opts{[]
{
ircd::fs::fd::opts ret
{
std::ios_base::in | std::ios_base::out
};
return ret;
}()};
ircd::db::database::env::random_rw_file::random_rw_file(database *const &d,
const std::string &name,
const EnvOptions &env_opts)
try
:d
{
*d
}
,opts{[&env_opts]
{
fs::fd::opts ret{default_opts};
ret.direct = env_opts.use_direct_reads && env_opts.use_direct_writes;
return ret;
}()}
,fd
{
name, this->opts
}
,_buffer_align
{
opts.direct?
fs::block_size(fd):
1
}
,aio
{
true
}
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] opened rwfile:%p fd:%d bs:%zu '%s'",
d->name,
this,
int(fd),
_buffer_align,
name
};
#endif
}
catch(const std::exception &e)
{
log::error
{
log, "[%s] opening rwfile:%p `%s' :%s",
d->name,
this,
name,
e.what()
};
}
ircd::db::database::env::random_rw_file::~random_rw_file()
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] close rwfile:%p fd:%d '%s'",
d.name,
this,
int(fd)
};
#endif
}
rocksdb::Status
ircd::db::database::env::random_rw_file::Close()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] close rwfile:%p fd:%d '%s'",
d.name,
this,
int(fd)
};
#endif
this->fd = fs::fd{};
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
"[%s] rwfile:%p close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
"[%s] rwfile:%p close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::random_rw_file::Fsync()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rwfile:%p fd:%d fsync",
d.name,
int(fd),
this
};
#endif
fs::sync_opts opts;
fs::flush(fd, opts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
"[%s] rwfile:%p fd:%d fsync :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
"[%s] rwfile:%p fd:%d fsync :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::random_rw_file::Sync()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rwfile:%p fd:%d sync",
d.name,
int(fd),
this
};
#endif
fs::sync_opts opts;
fs::sync(fd, opts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
"[%s] rwfile:%p fd:%d sync :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
"[%s] rwfile:%p fd:%d sync :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::random_rw_file::Flush()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rwfile:%p fd:%d flush",
d.name,
int(fd),
this
};
#endif
fs::sync_opts opts;
opts.metadata = false;
fs::flush(fd, opts);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
"[%s] rwfile:%p fd:%d flush :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
"[%s] rwfile:%p fd:%d flush :%s",
d.name,
this,
int(fd),
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::random_rw_file::Read(uint64_t offset,
size_t length,
Slice *const result,
char *const scratch)
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
assert(result);
assert(scratch);
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p",
d.name,
this,
result,
offset,
length,
scratch
};
#endif
fs::read_opts opts;
opts.offset = offset;
opts.aio = this->aio;
opts.all = !this->opts.direct;
const mutable_buffer buf
{
scratch, length
};
const auto read
{
fs::read(fd, buf, opts)
};
*result = slice(read);
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
d.name,
this,
result,
offset,
length,
scratch,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::random_rw_file::Write(uint64_t offset,
const Slice &slice)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu",
d.name,
this,
int(fd),
data(slice),
size(slice),
offset
};
#endif
const const_buffer buf
{
data(slice), size(slice)
};
const auto read
{
fs::write(fd, buf, offset)
};
return Status::OK();
}
catch(const std::system_error &e)
{
log::error
{
log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu",
d.name,
this,
int(fd),
data(slice),
size(slice),
offset
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu",
d.name,
this,
int(fd),
data(slice),
size(slice),
offset
};
return error_to_status{e};
}
bool
ircd::db::database::env::random_rw_file::use_direct_io()
const noexcept
{
return opts.direct;
}
size_t
ircd::db::database::env::random_rw_file::GetRequiredBufferAlignment()
const noexcept
{
const auto &ret
{
_buffer_align
};
return ret;
}
//
// directory
//
ircd::db::database::env::directory::directory(database *const &d,
const std::string &name,
std::unique_ptr<Directory> defaults)
:d{*d}
,defaults{std::move(defaults)}
{
}
ircd::db::database::env::directory::~directory()
noexcept
{
}
rocksdb::Status
ircd::db::database::env::directory::Fsync()
noexcept
{
const ctx::uninterruptible::nothrow ui;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "[%s] directory:%p fsync",
d.name,
this
};
#endif
return defaults->Fsync();
}
//
// file_lock
//
ircd::db::database::env::file_lock::file_lock(database *const &d)
:d{*d}
{
}
ircd::db::database::env::file_lock::~file_lock()
noexcept
{
}
///////////////////////////////////////////////////////////////////////////////
//
// db/database/env/state.h
//
//
// env::state::state
//
ircd::db::database::env::state::state(database *const &d)
:d{*d}
{
for(size_t i(0); i < pool.size(); ++i)
pool.at(i) = std::make_unique<struct pool>(this->d, Priority(i));
}
ircd::db::database::env::state::~state()
noexcept
{
log::debug
{
log, "[%s] Shutting down environment...",
d.name
};
}
//
// state::pool
//
decltype(ircd::db::database::env::state::pool::stack_size)
ircd::db::database::env::state::pool::stack_size
{
{ "name", "ircd.db.env.pool.stack_size" },
{ "default", long(128_KiB) },
};
//
// state::pool::pool
//
ircd::db::database::env::state::pool::pool(database &d,
const Priority &pri)
:d{d}
,pri{pri}
,iopri
{
pri == Priority::HIGH?
IOPriority::IO_HIGH:
IOPriority::IO_LOW
}
,name
{
fmt::sprintf
{
namebuf, "db.%s", reflect(pri)
}
}
,popts
{
size_t(stack_size), // stack size of worker
0, // initial workers
-1, // queue hard limit
-1, // queue soft limit
}
,p
{
this->name, // name of pool
this->popts // pool options
}
{
}
ircd::db::database::env::state::pool::~pool()
noexcept
{
join();
}
void
ircd::db::database::env::state::pool::join()
try
{
if(!tasks.empty() || p.pending())
log::warning
{
log, "[%s] Waiting for tasks:%zu queued:%zu active:%zu in pool '%s'",
d.name,
tasks.size(),
p.queued(),
p.active(),
ctx::name(p)
};
this->wait();
assert(!p.pending());
assert(tasks.empty());
p.join();
log::debug
{
log, "[%s] Terminated pool '%s'.",
d.name,
ctx::name(p)
};
}
catch(const std::exception &e)
{
log::critical
{
log, "[%s] Environment pool '%s' join :%s",
d.name,
ctx::name(p),
e.what()
};
throw;
}
void
ircd::db::database::env::state::pool::wait()
{
dock.wait([this]
{
return tasks.empty() && !p.pending();
});
}
void
ircd::db::database::env::state::pool::operator()(task &&task)
{
assert(task._id == 0);
task._id = ++taskctr;
tasks.emplace_back(std::move(task));
log::debug
{
log, "[%s] pool:%s queue:%zu QUEUE task:%lu func:%p arg:%p",
this->d.name,
ctx::name(p),
tasks.size(),
tasks.back()._id,
tasks.back().func,
tasks.back().arg,
};
p([this]
{
if(tasks.empty())
return;
const ctx::uninterruptible::nothrow ui;
const auto task{std::move(tasks.front())};
tasks.pop_front();
log::debug
{
log, "[%s] pool:%s queue:%zu ENTER task:%lu func:%p arg:%p",
this->d.name,
ctx::name(p),
tasks.size(),
task._id,
task.func,
task.arg,
};
const ctx::slice_usage_warning message
{
"[%s] pool:%s task:%p",
this->d.name,
ctx::name(p),
task.func
};
// Execute the task
task.func(task.arg);
log::debug
{
log, "[%s] pool:%s queue:%zu LEAVE task:%zu func:%p arg:%p",
this->d.name,
ctx::name(p),
tasks.size(),
task._id,
task.func,
task.arg,
};
dock.notify_all();
});
}
size_t
ircd::db::database::env::state::pool::cancel(void *const &tag)
{
size_t i(0);
auto it(begin(tasks));
while(it != end(tasks))
{
auto &task(*it);
log::debug
{
log, "[%s] pool:%s tasks:%zu cancel#%zu task:%lu func:%p cancel:%p arg:%p tag:%p",
d.name,
ctx::name(p),
tasks.size(),
i,
task._id,
task.func,
task.cancel,
task.arg,
tag
};
task.cancel(task.arg);
it = tasks.erase(it);
++i;
}
dock.notify_all();
return i;
}