0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 10:12:39 +01:00

ircd::db: Add additional environment stubs for writable_file; minor cleanup.

This commit is contained in:
Jason Volk 2018-01-17 20:17:00 -08:00
parent 48c798de3f
commit 87f0f3bc5a
2 changed files with 303 additions and 42 deletions

View file

@ -29,9 +29,11 @@
/// Internal environment hookup.
///
struct ircd::db::database::env
struct ircd::db::database::env final
:rocksdb::Env
{
struct writable_file;
using Status = rocksdb::Status;
using EnvOptions = rocksdb::EnvOptions;
using Directory = rocksdb::Directory;
@ -50,46 +52,79 @@ struct ircd::db::database::env
*rocksdb::Env::Default()
};
Status NewSequentialFile(const std::string& f, std::unique_ptr<SequentialFile>* r, const EnvOptions& options) final override;
Status NewRandomAccessFile(const std::string& f, std::unique_ptr<RandomAccessFile>* r, const EnvOptions& options) final override;
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r, const EnvOptions& options) final override;
Status ReopenWritableFile(const std::string& fname, std::unique_ptr<WritableFile>* result, const EnvOptions& options) final override;
Status ReuseWritableFile(const std::string& fname, const std::string& old_fname, std::unique_ptr<WritableFile>* r, const EnvOptions& options) final override;
Status NewRandomRWFile(const std::string& fname, std::unique_ptr<RandomRWFile>* result, const EnvOptions& options) final override;
Status NewDirectory(const std::string& name, std::unique_ptr<Directory>* result) final override;
Status FileExists(const std::string& f) final override;
Status GetChildren(const std::string& dir, std::vector<std::string>* r) final override;
Status GetChildrenFileAttributes(const std::string& dir, std::vector<FileAttributes>* result) final override;
Status DeleteFile(const std::string& f) final override;
Status CreateDir(const std::string& d) final override;
Status CreateDirIfMissing(const std::string& d) final override;
Status DeleteDir(const std::string& d) final override;
Status GetFileSize(const std::string& f, uint64_t* s) final override;
Status GetFileModificationTime(const std::string& fname, uint64_t* file_mtime) final override;
Status RenameFile(const std::string& s, const std::string& t) final override;
Status LinkFile(const std::string& s, const std::string& t) final override;
Status LockFile(const std::string& f, FileLock** l) final override;
Status UnlockFile(FileLock* l) final override;
void Schedule(void (*f)(void* arg), void* a, Priority pri, void* tag = nullptr, void (*u)(void* arg) = 0) final override;
int UnSchedule(void* tag, Priority pri) final override;
void StartThread(void (*f)(void*), void* a) final override;
void WaitForJoin() final override;
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const final override;
Status GetTestDirectory(std::string* path) final override;
Status NewLogger(const std::string& fname, std::shared_ptr<Logger>* result) final override;
uint64_t NowMicros() final override;
void SleepForMicroseconds(int micros) final override;
Status GetHostName(char* name, uint64_t len) final override;
Status GetCurrentTime(int64_t* unix_time) final override;
Status GetAbsolutePath(const std::string& db_path, std::string* output_path) final override;
void SetBackgroundThreads(int num, Priority pri) final override;
void IncBackgroundThreadsIfNeeded(int num, Priority pri) final override;
void LowerThreadPoolIOPriority(Priority pool = LOW) final override;
std::string TimeToString(uint64_t time) final override;
Status GetThreadList(std::vector<ThreadStatus>* thread_list) final override;
ThreadStatusUpdater* GetThreadStatusUpdater() const final override;
uint64_t GetThreadID() const final override;
Status NewSequentialFile(const std::string& f, std::unique_ptr<SequentialFile>* r, const EnvOptions& options) override;
Status NewRandomAccessFile(const std::string& f, std::unique_ptr<RandomAccessFile>* r, const EnvOptions& options) override;
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r, const EnvOptions& options) override;
Status ReopenWritableFile(const std::string& fname, std::unique_ptr<WritableFile>* result, const EnvOptions& options) override;
Status ReuseWritableFile(const std::string& fname, const std::string& old_fname, std::unique_ptr<WritableFile>* r, const EnvOptions& options) override;
Status NewRandomRWFile(const std::string& fname, std::unique_ptr<RandomRWFile>* result, const EnvOptions& options) override;
Status NewDirectory(const std::string& name, std::unique_ptr<Directory>* result) override;
Status FileExists(const std::string& f) override;
Status GetChildren(const std::string& dir, std::vector<std::string>* r) override;
Status GetChildrenFileAttributes(const std::string& dir, std::vector<FileAttributes>* result) override;
Status DeleteFile(const std::string& f) override;
Status CreateDir(const std::string& d) override;
Status CreateDirIfMissing(const std::string& d) override;
Status DeleteDir(const std::string& d) override;
Status GetFileSize(const std::string& f, uint64_t* s) override;
Status GetFileModificationTime(const std::string& fname, uint64_t* file_mtime) override;
Status RenameFile(const std::string& s, const std::string& t) override;
Status LinkFile(const std::string& s, const std::string& t) override;
Status LockFile(const std::string& f, FileLock** l) override;
Status UnlockFile(FileLock* l) override;
void Schedule(void (*f)(void* arg), void* a, Priority pri, void* tag = nullptr, void (*u)(void* arg) = 0) override;
int UnSchedule(void* tag, Priority pri) override;
void StartThread(void (*f)(void*), void* a) override;
void WaitForJoin() override;
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
Status GetTestDirectory(std::string* path) override;
Status NewLogger(const std::string& fname, std::shared_ptr<Logger>* result) override;
uint64_t NowMicros() override;
void SleepForMicroseconds(int micros) override;
Status GetHostName(char* name, uint64_t len) override;
Status GetCurrentTime(int64_t* unix_time) override;
Status GetAbsolutePath(const std::string& db_path, std::string* output_path) override;
void SetBackgroundThreads(int num, Priority pri) override;
void IncBackgroundThreadsIfNeeded(int num, Priority pri) override;
void LowerThreadPoolIOPriority(Priority pool = LOW) override;
std::string TimeToString(uint64_t time) override;
Status GetThreadList(std::vector<ThreadStatus>* thread_list) override;
ThreadStatusUpdater* GetThreadStatusUpdater() const override;
uint64_t GetThreadID() const override;
env(database *const &d);
~env() noexcept;
};
struct ircd::db::database::env::writable_file final
:rocksdb::WritableFile
{
using Status = rocksdb::Status;
using Slice = rocksdb::Slice;
using IOPriority = rocksdb::Env::IOPriority;
database &d;
std::unique_ptr<rocksdb::WritableFile> defaults;
Status Append(const Slice& data) override;
Status PositionedAppend(const Slice& data, uint64_t offset) override;
Status Truncate(uint64_t size) override;
Status Close() override;
Status Flush() override;
Status Sync() override;
Status Fsync() override;
bool IsSyncThreadSafe() const override;
void SetIOPriority(IOPriority pri) override;
IOPriority GetIOPriority() override;
uint64_t GetFileSize() override;
void GetPreallocationStatus(size_t* block_size, size_t* last_allocated_block) override;
size_t GetUniqueId(char* id, size_t max_size) const override;
Status InvalidateCache(size_t offset, size_t length) override;
void SetPreallocationBlockSize(size_t size) override;
void PrepareWrite(size_t offset, size_t len) override;
Status Allocate(uint64_t offset, uint64_t len) override;
Status RangeSync(uint64_t offset, uint64_t nbytes) override;
writable_file(database *const &d, const std::string &name, const EnvOptions &, std::unique_ptr<WritableFile> defaults);
~writable_file() noexcept;
};

View file

@ -1168,6 +1168,10 @@ ircd::db::database::events::OnColumnFamilyHandleDeletionStarted(rocksdb::ColumnF
// database::env
//
//
// env
//
ircd::db::database::env::env(database *const &d)
:d{*d}
{
@ -1214,7 +1218,14 @@ ircd::db::database::env::NewWritableFile(const std::string& name,
name,
&options);
return defaults.NewWritableFile(name, r, options);
std::unique_ptr<WritableFile> defaults;
const auto ret
{
this->defaults.NewWritableFile(name, &defaults, options)
};
*r = std::make_unique<writable_file>(&d, name, options, std::move(defaults));
return ret;
}
rocksdb::Status
@ -1421,7 +1432,6 @@ ircd::db::database::env::Schedule(void (*f)(void* arg),
a,
tag,
u,
f,
reflect(prio));
return defaults.Schedule(f, a, prio, tag, u);
@ -1587,6 +1597,222 @@ const
return defaults.GetThreadID();
}
//
// writable_file
//
ircd::db::database::env::writable_file::writable_file(database *const &d,
const std::string &name,
const EnvOptions &opts,
std::unique_ptr<WritableFile> defaults)
:d{*d}
,defaults{std::move(defaults)}
{
}
ircd::db::database::env::writable_file::~writable_file()
noexcept
{
}
rocksdb::Status
ircd::db::database::env::writable_file::Append(const Slice& s)
{
log.debug("'%s': wfile:%p append:%p bytes:%zu",
d.name,
this,
data(s),
size(s));
return defaults->Append(s);
}
rocksdb::Status
ircd::db::database::env::writable_file::PositionedAppend(const Slice& s,
uint64_t offset)
{
log.debug("'%s': wfile:%p append:%p bytes:%zu offset:%lu",
d.name,
this,
data(s),
size(s),
offset);
return defaults->PositionedAppend(s, offset);
}
rocksdb::Status
ircd::db::database::env::writable_file::Truncate(uint64_t size)
{
log.debug("'%s': wfile:%p truncate to %lu bytes",
d.name,
this,
size);
return defaults->Truncate(size);
}
rocksdb::Status
ircd::db::database::env::writable_file::Close()
{
log.debug("'%s': wfile:%p close",
d.name,
this);
return defaults->Close();
}
rocksdb::Status
ircd::db::database::env::writable_file::Flush()
{
log.debug("'%s': wfile:%p flush",
d.name,
this);
return defaults->Flush();
}
rocksdb::Status
ircd::db::database::env::writable_file::Sync()
{
log.debug("'%s': wfile:%p sync",
d.name,
this);
return defaults->Sync();
}
rocksdb::Status
ircd::db::database::env::writable_file::Fsync()
{
log.debug("'%s': wfile:%p fsync",
d.name,
this);
return defaults->Fsync();
}
bool
ircd::db::database::env::writable_file::IsSyncThreadSafe()
const
{
return defaults->IsSyncThreadSafe();
}
void
ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio)
{
log.debug("'%s': wfile:%p set IO prio to %s",
d.name,
this,
reflect(prio));
defaults->SetIOPriority(prio);
}
rocksdb::Env::IOPriority
ircd::db::database::env::writable_file::GetIOPriority()
{
return defaults->GetIOPriority();
}
uint64_t
ircd::db::database::env::writable_file::GetFileSize()
{
return defaults->GetFileSize();
}
void
ircd::db::database::env::writable_file::GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block)
{
log.debug("'%s': wfile:%p get preallocation block_size:%p last_block:%p",
d.name,
this,
block_size,
last_allocated_block);
defaults->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t
ircd::db::database::env::writable_file::GetUniqueId(char* id,
size_t max_size)
const
{
log.debug("'%s': wfile:%p get unique id:%p max_size:%zu",
d.name,
this,
id,
max_size);
return defaults->GetUniqueId(id, max_size);
}
rocksdb::Status
ircd::db::database::env::writable_file::InvalidateCache(size_t offset,
size_t length)
{
log.debug("'%s': wfile:%p invalidate cache offset:%zu length:%zu",
d.name,
this,
offset,
length);
return defaults->InvalidateCache(offset, length);
}
void
ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size)
{
log.debug("'%s': wfile:%p set preallocation block size:%zu",
d.name,
this,
size);
defaults->SetPreallocationBlockSize(size);
}
void
ircd::db::database::env::writable_file::PrepareWrite(size_t offset,
size_t length)
{
log.debug("'%s': wfile:%p prepare write offset:%zu length:%zu",
d.name,
this,
offset,
length);
defaults->PrepareWrite(offset, length);
}
rocksdb::Status
ircd::db::database::env::writable_file::Allocate(uint64_t offset,
uint64_t length)
{
log.debug("'%s': wfile:%p allocate offset:%lu length:%lu",
d.name,
this,
offset,
length);
return defaults->Allocate(offset, length);
}
rocksdb::Status
ircd::db::database::env::writable_file::RangeSync(uint64_t offset,
uint64_t length)
{
log.debug("'%s': wfile:%p range sync offset:%lu length:%lu",
d.name,
this,
offset,
length);
return defaults->RangeSync(offset, length);
}
///////////////////////////////////////////////////////////////////////////////
//
// db/iov.h