// Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors // Copyright (C) 2016-2018 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. #include "db.h" decltype(ircd::db::database::env::log) ircd::db::database::env::log { "db.env" }; // // env::env // ircd::db::database::env::env(database *const &d) :d{*d}, st{std::make_unique(d)} { } ircd::db::database::env::~env() noexcept { } rocksdb::Status ircd::db::database::env::NewSequentialFile(const std::string &name, std::unique_ptr *const r, const EnvOptions &options) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] new sequential file '%s' options:%p [mm:%b direct:%b]", d.name, name, &options, options.use_mmap_reads, options.use_direct_reads, }; #endif *r = std::make_unique(&d, name, options); return Status::OK(); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::NewRandomAccessFile(const std::string &name, std::unique_ptr *const r, const EnvOptions &options) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] new random access file '%s' options:%p [mm:%b direct:%b bufsz:%zu readahead:%zu]", d.name, name, &options, options.use_mmap_reads, options.use_direct_reads, options.random_access_max_buffer_size, options.compaction_readahead_size, }; #endif *r = std::make_unique(&d, name, options); return Status::OK(); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::NewWritableFile(const std::string &name, std::unique_ptr *const r, const EnvOptions &options) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] new writable file '%s' options:%p [mm:%b direct:%b rl:%p bufsz:%zu syncsz:%zu]", d.name, name, &options, options.use_mmap_writes, options.use_direct_writes, options.rate_limiter, options.writable_file_max_buffer_size, options.bytes_per_sync, }; #endif if(options.use_direct_writes) *r = std::make_unique(&d, name, options, true); else *r = std::make_unique(&d, name, options, true); return Status::OK(); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::ReopenWritableFile(const std::string &name, std::unique_ptr *const r, const EnvOptions &options) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] reopen writable file '%s' options:%p", d.name, name, &options }; #endif if(options.use_direct_writes) *r = std::make_unique(&d, name, options, false); else *r = std::make_unique(&d, name, options, false); return Status::OK(); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status __attribute__((unused)) ircd::db::database::env::ReuseWritableFile(const std::string &name, const std::string &old_name, std::unique_ptr *const r, const EnvOptions &options) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] reuse writable file '%s' old '%s' options:%p", d.name, name, old_name, &options }; #endif throw ircd::not_implemented { "[%s] ReuseWritableFile(name:'%s' old:'%s')", d.name, name, old_name }; return Status::OK(); //return defaults.ReuseWritableFile(name, old_name, r, options); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::NewRandomRWFile(const std::string &name, std::unique_ptr *const result, const EnvOptions &options) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] new random read/write file '%s' options:%p", d.name, name, &options }; #endif *result = std::make_unique(&d, name, options); return Status::OK(); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::NewDirectory(const std::string &name, std::unique_ptr *const result) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] new directory '%s'", d.name, name }; #endif std::unique_ptr defaults; const auto ret { this->defaults.NewDirectory(name, &defaults) }; *result = std::make_unique(&d, name, std::move(defaults)); return ret; } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::FileExists(const std::string &f) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] file exists '%s'", d.name, f }; #endif return defaults.FileExists(f); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::GetChildren(const std::string &dir, std::vector *const r) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get children of directory '%s'", d.name, dir }; #endif return defaults.GetChildren(dir, r); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::GetChildrenFileAttributes(const std::string &dir, std::vector *const result) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get children file attributes of directory '%s'", d.name, dir }; #endif return defaults.GetChildrenFileAttributes(dir, result); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::DeleteFile(const std::string &name) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] delete file '%s'", d.name, name }; #endif return defaults.DeleteFile(name); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::CreateDir(const std::string &name) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] create directory '%s'", d.name, name }; #endif return defaults.CreateDir(name); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::CreateDirIfMissing(const std::string &name) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] create directory if missing '%s'", d.name, name }; #endif return defaults.CreateDirIfMissing(name); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::DeleteDir(const std::string &name) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] delete directory '%s'", d.name, name }; #endif return defaults.DeleteDir(name); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::GetFileSize(const std::string &name, uint64_t *const s) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get file size '%s'", d.name, name }; #endif assert(s); *s = fs::size(name); return Status::OK(); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::GetFileModificationTime(const std::string &name, uint64_t *const file_mtime) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get file mtime '%s'", d.name, name }; #endif return defaults.GetFileModificationTime(name, file_mtime); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::RenameFile(const std::string &s, const std::string &t) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rename file '%s' to '%s'", d.name, s, t }; #endif return defaults.RenameFile(s, t); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::LinkFile(const std::string &s, const std::string &t) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] link file '%s' to '%s'", d.name, s, t }; #endif return defaults.LinkFile(s, t); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::LockFile(const std::string &name, FileLock** l) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] lock file '%s'", d.name, name }; #endif return defaults.LockFile(name, l); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::UnlockFile(FileLock *const l) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] unlock file lock:%p", d.name, l }; #endif return defaults.UnlockFile(l); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::GetTestDirectory(std::string *const path) noexcept try { const ctx::uninterruptible::nothrow ui; return defaults.GetTestDirectory(path); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::GetAbsolutePath(const std::string &db_path, std::string *const output_path) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get absolute path from '%s' ret:%p", d.name, db_path, output_path }; #endif return defaults.GetAbsolutePath(db_path, output_path); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::NewLogger(const std::string &name, std::shared_ptr *const result) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] new logger '%s' result:%p", d.name, name, (const void *)result }; #endif return defaults.NewLogger(name, result); } catch(const std::exception &e) { return error_to_status{e}; } rocksdb::Status ircd::db::database::env::GetHostName(char *const name, uint64_t len) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get host name name:%p len:%lu", d.name, name, len }; #endif return defaults.GetHostName(name, len); } catch(const std::exception &e) { return error_to_status{e}; } [[gnu::hot]] uint64_t ircd::db::database::env::NowMicros() noexcept { return ircd::time(); } /// Bug in rocksdb causes this to not be called all the time, specifically /// for perf events. [[gnu::hot]] uint64_t ircd::db::database::env::NowNanos() noexcept { return ircd::now().count(); } rocksdb::Status ircd::db::database::env::GetCurrentTime(int64_t *const unix_time) noexcept try { return defaults.GetCurrentTime(unix_time); } catch(const std::exception &e) { return error_to_status{e}; } std::string ircd::db::database::env::TimeToString(uint64_t time) noexcept try { return defaults.TimeToString(time); } catch(const std::exception &e) { throw panic { "[%s] time to string :%s", d.name, e.what() }; } void ircd::db::database::env::SleepForMicroseconds(int micros) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] sleep for %d microseconds", d.name, micros }; #endif ctx::sleep(microseconds(micros)); } catch(const std::exception &e) { log::critical { log, "[%s] sleep micros:%d :%s", d.name, micros, e.what() }; } void ircd::db::database::env::Schedule(void (*f)(void* arg), void *const a, Priority prio, void *const tag, void (*u)(void* arg)) noexcept try { const ctx::uninterruptible::nothrow ui; //#ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] schedule func:%p a:%p tag:%p u:%p prio:%s", d.name, f, a, tag, u, reflect(prio) }; //#endif assert(st); auto &pool { *st->pool.at(prio) }; pool(state::task { f, u, a }); } catch(const std::exception &e) { log::critical { log, "[%s] schedule func:%p a:%p tag:%p u:%p prio:%s", d.name, f, a, tag, u, reflect(prio) }; } int ircd::db::database::env::UnSchedule(void *const tag, const Priority prio) noexcept try { const ctx::uninterruptible::nothrow ui; //#ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] unschedule tag:%p prio:%s", d.name, tag, reflect(prio) }; //#endif assert(st); auto &pool { *st->pool.at(prio) }; return pool.cancel(tag); } catch(const std::exception &e) { log::critical { log, "[%s] unschedule tag:%p prio:%s :%s", d.name, tag, reflect(prio), e.what() }; return 0; } void ircd::db::database::env::StartThread(void (*f)(void*), void *const a) noexcept try { const ctx::uninterruptible::nothrow ui; //#ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] start thread func:%p a:%p", d.name, f, a }; //#endif throw ircd::not_implemented { "Independent (non-pool) context spawning not yet implemented" }; } catch(const std::exception &e) { log::critical { log, "[%s] start thread :%s", d.name, e.what() }; } void ircd::db::database::env::WaitForJoin() noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wait for all ctx to join", d.name }; #endif assert(st); for(auto &pool : st->pool) if(pool) pool->join(); } catch(const std::exception &e) { log::critical { log, "[%s] wait for join :%s", d.name, e.what() }; } unsigned int ircd::db::database::env::GetThreadPoolQueueLen(Priority prio) const noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get thread pool queue len prio:%s", d.name, reflect(prio) }; #endif assert(st); const auto &pool { *st->pool.at(prio) }; return pool.tasks.size(); } catch(const std::exception &e) { throw panic { "[%s] set background threads :%s", d.name, e.what() }; } void ircd::db::database::env::SetBackgroundThreads(int num, Priority prio) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] set background threads prio:%s num:%d", d.name, reflect(prio), num }; #endif assert(st); auto &pool { *st->pool.at(prio) }; pool.p.set(num); } catch(const std::exception &e) { log::critical { log, "[%s] set background threads prio:%s num:%d :%s", d.name, reflect(prio), num, e.what() }; } void ircd::db::database::env::IncBackgroundThreadsIfNeeded(int num, Priority prio) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] increase background threads num:%d prio:%s", d.name, num, reflect(prio) }; #endif assert(st); auto &pool { *st->pool.at(prio) }; pool.p.add(num); } catch(const std::exception &e) { log::critical { log, "[%s] inc background threads num:%d prio:%s :%s", d.name, num, reflect(prio), e.what() }; } void ircd::db::database::env::LowerThreadPoolIOPriority(Priority prio) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] lower thread pool priority prio:%s", d.name, reflect(prio) }; #endif assert(st); auto &pool { *st->pool.at(prio) }; switch(pool.iopri) { case IOPriority::IO_HIGH: pool.iopri = IOPriority::IO_LOW; break; default: break; } } catch(const std::exception &e) { log::critical { log, "[%s] lower thread pool IO priority pool:%s :%s", d.name, reflect(prio), e.what() }; } rocksdb::Status ircd::db::database::env::GetThreadList(std::vector *const list) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get thread list %p (%zu)", d.name, list, list? list->size() : 0UL }; #endif throw ircd::not_implemented { "[%s] GetThreadList()", d.name }; return defaults.GetThreadList(list); } catch(const std::exception &e) { log::critical { log, "[%s] get thread list:%p :%s", d.name, list, e.what() }; return error_to_status{e}; } rocksdb::ThreadStatusUpdater * ircd::db::database::env::GetThreadStatusUpdater() const noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get thread status updater", d.name, }; #endif return defaults.GetThreadStatusUpdater(); } catch(const std::exception &e) { log::critical { log, "[%s] get thread status updater :%s", d.name, e.what() }; return nullptr; } uint64_t ircd::db::database::env::GetThreadID() const noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get thread ID", d.name, }; #endif return ctx::this_ctx::id(); } catch(const std::exception &e) { throw panic { "[%s] get thread id :%s", d.name, e.what() }; } int ircd::db::database::env::GetBackgroundThreads(Priority prio) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] get background threads prio:%s", d.name, reflect(prio) }; #endif assert(st); const auto &pool { *st->pool.at(prio) }; return pool.p.size(); } catch(const std::exception &e) { log::critical { log, "[%s] get background threads prio:%s :%s", d.name, reflect(prio), e.what() }; return 0; } int8_t ircd::db::database::env::make_nice(const Priority &prio) { switch(prio) { case Priority::HIGH: return -5; case Priority::LOW: return 5; case Priority::BOTTOM: return 20; default: return 0; } } int8_t ircd::db::database::env::make_nice(const IOPriority &prio) { switch(prio) { case IOPriority::IO_HIGH: return -5; case IOPriority::IO_LOW: return 5; default: return 0; } } // // writable_file // ircd::db::database::env::writable_file::writable_file(database *const &d, const std::string &name, const EnvOptions &env_opts, const bool &trunc) try :d { *d } ,env_opts { env_opts } ,opts { .mode = std::ios::out | (trunc? std::ios::trunc : std::ios::openmode(0)), .direct = this->env_opts.use_direct_writes, .cloexec = this->env_opts.set_fd_cloexec, .dontneed = true, } ,ionice { ctx::ionice(ctx::cur()) } ,fd { name, this->opts } ,preallocation_block_size { ircd::info::page_size } { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] opened wfile:%p fd:%d '%s'", d->name, this, int(fd), name }; #endif // Workaround a RocksDB bug which doesn't propagate EnvOptions properly // on some constructions of WritableFile early on during db open. We'll // get an env_opts.allow_fallocate==true here while it should be false // from the DBOptions at d->opts. We use &= so it's not set to true when // the caller specifically wants it false just for them. assert(d && d->opts); this->env_opts.allow_fallocate &= d->opts->allow_fallocate; //assert(env_opts.allow_fallocate == d->opts->allow_fallocate); } catch(const std::exception &e) { log::error { log, "[%s] opening wfile:%p `%s' :%s", d->name, this, name, e.what() }; } ircd::db::database::env::writable_file::~writable_file() noexcept { Close(); #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] closed wfile:%p fd:%d", d.name, this, int(fd) }; #endif } rocksdb::Status ircd::db::database::env::writable_file::Close() noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; if(!fd) return Status::OK(); #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d close", d.name, this, int(fd) }; #endif assert(logical_size == -1UL || logical_size == fs::size(fd)); fd = fs::fd{}; return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p close :%s", d.name, this, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::error { log, "[%s] wfile:%p close :%s", d.name, this, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::Flush() noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d flush", d.name, this, int(fd), }; #endif fs::sync_opts opts; opts.metadata = false; opts.priority = ionice; opts.nodelay = nodelay; fs::flush(fd, opts); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d flush :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::error { log, "[%s] wfile:%p fd:%d flush :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::Sync() noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d sync", d.name, this, int(fd), }; #endif fs::sync_opts opts; opts.priority = ionice; opts.nodelay = true; fs::sync(fd, opts); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d sync :%s", d.name, this, int(fd), e.what(), }; return error_to_status{e}; } catch(const std::exception &e) { log::error { log, "[%s] wfile:%p fd:%d sync :%s", d.name, this, int(fd), e.what(), }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::Fsync() noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d fsync", d.name, this, int(fd), }; #endif fs::sync_opts opts; opts.priority = ionice; opts.nodelay = true; fs::flush(fd, opts); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d fsync :%s", d.name, this, int(fd), e.what(), }; return error_to_status{e}; } catch(const std::exception &e) { log::error { log, "[%s] wfile:%p fd:%d fsync :%s", d.name, this, int(fd), e.what(), }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::RangeSync(uint64_t offset, uint64_t length) noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] wfile:%p fd:%d range sync offset:%lu length:%lu", d.name, this, int(fd), offset, length }; #endif // RocksDB sez they want us to initiate flushing of dirty pages // asynchronously without waiting for completion. RocksDB allows // this callback to be a no-op and do nothing at all. // // We plug this into a "range flush" gimmick in ircd::fs which almost // certainly calls fdatasync() and ignores the range; it may one day // on supporting platforms and in certain circumstances call // sync_file_range() without any of the wait flags and respect the range. fs::sync_opts opts; opts.metadata = false; opts.priority = ionice; opts.nodelay = nodelay; fs::flush(fd, offset, length, opts); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d range sync offset:%zu length:%zu :%s", d.name, this, int(fd), offset, length, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p fd:%d range sync offset:%zu length:%zu :%s", d.name, this, int(fd), offset, length, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::Truncate(uint64_t size) noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] wfile:%p fd:%d truncate to %lu bytes", d.name, this, int(fd), size }; #endif fs::write_opts wopts; wopts.priority = ionice; wopts.nodelay = nodelay; wopts.interruptible = false; fs::truncate(fd, size, wopts); logical_size = size; return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d truncate to %lu bytes :%s", d.name, this, int(fd), size, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p fd:%d truncate to %lu bytes :%s", d.name, this, int(fd), size, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::InvalidateCache(size_t offset, size_t length) noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu", d.name, this, int(fd), offset, length }; #endif if(likely(logical_size != -1UL) && offset + length > logical_size) logical_size = -1UL; if(opts.direct) return Status::OK(); fs::evict(fd, length, offset); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu", d.name, this, int(fd), offset, length }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu", d.name, this, int(fd), offset, length }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::Append(const Slice &s) noexcept try { assert(!opts.direct); const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d append:%p bytes:%zu", d.name, this, int(fd), data(s), size(s), }; #endif fs::write_opts wopts; wopts.priority = ionice; wopts.nodelay = nodelay; wopts.interruptible = false; const const_buffer buf { data(s), size(s) }; const const_buffer appended { fs::append(fd, buf, wopts) }; if(likely(logical_size != -1UL)) logical_size += size(appended); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d append:%p size:%zu :%s", d.name, this, int(fd), data(s), size(s), e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p fd:%d append:%p size:%zu :%s", d.name, this, int(fd), data(s), size(s), e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::PositionedAppend(const Slice &s, uint64_t offset) noexcept try { assert(!opts.direct); const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] wfile:%p fd:%d append:%p bytes:%zu offset:%lu", d.name, this, int(fd), data(s), size(s), offset }; #endif fs::write_opts wopts; wopts.priority = ionice; wopts.nodelay = nodelay; wopts.offset = offset; wopts.interruptible = false; const const_buffer buf { data(s), size(s) }; const const_buffer appended { fs::append(fd, buf, wopts) }; const auto append_break { offset + size(appended) }; if(likely(logical_size != -1UL) && append_break > logical_size) logical_size = append_break; return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d append:%p size:%zu offset:%zu :%s", d.name, this, int(fd), data(s), size(s), offset, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p fd:%d append:%p size:%zu offset:%lu :%s", d.name, this, int(fd), data(s), size(s), offset, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file::Allocate(uint64_t offset, uint64_t length) noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d allocate offset:%lu length:%lu%s%s", d.name, this, int(fd), offset, length, env_opts.fallocate_with_keep_size? " KEEP_SIZE" : "", env_opts.allow_fallocate? "" : " (DISABLED)" }; #endif if(!env_opts.allow_fallocate) return Status::NotSupported(); _allocate(offset, length); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p fd:%d allocate offset:%zu length:%zu :%s", d.name, this, int(fd), offset, length, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p fd:%d allocate offset:%zu length:%zu :%s", d.name, this, int(fd), offset, length, e.what() }; return error_to_status{e}; } void ircd::db::database::env::writable_file::PrepareWrite(size_t offset, size_t length) noexcept { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p prepare write offset:%zu length:%zu", d.name, this, offset, length }; #endif if(!env_opts.allow_fallocate) return; _allocate(offset, length); } void ircd::db::database::env::writable_file::_allocate(const size_t &offset, const size_t &length) { const size_t first_block { offset / preallocation_block_size }; const size_t last_block { (offset + length) / preallocation_block_size }; const ssize_t missing_blocks { ssize_t(last_block) - preallocation_last_block }; // Fast bail when the offset and length are behind the last block already // allocated. We don't support windowing here. If this branch is not taken // we'll fallocate() contiguously from the last fallocate() (or offset 0). if(missing_blocks <= 0) return; const ssize_t start_block { preallocation_last_block + 1 }; const size_t allocate_offset { start_block * preallocation_block_size }; const size_t allocate_length { missing_blocks * preallocation_block_size }; fs::write_opts wopts; wopts.offset = allocate_offset; wopts.priority = ionice; wopts.nodelay = nodelay; wopts.keep_size = env_opts.fallocate_with_keep_size; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d allocating %zd blocks after block:%zu offset:%lu length:%lu%s", d.name, this, int(fd), missing_blocks, start_block, allocate_offset, allocate_length, wopts.keep_size? " KEEP_SIZE" : "" }; #endif assert(env_opts.allow_fallocate); assert(bool(d.opts)); assert(d.opts->allow_fallocate); fs::allocate(fd, allocate_length, wopts); this->preallocation_last_block = last_block; } void ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size, size_t *const last_allocated_block) noexcept { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; *block_size = this->preallocation_block_size; *last_allocated_block = this->preallocation_last_block; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p get preallocation block_size(%p):%zu last_block(%p):%zu", d.name, this, block_size, *block_size, last_allocated_block, *last_allocated_block }; #endif } void ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size) noexcept { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p set preallocation block size:%zu", d.name, this, size }; #endif this->preallocation_block_size = size; } uint64_t ircd::db::database::env::writable_file::GetFileSize() noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p fd:%d get file size; cached:%zd", d.name, this, int(fd), logical_size, }; #endif if(logical_size == -1UL) logical_size = fs::size(fd); return logical_size; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p fd:%d get file size :%s", d.name, this, int(fd), e.what() }; return 0; } void ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio) noexcept { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p IO priority %s", d.name, this, reflect(prio) }; #endif this->prio = prio; this->ionice = make_nice(prio); switch(this->prio) { case IOPriority::IO_HIGH: nodelay = true; break; default: case IOPriority::IO_LOW: nodelay = false; break; } } rocksdb::Env::IOPriority ircd::db::database::env::writable_file::GetIOPriority() noexcept { return prio; } void ircd::db::database::env::writable_file::SetWriteLifeTimeHint(WriteLifeTimeHint hint) noexcept try { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p hint:%d %s", d.name, this, int(hint), reflect(hint), }; #endif this->hint = hint; fs::write_life(fd, this->hint); } catch(const std::system_error &e) { log::derror { log, "[%s] wfile:%p fd:%d setting write lifetime hint:%d %s :%s", d.name, this, int(fd), int(hint), reflect(hint), e.what(), }; } rocksdb::Env::WriteLifeTimeHint ircd::db::database::env::writable_file::GetWriteLifeTimeHint() noexcept { return hint; } size_t ircd::db::database::env::writable_file::GetUniqueId(char *const id, size_t max_size) const noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] wfile:%p get unique id:%p max_size:%zu", d.name, this, id, max_size }; #endif const mutable_buffer buf { id, max_size }; //return size(fs::uuid(fd, buf)); return 0; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p get unique id :%s", d.name, this, e.what() }; return 0; } bool ircd::db::database::env::writable_file::IsSyncThreadSafe() const noexcept try { return true; } catch(...) { return false; } // // writable_file_direct // ircd::db::database::env::writable_file_direct::writable_file_direct(database *const &d, const std::string &name, const EnvOptions &env_opts, const bool &trunc) :writable_file { d, name, env_opts, trunc } ,alignment { fs::block_size(fd) } ,logical_offset { !trunc? fs::size(fd): size_t(0) } ,buffer { alignment, alignment } { zero(buffer); if(!aligned(logical_offset)) throw panic { "direct writable file requires read into buffer." }; } rocksdb::Status ircd::db::database::env::writable_file_direct::Close() noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; if(!fd) return Status::OK(); #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p DIRECT fd:%d close", d.name, this, int(fd) }; #endif if(logical_offset > 0 && fs::size(fd) != logical_offset) { fs::write_opts wopts; wopts.priority = this->ionice; wopts.nodelay = true; wopts.interruptible = false; fs::truncate(fd, logical_offset, wopts); } fd = fs::fd{}; return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p DIRECT close :%s", d.name, this, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::error { log, "[%s] wfile:%p DIRECT close :%s", d.name, this, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file_direct::Truncate(uint64_t size) noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes", d.name, this, int(fd), size }; #endif fs::write_opts wopts; wopts.priority = this->ionice; wopts.nodelay = true; wopts.interruptible = false; fs::truncate(fd, size, wopts); logical_offset = size; return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes :%s", d.name, this, int(fd), size, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes :%s", d.name, this, int(fd), size, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file_direct::Append(const Slice &s) noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; if(!aligned(logical_offset) || !aligned(data(s))) log::dwarning { log, "[%s] ALIGNMENT CHECK fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s", d.name, int(fd), data(s), aligned(data(s))? "" : "#AC", size(s), aligned(size(s))? "" : "#AC", logical_offset, aligned(logical_offset)? "" : "#AC" }; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s", d.name, this, int(fd), data(s), aligned(data(s))? "" : "#AC", size(s), aligned(size(s))? "" : "#AC", logical_offset, aligned(logical_offset)? "" : "#AC" }; #endif const auto logical_check { logical_offset }; const_buffer buf { slice(s) }; while(!empty(buf)) buf = write(buf); assert(logical_check + size(slice(s)) == logical_offset); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] wfile:%p DIRECT fd:%d append:%p size:%zu :%s", d.name, this, int(fd), data(s), size(s), e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p DIRECT fd:%d append:%p size:%zu :%s", d.name, this, int(fd), data(s), size(s), e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::writable_file_direct::PositionedAppend(const Slice &s, uint64_t offset) noexcept { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s offset:%zu%s", d.name, this, int(fd), data(s), aligned(data(s))? "" : "#AC", size(s), aligned(size(s))? "" : "#AC", offset, aligned(offset)? "" : "#AC" }; #endif return rocksdb::Status::NotSupported(); } uint64_t ircd::db::database::env::writable_file_direct::GetFileSize() noexcept try { const ctx::uninterruptible::nothrow ui; const std::lock_guard lock{mutex}; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p DIRECT fd:%d get file size", d.name, this, int(fd) }; #endif const auto &ret { logical_offset }; assert(ret <= fs::size(fd)); return ret; } catch(const std::exception &e) { log::critical { log, "[%s] wfile:%p DIRECT fd:%d get file size :%s", d.name, this, int(fd), e.what() }; return 0; } /// (Internal) Append buffer. This function is the internal entry interface /// for appending a buffer of any size and alignment to the file. It is /// internal because it does no locking or error handling back to rocksdb, /// because it's expected to be called from some virtual override which does /// those things. This function will branch off as required to other internal /// write_* functions to properly align and rebuffer the supplied buffer /// eventually culminating in an aligned append to the file. /// /// Calling this function will always result in some write to the file; even /// if temporary buffering is used to achieve alignment; even if the entire /// supplied buffer is hopelessly unaligned: the supplied data will be written /// out some way or another during this call. This means there is no /// requirement to care about flushing the temporary this->buffer after this /// call is made. Note that the temporary this->buffer has no reason to be /// touched by anything other than this function stack. /// /// !!! NOTE !!! /// There is a requirement to truncate the file after this call is made before /// closing the file. If a crash occurs after a write() which was padded out /// to the block alignment: the file size will reflect the padding when it is /// opened at next startup; RocksDB will not detect its terminator character /// sequence and consider this file corrupt. /// !!! /// /// - any offset /// - any data /// - any size ircd::const_buffer ircd::db::database::env::writable_file_direct::write(const const_buffer &buf_) { const_buffer buf { aligned(logical_offset) && aligned(data(buf_))? write_aligned(buf_): !aligned(logical_offset)? write_unaligned_off(buf_): buf_ }; assert(aligned(logical_offset) || empty(buf)); // buf can be empty here if it was entirely dealt with by the above // branches and there's nothing else to do here. if(empty(buf)) return buf; // Branch on whether the buffer's address is aligned. If so, considering // the logical_offset is aligned here we are then finished. if(aligned(data(buf))) return write_aligned(buf); return write_unaligned_buf(buf); } /// * aligned offset /// * unaligned data /// - any size ircd::const_buffer ircd::db::database::env::writable_file_direct::write_unaligned_buf(const const_buffer &buf) { assert(aligned(logical_offset)); assert(!aligned(data(buf))); assert(!aligned(buf)); // Window on the data between the given buffer's pointer and the next // alignment boundary. const const_buffer under_buf { buf, std::min(remain(uintptr_t(data(buf))), size(buf)) }; // Window on the data from the alignment boundary to the end of the // given buffer. const const_buffer remaining_buf { buf + size(under_buf) }; assert(size(under_buf) <= size(buf)); assert(size(under_buf) + size(remaining_buf) == size(buf)); assert(data(buf) + size(under_buf) == data(remaining_buf)); assert(aligned(data(remaining_buf)) || empty(remaining_buf)); mutable_buffer dst(this->buffer); consume(dst, copy(dst, under_buf)); consume(dst, copy(dst, remaining_buf)); consume(dst, zero(dst)); assert(empty(dst)); // Flush the temporary buffer. _write__aligned(this->buffer, logical_offset); logical_offset += size(under_buf); return remaining_buf; } /// * unaligned offset /// - any data /// - any size ircd::const_buffer ircd::db::database::env::writable_file_direct::write_unaligned_off(const const_buffer &buf) { assert(!aligned(logical_offset)); // Window on the amount of buf we can take to fill up remaining space in // the temporary this->buffer const const_buffer src { buf, std::min(size(buf), buffer_remain()) }; // Window on the remaining space in the temporary this->buffer. const mutable_buffer dst { this->buffer + buffer_consumed() }; // Window on the remaining space in dst after src is copied to dst, if any. const mutable_buffer pad { dst + size(src) }; assert(size(dst) - size(pad) == size(src)); assert(size(src) + size(pad) == buffer_remain()); assert(size(src) + size(pad) + buffer_consumed() == alignment); assert(size(src) + buffer_consumed() != alignment || empty(pad)); copy(dst, src); zero(pad); // Backtrack the logical_offset to the aligned offset where this->buffer's // data starts. const auto aligned_offset { align(logical_offset) }; // Write the whole temporary this->buffer at the aligned offset. _write__aligned(this->buffer, aligned_offset); logical_offset += size(src); assert(aligned(logical_offset) || size(buf) < alignment); return const_buffer { buf + size(src) }; } /// * aligned offset /// * aligned data /// - any size ircd::const_buffer ircd::db::database::env::writable_file_direct::write_aligned(const const_buffer &buf) { assert(aligned(data(buf))); assert(aligned(logical_offset)); // This portion at the end of buf did not fill out to the alignment. const const_buffer overflow { _write_aligned(buf, logical_offset) }; // The aligned portion was written so the offset is incremented here. logical_offset += size(buf) - size(overflow); assert(aligned(logical_offset)); assert(size(overflow) < alignment); assert(aligned(data(overflow)) || empty(overflow)); assert(align(size(buf)) + size(overflow) == size(buf)); assert(blocks(size(buf)) * alignment + size(overflow) == size(buf)); if(!empty(overflow)) { mutable_buffer dst(this->buffer); consume(dst, copy(dst, overflow)); consume(dst, zero(dst)); assert(empty(dst)); _write__aligned(this->buffer, logical_offset); logical_offset += size(overflow); assert(!aligned(logical_offset)); } return {}; } /// * aligned data /// * aligned offset /// - any size ircd::const_buffer ircd::db::database::env::writable_file_direct::_write_aligned(const const_buffer &buf, const uint64_t &offset) { assert(aligned(data(buf))); assert(aligned(offset)); // This portion will be written const const_buffer aligned_buf { buf, blocks(size(buf)) * alignment }; // This trailing portion will be returned to caller const const_buffer ret { buf + size(aligned_buf), size(buf) - size(aligned_buf) }; assert(!empty(aligned_buf) || size(buf) < alignment); assert(size(aligned_buf) + size(ret) == size(buf)); assert(size(ret) < alignment); if(empty(aligned_buf)) { assert(size(ret) == size(buf)); return ret; } _write__aligned(aligned_buf, offset); return ret; } /// * aligned offset /// * aligned data /// * aligned size ircd::const_buffer ircd::db::database::env::writable_file_direct::_write__aligned(const const_buffer &buf, const uint64_t &offset) { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] wfile:%p DIRECT fd:%d write:%p%s bytes:%zu%s offset:%zu%s (logical:%zu)", d.name, this, int(fd), data(buf), aligned(data(buf))? "" : "#AC", size(buf), aligned(size(buf))? "" : "#AC", offset, aligned(offset)? "" : "#AC", logical_offset }; #endif assert(aligned(buf)); assert(aligned(offset)); fs::write_opts wopts; wopts.priority = ionice; wopts.nodelay = nodelay; wopts.offset = offset; wopts.interruptible = false; fs::write(fd, buf, wopts); return {}; } size_t ircd::db::database::env::writable_file_direct::buffer_remain() const { return size(buffer) - buffer_consumed(); } size_t ircd::db::database::env::writable_file_direct::buffer_consumed() const { return likely(alignment != 0)? logical_offset % size(buffer): 0UL; } size_t ircd::db::database::env::writable_file_direct::blocks(const size_t &value) const { return likely(alignment != 0)? value / alignment: 0UL; } size_t ircd::db::database::env::writable_file_direct::remain(const size_t &value) const { return likely(alignment != 0)? alignment - (value - align(value)): 0UL; } size_t ircd::db::database::env::writable_file_direct::align(const size_t &value) const { return likely(alignment != 0)? value - (value % alignment): value; } bool ircd::db::database::env::writable_file_direct::aligned(const const_buffer &buf) const { return buffer::aligned(buf, alignment); } bool ircd::db::database::env::writable_file_direct::aligned(const void *const &value) const { return aligned(size_t(value)); } bool ircd::db::database::env::writable_file_direct::aligned(const size_t &value) const { return (alignment == 0) || (value % alignment == 0UL); } // // sequential_file // decltype(ircd::db::database::env::sequential_file::default_opts) ircd::db::database::env::sequential_file::default_opts { .mode = std::ios_base::in, .sequential = true, }; ircd::db::database::env::sequential_file::sequential_file(database *const &d, const std::string &name, const EnvOptions &env_opts) try :d { *d } ,opts{[&env_opts] { fs::fd::opts ret{default_opts}; ret.direct = env_opts.use_direct_reads; return ret; }()} ,fd { name, this->opts } ,_buffer_align { opts.direct? fs::block_size(fd): 1 } ,ionice { ctx::ionice(ctx::cur()) } ,aio { // When this flag is false then AIO operations are never used for this // file; if true, AIO may be used if available and/or other conditions. // Currently the /proc filesystem doesn't like AIO. !startswith(name, "/proc/") } { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] opened seqfile:%p fd:%d bs:%zu '%s'", d->name, this, int(fd), _buffer_align, name }; #endif } catch(const std::system_error &e) { // Set the level to downplay some errors which the user shouldn't // be alerted to with a log message under normal operations. const log::level level { is(e.code(), std::errc::no_such_file_or_directory)? log::level::DERROR: log::level::ERROR }; log::logf { log, level, "[%s] opening seqfile:%p `%s' (%d) :%s", d->name, this, name, e.code().value(), e.what() }; } catch(const std::exception &e) { log::error { log, "[%s] opening seqfile:%p `%s' :%s", d->name, this, name, e.what() }; } ircd::db::database::env::sequential_file::~sequential_file() noexcept { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] close seqfile:%p fd:%d", d.name, this, int(fd) }; #endif } rocksdb::Status ircd::db::database::env::sequential_file::Read(size_t length, Slice *const result, char *const scratch) noexcept try { const ctx::uninterruptible::nothrow ui; const std::unique_lock lock { mutex, std::try_to_lock }; // RocksDB sez that this call requires "External synchronization" i.e the // caller, not this class is responsible for exclusion. We assert anyway. if(unlikely(!bool(lock))) throw panic { "[%s] Unexpected concurrent access to seqfile %p", d.name, this }; assert(result); assert(scratch); #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p", d.name, this, result, offset, length, scratch }; #endif fs::read_opts opts; opts.offset = offset; opts.priority = ionice; opts.aio = this->aio; opts.all = false; const mutable_buffer buf { scratch, length }; const const_buffer read { fs::read(fd, buf, opts) }; *result = slice(read); this->offset += size(read); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::sequential_file::PositionedRead(uint64_t offset, size_t length, Slice *const result, char *const scratch) noexcept try { const ctx::uninterruptible::nothrow ui; const std::unique_lock lock { mutex, std::try_to_lock }; if(unlikely(!bool(lock))) throw panic { "[%s] Unexpected concurrent access to seqfile %p", d.name, this }; assert(result); assert(scratch); #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] seqfile:%p offset:%zu positioned read:%p offset:%zu length:%zu scratch:%p", d.name, this, this->offset, result, offset, length, scratch }; #endif fs::read_opts opts; opts.offset = offset; opts.priority = ionice; opts.aio = this->aio; opts.all = false; const mutable_buffer buf { scratch, length }; const const_buffer read { fs::read(fd, buf, opts) }; *result = slice(read); this->offset = std::max(this->offset, off_t(offset + size(read))); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] seqfile:%p positioned read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] seqfile:%p positioned read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::sequential_file::Skip(uint64_t size) noexcept try { const ctx::uninterruptible::nothrow ui; const std::unique_lock lock { mutex, std::try_to_lock }; // RocksDB sez that this call requires "External synchronization" i.e the // caller, not this class is responsible for exclusion. We assert anyway. if(unlikely(!bool(lock))) throw panic { "[%s] Unexpected concurrent access to seqfile %p", d.name, this }; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] seqfile:%p offset:zu skip:%zu", d.name, this, offset, size }; #endif offset += size; return Status::OK(); } catch(const panic &e) { log::critical { log, "[%s] seqfile:%p :%s", d.name, this, e.what(), }; return Status::Busy(); } rocksdb::Status ircd::db::database::env::sequential_file::InvalidateCache(size_t offset, size_t length) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { "[%s] seqfile:%p invalidate cache offset:%zu length:%zu", d.name, this, offset, length }; #endif if(opts.direct) return Status::OK(); fs::evict(fd, length, offset); return Status::OK(); } catch(const std::system_error &e) { log::error { "[%s] seqfile:%p invalidate cache offset:%zu length:%zu :%s", d.name, this, offset, length, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { "[%s] seqfile:%p invalidate cache offset:%zu length:%zu :%s", d.name, this, offset, length, e.what() }; return error_to_status{e}; } bool ircd::db::database::env::sequential_file::use_direct_io() const noexcept { return opts.direct; } size_t ircd::db::database::env::sequential_file::GetRequiredBufferAlignment() const noexcept { const auto &ret { _buffer_align }; return ret; } // // random_access_file // decltype(ircd::db::database::env::random_access_file::default_opts) ircd::db::database::env::random_access_file::default_opts { .mode = std::ios_base::in, .random = true, }; ircd::db::database::env::random_access_file::random_access_file(database *const &d, const std::string &name, const EnvOptions &env_opts) try :d { *d } ,opts{[&env_opts] { fs::fd::opts ret{default_opts}; ret.direct = env_opts.use_direct_reads; return ret; }()} ,fd { name, this->opts } ,_buffer_align { opts.direct? fs::block_size(fd): 1 } ,ionice { ctx::ionice(ctx::cur()) } ,aio { // When this flag is false then AIO operations are never used for this // file; if true, AIO may be used if available and/or other conditions. // Currently the /proc filesystem doesn't like AIO. !startswith(name, "/proc/") } { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] opened rfile:%p fd:%d bs:%zu '%s'", d->name, this, int(fd), _buffer_align, name }; #endif } catch(const std::exception &e) { log::error { log, "[%s] opening rfile:%p `%s' :%s", d->name, this, name, e.what() }; } ircd::db::database::env::random_access_file::~random_access_file() noexcept { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] close rfile:%p fd:%d", d.name, this, int(fd) }; #endif } rocksdb::Status ircd::db::database::env::random_access_file::Prefetch(uint64_t offset, size_t length) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rfile:%p prefetch offset:%zu length:%zu", d.name, this, offset, length }; #endif // Note RocksDB does not call our prefetch() when using direct IO. assert(!this->opts.direct); fs::prefetch(fd, length, offset); return Status::OK(); } catch(const std::system_error &e) { return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] rfile:%p prefetch offset:%zu length:%zu :%s", d.name, this, offset, length, e.what() }; return error_to_status{e}; } #ifdef IRCD_DB_HAS_ENV_MULTIREAD rocksdb::Status ircd::db::database::env::random_access_file::MultiRead(rocksdb::ReadRequest *const req, size_t num) noexcept try { assert(req); const ctx::uninterruptible::nothrow ui; fs::read_op op[num]; mutable_buffer buf[num]; fs::read_opts opts[num]; for(size_t i(0); i < num; ++i) { opts[i].offset = req[i].offset; opts[i].priority = ionice; opts[i].aio = this->aio; opts[i].all = false; buf[i] = { req[i].scratch, req[i].len }; op[i].fd = std::addressof(this->fd); op[i].opts = opts + i; op[i].bufs = { buf + i, 1 }; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rfile:%p multiread:%zu:%zu offset:%zu length:%zu scratch:%p", d.name, this, i, num, req[i].offset, req[i].len, req[i].scratch, }; #endif assert(!this->opts.direct || buffer::aligned(buf[i], _buffer_align)); } const auto bytes { fs::read({op, num}) }; for(size_t i(0); i < num; ++i) try { if(op[i].eptr) std::rethrow_exception(op[i].eptr); assert(op[i].ret <= size(buf[i])); const const_buffer read { buf[i], op[i].ret }; req[i].result = slice(read); req[i].status = Status::OK(); assert(req[i].result.size() == req[i].len); } catch(const std::exception &e) { log::error { log, "[%s] rfile:%p multiread:%zu:%zu offset:%zu length:%zu :%s", d.name, this, i, num, req[i].offset, req[i].len, e.what(), }; req[i].status = error_to_status{e}; } return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] rfile:%p multiread:%p num:%zu :%s", d.name, this, req, num, e.what(), }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] rfile:%p multiread:%p num:%zu :%s", d.name, this, req, num, e.what(), }; return error_to_status{e}; } #else #warning "RocksDB MultiRead is not supported for this build." #endif IRCD_DB_HAS_ENV_MULTIREAD rocksdb::Status ircd::db::database::env::random_access_file::Read(uint64_t offset, size_t length, Slice *const result, char *const scratch) const noexcept try { const ctx::uninterruptible::nothrow ui; assert(result); assert(scratch); #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p", d.name, this, result, offset, length, scratch }; #endif fs::read_opts opts; opts.offset = offset; opts.priority = ionice; opts.aio = this->aio; opts.all = !this->opts.direct; const mutable_buffer buf { scratch, length }; assert(!this->opts.direct || buffer::aligned(buf, _buffer_align)); const auto read { fs::read(fd, buf, opts) }; *result = slice(read); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::random_access_file::InvalidateCache(size_t offset, size_t length) noexcept { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rfile:%p invalidate cache offset:%zu length:%zu", d.name, this, offset, length }; #endif if(opts.direct) return Status::OK(); fs::evict(fd, length, offset); return Status::OK(); } size_t ircd::db::database::env::random_access_file::GetUniqueId(char *const id, size_t max_size) const noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rfile:%p get unique id:%p max_size:%zu", d.name, this, id, max_size }; #endif const mutable_buffer buf { id, max_size }; //return size(fs::uuid(fd, buf)); return 0; } catch(const std::exception &e) { log::critical { log, "[%s] rfile:%p GetUniqueId id:%p max_size:%zu :%s", d.name, this, id, max_size, e.what() }; return 0; } void ircd::db::database::env::random_access_file::Hint(AccessPattern pattern) noexcept { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rfile:%p hint %s", d.name, this, reflect(pattern) }; #endif } bool ircd::db::database::env::random_access_file::use_direct_io() const noexcept { return opts.direct; } size_t ircd::db::database::env::random_access_file::GetRequiredBufferAlignment() const noexcept { const auto &ret { _buffer_align }; return ret; } // // random_rw_file // decltype(ircd::db::database::env::random_rw_file::default_opts) ircd::db::database::env::random_rw_file::default_opts { .mode = std::ios_base::in | std::ios_base::out, .random = true, }; ircd::db::database::env::random_rw_file::random_rw_file(database *const &d, const std::string &name, const EnvOptions &env_opts) try :d { *d } ,opts{[&env_opts] { fs::fd::opts ret{default_opts}; ret.direct = env_opts.use_direct_reads && env_opts.use_direct_writes; return ret; }()} ,fd { name, this->opts } ,_buffer_align { opts.direct? fs::block_size(fd): 1 } ,ionice { ctx::ionice(ctx::cur()) } ,aio { true } { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] opened rwfile:%p fd:%d bs:%zu '%s'", d->name, this, int(fd), _buffer_align, name }; #endif } catch(const std::exception &e) { log::error { log, "[%s] opening rwfile:%p `%s' :%s", d->name, this, name, e.what() }; } ircd::db::database::env::random_rw_file::~random_rw_file() noexcept { #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] close rwfile:%p fd:%d '%s'", d.name, this, int(fd) }; #endif } rocksdb::Status ircd::db::database::env::random_rw_file::Close() noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] close rwfile:%p fd:%d '%s'", d.name, this, int(fd) }; #endif this->fd = fs::fd{}; return Status::OK(); } catch(const std::system_error &e) { log::error { "[%s] rwfile:%p close :%s", d.name, this, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { "[%s] rwfile:%p close :%s", d.name, this, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::random_rw_file::Fsync() noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rwfile:%p fd:%d fsync", d.name, int(fd), this }; #endif fs::sync_opts opts; opts.priority = ionice; opts.nodelay = true; fs::flush(fd, opts); return Status::OK(); } catch(const std::system_error &e) { log::error { "[%s] rwfile:%p fd:%d fsync :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { "[%s] rwfile:%p fd:%d fsync :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::random_rw_file::Sync() noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rwfile:%p fd:%d sync", d.name, int(fd), this }; #endif fs::sync_opts opts; opts.priority = ionice; opts.nodelay = true; fs::sync(fd, opts); return Status::OK(); } catch(const std::system_error &e) { log::error { "[%s] rwfile:%p fd:%d sync :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { "[%s] rwfile:%p fd:%d sync :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::random_rw_file::Flush() noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rwfile:%p fd:%d flush", d.name, int(fd), this }; #endif fs::sync_opts opts; opts.metadata = false; opts.priority = ionice; opts.nodelay = nodelay; fs::flush(fd, opts); return Status::OK(); } catch(const std::system_error &e) { log::error { "[%s] rwfile:%p fd:%d flush :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { "[%s] rwfile:%p fd:%d flush :%s", d.name, this, int(fd), e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::random_rw_file::Read(uint64_t offset, size_t length, Slice *const result, char *const scratch) const noexcept try { const ctx::uninterruptible::nothrow ui; assert(result); assert(scratch); #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p", d.name, this, result, offset, length, scratch }; #endif fs::read_opts opts; opts.offset = offset; opts.priority = ionice; opts.aio = this->aio; opts.all = !this->opts.direct; const mutable_buffer buf { scratch, length }; const auto read { fs::read(fd, buf, opts) }; *result = slice(read); return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p :%s", d.name, this, result, offset, length, scratch, e.what() }; return error_to_status{e}; } rocksdb::Status ircd::db::database::env::random_rw_file::Write(uint64_t offset, const Slice &slice) noexcept try { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu", d.name, this, int(fd), data(slice), size(slice), offset }; #endif const const_buffer buf { data(slice), size(slice) }; const auto read { fs::write(fd, buf, offset) }; return Status::OK(); } catch(const std::system_error &e) { log::error { log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu", d.name, this, int(fd), data(slice), size(slice), offset }; return error_to_status{e}; } catch(const std::exception &e) { log::critical { log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu", d.name, this, int(fd), data(slice), size(slice), offset }; return error_to_status{e}; } bool ircd::db::database::env::random_rw_file::use_direct_io() const noexcept { return opts.direct; } size_t ircd::db::database::env::random_rw_file::GetRequiredBufferAlignment() const noexcept { const auto &ret { _buffer_align }; return ret; } // // directory // ircd::db::database::env::directory::directory(database *const &d, const std::string &name, std::unique_ptr defaults) :d{*d} ,defaults{std::move(defaults)} { } ircd::db::database::env::directory::~directory() noexcept { } rocksdb::Status ircd::db::database::env::directory::Fsync() noexcept { const ctx::uninterruptible::nothrow ui; #ifdef RB_DEBUG_DB_ENV log::debug { log, "[%s] directory:%p fsync", d.name, this }; #endif return defaults->Fsync(); } // // file_lock // ircd::db::database::env::file_lock::file_lock(database *const &d) :d{*d} { } ircd::db::database::env::file_lock::~file_lock() noexcept { } /////////////////////////////////////////////////////////////////////////////// // // db/database/env/state.h // // // env::state::state // ircd::db::database::env::state::state(database *const &d) :d{*d} { for(size_t i(0); i < pool.size(); ++i) pool.at(i) = std::make_unique(this->d, Priority(i)); } ircd::db::database::env::state::~state() noexcept { log::debug { log, "[%s] Shutting down environment...", d.name }; } // // state::pool // decltype(ircd::db::database::env::state::pool::stack_size) ircd::db::database::env::state::pool::stack_size { { "name", "ircd.db.env.pool.stack_size" }, { "default", long(128_KiB) }, }; // // state::pool::pool // ircd::db::database::env::state::pool::pool(database &d, const Priority &pri) :d{d} ,pri{pri} ,iopri { pri == Priority::HIGH? IOPriority::IO_HIGH: pri == Priority::BOTTOM? IOPriority::IO_LOW: IOPriority::IO_LOW } ,name { fmt::sprintf { namebuf, "db.%s", reflect(pri) } } ,popts { size_t(stack_size), // stack size of worker 0, // initial workers -1, // queue hard limit -1, // queue soft limit true, // queue_max_blocking true, // queue_max_dwarning make_nice(iopri), // ionice make_nice(this->pri), // nice } ,p { this->name, // name of pool this->popts // pool options } { } ircd::db::database::env::state::pool::~pool() noexcept { join(); } void ircd::db::database::env::state::pool::join() try { if(!tasks.empty() || p.pending()) log::warning { log, "[%s] Waiting for tasks:%zu queued:%zu active:%zu in pool '%s'", d.name, tasks.size(), p.queued(), p.active(), ctx::name(p) }; this->wait(); assert(!p.pending()); assert(tasks.empty()); p.join(); log::debug { log, "[%s] Terminated pool '%s'.", d.name, ctx::name(p) }; } catch(const std::exception &e) { log::critical { log, "[%s] Environment pool '%s' join :%s", d.name, ctx::name(p), e.what() }; throw; } void ircd::db::database::env::state::pool::wait() { dock.wait([this] { return tasks.empty() && !p.pending(); }); } void ircd::db::database::env::state::pool::operator()(task &&task) { assert(task._id == 0); task._id = ++taskctr; tasks.emplace_back(std::move(task)); log::debug { log, "[%s] pool:%s queue:%zu QUEUE task:%lu func:%p arg:%p", this->d.name, ctx::name(p), tasks.size(), tasks.back()._id, tasks.back().func, tasks.back().arg, }; p([this] { if(tasks.empty()) return; const ctx::uninterruptible::nothrow ui; const auto task{std::move(tasks.front())}; tasks.pop_front(); log::debug { log, "[%s] pool:%s queue:%zu ENTER task:%lu func:%p arg:%p", this->d.name, ctx::name(p), tasks.size(), task._id, task.func, task.arg, }; const ctx::slice_usage_warning message { "[%s] pool:%s task:%p", this->d.name, ctx::name(p), task.func }; // Execute the task task.func(task.arg); log::debug { log, "[%s] pool:%s queue:%zu LEAVE task:%zu func:%p arg:%p", this->d.name, ctx::name(p), tasks.size(), task._id, task.func, task.arg, }; dock.notify_all(); }); } size_t ircd::db::database::env::state::pool::cancel(void *const &tag) { size_t i(0); auto it(begin(tasks)); while(it != end(tasks)) { auto &task(*it); log::debug { log, "[%s] pool:%s tasks:%zu cancel#%zu task:%lu func:%p cancel:%p arg:%p tag:%p", d.name, ctx::name(p), tasks.size(), i, task._id, task.func, task.cancel, task.arg, tag }; task.cancel(task.arg); it = tasks.erase(it); ++i; } dock.notify_all(); return i; }