0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-26 15:33:54 +01:00

ircd::db: Toward O_DIRECT writes from writable_file environment.

This commit is contained in:
Jason Volk 2018-11-02 01:07:42 -07:00
parent 95d42686f3
commit 1d03747da3
2 changed files with 367 additions and 220 deletions

View file

@ -21,30 +21,37 @@ struct ircd::db::database::env::writable_file final
using Status = rocksdb::Status;
using Slice = rocksdb::Slice;
using IOPriority = rocksdb::Env::IOPriority;
using WriteLifeTimeHint = rocksdb::Env::WriteLifeTimeHint;
database &d;
ctx::mutex mutex;
rocksdb::EnvOptions env_opts;
fs::fd::opts opts;
fs::fd fd;
IOPriority prio {IO_LOW};
WriteLifeTimeHint hint {WriteLifeTimeHint::WLTH_NOT_SET};
fs::fd fd;
size_t _buffer_align;
size_t preallocation_block_size {0};
size_t preallocation_last_block {0};
size_t logical_offset;
size_t preallocation_block_size;
ssize_t preallocation_last_block {-1};
Status Allocate(uint64_t offset, uint64_t len) noexcept override;
void PrepareWrite(size_t offset, size_t len) noexcept override;
void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) noexcept override;
void SetPreallocationBlockSize(size_t size) noexcept override;
Status Truncate(uint64_t size) noexcept override;
bool IsSyncThreadSafe() const noexcept override;
void SetIOPriority(IOPriority pri) noexcept override;
IOPriority GetIOPriority() noexcept override;
uint64_t GetFileSize() noexcept override;
size_t GetUniqueId(char* id, size_t max_size) const noexcept override;
Status InvalidateCache(size_t offset, size_t length) noexcept override;
IOPriority GetIOPriority() noexcept override;
void SetIOPriority(IOPriority pri) noexcept override;
WriteLifeTimeHint GetWriteLifeTimeHint() noexcept override;
void SetWriteLifeTimeHint(WriteLifeTimeHint hint) noexcept override;
uint64_t GetFileSize() noexcept override;
void SetPreallocationBlockSize(size_t size) noexcept override;
void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) noexcept override;
void _truncate(const size_t &size);
void _allocate(const size_t &offset, const size_t &length);
void PrepareWrite(size_t offset, size_t len) noexcept override;
Status Allocate(uint64_t offset, uint64_t len) noexcept override;
Status PositionedAppend(const Slice& data, uint64_t offset) noexcept override;
Status Append(const Slice& data) noexcept override;
Status InvalidateCache(size_t offset, size_t length) noexcept override;
Status Truncate(uint64_t size) noexcept override;
Status RangeSync(uint64_t offset, uint64_t nbytes) noexcept override;
Status Fsync() noexcept override;
Status Sync() noexcept override;
@ -52,5 +59,7 @@ struct ircd::db::database::env::writable_file final
Status Close() noexcept override;
writable_file(database *const &d, const std::string &name, const EnvOptions &, const bool &trunc);
writable_file(const writable_file &) = delete;
writable_file(writable_file &&) = delete;
~writable_file() noexcept;
};

View file

@ -856,7 +856,6 @@ try
opts.enable_pipelined_write = false;
opts.write_thread_max_yield_usec = 0;
opts.write_thread_slow_yield_usec = 0;
opts.use_direct_io_for_flush_and_compaction = false;
//opts.max_total_wal_size = 8_MiB;
// Detect if O_DIRECT is possible if db::init left a file in the
@ -865,6 +864,10 @@ try
opts.use_direct_reads = ircd::nodirect? false:
fs::exists(direct_io_test_file_path());
//opts.use_direct_io_for_flush_and_compaction = opts.use_direct_reads;
opts.use_direct_io_for_flush_and_compaction = false;
opts.allow_fallocate = !opts.use_direct_io_for_flush_and_compaction;
#ifdef RB_DEBUG
opts.dump_malloc_stats = true;
#endif
@ -1568,6 +1571,7 @@ ircd::db::database::column::column(database &d,
table_opts.block_size = this->descriptor->block_size;
table_opts.metadata_block_size = this->descriptor->meta_block_size;
table_opts.block_size_deviation = 50;
table_opts.block_align = false; // Doesn't work w/ compression.
// Setup the cache for assets.
const auto &cache_size(this->descriptor->cache_size);
@ -4305,17 +4309,32 @@ try
}
,_buffer_align
{
fs::block_size(fd)
opts.direct?
fs::block_size(fd):
size_t(0)
}
,logical_offset
{
!trunc?
fs::size(fd):
size_t(0)
}
,preallocation_block_size
{
_buffer_align?
_buffer_align:
ircd::info::page_size
}
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': opened wfile:%p fd:%d bs:%zu '%s'",
log, "'%s': opened wfile:%p fd:%d bs:%zu %s '%s'",
d->name,
this,
int(fd),
_buffer_align,
opts.direct? "DIRECT_IO"_sv : "BUFFERED"_sv,
name
};
#endif
@ -4335,6 +4354,8 @@ catch(const std::exception &e)
ircd::db::database::env::writable_file::~writable_file()
noexcept
{
this->Close();
#ifdef RB_DEBUG_DB_ENV
log::debug
{
@ -4351,7 +4372,10 @@ ircd::db::database::env::writable_file::Close()
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
std::unique_lock<decltype(mutex)> lock{mutex};
if(!fd)
return Status::OK();
#ifdef RB_DEBUG_DB_ENV
log::debug
@ -4363,7 +4387,10 @@ noexcept try
};
#endif
this->fd = fs::fd{};
if(opts.direct && logical_offset > 0)
_truncate(logical_offset);
fd = fs::fd{};
return Status::OK();
}
catch(const fs::error &e)
@ -4580,6 +4607,108 @@ catch(const std::exception &e)
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Truncate(uint64_t size)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"'%s': wfile:%p fd:%d truncate to %lu bytes",
d.name,
this,
int(fd),
size
};
#endif
_truncate(size);
return Status::OK();
}
catch(const fs::error &e)
{
log::error
{
log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s",
d.name,
this,
int(fd),
size,
e.what()
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::InvalidateCache(size_t offset,
size_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
#endif
//syscall(::posix_fadvise, fd, offset, length, FADV_DONTNEED);
return Status::OK();
}
catch(const fs::error &e)
{
log::error
{
log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
return error_to_status{e};
}
rocksdb::Status
ircd::db::database::env::writable_file::Append(const Slice &s)
noexcept try
@ -4590,21 +4719,31 @@ noexcept try
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p fd:%d append:%p bytes:%zu",
log, "'%s': wfile:%p fd:%d append:%p bytes:%zu logical_offset:%zu",
d.name,
this,
int(fd),
data(s),
size(s)
size(s),
logical_offset
};
#endif
fs::write_opts wopts;
wopts.priority = this->prio;
wopts.offset = logical_offset;
const const_buffer buf
{
data(s), size(s)
};
fs::append(fd, buf);
assert(!opts.direct || _buffer_align != 0);
assert(!_buffer_align || uintptr_t(data(buf)) % _buffer_align == 0);
assert(!_buffer_align || wopts.offset % _buffer_align == 0);
assert(!_buffer_align || size(buf) % _buffer_align == 0);
fs::write(fd, buf, wopts);
logical_offset += size(buf);
return Status::OK();
}
catch(const fs::error &e)
@ -4661,14 +4800,6 @@ noexcept try
assert(0);
return Status::NotSupported();
const const_buffer buf
{
data(s), size(s)
};
fs::append(fd, buf, offset);
return Status::OK();
}
catch(const fs::error &e)
{
@ -4704,7 +4835,8 @@ catch(const std::exception &e)
}
rocksdb::Status
ircd::db::database::env::writable_file::Truncate(uint64_t size)
ircd::db::database::env::writable_file::Allocate(uint64_t offset,
uint64_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
@ -4713,26 +4845,30 @@ noexcept try
#ifdef RB_DEBUG_DB_ENV
log::debug
{
"'%s': wfile:%p fd:%d truncate to %lu bytes",
log, "'%s': wfile:%p fd:%d allocate offset:%lu length:%lu%s%s",
d.name,
this,
int(fd),
size
offset,
length,
env_opts.fallocate_with_keep_size? " KEEP_SIZE" : "",
env_opts.allow_fallocate? "" : " (DISABLED)"
};
#endif
fs::truncate(fd, size);
_allocate(offset, length);
return Status::OK();
}
catch(const fs::error &e)
{
log::error
{
log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s",
log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
size,
offset,
length,
e.what()
};
@ -4742,50 +4878,156 @@ catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p fd:%d truncate to %lu bytes :%s",
log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
size,
offset,
length,
e.what()
};
return error_to_status{e};
}
bool
ircd::db::database::env::writable_file::IsSyncThreadSafe()
const noexcept try
{
return true;
}
catch(...)
{
return false;
}
void
ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio)
ircd::db::database::env::writable_file::PrepareWrite(size_t offset,
size_t length)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p set IO prio to %s",
log, "'%s': wfile:%p prepare write offset:%zu length:%zu",
d.name,
this,
reflect(prio)
offset,
length
};
#endif
this->prio = prio;
if(!env_opts.allow_fallocate)
return;
_allocate(offset, length);
}
rocksdb::Env::IOPriority
ircd::db::database::env::writable_file::GetIOPriority()
void
ircd::db::database::env::writable_file::_allocate(const size_t &offset,
const size_t &length)
{
const size_t first_block
{
offset / preallocation_block_size
};
const size_t last_block
{
(offset + length) / preallocation_block_size
};
const ssize_t missing_blocks
{
ssize_t(last_block) - preallocation_last_block
};
if(missing_blocks <= 0)
return;
const ssize_t start_block
{
preallocation_last_block + 1
};
const size_t allocate_offset
{
start_block * preallocation_block_size
};
const size_t allocate_length
{
missing_blocks * preallocation_block_size
};
fs::write_opts wopts;
wopts.offset = allocate_offset;
wopts.priority = this->prio;
wopts.keep_size = env_opts.fallocate_with_keep_size;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p fd:%d allocating %zd blocks after block:%zu offset:%lu length:%lu%s",
d.name,
this,
int(fd),
missing_blocks,
start_block,
allocate_offset,
allocate_length,
wopts.keep_size? " KEEP_SIZE" : ""
};
#endif
fs::allocate(fd, allocate_length, wopts);
this->preallocation_last_block = last_block;
}
void
ircd::db::database::env::writable_file::_truncate(const size_t &size)
{
fs::write_opts wopts;
wopts.priority = this->prio;
fs::truncate(fd, size, wopts);
this->logical_offset = size;
}
void
ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size,
size_t *const last_allocated_block)
noexcept
{
return prio;
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
*block_size = this->preallocation_block_size;
*last_allocated_block = this->preallocation_last_block;
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p get preallocation block_size(%p):%zu last_block(%p):%zu",
d.name,
this,
block_size,
*block_size,
last_allocated_block,
*last_allocated_block
};
#endif
}
void
ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p set preallocation block size:%zu",
d.name,
this,
size
};
#endif
this->preallocation_block_size = size;
}
uint64_t
@ -4805,7 +5047,14 @@ noexcept try
};
#endif
return fs::size(fd);
const auto &ret
{
logical_offset
};
assert(opts.direct || ret == fs::size(fd));
assert(!opts.direct || ret <= fs::size(fd));
return ret;
}
catch(const std::exception &e)
{
@ -4821,6 +5070,54 @@ catch(const std::exception &e)
return 0;
}
void
ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p IO priority %s",
d.name,
this,
reflect(prio)
};
#endif
this->prio = prio;
}
rocksdb::Env::IOPriority
ircd::db::database::env::writable_file::GetIOPriority()
noexcept
{
return prio;
}
void
ircd::db::database::env::writable_file::SetWriteLifeTimeHint(WriteLifeTimeHint hint)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p hint %s",
d.name,
this,
reflect(hint)
};
#endif
this->hint = hint;
}
rocksdb::Env::WriteLifeTimeHint
ircd::db::database::env::writable_file::GetWriteLifeTimeHint()
noexcept
{
return hint;
}
size_t
ircd::db::database::env::writable_file::GetUniqueId(char *const id,
size_t max_size)
@ -4860,174 +5157,15 @@ catch(const std::exception &e)
return 0;
}
rocksdb::Status
ircd::db::database::env::writable_file::InvalidateCache(size_t offset,
size_t length)
noexcept try
bool
ircd::db::database::env::writable_file::IsSyncThreadSafe()
const noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
#endif
//TODO: ircd::fs::
//syscall(::posix_fadvise, fd, offset, length, FADV_DONTNEED);
return Status::OK();
return true;
}
catch(const fs::error &e)
catch(...)
{
log::error
{
log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
d.name,
this,
int(fd),
offset,
length
};
return error_to_status{e};
}
void
ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size,
size_t *const last_allocated_block)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p get preallocation block_size:%p last_block:%p",
d.name,
this,
block_size,
last_allocated_block
};
#endif
*block_size = this->preallocation_block_size;
*last_allocated_block = this->preallocation_last_block;
}
void
ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size)
noexcept
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p set preallocation block size:%zu",
d.name,
this,
size
};
#endif
this->preallocation_block_size = size;
}
void
ircd::db::database::env::writable_file::PrepareWrite(size_t offset,
size_t length)
noexcept
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p prepare write offset:%zu length:%zu",
d.name,
this,
offset,
length
};
#endif
}
rocksdb::Status
ircd::db::database::env::writable_file::Allocate(uint64_t offset,
uint64_t length)
noexcept try
{
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p fd:%d allocate offset:%lu length:%lu%s",
d.name,
this,
int(fd),
offset,
length,
env_opts.fallocate_with_keep_size? " KEEP_SIZE" : ""
};
#endif
fs::write_opts wopts;
wopts.offset = offset;
wopts.keep_size = env_opts.fallocate_with_keep_size;
fs::allocate(fd, length, wopts);
this->preallocation_last_block = offset;
return Status::OK();
}
catch(const fs::error &e)
{
log::error
{
log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
offset,
length,
e.what()
};
return error_to_status{e};
}
catch(const std::exception &e)
{
log::critical
{
log, "'%s': wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
d.name,
this,
int(fd),
offset,
length,
e.what()
};
return error_to_status{e};
return false;
}
//