From 87f0f3bc5a1252d44925ef12967f564f0d367c47 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 17 Jan 2018 20:17:00 -0800 Subject: [PATCH] ircd::db: Add additional environment stubs for writable_file; minor cleanup. --- include/ircd/db/database/env.h | 115 +++++++++++------ ircd/db.cc | 230 ++++++++++++++++++++++++++++++++- 2 files changed, 303 insertions(+), 42 deletions(-) diff --git a/include/ircd/db/database/env.h b/include/ircd/db/database/env.h index 67b0db59f..aa5047ecb 100644 --- a/include/ircd/db/database/env.h +++ b/include/ircd/db/database/env.h @@ -29,9 +29,11 @@ /// Internal environment hookup. /// -struct ircd::db::database::env +struct ircd::db::database::env final :rocksdb::Env { + struct writable_file; + using Status = rocksdb::Status; using EnvOptions = rocksdb::EnvOptions; using Directory = rocksdb::Directory; @@ -50,46 +52,79 @@ struct ircd::db::database::env *rocksdb::Env::Default() }; - Status NewSequentialFile(const std::string& f, std::unique_ptr* r, const EnvOptions& options) final override; - Status NewRandomAccessFile(const std::string& f, std::unique_ptr* r, const EnvOptions& options) final override; - Status NewWritableFile(const std::string& f, std::unique_ptr* r, const EnvOptions& options) final override; - Status ReopenWritableFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) final override; - Status ReuseWritableFile(const std::string& fname, const std::string& old_fname, std::unique_ptr* r, const EnvOptions& options) final override; - Status NewRandomRWFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) final override; - Status NewDirectory(const std::string& name, std::unique_ptr* result) final override; - Status FileExists(const std::string& f) final override; - Status GetChildren(const std::string& dir, std::vector* r) final override; - Status GetChildrenFileAttributes(const std::string& dir, std::vector* result) final override; - Status DeleteFile(const std::string& f) final override; - Status CreateDir(const std::string& d) final override; - Status CreateDirIfMissing(const std::string& d) final override; - Status DeleteDir(const std::string& d) final override; - Status GetFileSize(const std::string& f, uint64_t* s) final override; - Status GetFileModificationTime(const std::string& fname, uint64_t* file_mtime) final override; - Status RenameFile(const std::string& s, const std::string& t) final override; - Status LinkFile(const std::string& s, const std::string& t) final override; - Status LockFile(const std::string& f, FileLock** l) final override; - Status UnlockFile(FileLock* l) final override; - void Schedule(void (*f)(void* arg), void* a, Priority pri, void* tag = nullptr, void (*u)(void* arg) = 0) final override; - int UnSchedule(void* tag, Priority pri) final override; - void StartThread(void (*f)(void*), void* a) final override; - void WaitForJoin() final override; - unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const final override; - Status GetTestDirectory(std::string* path) final override; - Status NewLogger(const std::string& fname, std::shared_ptr* result) final override; - uint64_t NowMicros() final override; - void SleepForMicroseconds(int micros) final override; - Status GetHostName(char* name, uint64_t len) final override; - Status GetCurrentTime(int64_t* unix_time) final override; - Status GetAbsolutePath(const std::string& db_path, std::string* output_path) final override; - void SetBackgroundThreads(int num, Priority pri) final override; - void IncBackgroundThreadsIfNeeded(int num, Priority pri) final override; - void LowerThreadPoolIOPriority(Priority pool = LOW) final override; - std::string TimeToString(uint64_t time) final override; - Status GetThreadList(std::vector* thread_list) final override; - ThreadStatusUpdater* GetThreadStatusUpdater() const final override; - uint64_t GetThreadID() const final override; + Status NewSequentialFile(const std::string& f, std::unique_ptr* r, const EnvOptions& options) override; + Status NewRandomAccessFile(const std::string& f, std::unique_ptr* r, const EnvOptions& options) override; + Status NewWritableFile(const std::string& f, std::unique_ptr* r, const EnvOptions& options) override; + Status ReopenWritableFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) override; + Status ReuseWritableFile(const std::string& fname, const std::string& old_fname, std::unique_ptr* r, const EnvOptions& options) override; + Status NewRandomRWFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) override; + Status NewDirectory(const std::string& name, std::unique_ptr* result) override; + Status FileExists(const std::string& f) override; + Status GetChildren(const std::string& dir, std::vector* r) override; + Status GetChildrenFileAttributes(const std::string& dir, std::vector* result) override; + Status DeleteFile(const std::string& f) override; + Status CreateDir(const std::string& d) override; + Status CreateDirIfMissing(const std::string& d) override; + Status DeleteDir(const std::string& d) override; + Status GetFileSize(const std::string& f, uint64_t* s) override; + Status GetFileModificationTime(const std::string& fname, uint64_t* file_mtime) override; + Status RenameFile(const std::string& s, const std::string& t) override; + Status LinkFile(const std::string& s, const std::string& t) override; + Status LockFile(const std::string& f, FileLock** l) override; + Status UnlockFile(FileLock* l) override; + void Schedule(void (*f)(void* arg), void* a, Priority pri, void* tag = nullptr, void (*u)(void* arg) = 0) override; + int UnSchedule(void* tag, Priority pri) override; + void StartThread(void (*f)(void*), void* a) override; + void WaitForJoin() override; + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + Status GetTestDirectory(std::string* path) override; + Status NewLogger(const std::string& fname, std::shared_ptr* result) override; + uint64_t NowMicros() override; + void SleepForMicroseconds(int micros) override; + Status GetHostName(char* name, uint64_t len) override; + Status GetCurrentTime(int64_t* unix_time) override; + Status GetAbsolutePath(const std::string& db_path, std::string* output_path) override; + void SetBackgroundThreads(int num, Priority pri) override; + void IncBackgroundThreadsIfNeeded(int num, Priority pri) override; + void LowerThreadPoolIOPriority(Priority pool = LOW) override; + std::string TimeToString(uint64_t time) override; + Status GetThreadList(std::vector* thread_list) override; + ThreadStatusUpdater* GetThreadStatusUpdater() const override; + uint64_t GetThreadID() const override; env(database *const &d); ~env() noexcept; }; + +struct ircd::db::database::env::writable_file final +:rocksdb::WritableFile +{ + using Status = rocksdb::Status; + using Slice = rocksdb::Slice; + using IOPriority = rocksdb::Env::IOPriority; + + database &d; + std::unique_ptr defaults; + + Status Append(const Slice& data) override; + Status PositionedAppend(const Slice& data, uint64_t offset) override; + Status Truncate(uint64_t size) override; + Status Close() override; + Status Flush() override; + Status Sync() override; + Status Fsync() override; + bool IsSyncThreadSafe() const override; + void SetIOPriority(IOPriority pri) override; + IOPriority GetIOPriority() override; + uint64_t GetFileSize() override; + void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) override; + size_t GetUniqueId(char* id, size_t max_size) const override; + Status InvalidateCache(size_t offset, size_t length) override; + void SetPreallocationBlockSize(size_t size) override; + void PrepareWrite(size_t offset, size_t len) override; + Status Allocate(uint64_t offset, uint64_t len) override; + Status RangeSync(uint64_t offset, uint64_t nbytes) override; + + writable_file(database *const &d, const std::string &name, const EnvOptions &, std::unique_ptr defaults); + ~writable_file() noexcept; +}; diff --git a/ircd/db.cc b/ircd/db.cc index c11053c2a..92b048908 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -1168,6 +1168,10 @@ ircd::db::database::events::OnColumnFamilyHandleDeletionStarted(rocksdb::ColumnF // database::env // +// +// env +// + ircd::db::database::env::env(database *const &d) :d{*d} { @@ -1214,7 +1218,14 @@ ircd::db::database::env::NewWritableFile(const std::string& name, name, &options); - return defaults.NewWritableFile(name, r, options); + std::unique_ptr defaults; + const auto ret + { + this->defaults.NewWritableFile(name, &defaults, options) + }; + + *r = std::make_unique(&d, name, options, std::move(defaults)); + return ret; } rocksdb::Status @@ -1421,7 +1432,6 @@ ircd::db::database::env::Schedule(void (*f)(void* arg), a, tag, u, - f, reflect(prio)); return defaults.Schedule(f, a, prio, tag, u); @@ -1587,6 +1597,222 @@ const return defaults.GetThreadID(); } +// +// writable_file +// + +ircd::db::database::env::writable_file::writable_file(database *const &d, + const std::string &name, + const EnvOptions &opts, + std::unique_ptr defaults) +:d{*d} +,defaults{std::move(defaults)} +{ +} + +ircd::db::database::env::writable_file::~writable_file() +noexcept +{ +} + +rocksdb::Status +ircd::db::database::env::writable_file::Append(const Slice& s) +{ + log.debug("'%s': wfile:%p append:%p bytes:%zu", + d.name, + this, + data(s), + size(s)); + + return defaults->Append(s); +} + +rocksdb::Status +ircd::db::database::env::writable_file::PositionedAppend(const Slice& s, + uint64_t offset) +{ + log.debug("'%s': wfile:%p append:%p bytes:%zu offset:%lu", + d.name, + this, + data(s), + size(s), + offset); + + return defaults->PositionedAppend(s, offset); +} + +rocksdb::Status +ircd::db::database::env::writable_file::Truncate(uint64_t size) +{ + log.debug("'%s': wfile:%p truncate to %lu bytes", + d.name, + this, + size); + + return defaults->Truncate(size); +} + +rocksdb::Status +ircd::db::database::env::writable_file::Close() +{ + log.debug("'%s': wfile:%p close", + d.name, + this); + + return defaults->Close(); +} + +rocksdb::Status +ircd::db::database::env::writable_file::Flush() +{ + log.debug("'%s': wfile:%p flush", + d.name, + this); + + return defaults->Flush(); +} + +rocksdb::Status +ircd::db::database::env::writable_file::Sync() +{ + log.debug("'%s': wfile:%p sync", + d.name, + this); + + return defaults->Sync(); +} + +rocksdb::Status +ircd::db::database::env::writable_file::Fsync() +{ + log.debug("'%s': wfile:%p fsync", + d.name, + this); + + return defaults->Fsync(); +} + +bool +ircd::db::database::env::writable_file::IsSyncThreadSafe() +const +{ + return defaults->IsSyncThreadSafe(); +} + +void +ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio) +{ + log.debug("'%s': wfile:%p set IO prio to %s", + d.name, + this, + reflect(prio)); + + defaults->SetIOPriority(prio); +} + +rocksdb::Env::IOPriority +ircd::db::database::env::writable_file::GetIOPriority() +{ + return defaults->GetIOPriority(); +} + +uint64_t +ircd::db::database::env::writable_file::GetFileSize() +{ + return defaults->GetFileSize(); +} + +void +ircd::db::database::env::writable_file::GetPreallocationStatus(size_t* block_size, + size_t* last_allocated_block) +{ + log.debug("'%s': wfile:%p get preallocation block_size:%p last_block:%p", + d.name, + this, + block_size, + last_allocated_block); + + defaults->GetPreallocationStatus(block_size, last_allocated_block); +} + +size_t +ircd::db::database::env::writable_file::GetUniqueId(char* id, + size_t max_size) +const +{ + log.debug("'%s': wfile:%p get unique id:%p max_size:%zu", + d.name, + this, + id, + max_size); + + return defaults->GetUniqueId(id, max_size); +} + +rocksdb::Status +ircd::db::database::env::writable_file::InvalidateCache(size_t offset, + size_t length) +{ + log.debug("'%s': wfile:%p invalidate cache offset:%zu length:%zu", + d.name, + this, + offset, + length); + + return defaults->InvalidateCache(offset, length); +} + +void +ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size) +{ + log.debug("'%s': wfile:%p set preallocation block size:%zu", + d.name, + this, + size); + + defaults->SetPreallocationBlockSize(size); +} + +void +ircd::db::database::env::writable_file::PrepareWrite(size_t offset, + size_t length) +{ + log.debug("'%s': wfile:%p prepare write offset:%zu length:%zu", + d.name, + this, + offset, + length); + + defaults->PrepareWrite(offset, length); +} + +rocksdb::Status +ircd::db::database::env::writable_file::Allocate(uint64_t offset, + uint64_t length) +{ + log.debug("'%s': wfile:%p allocate offset:%lu length:%lu", + d.name, + this, + offset, + length); + + return defaults->Allocate(offset, length); +} + +rocksdb::Status +ircd::db::database::env::writable_file::RangeSync(uint64_t offset, + uint64_t length) +{ + log.debug("'%s': wfile:%p range sync offset:%lu length:%lu", + d.name, + this, + offset, + length); + + return defaults->RangeSync(offset, length); +} + + /////////////////////////////////////////////////////////////////////////////// // // db/iov.h