0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-02 18:18:56 +02:00

ircd::db: Skeleton stubs for basic environment.

This commit is contained in:
Jason Volk 2018-01-17 19:22:35 -08:00
parent 79447c504a
commit 625dc73978
3 changed files with 520 additions and 0 deletions

View file

@ -89,6 +89,7 @@ struct ircd::db::database
std::string name;
std::string path;
std::string optstr;
std::shared_ptr<struct env> env;
std::shared_ptr<struct logs> logs;
std::shared_ptr<struct stats> stats;
std::shared_ptr<struct events> events;

View file

@ -30,6 +30,66 @@
/// Internal environment hookup.
///
struct ircd::db::database::env
:rocksdb::Env
{
using Status = rocksdb::Status;
using EnvOptions = rocksdb::EnvOptions;
using Directory = rocksdb::Directory;
using FileLock = rocksdb::FileLock;
using WritableFile = rocksdb::WritableFile;
using SequentialFile = rocksdb::SequentialFile;
using RandomAccessFile = rocksdb::RandomAccessFile;
using RandomRWFile = rocksdb::RandomRWFile;
using Logger = rocksdb::Logger;
using ThreadStatus = rocksdb::ThreadStatus;
using ThreadStatusUpdater = rocksdb::ThreadStatusUpdater;
database &d;
Env &defaults
{
*rocksdb::Env::Default()
};
Status NewSequentialFile(const std::string& f, std::unique_ptr<SequentialFile>* r, const EnvOptions& options) final override;
Status NewRandomAccessFile(const std::string& f, std::unique_ptr<RandomAccessFile>* r, const EnvOptions& options) final override;
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r, const EnvOptions& options) final override;
Status ReopenWritableFile(const std::string& fname, std::unique_ptr<WritableFile>* result, const EnvOptions& options) final override;
Status ReuseWritableFile(const std::string& fname, const std::string& old_fname, std::unique_ptr<WritableFile>* r, const EnvOptions& options) final override;
Status NewRandomRWFile(const std::string& fname, std::unique_ptr<RandomRWFile>* result, const EnvOptions& options) final override;
Status NewDirectory(const std::string& name, std::unique_ptr<Directory>* result) final override;
Status FileExists(const std::string& f) final override;
Status GetChildren(const std::string& dir, std::vector<std::string>* r) final override;
Status GetChildrenFileAttributes(const std::string& dir, std::vector<FileAttributes>* result) final override;
Status DeleteFile(const std::string& f) final override;
Status CreateDir(const std::string& d) final override;
Status CreateDirIfMissing(const std::string& d) final override;
Status DeleteDir(const std::string& d) final override;
Status GetFileSize(const std::string& f, uint64_t* s) final override;
Status GetFileModificationTime(const std::string& fname, uint64_t* file_mtime) final override;
Status RenameFile(const std::string& s, const std::string& t) final override;
Status LinkFile(const std::string& s, const std::string& t) final override;
Status LockFile(const std::string& f, FileLock** l) final override;
Status UnlockFile(FileLock* l) final override;
void Schedule(void (*f)(void* arg), void* a, Priority pri, void* tag = nullptr, void (*u)(void* arg) = 0) final override;
int UnSchedule(void* tag, Priority pri) final override;
void StartThread(void (*f)(void*), void* a) final override;
void WaitForJoin() final override;
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const final override;
Status GetTestDirectory(std::string* path) final override;
Status NewLogger(const std::string& fname, std::shared_ptr<Logger>* result) final override;
uint64_t NowMicros() final override;
void SleepForMicroseconds(int micros) final override;
Status GetHostName(char* name, uint64_t len) final override;
Status GetCurrentTime(int64_t* unix_time) final override;
Status GetAbsolutePath(const std::string& db_path, std::string* output_path) final override;
void SetBackgroundThreads(int num, Priority pri) final override;
void IncBackgroundThreadsIfNeeded(int num, Priority pri) final override;
void LowerThreadPoolIOPriority(Priority pool = LOW) final override;
std::string TimeToString(uint64_t time) final override;
Status GetThreadList(std::vector<ThreadStatus>* thread_list) final override;
ThreadStatusUpdater* GetThreadStatusUpdater() const final override;
uint64_t GetThreadID() const final override;
env(database *const &d);
~env() noexcept;
};

View file

@ -50,6 +50,8 @@ namespace ircd::db
struct throw_on_error;
string_view reflect(const rocksdb::Env::Priority &p);
string_view reflect(const rocksdb::Env::IOPriority &p);
const std::string &reflect(const rocksdb::Tickers &);
const std::string &reflect(const rocksdb::Histograms &);
rocksdb::Slice slice(const string_view &);
@ -251,6 +253,10 @@ try
{
std::move(optstr)
}
,env
{
std::make_shared<struct env>(this)
}
,logs
{
std::make_shared<struct logs>(this)
@ -321,6 +327,9 @@ try
opts.max_file_opening_threads = 0;
//opts.use_fsync = true;
// Setup env
opts.env = env.get();
// Setup logging
logs->SetInfoLogLevel(ircd::debugmode? rocksdb::DEBUG_LEVEL : rocksdb::WARN_LEVEL);
opts.info_log_level = logs->GetInfoLogLevel();
@ -1154,6 +1163,430 @@ ircd::db::database::events::OnColumnFamilyHandleDeletionStarted(rocksdb::ColumnF
h);
}
///////////////////////////////////////////////////////////////////////////////
//
// database::env
//
ircd::db::database::env::env(database *const &d)
:d{*d}
{
}
ircd::db::database::env::~env()
noexcept
{
}
rocksdb::Status
ircd::db::database::env::NewSequentialFile(const std::string& name,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& options)
{
log.debug("'%s': new sequential file '%s' options:%p",
d.name,
name,
&options);
return defaults.NewSequentialFile(name, r, options);
}
rocksdb::Status
ircd::db::database::env::NewRandomAccessFile(const std::string& name,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& options)
{
log.debug("'%s': new random access file '%s' options:%p",
d.name,
name,
&options);
return defaults.NewRandomAccessFile(name, r, options);
}
rocksdb::Status
ircd::db::database::env::NewWritableFile(const std::string& name,
std::unique_ptr<WritableFile>* r,
const EnvOptions& options)
{
log.debug("'%s': new writable file '%s' options:%p",
d.name,
name,
&options);
return defaults.NewWritableFile(name, r, options);
}
rocksdb::Status
ircd::db::database::env::ReopenWritableFile(const std::string& name,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options)
{
log.debug("'%s': reopen writable file '%s' options:%p",
d.name,
name,
&options);
return defaults.ReopenWritableFile(name, result, options);
}
rocksdb::Status
ircd::db::database::env::ReuseWritableFile(const std::string& name,
const std::string& old_name,
std::unique_ptr<WritableFile>* r,
const EnvOptions& options)
{
log.debug("'%s': reuse writable file '%s' old '%s' options:%p",
d.name,
name,
old_name,
&options);
return defaults.ReuseWritableFile(name, old_name, r, options);
}
rocksdb::Status
ircd::db::database::env::NewRandomRWFile(const std::string& name,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options)
{
log.debug("'%s': new random read/write file '%s' options:%p",
d.name,
name,
&options);
return defaults.NewRandomRWFile(name, result, options);
}
rocksdb::Status
ircd::db::database::env::NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result)
{
log.debug("'%s': new directory '%s'",
d.name,
name);
return defaults.NewDirectory(name, result);
}
rocksdb::Status
ircd::db::database::env::FileExists(const std::string& f)
{
log.debug("'%s': file exists '%s'",
d.name,
f);
return defaults.FileExists(f);
}
rocksdb::Status
ircd::db::database::env::GetChildren(const std::string& dir,
std::vector<std::string>* r)
{
log.debug("'%s': get children of directory '%s'",
d.name,
dir);
return defaults.GetChildren(dir, r);
}
rocksdb::Status
ircd::db::database::env::GetChildrenFileAttributes(const std::string& dir,
std::vector<FileAttributes>* result)
{
log.debug("'%s': get children file attributes of directory '%s'",
d.name,
dir);
return defaults.GetChildrenFileAttributes(dir, result);
}
rocksdb::Status
ircd::db::database::env::DeleteFile(const std::string& name)
{
log.debug("'%s': delete file '%s'",
d.name,
name);
return defaults.DeleteFile(name);
}
rocksdb::Status
ircd::db::database::env::CreateDir(const std::string& name)
{
log.debug("'%s': create directory '%s'",
d.name,
d);
return defaults.CreateDir(name);
}
rocksdb::Status
ircd::db::database::env::CreateDirIfMissing(const std::string& name)
{
log.debug("'%s': create directory if missing '%s'",
d.name,
name);
return defaults.CreateDirIfMissing(name);
}
rocksdb::Status
ircd::db::database::env::DeleteDir(const std::string& name)
{
log.debug("'%s': delete directory '%s'",
d.name,
name);
return defaults.DeleteDir(name);
}
rocksdb::Status
ircd::db::database::env::GetFileSize(const std::string& name,
uint64_t* s)
{
log.debug("'%s': get file size '%s'",
d.name,
name);
return defaults.GetFileSize(name, s);
}
rocksdb::Status
ircd::db::database::env::GetFileModificationTime(const std::string& name,
uint64_t* file_mtime)
{
log.debug("'%s': get file mtime '%s'",
d.name,
name);
return defaults.GetFileModificationTime(name, file_mtime);
}
rocksdb::Status
ircd::db::database::env::RenameFile(const std::string& s,
const std::string& t)
{
log.debug("'%s': rename file '%s' to '%s'",
d.name,
s,
t);
return defaults.RenameFile(s, t);
}
rocksdb::Status
ircd::db::database::env::LinkFile(const std::string& s,
const std::string& t)
{
log.debug("'%s': link file '%s' to '%s'",
d.name,
s,
t);
return defaults.LinkFile(s, t);
}
rocksdb::Status
ircd::db::database::env::LockFile(const std::string& name,
FileLock** l)
{
log.debug("'%s': lock file '%s'",
d.name,
name);
return defaults.LockFile(name, l);
}
rocksdb::Status
ircd::db::database::env::UnlockFile(FileLock* l)
{
log.debug("'%s': unlock file lock:%p",
d.name,
l);
return defaults.UnlockFile(l);
}
void
ircd::db::database::env::Schedule(void (*f)(void* arg),
void* a,
Priority prio,
void* tag,
void (*u)(void* arg))
{
log.debug("'%s': schedule func:%p a:%p tag:%p u:%p prio:%s",
d.name,
f,
a,
tag,
u,
f,
reflect(prio));
return defaults.Schedule(f, a, prio, tag, u);
}
int
ircd::db::database::env::UnSchedule(void* tag,
Priority pri)
{
log.debug("'%s': unschedule tag:%p prio:%s",
d.name,
tag,
reflect(pri));
return defaults.UnSchedule(tag, pri);
}
void
ircd::db::database::env::StartThread(void (*f)(void*),
void* a)
{
log.debug("'%s': start thread func:%p a:%p",
d.name,
f,
a);
return defaults.StartThread(f, a);
}
void
ircd::db::database::env::WaitForJoin()
{
return defaults.WaitForJoin();
}
unsigned int
ircd::db::database::env::GetThreadPoolQueueLen(Priority pri)
const
{
log.debug("'%s': get thread pool queue len prio:%s",
d.name,
reflect(pri));
return defaults.GetThreadPoolQueueLen(pri);
}
rocksdb::Status
ircd::db::database::env::GetTestDirectory(std::string* path)
{
return defaults.GetTestDirectory(path);
}
rocksdb::Status
ircd::db::database::env::NewLogger(const std::string& name,
std::shared_ptr<Logger>* result)
{
return defaults.NewLogger(name, result);
}
uint64_t
ircd::db::database::env::NowMicros()
{
return defaults.NowMicros();
}
void
ircd::db::database::env::SleepForMicroseconds(int micros)
{
log.debug("'%s': sleep for %d microseconds",
d.name,
micros);
defaults.SleepForMicroseconds(micros);
}
rocksdb::Status
ircd::db::database::env::GetHostName(char* name,
uint64_t len)
{
log.debug("'%s': get host name name:%p len:%lu",
d.name,
name,
len);
return defaults.GetHostName(name, len);
}
rocksdb::Status
ircd::db::database::env::GetCurrentTime(int64_t* unix_time)
{
return defaults.GetCurrentTime(unix_time);
}
rocksdb::Status
ircd::db::database::env::GetAbsolutePath(const std::string& db_path,
std::string* output_path)
{
log.debug("'%s': get absolute path from '%s' ret:%p",
d.name,
db_path,
output_path);
return defaults.GetAbsolutePath(db_path, output_path);
}
void
ircd::db::database::env::SetBackgroundThreads(int num,
Priority pri)
{
log.debug("'%s': set background threads num:%d prio:%s",
d.name,
num,
reflect(pri));
return defaults.SetBackgroundThreads(num, pri);
}
void
ircd::db::database::env::IncBackgroundThreadsIfNeeded(int num,
Priority pri)
{
log.debug("'%s': increase background threads num:%d prio:%s",
d.name,
num,
reflect(pri));
return defaults.IncBackgroundThreadsIfNeeded(num, pri);
}
void
ircd::db::database::env::LowerThreadPoolIOPriority(Priority pool)
{
log.debug("'%s': lower thread pool priority prio:%s",
d.name,
reflect(pool));
defaults.LowerThreadPoolIOPriority(pool);
}
std::string
ircd::db::database::env::TimeToString(uint64_t time)
{
return defaults.TimeToString(time);
}
rocksdb::Status
ircd::db::database::env::GetThreadList(std::vector<ThreadStatus>* thread_list)
{
return defaults.GetThreadList(thread_list);
}
rocksdb::ThreadStatusUpdater*
ircd::db::database::env::GetThreadStatusUpdater()
const
{
return defaults.GetThreadStatusUpdater();
}
uint64_t
ircd::db::database::env::GetThreadID()
const
{
return defaults.GetThreadID();
}
///////////////////////////////////////////////////////////////////////////////
//
// db/iov.h
@ -3928,6 +4361,32 @@ ircd::db::reflect(const op &op)
return "?????";
}
ircd::string_view
ircd::db::reflect(const rocksdb::Env::Priority &p)
{
switch(p)
{
case rocksdb::Env::Priority::LOW: return "LOW"_sv;
case rocksdb::Env::Priority::HIGH: return "HIGH"_sv;
case rocksdb::Env::Priority::TOTAL: return "TOTAL"_sv;
}
return "????"_sv;
}
ircd::string_view
ircd::db::reflect(const rocksdb::Env::IOPriority &p)
{
switch(p)
{
case rocksdb::Env::IOPriority::IO_LOW: return "IO_LOW"_sv;
case rocksdb::Env::IOPriority::IO_HIGH: return "IO_HIGH"_sv;
case rocksdb::Env::IOPriority::IO_TOTAL: return "IO_TOTAL"_sv;
}
return "IO_????"_sv;
}
bool
ircd::db::value_required(const op &op)
{