From e90f1729f08696cfaefdb4c4545090fed1c511fc Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 9 Nov 2018 18:24:51 -0800 Subject: [PATCH] ircd::db: Split writable_file into writable_file_direct using vtable for functionality. --- include/ircd/db/database/env/env.h | 1 + include/ircd/db/database/env/writable_file.h | 31 +- ircd/db.cc | 473 ++++++++++++++++--- 3 files changed, 444 insertions(+), 61 deletions(-) diff --git a/include/ircd/db/database/env/env.h b/include/ircd/db/database/env/env.h index bcbf49f1b..05c83e2d6 100644 --- a/include/ircd/db/database/env/env.h +++ b/include/ircd/db/database/env/env.h @@ -21,6 +21,7 @@ struct ircd::db::database::env final :rocksdb::Env { struct writable_file; + struct writable_file_direct; struct sequential_file; struct random_access_file; struct random_rw_file; diff --git a/include/ircd/db/database/env/writable_file.h b/include/ircd/db/database/env/writable_file.h index cabf420d5..501e70d06 100644 --- a/include/ircd/db/database/env/writable_file.h +++ b/include/ircd/db/database/env/writable_file.h @@ -15,7 +15,7 @@ // RocksDB symbols which we cannot forward declare. It is used internally // and does not need to be included by general users of IRCd. -struct ircd::db::database::env::writable_file final +struct ircd::db::database::env::writable_file :rocksdb::WritableFile { using Status = rocksdb::Status; @@ -30,9 +30,7 @@ struct ircd::db::database::env::writable_file final IOPriority prio {IO_LOW}; WriteLifeTimeHint hint {WriteLifeTimeHint::WLTH_NOT_SET}; fs::fd fd; - size_t _buffer_align; - size_t logical_offset; - size_t preallocation_block_size; + size_t preallocation_block_size {0}; ssize_t preallocation_last_block {-1}; bool IsSyncThreadSafe() const noexcept override; @@ -44,7 +42,6 @@ struct ircd::db::database::env::writable_file final uint64_t GetFileSize() noexcept override; void SetPreallocationBlockSize(size_t size) noexcept override; void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) noexcept override; - void _truncate(const size_t &size); void _allocate(const size_t &offset, const size_t &length); void PrepareWrite(size_t offset, size_t len) noexcept override; Status Allocate(uint64_t offset, uint64_t len) noexcept override; @@ -63,3 +60,27 @@ struct ircd::db::database::env::writable_file final writable_file(writable_file &&) = delete; ~writable_file() noexcept; }; + +struct ircd::db::database::env::writable_file_direct final +:writable_file +{ + size_t alignment {0}; + size_t logical_offset {0}; + unique_buffer buffer; + + bool aligned(const size_t &) const; + bool aligned(const void *const &) const; + bool aligned(const const_buffer &) const; + size_t align(const size_t &) const; + size_t remain(const size_t &) const; + + void write(const const_buffer &, const uint64_t &offset); + + uint64_t GetFileSize() noexcept override; + Status PositionedAppend(const Slice& data, uint64_t offset) noexcept override; + Status Append(const Slice& data) noexcept override; + Status Truncate(uint64_t size) noexcept override; + Status Close() noexcept override; + + writable_file_direct(database *const &d, const std::string &name, const EnvOptions &, const bool &trunc); +}; diff --git a/ircd/db.cc b/ircd/db.cc index c73c87bfd..a166d431e 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -869,6 +869,7 @@ try opts.write_thread_max_yield_usec = 0; opts.write_thread_slow_yield_usec = 0; //opts.max_total_wal_size = 8_MiB; + opts.allow_fallocate = true; // Detect if O_DIRECT is possible if db::init left a file in the // database directory claiming such. User can force no direct io @@ -876,9 +877,8 @@ try opts.use_direct_reads = ircd::nodirect? false: fs::exists(direct_io_test_file_path()); - //opts.use_direct_io_for_flush_and_compaction = opts.use_direct_reads; opts.use_direct_io_for_flush_and_compaction = false; - opts.allow_fallocate = !opts.use_direct_io_for_flush_and_compaction; + //opts.use_direct_io_for_flush_and_compaction = opts.use_direct_reads; #ifdef RB_DEBUG opts.dump_malloc_stats = true; @@ -3194,7 +3194,11 @@ noexcept try }; #endif - *r = std::make_unique(&d, name, options, true); + if(options.use_direct_writes) + *r = std::make_unique(&d, name, options, true); + else + *r = std::make_unique(&d, name, options, true); + return Status::OK(); } catch(const fs::error &e) @@ -3224,7 +3228,11 @@ noexcept try }; #endif - *r = std::make_unique(&d, name, options, false); + if(options.use_direct_writes) + *r = std::make_unique(&d, name, options, false); + else + *r = std::make_unique(&d, name, options, false); + return Status::OK(); } catch(const fs::error &e) @@ -4315,34 +4323,18 @@ try { name, this->opts } -,_buffer_align -{ - opts.direct? - fs::block_size(fd): - size_t(0) -} -,logical_offset -{ - !trunc? - fs::size(fd): - size_t(0) -} ,preallocation_block_size { - _buffer_align? - _buffer_align: - ircd::info::page_size + ircd::info::page_size } { #ifdef RB_DEBUG_DB_ENV log::debug { - log, "'%s': opened wfile:%p fd:%d bs:%zu %s '%s'", + log, "'%s': opened wfile:%p fd:%d '%s'", d->name, this, int(fd), - _buffer_align, - opts.direct? "DIRECT_IO"_sv : "BUFFERED"_sv, name }; #endif @@ -4362,7 +4354,7 @@ catch(const std::exception &e) ircd::db::database::env::writable_file::~writable_file() noexcept { - this->Close(); + Close(); #ifdef RB_DEBUG_DB_ENV log::debug @@ -4395,9 +4387,6 @@ noexcept try }; #endif - if(opts.direct && logical_offset > 0) - _truncate(logical_offset); - fd = fs::fd{}; return Status::OK(); } @@ -4633,7 +4622,9 @@ noexcept try }; #endif - _truncate(size); + fs::write_opts wopts; + wopts.priority = this->prio; + fs::truncate(fd, size, wopts); return Status::OK(); } catch(const fs::error &e) @@ -4733,31 +4724,23 @@ noexcept try #ifdef RB_DEBUG_DB_ENV log::debug { - log, "'%s': wfile:%p fd:%d append:%p bytes:%zu logical_offset:%zu", + log, "'%s': wfile:%p fd:%d append:%p bytes:%zu", d.name, this, int(fd), data(s), size(s), - logical_offset }; #endif fs::write_opts wopts; wopts.priority = this->prio; - wopts.offset = logical_offset; const const_buffer buf { data(s), size(s) }; - assert(!opts.direct || _buffer_align != 0); - assert(!_buffer_align || uintptr_t(data(buf)) % _buffer_align == 0); - assert(!_buffer_align || wopts.offset % _buffer_align == 0); - assert(!_buffer_align || size(buf) % _buffer_align == 0); - - fs::write(fd, buf, wopts); - logical_offset += size(buf); + fs::append(fd, buf, wopts); return Status::OK(); } catch(const fs::error &e) @@ -4812,8 +4795,16 @@ noexcept try }; #endif - assert(0); - return Status::NotSupported(); + fs::write_opts wopts; + wopts.priority = this->prio; + wopts.offset = offset; + const const_buffer buf + { + data(s), size(s) + }; + + fs::append(fd, buf, wopts); + return Status::OK(); } catch(const fs::error &e) { @@ -4948,6 +4939,9 @@ ircd::db::database::env::writable_file::_allocate(const size_t &offset, ssize_t(last_block) - preallocation_last_block }; + // Fast bail when the offset and length are behind the last block already + // allocated. We don't support windowing here. If this branch is not taken + // we'll fallocate() contiguously from the last fallocate() (or offset 0). if(missing_blocks <= 0) return; @@ -4990,15 +4984,6 @@ ircd::db::database::env::writable_file::_allocate(const size_t &offset, this->preallocation_last_block = last_block; } -void -ircd::db::database::env::writable_file::_truncate(const size_t &size) -{ - fs::write_opts wopts; - wopts.priority = this->prio; - fs::truncate(fd, size, wopts); - this->logical_offset = size; -} - void ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size, size_t *const last_allocated_block) @@ -5061,14 +5046,7 @@ noexcept try }; #endif - const auto &ret - { - logical_offset - }; - - assert(opts.direct || ret == fs::size(fd)); - assert(!opts.direct || ret <= fs::size(fd)); - return ret; + return fs::size(fd); } catch(const std::exception &e) { @@ -5183,6 +5161,389 @@ catch(...) return false; } +// +// writable_file_direct +// + +ircd::db::database::env::writable_file_direct::writable_file_direct(database *const &d, + const std::string &name, + const EnvOptions &env_opts, + const bool &trunc) +:writable_file +{ + d, name, env_opts, trunc +} +,alignment +{ + fs::block_size(fd) +} +,logical_offset +{ + !trunc? + fs::size(fd): + size_t(0) +} +,buffer +{ + alignment, alignment +} +{ + zero(buffer); + if(!aligned(logical_offset)) + throw assertive + { + "direct writable file requires read into buffer." + }; +} + +rocksdb::Status +ircd::db::database::env::writable_file_direct::Close() +noexcept try +{ + const ctx::uninterruptible::nothrow ui; + std::unique_lock lock{mutex}; + + if(!fd) + return Status::OK(); + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p DIRECT fd:%d close", + d.name, + this, + int(fd) + }; + #endif + + if(logical_offset > 0 && fs::size(fd) != logical_offset) + { + fs::write_opts wopts; + wopts.priority = this->prio; + fs::truncate(fd, logical_offset, wopts); + } + + fd = fs::fd{}; + return Status::OK(); +} +catch(const fs::error &e) +{ + log::error + { + log, "'%s': wfile:%p DIRECT close :%s", + d.name, + this, + e.what() + }; + + return error_to_status{e}; +} +catch(const std::exception &e) +{ + log::error + { + log, "'%s': wfile:%p DIRECT close :%s", + d.name, + this, + e.what() + }; + + return error_to_status{e}; +} + +rocksdb::Status +ircd::db::database::env::writable_file_direct::Truncate(uint64_t size) +noexcept try +{ + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + "'%s': wfile:%p DIRECT fd:%d truncate to %lu bytes", + d.name, + this, + int(fd), + size + }; + #endif + + fs::write_opts wopts; + wopts.priority = this->prio; + fs::truncate(fd, size, wopts); + logical_offset = size; + return Status::OK(); +} +catch(const fs::error &e) +{ + log::error + { + log, "'%s': wfile:%p DIRECT fd:%d truncate to %lu bytes :%s", + d.name, + this, + int(fd), + size, + e.what() + }; + + return error_to_status{e}; +} +catch(const std::exception &e) +{ + log::critical + { + log, "'%s': wfile:%p DIRECT fd:%d truncate to %lu bytes :%s", + d.name, + this, + int(fd), + size, + e.what() + }; + + return error_to_status{e}; +} + +rocksdb::Status +ircd::db::database::env::writable_file_direct::Append(const Slice &s) +noexcept try +{ + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s", + d.name, + this, + int(fd), + data(s), + aligned(data(s))? "" : "#AC", + size(s), + aligned(size(s))? "" : "#AC", + logical_offset, + aligned(logical_offset)? "" : "#AC" + }; + #endif + + const_buffer input + { + data(s), size(s) + }; + + if(!aligned(logical_offset)) + { + const size_t base(align(logical_offset)); + const size_t rem(remain(logical_offset)); + const size_t used(logical_offset % alignment); + const size_t cons(std::min(rem, size(input))); + const const_buffer underflow + { + data(input), cons + }; + + copy(buffer + used, underflow); + write(buffer, base); + logical_offset += cons; + input = const_buffer + { + data(input) + cons, size(input) - cons + }; + } + + if(empty(input)) + return Status::OK(); + + const const_buffer output + { + data(input), align(size(input)) + }; + + const const_buffer overflow + { + data(input) + size(output), size(input) - size(output) + }; + + write(output, logical_offset); + logical_offset += size(output); + + if(!empty(overflow)) + { + zero(buffer); + copy(buffer, overflow); + write(buffer, logical_offset); + logical_offset += size(overflow); + } + + return Status::OK(); +} +catch(const fs::error &e) +{ + log::error + { + log, "'%s': wfile:%p DIRECT fd:%d append:%p size:%zu :%s", + d.name, + this, + int(fd), + data(s), + size(s), + e.what() + }; + + return error_to_status{e}; +} +catch(const std::exception &e) +{ + log::critical + { + log, "'%s': wfile:%p DIRECT fd:%d append:%p size:%zu :%s", + d.name, + this, + int(fd), + data(s), + size(s), + e.what() + }; + + return error_to_status{e}; +} + +void +ircd::db::database::env::writable_file_direct::write(const const_buffer &buf, + const uint64_t &offset) +{ + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p DIRECT fd:%d write:%p%s bytes:%zu%s offset:%zu%s (logical:%zu)", + d.name, + this, + int(fd), + data(buf), + aligned(data(buf))? "" : "#AC", + size(buf), + aligned(size(buf))? "" : "#AC", + offset, + aligned(offset)? "" : "#AC", + logical_offset + }; + #endif + + assert(aligned(buf)); + assert(aligned(offset)); + + fs::write_opts wopts; + wopts.priority = this->prio; + wopts.offset = offset; + fs::write(fd, buf, wopts); +} + +rocksdb::Status +ircd::db::database::env::writable_file_direct::PositionedAppend(const Slice &s, + uint64_t offset) +noexcept +{ + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s offset:%zu%s", + d.name, + this, + int(fd), + data(s), + aligned(data(s))? "" : "#AC", + size(s), + aligned(size(s))? "" : "#AC", + offset, + aligned(offset)? "" : "#AC" + }; + #endif + + return rocksdb::Status::NotSupported(); +} + +uint64_t +ircd::db::database::env::writable_file_direct::GetFileSize() +noexcept try +{ + const ctx::uninterruptible::nothrow ui; + const std::lock_guard lock{mutex}; + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': wfile:%p DIRECT fd:%d get file size", + d.name, + this, + int(fd) + }; + #endif + + const auto &ret + { + logical_offset + }; + + assert(ret <= fs::size(fd)); + return ret; +} +catch(const std::exception &e) +{ + log::critical + { + log, "'%s': wfile:%p DIRECT fd:%d get file size :%s", + d.name, + this, + int(fd), + e.what() + }; + + return 0; +} + +size_t +ircd::db::database::env::writable_file_direct::remain(const size_t &value) +const +{ + return likely(alignment != 0)? + alignment - (value - align(value)): + 0; +} + +size_t +ircd::db::database::env::writable_file_direct::align(const size_t &value) +const +{ + return likely(alignment != 0)? + value - (value % alignment): + value; +} + +bool +ircd::db::database::env::writable_file_direct::aligned(const const_buffer &buf) +const +{ + return aligned(data(buf)) && aligned(size(buf)); +} + +bool +ircd::db::database::env::writable_file_direct::aligned(const void *const &value) +const +{ + return aligned(size_t(value)); +} + +bool +ircd::db::database::env::writable_file_direct::aligned(const size_t &value) +const +{ + return (alignment == 0) || (value % alignment == 0UL); +} + // // sequential_file //