0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-09-28 19:58:53 +02:00

ircd::db: Split writable_file into writable_file_direct using vtable for functionality.

This commit is contained in:
Jason Volk 2018-11-09 18:24:51 -08:00
parent 382dc67ca1
commit e90f1729f0
3 changed files with 444 additions and 61 deletions

View file

@ -21,6 +21,7 @@ struct ircd::db::database::env final
:rocksdb::Env
{
struct writable_file;
struct writable_file_direct;
struct sequential_file;
struct random_access_file;
struct random_rw_file;

View file

@ -15,7 +15,7 @@
// RocksDB symbols which we cannot forward declare. It is used internally
// and does not need to be included by general users of IRCd.
struct ircd::db::database::env::writable_file final
struct ircd::db::database::env::writable_file
:rocksdb::WritableFile
{
using Status = rocksdb::Status;
@ -30,9 +30,7 @@ struct ircd::db::database::env::writable_file final
IOPriority prio {IO_LOW};
WriteLifeTimeHint hint {WriteLifeTimeHint::WLTH_NOT_SET};
fs::fd fd;
size_t _buffer_align;
size_t logical_offset;
size_t preallocation_block_size;
size_t preallocation_block_size {0};
ssize_t preallocation_last_block {-1};
bool IsSyncThreadSafe() const noexcept override;
@ -44,7 +42,6 @@ struct ircd::db::database::env::writable_file final
uint64_t GetFileSize() noexcept override;
void SetPreallocationBlockSize(size_t size) noexcept override;
void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) noexcept override;
void _truncate(const size_t &size);
void _allocate(const size_t &offset, const size_t &length);
void PrepareWrite(size_t offset, size_t len) noexcept override;
Status Allocate(uint64_t offset, uint64_t len) noexcept override;
@ -63,3 +60,27 @@ struct ircd::db::database::env::writable_file final
writable_file(writable_file &&) = delete;
~writable_file() noexcept;
};
struct ircd::db::database::env::writable_file_direct final
:writable_file
{
size_t alignment {0};
size_t logical_offset {0};
unique_buffer<mutable_buffer> buffer;
bool aligned(const size_t &) const;
bool aligned(const void *const &) const;
bool aligned(const const_buffer &) const;
size_t align(const size_t &) const;
size_t remain(const size_t &) const;
void write(const const_buffer &, const uint64_t &offset);
uint64_t GetFileSize() noexcept override;
Status PositionedAppend(const Slice& data, uint64_t offset) noexcept override;
Status Append(const Slice& data) noexcept override;
Status Truncate(uint64_t size) noexcept override;
Status Close() noexcept override;
writable_file_direct(database *const &d, const std::string &name, const EnvOptions &, const bool &trunc);
};

View file

@ -869,6 +869,7 @@ try
opts.write_thread_max_yield_usec = 0;
opts.write_thread_slow_yield_usec = 0;
//opts.max_total_wal_size = 8_MiB;
opts.allow_fallocate = true;
// Detect if O_DIRECT is possible if db::init left a file in the
// database directory claiming such. User can force no direct io
@ -876,9 +877,8 @@ try
opts.use_direct_reads = ircd::nodirect? false:
fs::exists(direct_io_test_file_path());
//opts.use_direct_io_for_flush_and_compaction = opts.use_direct_reads;
opts.use_direct_io_for_flush_and_compaction = false;
opts.allow_fallocate = !opts.use_direct_io_for_flush_and_compaction;
//opts.use_direct_io_for_flush_and_compaction = opts.use_direct_reads;
#ifdef RB_DEBUG
opts.dump_malloc_stats = true;
@ -3194,7 +3194,11 @@ noexcept try
};
#endif
*r = std::make_unique<writable_file>(&d, name, options, true);
if(options.use_direct_writes)
*r = std::make_unique<writable_file_direct>(&d, name, options, true);
else
*r = std::make_unique<writable_file>(&d, name, options, true);
return Status::OK();
}
catch(const fs::error &e)
@ -3224,7 +3228,11 @@ noexcept try
};
#endif
*r = std::make_unique<writable_file>(&d, name, options, false);
if(options.use_direct_writes)
*r = std::make_unique<writable_file_direct>(&d, name, options, false);
else
*r = std::make_unique<writable_file>(&d, name, options, false);
return Status::OK();
}
catch(const fs::error &e)
@ -4315,34 +4323,18 @@ try
{
name, this->opts
}
,_buffer_align
{
opts.direct?
fs::block_size(fd):
size_t(0)
}
,logical_offset
{
!trunc?
fs::size(fd):
size_t(0)
}
,preallocation_block_size
{
_buffer_align?
_buffer_align:
ircd::info::page_size
ircd::info::page_size
}
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': opened wfile:%p fd:%d bs:%zu %s '%s'",
log, "'%s': opened wfile:%p fd:%d '%s'",
d->name,
this,
int(fd),
_buffer_align,
opts.direct? "DIRECT_IO"_sv : "BUFFERED"_sv,
name
};
#endif
@ -4362,7 +4354,7 @@ catch(const std::exception &e)
ircd::db::database::env::writable_file::~writable_file()
noexcept
{
this->Close();
Close();
#ifdef RB_DEBUG_DB_ENV
log::debug
@ -4395,9 +4387,6 @@ noexcept try
};
#endif
if(opts.direct && logical_offset > 0)
_truncate(logical_offset);
fd = fs::fd{};
return Status::OK();
}
@ -4633,7 +4622,9 @@ noexcept try
};
#endif
_truncate(size);
fs::write_opts wopts;
wopts.priority = this->prio;
fs::truncate(fd, size, wopts);
return Status::OK();
}
catch(const fs::error &e)
@ -4733,31 +4724,23 @@ noexcept try
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p fd:%d append:%p bytes:%zu logical_offset:%zu",
log, "'%s': wfile:%p fd:%d append:%p bytes:%zu",
d.name,
this,
int(fd),
data(s),
size(s),
logical_offset
};
#endif
fs::write_opts wopts;
wopts.priority = this->prio;
wopts.offset = logical_offset;
const const_buffer buf
{
data(s), size(s)
};
assert(!opts.direct || _buffer_align != 0);
assert(!_buffer_align || uintptr_t(data(buf)) % _buffer_align == 0);
assert(!_buffer_align || wopts.offset % _buffer_align == 0);
assert(!_buffer_align || size(buf) % _buffer_align == 0);
fs::write(fd, buf, wopts);
logical_offset += size(buf);
fs::append(fd, buf, wopts);
return Status::OK();
}
catch(const fs::error &e)
@ -4812,8 +4795,16 @@ noexcept try
};
#endif
assert(0);
return Status::NotSupported();
fs::write_opts wopts;
wopts.priority = this->prio;
wopts.offset = offset;
const const_buffer buf
{
data(s), size(s)
};
fs::append(fd, buf, wopts);
return Status::OK();
}
catch(const fs::error &e)
{
@ -4948,6 +4939,9 @@ ircd::db::database::env::writable_file::_allocate(const size_t &offset,
ssize_t(last_block) - preallocation_last_block
};
// Fast bail when the offset and length are behind the last block already
// allocated. We don't support windowing here. If this branch is not taken
// we'll fallocate() contiguously from the last fallocate() (or offset 0).
if(missing_blocks <= 0)
return;
@ -4990,15 +4984,6 @@ ircd::db::database::env::writable_file::_allocate(const size_t &offset,
this->preallocation_last_block = last_block;
}
void
ircd::db::database::env::writable_file::_truncate(const size_t &size)
{
fs::write_opts wopts;
wopts.priority = this->prio;
fs::truncate(fd, size, wopts);
this->logical_offset = size;
}
void
ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size,
size_t *const last_allocated_block)
@ -5061,14 +5046,7 @@ noexcept try
};
#endif
const auto &ret
{
logical_offset
};
assert(opts.direct || ret == fs::size(fd));
assert(!opts.direct || ret <= fs::size(fd));
return ret;
return fs::size(fd);
}
catch(const std::exception &e)
{
@ -5183,6 +5161,389 @@ catch(...)
return false;
}
//
// writable_file_direct
//
ircd::db::database::env::writable_file_direct::writable_file_direct(database *const &d,
const std::string &name,
const EnvOptions &env_opts,
const bool &trunc)
:writable_file
{
d, name, env_opts, trunc
}
,alignment
{
fs::block_size(fd)
}
,logical_offset
{
!trunc?
fs::size(fd):
size_t(0)
}
,buffer
{
alignment, alignment
}
{
zero(buffer);
if(!aligned(logical_offset))
throw assertive
{
"direct writable file requires read into buffer."
};
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::Close()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
std::unique_lock<decltype(mutex)> lock{mutex};
if(!fd)
return Status::OK();
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p DIRECT fd:%d close",
d.name,
this,
int(fd)
};
#endif
if(logical_offset > 0 && fs::size(fd) != logical_offset)
{
fs::write_opts wopts;
wopts.priority = this->prio;
fs::truncate(fd, logical_offset, wopts);
}
fd = fs::fd{};
return Status::OK();
}
catch(const fs::error &e)
{
log::error
{
log, "'%s': wfile:%p DIRECT close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::error
{
log, "'%s': wfile:%p DIRECT close :%s",
d.name,
this,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::Truncate(uint64_t size)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"'%s': wfile:%p DIRECT fd:%d truncate to %lu bytes",
d.name,
this,
int(fd),
size
};
#endif
fs::write_opts wopts;
wopts.priority = this->prio;
fs::truncate(fd, size, wopts);
logical_offset = size;
return Status::OK();
}
catch(const fs::error &e)
{
log::error
{
log, "'%s': wfile:%p DIRECT fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p DIRECT fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::Append(const Slice &s)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s",
d.name,
this,
int(fd),
data(s),
aligned(data(s))? "" : "#AC",
size(s),
aligned(size(s))? "" : "#AC",
logical_offset,
aligned(logical_offset)? "" : "#AC"
};
#endif
const_buffer input
{
data(s), size(s)
};
if(!aligned(logical_offset))
{
const size_t base(align(logical_offset));
const size_t rem(remain(logical_offset));
const size_t used(logical_offset % alignment);
const size_t cons(std::min(rem, size(input)));
const const_buffer underflow
{
data(input), cons
};
copy(buffer + used, underflow);
write(buffer, base);
logical_offset += cons;
input = const_buffer
{
data(input) + cons, size(input) - cons
};
}
if(empty(input))
return Status::OK();
const const_buffer output
{
data(input), align(size(input))
};
const const_buffer overflow
{
data(input) + size(output), size(input) - size(output)
};
write(output, logical_offset);
logical_offset += size(output);
if(!empty(overflow))
{
zero(buffer);
copy(buffer, overflow);
write(buffer, logical_offset);
logical_offset += size(overflow);
}
return Status::OK();
}
catch(const fs::error &e)
{
log::error
{
log, "'%s': wfile:%p DIRECT fd:%d append:%p size:%zu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p DIRECT fd:%d append:%p size:%zu :%s",
d.name,
this,
int(fd),
data(s),
size(s),
e.what()
};
return error_to_status{e};
}
void
ircd::db::database::env::writable_file_direct::write(const const_buffer &buf,
const uint64_t &offset)
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p DIRECT fd:%d write:%p%s bytes:%zu%s offset:%zu%s (logical:%zu)",
d.name,
this,
int(fd),
data(buf),
aligned(data(buf))? "" : "#AC",
size(buf),
aligned(size(buf))? "" : "#AC",
offset,
aligned(offset)? "" : "#AC",
logical_offset
};
#endif
assert(aligned(buf));
assert(aligned(offset));
fs::write_opts wopts;
wopts.priority = this->prio;
wopts.offset = offset;
fs::write(fd, buf, wopts);
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::PositionedAppend(const Slice &s,
uint64_t offset)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s offset:%zu%s",
d.name,
this,
int(fd),
data(s),
aligned(data(s))? "" : "#AC",
size(s),
aligned(size(s))? "" : "#AC",
offset,
aligned(offset)? "" : "#AC"
};
#endif
return rocksdb::Status::NotSupported();
}
uint64_t
ircd::db::database::env::writable_file_direct::GetFileSize()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p DIRECT fd:%d get file size",
d.name,
this,
int(fd)
};
#endif
const auto &ret
{
logical_offset
};
assert(ret <= fs::size(fd));
return ret;
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p DIRECT fd:%d get file size :%s",
d.name,
this,
int(fd),
e.what()
};
return 0;
}
size_t
ircd::db::database::env::writable_file_direct::remain(const size_t &value)
const
{
return likely(alignment != 0)?
alignment - (value - align(value)):
0;
}
size_t
ircd::db::database::env::writable_file_direct::align(const size_t &value)
const
{
return likely(alignment != 0)?
value - (value % alignment):
value;
}
bool
ircd::db::database::env::writable_file_direct::aligned(const const_buffer &buf)
const
{
return aligned(data(buf)) && aligned(size(buf));
}
bool
ircd::db::database::env::writable_file_direct::aligned(const void *const &value)
const
{
return aligned(size_t(value));
}
bool
ircd::db::database::env::writable_file_direct::aligned(const size_t &value)
const
{
return (alignment == 0) || (value % alignment == 0UL);
}
//
// sequential_file
//