diff --git a/include/ircd/db/database/env/writable_file.h b/include/ircd/db/database/env/writable_file.h index d664b8675..cabf420d5 100644 --- a/include/ircd/db/database/env/writable_file.h +++ b/include/ircd/db/database/env/writable_file.h @@ -21,30 +21,37 @@ struct ircd::db::database::env::writable_file final using Status = rocksdb::Status; using Slice = rocksdb::Slice; using IOPriority = rocksdb::Env::IOPriority; + using WriteLifeTimeHint = rocksdb::Env::WriteLifeTimeHint; database &d; ctx::mutex mutex; rocksdb::EnvOptions env_opts; fs::fd::opts opts; - fs::fd fd; IOPriority prio {IO_LOW}; + WriteLifeTimeHint hint {WriteLifeTimeHint::WLTH_NOT_SET}; + fs::fd fd; size_t _buffer_align; - size_t preallocation_block_size {0}; - size_t preallocation_last_block {0}; + size_t logical_offset; + size_t preallocation_block_size; + ssize_t preallocation_last_block {-1}; - Status Allocate(uint64_t offset, uint64_t len) noexcept override; - void PrepareWrite(size_t offset, size_t len) noexcept override; - void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) noexcept override; - void SetPreallocationBlockSize(size_t size) noexcept override; - Status Truncate(uint64_t size) noexcept override; bool IsSyncThreadSafe() const noexcept override; - void SetIOPriority(IOPriority pri) noexcept override; - IOPriority GetIOPriority() noexcept override; - uint64_t GetFileSize() noexcept override; size_t GetUniqueId(char* id, size_t max_size) const noexcept override; - Status InvalidateCache(size_t offset, size_t length) noexcept override; + IOPriority GetIOPriority() noexcept override; + void SetIOPriority(IOPriority pri) noexcept override; + WriteLifeTimeHint GetWriteLifeTimeHint() noexcept override; + void SetWriteLifeTimeHint(WriteLifeTimeHint hint) noexcept override; + uint64_t GetFileSize() noexcept override; + void SetPreallocationBlockSize(size_t size) noexcept override; + void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) noexcept override; + void _truncate(const size_t &size); + void _allocate(const size_t &offset, const size_t &length); + void PrepareWrite(size_t offset, size_t len) noexcept override; + Status Allocate(uint64_t offset, uint64_t len) noexcept override; Status PositionedAppend(const Slice& data, uint64_t offset) noexcept override; Status Append(const Slice& data) noexcept override; + Status InvalidateCache(size_t offset, size_t length) noexcept override; + Status Truncate(uint64_t size) noexcept override; Status RangeSync(uint64_t offset, uint64_t nbytes) noexcept override; Status Fsync() noexcept override; Status Sync() noexcept override; @@ -52,5 +59,7 @@ struct ircd::db::database::env::writable_file final Status Close() noexcept override; writable_file(database *const &d, const std::string &name, const EnvOptions &, const bool &trunc); + writable_file(const writable_file &) = delete; + writable_file(writable_file &&) = delete; ~writable_file() noexcept; }; diff --git a/ircd/db.cc b/ircd/db.cc index dc02425e8..90d180b0a 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -856,7 +856,6 @@ try opts.enable_pipelined_write = false; opts.write_thread_max_yield_usec = 0; opts.write_thread_slow_yield_usec = 0; - opts.use_direct_io_for_flush_and_compaction = false; //opts.max_total_wal_size = 8_MiB; // Detect if O_DIRECT is possible if db::init left a file in the @@ -865,6 +864,10 @@ try opts.use_direct_reads = ircd::nodirect? false: fs::exists(direct_io_test_file_path()); + //opts.use_direct_io_for_flush_and_compaction = opts.use_direct_reads; + opts.use_direct_io_for_flush_and_compaction = false; + opts.allow_fallocate = !opts.use_direct_io_for_flush_and_compaction; + #ifdef RB_DEBUG opts.dump_malloc_stats = true; #endif @@ -1568,6 +1571,7 @@ ircd::db::database::column::column(database &d, table_opts.block_size = this->descriptor->block_size; table_opts.metadata_block_size = this->descriptor->meta_block_size; table_opts.block_size_deviation = 50; + table_opts.block_align = false; // Doesn't work w/ compression. // Setup the cache for assets. const auto &cache_size(this->descriptor->cache_size); @@ -4305,17 +4309,32 @@ try } ,_buffer_align { - fs::block_size(fd) + opts.direct? + fs::block_size(fd): + size_t(0) +} +,logical_offset +{ + !trunc? + fs::size(fd): + size_t(0) +} +,preallocation_block_size +{ + _buffer_align? + _buffer_align: + ircd::info::page_size } { #ifdef RB_DEBUG_DB_ENV log::debug { - log, "'%s': opened wfile:%p fd:%d bs:%zu '%s'", + log, "'%s': opened wfile:%p fd:%d bs:%zu %s '%s'", d->name, this, int(fd), _buffer_align, + opts.direct? "DIRECT_IO"_sv : "BUFFERED"_sv, name }; #endif @@ -4335,6 +4354,8 @@ catch(const std::exception &e) ircd::db::database::env::writable_file::~writable_file() noexcept { + this->Close(); + #ifdef RB_DEBUG_DB_ENV log::debug { @@ -4351,7 +4372,10 @@ ircd::db::database::env::writable_file::Close() noexcept try { const ctx::uninterruptible::nothrow ui; - const std::lock_guard lock{mutex}; + std::unique_lock lock{mutex}; + + if(!fd) + return Status::OK(); #ifdef RB_DEBUG_DB_ENV log::debug @@ -4363,7 +4387,10 @@ noexcept try }; #endif - this->fd = fs::fd{}; + if(opts.direct && logical_offset > 0) + _truncate(logical_offset); + + fd = fs::fd{}; return Status::OK(); } catch(const fs::error &e) @@ -4580,6 +4607,108 @@ catch(const std::exception &e) return error_to_status{e}; } +rocksdb::Status +ircd::db::database::env::writable_file::Truncate(uint64_t size) +noexcept try +{ + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + "'%s': wfile:%p fd:%d truncate to %lu bytes", + d.name, + this, + int(fd), + size + }; + #endif + + _truncate(size); + return Status::OK(); +} +catch(const fs::error &e) +{ + log::error + { + log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s", + d.name, + this, + int(fd), + size, + e.what() + }; + + return error_to_status{e}; +} +catch(const std::exception &e) +{ + log::critical + { + log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s", + d.name, + this, + int(fd), + size, + e.what() + }; + + return error_to_status{e}; +} + +rocksdb::Status +ircd::db::database::env::writable_file::InvalidateCache(size_t offset, + size_t length) +noexcept try +{ + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu", + d.name, + this, + int(fd), + offset, + length + }; + #endif + + //syscall(::posix_fadvise, fd, offset, length, FADV_DONTNEED); + return Status::OK(); +} +catch(const fs::error &e) +{ + log::error + { + log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu", + d.name, + this, + int(fd), + offset, + length + }; + + return error_to_status{e}; +} +catch(const std::exception &e) +{ + log::critical + { + log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu", + d.name, + this, + int(fd), + offset, + length + }; + + return error_to_status{e}; +} + rocksdb::Status ircd::db::database::env::writable_file::Append(const Slice &s) noexcept try @@ -4590,21 +4719,31 @@ noexcept try #ifdef RB_DEBUG_DB_ENV log::debug { - log, "'%s': wfile:%p fd:%d append:%p bytes:%zu", + log, "'%s': wfile:%p fd:%d append:%p bytes:%zu logical_offset:%zu", d.name, this, int(fd), data(s), - size(s) + size(s), + logical_offset }; #endif + fs::write_opts wopts; + wopts.priority = this->prio; + wopts.offset = logical_offset; const const_buffer buf { data(s), size(s) }; - fs::append(fd, buf); + assert(!opts.direct || _buffer_align != 0); + assert(!_buffer_align || uintptr_t(data(buf)) % _buffer_align == 0); + assert(!_buffer_align || wopts.offset % _buffer_align == 0); + assert(!_buffer_align || size(buf) % _buffer_align == 0); + + fs::write(fd, buf, wopts); + logical_offset += size(buf); return Status::OK(); } catch(const fs::error &e) @@ -4661,14 +4800,6 @@ noexcept try assert(0); return Status::NotSupported(); - - const const_buffer buf - { - data(s), size(s) - }; - - fs::append(fd, buf, offset); - return Status::OK(); } catch(const fs::error &e) { @@ -4704,7 +4835,8 @@ catch(const std::exception &e) } rocksdb::Status -ircd::db::database::env::writable_file::Truncate(uint64_t size) +ircd::db::database::env::writable_file::Allocate(uint64_t offset, + uint64_t length) noexcept try { const ctx::uninterruptible::nothrow ui; @@ -4713,26 +4845,30 @@ noexcept try #ifdef RB_DEBUG_DB_ENV log::debug { - "'%s': wfile:%p fd:%d truncate to %lu bytes", + log, "'%s': wfile:%p fd:%d allocate offset:%lu length:%lu%s%s", d.name, this, int(fd), - size + offset, + length, + env_opts.fallocate_with_keep_size? " KEEP_SIZE" : "", + env_opts.allow_fallocate? "" : " (DISABLED)" }; #endif - fs::truncate(fd, size); + _allocate(offset, length); return Status::OK(); } catch(const fs::error &e) { log::error { - log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s", + log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s", d.name, this, int(fd), - size, + offset, + length, e.what() }; @@ -4742,50 +4878,156 @@ catch(const std::exception &e) { log::critical { - log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s", + log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s", d.name, this, int(fd), - size, + offset, + length, e.what() }; return error_to_status{e}; } -bool -ircd::db::database::env::writable_file::IsSyncThreadSafe() -const noexcept try -{ - return true; -} -catch(...) -{ - return false; -} - void -ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio) +ircd::db::database::env::writable_file::PrepareWrite(size_t offset, + size_t length) noexcept { + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + #ifdef RB_DEBUG_DB_ENV log::debug { - log, "'%s': wfile:%p set IO prio to %s", + log, "'%s': wfile:%p prepare write offset:%zu length:%zu", d.name, this, - reflect(prio) + offset, + length }; #endif - this->prio = prio; + if(!env_opts.allow_fallocate) + return; + + _allocate(offset, length); } -rocksdb::Env::IOPriority -ircd::db::database::env::writable_file::GetIOPriority() +void +ircd::db::database::env::writable_file::_allocate(const size_t &offset, + const size_t &length) +{ + const size_t first_block + { + offset / preallocation_block_size + }; + + const size_t last_block + { + (offset + length) / preallocation_block_size + }; + + const ssize_t missing_blocks + { + ssize_t(last_block) - preallocation_last_block + }; + + if(missing_blocks <= 0) + return; + + const ssize_t start_block + { + preallocation_last_block + 1 + }; + + const size_t allocate_offset + { + start_block * preallocation_block_size + }; + + const size_t allocate_length + { + missing_blocks * preallocation_block_size + }; + + fs::write_opts wopts; + wopts.offset = allocate_offset; + wopts.priority = this->prio; + wopts.keep_size = env_opts.fallocate_with_keep_size; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p fd:%d allocating %zd blocks after block:%zu offset:%lu length:%lu%s", + d.name, + this, + int(fd), + missing_blocks, + start_block, + allocate_offset, + allocate_length, + wopts.keep_size? " KEEP_SIZE" : "" + }; + #endif + + fs::allocate(fd, allocate_length, wopts); + this->preallocation_last_block = last_block; +} + +void +ircd::db::database::env::writable_file::_truncate(const size_t &size) +{ + fs::write_opts wopts; + wopts.priority = this->prio; + fs::truncate(fd, size, wopts); + this->logical_offset = size; +} + +void +ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size, + size_t *const last_allocated_block) noexcept { - return prio; + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + *block_size = this->preallocation_block_size; + *last_allocated_block = this->preallocation_last_block; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p get preallocation block_size(%p):%zu last_block(%p):%zu", + d.name, + this, + block_size, + *block_size, + last_allocated_block, + *last_allocated_block + }; + #endif +} + +void +ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size) +noexcept +{ + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p set preallocation block size:%zu", + d.name, + this, + size + }; + #endif + + this->preallocation_block_size = size; } uint64_t @@ -4805,7 +5047,14 @@ noexcept try }; #endif - return fs::size(fd); + const auto &ret + { + logical_offset + }; + + assert(opts.direct || ret == fs::size(fd)); + assert(!opts.direct || ret <= fs::size(fd)); + return ret; } catch(const std::exception &e) { @@ -4821,6 +5070,54 @@ catch(const std::exception &e) return 0; } +void +ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio) +noexcept +{ + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p IO priority %s", + d.name, + this, + reflect(prio) + }; + #endif + + this->prio = prio; +} + +rocksdb::Env::IOPriority +ircd::db::database::env::writable_file::GetIOPriority() +noexcept +{ + return prio; +} + +void +ircd::db::database::env::writable_file::SetWriteLifeTimeHint(WriteLifeTimeHint hint) +noexcept +{ + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p hint %s", + d.name, + this, + reflect(hint) + }; + #endif + + this->hint = hint; +} + +rocksdb::Env::WriteLifeTimeHint +ircd::db::database::env::writable_file::GetWriteLifeTimeHint() +noexcept +{ + return hint; +} + size_t ircd::db::database::env::writable_file::GetUniqueId(char *const id, size_t max_size) @@ -4860,174 +5157,15 @@ catch(const std::exception &e) return 0; } -rocksdb::Status -ircd::db::database::env::writable_file::InvalidateCache(size_t offset, - size_t length) -noexcept try +bool +ircd::db::database::env::writable_file::IsSyncThreadSafe() +const noexcept try { - const ctx::uninterruptible::nothrow ui; - const std::lock_guard lock{mutex}; - - #ifdef RB_DEBUG_DB_ENV - log::debug - { - log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu", - d.name, - this, - int(fd), - offset, - length - }; - #endif - - //TODO: ircd::fs:: - //syscall(::posix_fadvise, fd, offset, length, FADV_DONTNEED); - return Status::OK(); + return true; } -catch(const fs::error &e) +catch(...) { - log::error - { - log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu", - d.name, - this, - int(fd), - offset, - length - }; - - return error_to_status{e}; -} -catch(const std::exception &e) -{ - log::critical - { - log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu", - d.name, - this, - int(fd), - offset, - length - }; - - return error_to_status{e}; -} - -void -ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size, - size_t *const last_allocated_block) -noexcept -{ - #ifdef RB_DEBUG_DB_ENV - log::debug - { - log, "'%s': wfile:%p get preallocation block_size:%p last_block:%p", - d.name, - this, - block_size, - last_allocated_block - }; - #endif - - *block_size = this->preallocation_block_size; - *last_allocated_block = this->preallocation_last_block; -} - -void -ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size) -noexcept -{ - #ifdef RB_DEBUG_DB_ENV - log::debug - { - log, "'%s': wfile:%p set preallocation block size:%zu", - d.name, - this, - size - }; - #endif - - this->preallocation_block_size = size; -} - -void -ircd::db::database::env::writable_file::PrepareWrite(size_t offset, - size_t length) -noexcept -{ - const ctx::uninterruptible::nothrow ui; - const std::lock_guard lock{mutex}; - - #ifdef RB_DEBUG_DB_ENV - log::debug - { - log, "'%s': wfile:%p prepare write offset:%zu length:%zu", - d.name, - this, - offset, - length - }; - #endif -} - -rocksdb::Status -ircd::db::database::env::writable_file::Allocate(uint64_t offset, - uint64_t length) -noexcept try -{ - const ctx::uninterruptible::nothrow ui; - const std::lock_guard lock{mutex}; - - #ifdef RB_DEBUG_DB_ENV - log::debug - { - log, "'%s': wfile:%p fd:%d allocate offset:%lu length:%lu%s", - d.name, - this, - int(fd), - offset, - length, - env_opts.fallocate_with_keep_size? " KEEP_SIZE" : "" - }; - #endif - - fs::write_opts wopts; - wopts.offset = offset; - wopts.keep_size = env_opts.fallocate_with_keep_size; - - fs::allocate(fd, length, wopts); - this->preallocation_last_block = offset; - return Status::OK(); -} -catch(const fs::error &e) -{ - log::error - { - log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s", - d.name, - this, - int(fd), - offset, - length, - e.what() - }; - - return error_to_status{e}; -} -catch(const std::exception &e) -{ - log::critical - { - log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s", - d.name, - this, - int(fd), - offset, - length, - e.what() - }; - - return error_to_status{e}; + return false; } //