mirror of
https://github.com/matrix-construct/construct
synced 2024-11-30 10:42:47 +01:00
4160 lines
71 KiB
C++
4160 lines
71 KiB
C++
// Matrix Construct
|
|
//
|
|
// Copyright (C) Matrix Construct Developers, Authors & Contributors
|
|
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
|
|
//
|
|
// Permission to use, copy, modify, and/or distribute this software for any
|
|
// purpose with or without fee is hereby granted, provided that the above
|
|
// copyright notice and this permission notice is present in all copies. The
|
|
// full license for this software is available in the LICENSE file.
|
|
|
|
#include <RB_INC_FCNTL_H
|
|
#include "db.h"
|
|
|
|
decltype(ircd::db::database::env::log)
|
|
ircd::db::database::env::log
|
|
{
|
|
"db.env"
|
|
};
|
|
|
|
//
|
|
// env::env
|
|
//
|
|
|
|
ircd::db::database::env::env(database *const &d)
|
|
:d{*d},
|
|
st{std::make_unique<state>(d)}
|
|
{
|
|
}
|
|
|
|
ircd::db::database::env::~env()
|
|
noexcept
|
|
{
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::NewSequentialFile(const std::string &name,
|
|
std::unique_ptr<SequentialFile> *const r,
|
|
const EnvOptions &options)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] new sequential file '%s' options:%p [mm:%b direct:%b bufsz:%zu readahead:%zu]",
|
|
d.name,
|
|
name,
|
|
&options,
|
|
options.use_mmap_reads,
|
|
options.use_direct_reads,
|
|
options.random_access_max_buffer_size,
|
|
options.compaction_readahead_size,
|
|
};
|
|
#endif
|
|
|
|
*r = std::make_unique<sequential_file>(&d, name, options);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::NewRandomAccessFile(const std::string &name,
|
|
std::unique_ptr<RandomAccessFile> *const r,
|
|
const EnvOptions &options)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] new random access file '%s' options:%p [mm:%b direct:%b bufsz:%zu readahead:%zu]",
|
|
d.name,
|
|
name,
|
|
&options,
|
|
options.use_mmap_reads,
|
|
options.use_direct_reads,
|
|
options.random_access_max_buffer_size,
|
|
options.compaction_readahead_size,
|
|
};
|
|
#endif
|
|
|
|
*r = std::make_unique<random_access_file>(&d, name, options);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::NewWritableFile(const std::string &name,
|
|
std::unique_ptr<WritableFile> *const r,
|
|
const EnvOptions &options)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] new writable file '%s' options:%p [mm:%b direct:%b rl:%p bufsz:%zu syncsz:%zu]",
|
|
d.name,
|
|
name,
|
|
&options,
|
|
options.use_mmap_writes,
|
|
options.use_direct_writes,
|
|
options.rate_limiter,
|
|
options.writable_file_max_buffer_size,
|
|
options.bytes_per_sync,
|
|
};
|
|
#endif
|
|
|
|
if(options.use_direct_writes)
|
|
*r = std::make_unique<writable_file_direct>(&d, name, options, true);
|
|
else
|
|
*r = std::make_unique<writable_file>(&d, name, options, true);
|
|
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::ReopenWritableFile(const std::string &name,
|
|
std::unique_ptr<WritableFile> *const r,
|
|
const EnvOptions &options)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] reopen writable file '%s' options:%p",
|
|
d.name,
|
|
name,
|
|
&options
|
|
};
|
|
#endif
|
|
|
|
if(options.use_direct_writes)
|
|
*r = std::make_unique<writable_file_direct>(&d, name, options, false);
|
|
else
|
|
*r = std::make_unique<writable_file>(&d, name, options, false);
|
|
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
__attribute__((unused))
|
|
ircd::db::database::env::ReuseWritableFile(const std::string &name,
|
|
const std::string &old_name,
|
|
std::unique_ptr<WritableFile> *const r,
|
|
const EnvOptions &options)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] reuse writable file '%s' old '%s' options:%p",
|
|
d.name,
|
|
name,
|
|
old_name,
|
|
&options
|
|
};
|
|
#endif
|
|
|
|
throw ircd::not_implemented
|
|
{
|
|
"[%s] ReuseWritableFile(name:'%s' old:'%s')",
|
|
d.name,
|
|
name,
|
|
old_name
|
|
};
|
|
|
|
return Status::OK();
|
|
//return defaults.ReuseWritableFile(name, old_name, r, options);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::NewRandomRWFile(const std::string &name,
|
|
std::unique_ptr<RandomRWFile> *const result,
|
|
const EnvOptions &options)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] new random read/write file '%s' options:%p",
|
|
d.name,
|
|
name,
|
|
&options
|
|
};
|
|
#endif
|
|
|
|
*result = std::make_unique<random_rw_file>(&d, name, options);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::NewDirectory(const std::string &name,
|
|
std::unique_ptr<Directory> *const result)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] new directory '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
std::unique_ptr<Directory> defaults;
|
|
const auto ret
|
|
{
|
|
this->defaults.NewDirectory(name, &defaults)
|
|
};
|
|
|
|
*result = std::make_unique<directory>(&d, name, std::move(defaults));
|
|
return ret;
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::FileExists(const std::string &f)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] file exists '%s'",
|
|
d.name,
|
|
f
|
|
};
|
|
#endif
|
|
|
|
return defaults.FileExists(f);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetChildren(const std::string &dir,
|
|
std::vector<std::string> *const r)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get children of directory '%s'",
|
|
d.name,
|
|
dir
|
|
};
|
|
#endif
|
|
|
|
return defaults.GetChildren(dir, r);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetChildrenFileAttributes(const std::string &dir,
|
|
std::vector<FileAttributes> *const result)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get children file attributes of directory '%s'",
|
|
d.name,
|
|
dir
|
|
};
|
|
#endif
|
|
|
|
return defaults.GetChildrenFileAttributes(dir, result);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::DeleteFile(const std::string &name)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] delete file '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
return defaults.DeleteFile(name);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::CreateDir(const std::string &name)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] create directory '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
return defaults.CreateDir(name);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::CreateDirIfMissing(const std::string &name)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] create directory if missing '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
return defaults.CreateDirIfMissing(name);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::DeleteDir(const std::string &name)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] delete directory '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
return defaults.DeleteDir(name);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetFileSize(const std::string &name,
|
|
uint64_t *const s)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get file size '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
assert(s);
|
|
*s = fs::size(name);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetFileModificationTime(const std::string &name,
|
|
uint64_t *const file_mtime)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get file mtime '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
return defaults.GetFileModificationTime(name, file_mtime);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::RenameFile(const std::string &s,
|
|
const std::string &t)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rename file '%s' to '%s'",
|
|
d.name,
|
|
s,
|
|
t
|
|
};
|
|
#endif
|
|
|
|
return defaults.RenameFile(s, t);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::LinkFile(const std::string &s,
|
|
const std::string &t)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] link file '%s' to '%s'",
|
|
d.name,
|
|
s,
|
|
t
|
|
};
|
|
#endif
|
|
|
|
return defaults.LinkFile(s, t);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::LockFile(const std::string &name,
|
|
FileLock** l)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] lock file '%s'",
|
|
d.name,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
return defaults.LockFile(name, l);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::UnlockFile(FileLock *const l)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] unlock file lock:%p",
|
|
d.name,
|
|
l
|
|
};
|
|
#endif
|
|
|
|
return defaults.UnlockFile(l);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetTestDirectory(std::string *const path)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
return defaults.GetTestDirectory(path);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetAbsolutePath(const std::string &db_path,
|
|
std::string *const output_path)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get absolute path from '%s' ret:%p",
|
|
d.name,
|
|
db_path,
|
|
output_path
|
|
};
|
|
#endif
|
|
|
|
return defaults.GetAbsolutePath(db_path, output_path);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::NewLogger(const std::string &name,
|
|
std::shared_ptr<Logger> *const result)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] new logger '%s' result:%p",
|
|
d.name,
|
|
name,
|
|
(const void *)result
|
|
};
|
|
#endif
|
|
|
|
return defaults.NewLogger(name, result);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetHostName(char *const name,
|
|
uint64_t len)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get host name name:%p len:%lu",
|
|
d.name,
|
|
name,
|
|
len
|
|
};
|
|
#endif
|
|
|
|
return defaults.GetHostName(name, len);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
uint64_t
|
|
ircd::db::database::env::NowMicros()
|
|
noexcept
|
|
{
|
|
return ircd::time<microseconds>();
|
|
}
|
|
|
|
/// Bug in rocksdb causes this to not be called all the time, specifically
|
|
/// for perf events.
|
|
uint64_t
|
|
ircd::db::database::env::NowNanos()
|
|
noexcept
|
|
{
|
|
return ircd::now<nanoseconds>().count();
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetCurrentTime(int64_t *const unix_time)
|
|
noexcept try
|
|
{
|
|
return defaults.GetCurrentTime(unix_time);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
|
|
std::string
|
|
ircd::db::database::env::TimeToString(uint64_t time)
|
|
noexcept try
|
|
{
|
|
return defaults.TimeToString(time);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
throw panic
|
|
{
|
|
"[%s] time to string :%s",
|
|
d.name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::SleepForMicroseconds(int micros)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] sleep for %d microseconds",
|
|
d.name,
|
|
micros
|
|
};
|
|
#endif
|
|
|
|
ctx::sleep(microseconds(micros));
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] sleep micros:%d :%s",
|
|
d.name,
|
|
micros,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::Schedule(void (*f)(void* arg),
|
|
void *const a,
|
|
Priority prio,
|
|
void *const tag,
|
|
void (*u)(void* arg))
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
//#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] schedule func:%p a:%p tag:%p u:%p prio:%s",
|
|
d.name,
|
|
f,
|
|
a,
|
|
tag,
|
|
u,
|
|
reflect(prio)
|
|
};
|
|
//#endif
|
|
|
|
assert(st);
|
|
auto &pool
|
|
{
|
|
*st->pool.at(prio)
|
|
};
|
|
|
|
pool(state::task
|
|
{
|
|
f, u, a
|
|
});
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] schedule func:%p a:%p tag:%p u:%p prio:%s",
|
|
d.name,
|
|
f,
|
|
a,
|
|
tag,
|
|
u,
|
|
reflect(prio)
|
|
};
|
|
}
|
|
|
|
int
|
|
ircd::db::database::env::UnSchedule(void *const tag,
|
|
const Priority prio)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
//#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] unschedule tag:%p prio:%s",
|
|
d.name,
|
|
tag,
|
|
reflect(prio)
|
|
};
|
|
//#endif
|
|
|
|
assert(st);
|
|
auto &pool
|
|
{
|
|
*st->pool.at(prio)
|
|
};
|
|
|
|
return pool.cancel(tag);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] unschedule tag:%p prio:%s :%s",
|
|
d.name,
|
|
tag,
|
|
reflect(prio),
|
|
e.what()
|
|
};
|
|
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::StartThread(void (*f)(void*),
|
|
void *const a)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
//#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] start thread func:%p a:%p",
|
|
d.name,
|
|
f,
|
|
a
|
|
};
|
|
//#endif
|
|
|
|
throw ircd::not_implemented
|
|
{
|
|
"Independent (non-pool) context spawning not yet implemented"
|
|
};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] start thread :%s",
|
|
d.name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::WaitForJoin()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wait for all ctx to join",
|
|
d.name
|
|
};
|
|
#endif
|
|
|
|
assert(st);
|
|
for(auto &pool : st->pool)
|
|
if(pool)
|
|
pool->join();
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wait for join :%s",
|
|
d.name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
unsigned int
|
|
ircd::db::database::env::GetThreadPoolQueueLen(Priority prio)
|
|
const noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get thread pool queue len prio:%s",
|
|
d.name,
|
|
reflect(prio)
|
|
};
|
|
#endif
|
|
|
|
assert(st);
|
|
const auto &pool
|
|
{
|
|
*st->pool.at(prio)
|
|
};
|
|
|
|
return pool.tasks.size();
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
throw panic
|
|
{
|
|
"[%s] set background threads :%s",
|
|
d.name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::SetBackgroundThreads(int num,
|
|
Priority prio)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] set background threads prio:%s num:%d",
|
|
d.name,
|
|
reflect(prio),
|
|
num
|
|
};
|
|
#endif
|
|
|
|
assert(st);
|
|
auto &pool
|
|
{
|
|
*st->pool.at(prio)
|
|
};
|
|
|
|
pool.p.set(num);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] set background threads prio:%s num:%d :%s",
|
|
d.name,
|
|
reflect(prio),
|
|
num,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::IncBackgroundThreadsIfNeeded(int num,
|
|
Priority prio)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] increase background threads num:%d prio:%s",
|
|
d.name,
|
|
num,
|
|
reflect(prio)
|
|
};
|
|
#endif
|
|
|
|
assert(st);
|
|
auto &pool
|
|
{
|
|
*st->pool.at(prio)
|
|
};
|
|
|
|
pool.p.add(num);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] inc background threads num:%d prio:%s :%s",
|
|
d.name,
|
|
num,
|
|
reflect(prio),
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::LowerThreadPoolIOPriority(Priority prio)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] lower thread pool priority prio:%s",
|
|
d.name,
|
|
reflect(prio)
|
|
};
|
|
#endif
|
|
|
|
assert(st);
|
|
auto &pool
|
|
{
|
|
*st->pool.at(prio)
|
|
};
|
|
|
|
switch(pool.iopri)
|
|
{
|
|
case IOPriority::IO_HIGH:
|
|
pool.iopri = IOPriority::IO_LOW;
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] lower thread pool IO priority pool:%s :%s",
|
|
d.name,
|
|
reflect(prio),
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::GetThreadList(std::vector<ThreadStatus> *const list)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get thread list %p (%zu)",
|
|
d.name,
|
|
list,
|
|
list? list->size() : 0UL
|
|
};
|
|
#endif
|
|
|
|
throw ircd::not_implemented
|
|
{
|
|
"[%s] GetThreadList()", d.name
|
|
};
|
|
|
|
return defaults.GetThreadList(list);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] get thread list:%p :%s",
|
|
d.name,
|
|
list,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::ThreadStatusUpdater *
|
|
ircd::db::database::env::GetThreadStatusUpdater()
|
|
const noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get thread status updater",
|
|
d.name,
|
|
};
|
|
#endif
|
|
|
|
return defaults.GetThreadStatusUpdater();
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] get thread status updater :%s",
|
|
d.name,
|
|
e.what()
|
|
};
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
|
|
uint64_t
|
|
ircd::db::database::env::GetThreadID()
|
|
const noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get thread ID",
|
|
d.name,
|
|
};
|
|
#endif
|
|
|
|
return ctx::this_ctx::id();
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
throw panic
|
|
{
|
|
"[%s] get thread id :%s",
|
|
d.name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
int
|
|
ircd::db::database::env::GetBackgroundThreads(Priority prio)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] get background threads prio:%s",
|
|
d.name,
|
|
reflect(prio)
|
|
};
|
|
#endif
|
|
|
|
assert(st);
|
|
const auto &pool
|
|
{
|
|
*st->pool.at(prio)
|
|
};
|
|
|
|
return pool.p.size();
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] get background threads prio:%s :%s",
|
|
d.name,
|
|
reflect(prio),
|
|
e.what()
|
|
};
|
|
|
|
return 0;
|
|
}
|
|
|
|
//
|
|
// writable_file
|
|
//
|
|
|
|
ircd::db::database::env::writable_file::writable_file(database *const &d,
|
|
const std::string &name,
|
|
const EnvOptions &env_opts,
|
|
const bool &trunc)
|
|
try
|
|
:d
|
|
{
|
|
*d
|
|
}
|
|
,env_opts
|
|
{
|
|
env_opts
|
|
}
|
|
,opts{[this, &trunc]
|
|
{
|
|
fs::fd::opts ret
|
|
{
|
|
std::ios::out |
|
|
(trunc? std::ios::trunc : std::ios::openmode(0))
|
|
};
|
|
|
|
ret.direct = this->env_opts.use_direct_writes;
|
|
ret.cloexec = this->env_opts.set_fd_cloexec;
|
|
return ret;
|
|
}()}
|
|
,fd
|
|
{
|
|
name, this->opts
|
|
}
|
|
,preallocation_block_size
|
|
{
|
|
ircd::info::page_size
|
|
}
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] opened wfile:%p fd:%d '%s'",
|
|
d->name,
|
|
this,
|
|
int(fd),
|
|
name
|
|
};
|
|
#endif
|
|
|
|
// Workaround a RocksDB bug which doesn't propagate EnvOptions properly
|
|
// on some constructions of WritableFile early on during db open. We'll
|
|
// get an env_opts.allow_fallocate==true here while it should be false
|
|
// from the DBOptions at d->opts. We use &= so it's not set to true when
|
|
// the caller specifically wants it false just for them.
|
|
assert(d && d->opts);
|
|
this->env_opts.allow_fallocate &= d->opts->allow_fallocate;
|
|
//assert(env_opts.allow_fallocate == d->opts->allow_fallocate);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] opening wfile:%p `%s' :%s",
|
|
d->name,
|
|
this,
|
|
name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
ircd::db::database::env::writable_file::~writable_file()
|
|
noexcept
|
|
{
|
|
Close();
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] closed wfile:%p fd:%d",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::Close()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
if(!fd)
|
|
return Status::OK();
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d close",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
|
|
fd = fs::fd{};
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p close :%s",
|
|
d.name,
|
|
this,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p close :%s",
|
|
d.name,
|
|
this,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::Flush()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d flush",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
};
|
|
#endif
|
|
|
|
fs::sync_opts opts;
|
|
opts.metadata = false;
|
|
fs::flush(fd, opts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d flush :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d flush :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::Sync()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d sync",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
};
|
|
#endif
|
|
|
|
fs::sync_opts opts;
|
|
fs::sync(fd, opts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d sync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what(),
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d sync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what(),
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::Fsync()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d fsync",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
};
|
|
#endif
|
|
|
|
fs::sync_opts opts;
|
|
fs::flush(fd, opts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d fsync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what(),
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d fsync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what(),
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::RangeSync(uint64_t offset,
|
|
uint64_t length)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] wfile:%p fd:%d range sync offset:%lu length:%lu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length
|
|
};
|
|
#endif
|
|
|
|
// RocksDB sez they want us to initiate flushing of dirty pages
|
|
// asynchronously without waiting for completion. RocksDB allows
|
|
// this callback to be a no-op and do nothing at all.
|
|
//
|
|
// We plug this into a "range flush" gimmick in ircd::fs which almost
|
|
// certainly calls fdatasync() and ignores the range; it may one day
|
|
// on supporting platforms and in certain circumstances call
|
|
// sync_file_range() without any of the wait flags and respect the range.
|
|
|
|
fs::sync_opts opts;
|
|
opts.metadata = false;
|
|
fs::flush(fd, offset, length, opts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d range sync offset:%zu length:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p fd:%d range sync offset:%zu length:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::Truncate(uint64_t size)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] wfile:%p fd:%d truncate to %lu bytes",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
size
|
|
};
|
|
#endif
|
|
|
|
fs::write_opts wopts;
|
|
wopts.priority = this->prio_val;
|
|
wopts.nodelay = this->nodelay;
|
|
fs::truncate(fd, size, wopts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d truncate to %lu bytes :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
size,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p fd:%d truncate to %lu bytes :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
size,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::InvalidateCache(size_t offset,
|
|
size_t length)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length
|
|
};
|
|
#endif
|
|
|
|
if(opts.direct)
|
|
return Status::OK();
|
|
|
|
fs::evict(fd, length, offset);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p fd:%d invalidate cache offset:%zu length:%zu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::Append(const Slice &s)
|
|
noexcept try
|
|
{
|
|
assert(!opts.direct);
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d append:%p bytes:%zu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
};
|
|
#endif
|
|
|
|
fs::write_opts wopts;
|
|
wopts.priority = this->prio_val;
|
|
wopts.nodelay = this->nodelay;
|
|
const const_buffer buf
|
|
{
|
|
data(s), size(s)
|
|
};
|
|
|
|
fs::append(fd, buf, wopts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d append:%p size:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p fd:%d append:%p size:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::PositionedAppend(const Slice &s,
|
|
uint64_t offset)
|
|
noexcept try
|
|
{
|
|
assert(!opts.direct);
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] wfile:%p fd:%d append:%p bytes:%zu offset:%lu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
offset
|
|
};
|
|
#endif
|
|
|
|
fs::write_opts wopts;
|
|
wopts.priority = this->prio_val;
|
|
wopts.nodelay = this->nodelay;
|
|
wopts.offset = offset;
|
|
const const_buffer buf
|
|
{
|
|
data(s), size(s)
|
|
};
|
|
|
|
fs::append(fd, buf, wopts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d append:%p size:%zu offset:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
offset,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p fd:%d append:%p size:%zu offset:%lu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
offset,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file::Allocate(uint64_t offset,
|
|
uint64_t length)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d allocate offset:%lu length:%lu%s%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length,
|
|
env_opts.fallocate_with_keep_size? " KEEP_SIZE" : "",
|
|
env_opts.allow_fallocate? "" : " (DISABLED)"
|
|
};
|
|
#endif
|
|
|
|
if(!env_opts.allow_fallocate)
|
|
return Status::NotSupported();
|
|
|
|
_allocate(offset, length);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p fd:%d allocate offset:%zu length:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
offset,
|
|
length,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::writable_file::PrepareWrite(size_t offset,
|
|
size_t length)
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p prepare write offset:%zu length:%zu",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
length
|
|
};
|
|
#endif
|
|
|
|
if(!env_opts.allow_fallocate)
|
|
return;
|
|
|
|
_allocate(offset, length);
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::writable_file::_allocate(const size_t &offset,
|
|
const size_t &length)
|
|
{
|
|
const size_t first_block
|
|
{
|
|
offset / preallocation_block_size
|
|
};
|
|
|
|
const size_t last_block
|
|
{
|
|
(offset + length) / preallocation_block_size
|
|
};
|
|
|
|
const ssize_t missing_blocks
|
|
{
|
|
ssize_t(last_block) - preallocation_last_block
|
|
};
|
|
|
|
// Fast bail when the offset and length are behind the last block already
|
|
// allocated. We don't support windowing here. If this branch is not taken
|
|
// we'll fallocate() contiguously from the last fallocate() (or offset 0).
|
|
if(missing_blocks <= 0)
|
|
return;
|
|
|
|
const ssize_t start_block
|
|
{
|
|
preallocation_last_block + 1
|
|
};
|
|
|
|
const size_t allocate_offset
|
|
{
|
|
start_block * preallocation_block_size
|
|
};
|
|
|
|
const size_t allocate_length
|
|
{
|
|
missing_blocks * preallocation_block_size
|
|
};
|
|
|
|
fs::write_opts wopts;
|
|
wopts.offset = allocate_offset;
|
|
wopts.priority = this->prio_val;
|
|
wopts.nodelay = this->nodelay;
|
|
wopts.keep_size = env_opts.fallocate_with_keep_size;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d allocating %zd blocks after block:%zu offset:%lu length:%lu%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
missing_blocks,
|
|
start_block,
|
|
allocate_offset,
|
|
allocate_length,
|
|
wopts.keep_size? " KEEP_SIZE" : ""
|
|
};
|
|
#endif
|
|
|
|
assert(env_opts.allow_fallocate);
|
|
assert(bool(d.opts));
|
|
assert(d.opts->allow_fallocate);
|
|
|
|
fs::allocate(fd, allocate_length, wopts);
|
|
this->preallocation_last_block = last_block;
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::writable_file::GetPreallocationStatus(size_t *const block_size,
|
|
size_t *const last_allocated_block)
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
*block_size = this->preallocation_block_size;
|
|
*last_allocated_block = this->preallocation_last_block;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p get preallocation block_size(%p):%zu last_block(%p):%zu",
|
|
d.name,
|
|
this,
|
|
block_size,
|
|
*block_size,
|
|
last_allocated_block,
|
|
*last_allocated_block
|
|
};
|
|
#endif
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::writable_file::SetPreallocationBlockSize(size_t size)
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p set preallocation block size:%zu",
|
|
d.name,
|
|
this,
|
|
size
|
|
};
|
|
#endif
|
|
|
|
this->preallocation_block_size = size;
|
|
}
|
|
|
|
uint64_t
|
|
ircd::db::database::env::writable_file::GetFileSize()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p fd:%d get file size",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
|
|
return fs::size(fd);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p fd:%d get file size :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::writable_file::SetIOPriority(Env::IOPriority prio)
|
|
noexcept
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p IO priority %s",
|
|
d.name,
|
|
this,
|
|
reflect(prio)
|
|
};
|
|
#endif
|
|
|
|
this->prio = prio;
|
|
switch(this->prio)
|
|
{
|
|
case IOPriority::IO_HIGH:
|
|
prio_val = -5; //TODO: magic
|
|
nodelay = true;
|
|
break;
|
|
|
|
default:
|
|
case IOPriority::IO_LOW:
|
|
prio_val = 5; //TODO: magic
|
|
nodelay = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
rocksdb::Env::IOPriority
|
|
ircd::db::database::env::writable_file::GetIOPriority()
|
|
noexcept
|
|
{
|
|
return prio;
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::writable_file::SetWriteLifeTimeHint(WriteLifeTimeHint hint)
|
|
noexcept try
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p hint:%d %s",
|
|
d.name,
|
|
this,
|
|
int(hint),
|
|
reflect(hint),
|
|
};
|
|
#endif
|
|
|
|
this->hint = hint;
|
|
fs::write_life(fd, this->hint);
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::derror
|
|
{
|
|
log, "[%s] wfile:%p fd:%d setting write lifetime hint:%d %s :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
int(hint),
|
|
reflect(hint),
|
|
e.what(),
|
|
};
|
|
}
|
|
|
|
rocksdb::Env::WriteLifeTimeHint
|
|
ircd::db::database::env::writable_file::GetWriteLifeTimeHint()
|
|
noexcept
|
|
{
|
|
return hint;
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::writable_file::GetUniqueId(char *const id,
|
|
size_t max_size)
|
|
const noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] wfile:%p get unique id:%p max_size:%zu",
|
|
d.name,
|
|
this,
|
|
id,
|
|
max_size
|
|
};
|
|
#endif
|
|
|
|
const mutable_buffer buf
|
|
{
|
|
id, max_size
|
|
};
|
|
|
|
//return size(fs::uuid(fd, buf));
|
|
return 0;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p get unique id :%s",
|
|
d.name,
|
|
this,
|
|
e.what()
|
|
};
|
|
|
|
return 0;
|
|
}
|
|
|
|
bool
|
|
ircd::db::database::env::writable_file::IsSyncThreadSafe()
|
|
const noexcept try
|
|
{
|
|
return true;
|
|
}
|
|
catch(...)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
//
|
|
// writable_file_direct
|
|
//
|
|
|
|
ircd::db::database::env::writable_file_direct::writable_file_direct(database *const &d,
|
|
const std::string &name,
|
|
const EnvOptions &env_opts,
|
|
const bool &trunc)
|
|
:writable_file
|
|
{
|
|
d, name, env_opts, trunc
|
|
}
|
|
,alignment
|
|
{
|
|
fs::block_size(fd)
|
|
}
|
|
,logical_offset
|
|
{
|
|
!trunc?
|
|
fs::size(fd):
|
|
size_t(0)
|
|
}
|
|
,buffer
|
|
{
|
|
alignment, alignment
|
|
}
|
|
{
|
|
zero(buffer);
|
|
if(!aligned(logical_offset))
|
|
throw panic
|
|
{
|
|
"direct writable file requires read into buffer."
|
|
};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file_direct::Close()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
if(!fd)
|
|
return Status::OK();
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d close",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
|
|
if(logical_offset > 0 && fs::size(fd) != logical_offset)
|
|
{
|
|
fs::write_opts wopts;
|
|
wopts.priority = this->prio_val;
|
|
wopts.nodelay = true;
|
|
fs::truncate(fd, logical_offset, wopts);
|
|
}
|
|
|
|
fd = fs::fd{};
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p DIRECT close :%s",
|
|
d.name,
|
|
this,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p DIRECT close :%s",
|
|
d.name,
|
|
this,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file_direct::Truncate(uint64_t size)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
size
|
|
};
|
|
#endif
|
|
|
|
fs::write_opts wopts;
|
|
wopts.priority = this->prio_val;
|
|
wopts.nodelay = true;
|
|
fs::truncate(fd, size, wopts);
|
|
logical_offset = size;
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
size,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d truncate to %lu bytes :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
size,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file_direct::Append(const Slice &s)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
if(!aligned(logical_offset) || !aligned(data(s)))
|
|
log::dwarning
|
|
{
|
|
log, "[%s] ALIGNMENT CHECK fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s",
|
|
d.name,
|
|
int(fd),
|
|
data(s),
|
|
aligned(data(s))? "" : "#AC",
|
|
size(s),
|
|
aligned(size(s))? "" : "#AC",
|
|
logical_offset,
|
|
aligned(logical_offset)? "" : "#AC"
|
|
};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s logical_offset:%zu%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
aligned(data(s))? "" : "#AC",
|
|
size(s),
|
|
aligned(size(s))? "" : "#AC",
|
|
logical_offset,
|
|
aligned(logical_offset)? "" : "#AC"
|
|
};
|
|
#endif
|
|
|
|
const auto logical_check
|
|
{
|
|
logical_offset
|
|
};
|
|
|
|
const_buffer buf
|
|
{
|
|
slice(s)
|
|
};
|
|
|
|
while(!empty(buf))
|
|
buf = write(buf);
|
|
|
|
assert(logical_check + size(slice(s)) == logical_offset);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d append:%p size:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d append:%p size:%zu :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
size(s),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::writable_file_direct::PositionedAppend(const Slice &s,
|
|
uint64_t offset)
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d append:%p%s bytes:%zu%s offset:%zu%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(s),
|
|
aligned(data(s))? "" : "#AC",
|
|
size(s),
|
|
aligned(size(s))? "" : "#AC",
|
|
offset,
|
|
aligned(offset)? "" : "#AC"
|
|
};
|
|
#endif
|
|
|
|
return rocksdb::Status::NotSupported();
|
|
}
|
|
|
|
uint64_t
|
|
ircd::db::database::env::writable_file_direct::GetFileSize()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::lock_guard lock{mutex};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d get file size",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
|
|
const auto &ret
|
|
{
|
|
logical_offset
|
|
};
|
|
|
|
assert(ret <= fs::size(fd));
|
|
return ret;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d get file size :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return 0;
|
|
}
|
|
|
|
/// (Internal) Append buffer. This function is the internal entry interface
|
|
/// for appending a buffer of any size and alignment to the file. It is
|
|
/// internal because it does no locking or error handling back to rocksdb,
|
|
/// because it's expected to be called from some virtual override which does
|
|
/// those things. This function will branch off as required to other internal
|
|
/// write_* functions to properly align and rebuffer the supplied buffer
|
|
/// eventually culminating in an aligned append to the file.
|
|
///
|
|
/// Calling this function will always result in some write to the file; even
|
|
/// if temporary buffering is used to achieve alignment; even if the entire
|
|
/// supplied buffer is hopelessly unaligned: the supplied data will be written
|
|
/// out some way or another during this call. This means there is no
|
|
/// requirement to care about flushing the temporary this->buffer after this
|
|
/// call is made. Note that the temporary this->buffer has no reason to be
|
|
/// touched by anything other than this function stack.
|
|
///
|
|
/// !!! NOTE !!!
|
|
/// There is a requirement to truncate the file after this call is made before
|
|
/// closing the file. If a crash occurs after a write() which was padded out
|
|
/// to the block alignment: the file size will reflect the padding when it is
|
|
/// opened at next startup; RocksDB will not detect its terminator character
|
|
/// sequence and consider this file corrupt.
|
|
/// !!!
|
|
///
|
|
/// - any offset
|
|
/// - any data
|
|
/// - any size
|
|
ircd::const_buffer
|
|
ircd::db::database::env::writable_file_direct::write(const const_buffer &buf_)
|
|
{
|
|
const_buffer buf
|
|
{
|
|
aligned(logical_offset) && aligned(data(buf_))?
|
|
write_aligned(buf_):
|
|
!aligned(logical_offset)?
|
|
write_unaligned_off(buf_):
|
|
buf_
|
|
};
|
|
|
|
assert(aligned(logical_offset) || empty(buf));
|
|
|
|
// buf can be empty here if it was entirely dealt with by the above
|
|
// branches and there's nothing else to do here.
|
|
if(empty(buf))
|
|
return buf;
|
|
|
|
// Branch on whether the buffer's address is aligned. If so, considering
|
|
// the logical_offset is aligned here we are then finished.
|
|
if(aligned(data(buf)))
|
|
return write_aligned(buf);
|
|
|
|
return write_unaligned_buf(buf);
|
|
}
|
|
|
|
/// * aligned offset
|
|
/// * unaligned data
|
|
/// - any size
|
|
ircd::const_buffer
|
|
ircd::db::database::env::writable_file_direct::write_unaligned_buf(const const_buffer &buf)
|
|
{
|
|
assert(aligned(logical_offset));
|
|
assert(!aligned(data(buf)));
|
|
assert(!aligned(buf));
|
|
|
|
// Window on the data between the given buffer's pointer and the next
|
|
// alignment boundary.
|
|
const const_buffer under_buf
|
|
{
|
|
data(buf), std::min(remain(uintptr_t(data(buf))), size(buf))
|
|
};
|
|
|
|
// Window on the data from the alignment boundary to the end of the
|
|
// given buffer.
|
|
const const_buffer remaining_buf
|
|
{
|
|
buf + size(under_buf)
|
|
};
|
|
|
|
assert(size(under_buf) <= size(buf));
|
|
assert(size(under_buf) + size(remaining_buf) == size(buf));
|
|
assert(data(buf) + size(under_buf) == data(remaining_buf));
|
|
assert(aligned(data(remaining_buf)) || empty(remaining_buf));
|
|
|
|
mutable_buffer dst(this->buffer);
|
|
consume(dst, copy(dst, under_buf));
|
|
consume(dst, copy(dst, remaining_buf));
|
|
consume(dst, zero(dst));
|
|
assert(empty(dst));
|
|
|
|
// Flush the temporary buffer.
|
|
_write__aligned(this->buffer, logical_offset);
|
|
logical_offset += size(under_buf);
|
|
return remaining_buf;
|
|
}
|
|
|
|
/// * unaligned offset
|
|
/// - any data
|
|
/// - any size
|
|
ircd::const_buffer
|
|
ircd::db::database::env::writable_file_direct::write_unaligned_off(const const_buffer &buf)
|
|
{
|
|
assert(!aligned(logical_offset));
|
|
|
|
// Window on the amount of buf we can take to fill up remaining space in
|
|
// the temporary this->buffer
|
|
const const_buffer src
|
|
{
|
|
data(buf), std::min(size(buf), buffer_remain())
|
|
};
|
|
|
|
// Window on the remaining space in the temporary this->buffer.
|
|
const mutable_buffer dst
|
|
{
|
|
this->buffer + buffer_consumed()
|
|
};
|
|
|
|
// Window on the remaining space in dst after src is copied to dst, if any.
|
|
const mutable_buffer pad
|
|
{
|
|
dst + size(src)
|
|
};
|
|
|
|
assert(size(dst) - size(pad) == size(src));
|
|
assert(size(src) + size(pad) == buffer_remain());
|
|
assert(size(src) + size(pad) + buffer_consumed() == alignment);
|
|
assert(size(src) + buffer_consumed() != alignment || empty(pad));
|
|
|
|
copy(dst, src);
|
|
zero(pad);
|
|
|
|
// Backtrack the logical_offset to the aligned offset where this->buffer's
|
|
// data starts.
|
|
const auto aligned_offset
|
|
{
|
|
align(logical_offset)
|
|
};
|
|
|
|
// Write the whole temporary this->buffer at the aligned offset.
|
|
_write__aligned(this->buffer, aligned_offset);
|
|
logical_offset += size(src);
|
|
assert(aligned(logical_offset) || size(buf) < alignment);
|
|
return const_buffer
|
|
{
|
|
buf + size(src)
|
|
};
|
|
}
|
|
|
|
/// * aligned offset
|
|
/// * aligned data
|
|
/// - any size
|
|
ircd::const_buffer
|
|
ircd::db::database::env::writable_file_direct::write_aligned(const const_buffer &buf)
|
|
{
|
|
assert(aligned(data(buf)));
|
|
assert(aligned(logical_offset));
|
|
|
|
// This portion at the end of buf did not fill out to the alignment.
|
|
const const_buffer overflow
|
|
{
|
|
_write_aligned(buf, logical_offset)
|
|
};
|
|
|
|
// The aligned portion was written so the offset is incremented here.
|
|
logical_offset += size(buf) - size(overflow);
|
|
|
|
assert(aligned(logical_offset));
|
|
assert(size(overflow) < alignment);
|
|
assert(aligned(data(overflow)) || empty(overflow));
|
|
assert(align(size(buf)) + size(overflow) == size(buf));
|
|
assert(blocks(size(buf)) * alignment + size(overflow) == size(buf));
|
|
|
|
if(!empty(overflow))
|
|
{
|
|
mutable_buffer dst(this->buffer);
|
|
consume(dst, copy(dst, overflow));
|
|
consume(dst, zero(dst));
|
|
assert(empty(dst));
|
|
|
|
_write__aligned(this->buffer, logical_offset);
|
|
logical_offset += size(overflow);
|
|
assert(!aligned(logical_offset));
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
/// * aligned data
|
|
/// * aligned offset
|
|
/// - any size
|
|
ircd::const_buffer
|
|
ircd::db::database::env::writable_file_direct::_write_aligned(const const_buffer &buf,
|
|
const uint64_t &offset)
|
|
{
|
|
assert(aligned(data(buf)));
|
|
assert(aligned(offset));
|
|
|
|
// This portion will be written
|
|
const const_buffer aligned_buf
|
|
{
|
|
data(buf), blocks(size(buf)) * alignment
|
|
};
|
|
|
|
// This trailing portion will be returned to caller
|
|
const const_buffer ret
|
|
{
|
|
data(buf) + size(aligned_buf), size(buf) - size(aligned_buf)
|
|
};
|
|
|
|
assert(!empty(aligned_buf) || size(buf) < alignment);
|
|
assert(size(aligned_buf) + size(ret) == size(buf));
|
|
assert(size(ret) < alignment);
|
|
if(empty(aligned_buf))
|
|
{
|
|
assert(size(ret) == size(buf));
|
|
return ret;
|
|
}
|
|
|
|
_write__aligned(aligned_buf, offset);
|
|
return ret;
|
|
}
|
|
|
|
/// * aligned offset
|
|
/// * aligned data
|
|
/// * aligned size
|
|
ircd::const_buffer
|
|
ircd::db::database::env::writable_file_direct::_write__aligned(const const_buffer &buf,
|
|
const uint64_t &offset)
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] wfile:%p DIRECT fd:%d write:%p%s bytes:%zu%s offset:%zu%s (logical:%zu)",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(buf),
|
|
aligned(data(buf))? "" : "#AC",
|
|
size(buf),
|
|
aligned(size(buf))? "" : "#AC",
|
|
offset,
|
|
aligned(offset)? "" : "#AC",
|
|
logical_offset
|
|
};
|
|
#endif
|
|
|
|
assert(aligned(buf));
|
|
assert(aligned(offset));
|
|
|
|
fs::write_opts wopts;
|
|
wopts.priority = this->prio_val;
|
|
wopts.nodelay = this->nodelay;
|
|
wopts.offset = offset;
|
|
fs::write(fd, buf, wopts);
|
|
return {};
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::writable_file_direct::buffer_consumed()
|
|
const
|
|
{
|
|
return likely(alignment != 0)?
|
|
logical_offset % alignment:
|
|
0UL;
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::writable_file_direct::buffer_remain()
|
|
const
|
|
{
|
|
return remain(logical_offset);
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::writable_file_direct::blocks(const size_t &value)
|
|
const
|
|
{
|
|
return likely(alignment != 0)?
|
|
value / alignment:
|
|
0UL;
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::writable_file_direct::remain(const size_t &value)
|
|
const
|
|
{
|
|
return likely(alignment != 0)?
|
|
alignment - (value - align(value)):
|
|
0UL;
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::writable_file_direct::align(const size_t &value)
|
|
const
|
|
{
|
|
return likely(alignment != 0)?
|
|
value - (value % alignment):
|
|
value;
|
|
}
|
|
|
|
bool
|
|
ircd::db::database::env::writable_file_direct::aligned(const const_buffer &buf)
|
|
const
|
|
{
|
|
return buffer::aligned(buf, alignment);
|
|
}
|
|
|
|
bool
|
|
ircd::db::database::env::writable_file_direct::aligned(const void *const &value)
|
|
const
|
|
{
|
|
return aligned(size_t(value));
|
|
}
|
|
|
|
bool
|
|
ircd::db::database::env::writable_file_direct::aligned(const size_t &value)
|
|
const
|
|
{
|
|
return (alignment == 0) || (value % alignment == 0UL);
|
|
}
|
|
|
|
//
|
|
// sequential_file
|
|
//
|
|
|
|
decltype(ircd::db::database::env::sequential_file::default_opts)
|
|
ircd::db::database::env::sequential_file::default_opts{[]
|
|
{
|
|
ircd::fs::fd::opts ret{std::ios_base::in};
|
|
return ret;
|
|
}()};
|
|
|
|
ircd::db::database::env::sequential_file::sequential_file(database *const &d,
|
|
const std::string &name,
|
|
const EnvOptions &env_opts)
|
|
try
|
|
:d
|
|
{
|
|
*d
|
|
}
|
|
,opts{[&env_opts]
|
|
{
|
|
fs::fd::opts ret{default_opts};
|
|
ret.direct = env_opts.use_direct_reads;
|
|
return ret;
|
|
}()}
|
|
,fd
|
|
{
|
|
name, this->opts
|
|
}
|
|
,_buffer_align
|
|
{
|
|
opts.direct?
|
|
fs::block_size(fd):
|
|
1
|
|
}
|
|
,offset
|
|
{
|
|
0
|
|
}
|
|
,aio
|
|
{
|
|
// When this flag is false then AIO operations are never used for this
|
|
// file; if true, AIO may be used if available and/or other conditions.
|
|
// Currently the /proc filesystem doesn't like AIO.
|
|
!startswith(name, "/proc/")
|
|
}
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] opened seqfile:%p fd:%d bs:%zu '%s'",
|
|
d->name,
|
|
this,
|
|
int(fd),
|
|
_buffer_align,
|
|
name
|
|
};
|
|
#endif
|
|
|
|
#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_SEQUENTIAL)
|
|
if(!opts.direct)
|
|
syscall(::posix_fadvise, fd, offset, off_t(0), POSIX_FADV_SEQUENTIAL);
|
|
#endif
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
// Set the level to downplay some errors which the user shouldn't
|
|
// be alerted to with a log message under normal operations.
|
|
const log::level level
|
|
{
|
|
is(e.code(), std::errc::no_such_file_or_directory)?
|
|
log::level::DERROR:
|
|
log::level::ERROR
|
|
};
|
|
|
|
log::logf
|
|
{
|
|
log, level, "[%s] opening seqfile:%p `%s' (%d) :%s",
|
|
d->name,
|
|
this,
|
|
name,
|
|
e.code().value(),
|
|
e.what()
|
|
};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] opening seqfile:%p `%s' :%s",
|
|
d->name,
|
|
this,
|
|
name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
ircd::db::database::env::sequential_file::~sequential_file()
|
|
noexcept
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] close seqfile:%p fd:%d",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::sequential_file::Read(size_t length,
|
|
Slice *const result,
|
|
char *const scratch)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::unique_lock lock
|
|
{
|
|
mutex, std::try_to_lock
|
|
};
|
|
|
|
// RocksDB sez that this call requires "External synchronization" i.e the
|
|
// caller, not this class is responsible for exclusion. We assert anyway.
|
|
if(unlikely(!bool(lock)))
|
|
throw panic
|
|
{
|
|
"[%s] Unexpected concurrent access to seqfile %p",
|
|
d.name,
|
|
this
|
|
};
|
|
|
|
assert(result);
|
|
assert(scratch);
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch
|
|
};
|
|
#endif
|
|
|
|
fs::read_opts opts;
|
|
opts.offset = offset;
|
|
opts.aio = this->aio;
|
|
opts.all = false;
|
|
const mutable_buffer buf
|
|
{
|
|
scratch, length
|
|
};
|
|
|
|
const const_buffer read
|
|
{
|
|
fs::read(fd, buf, opts)
|
|
};
|
|
|
|
*result = slice(read);
|
|
this->offset += size(read);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] seqfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::sequential_file::PositionedRead(uint64_t offset,
|
|
size_t length,
|
|
Slice *const result,
|
|
char *const scratch)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::unique_lock lock
|
|
{
|
|
mutex, std::try_to_lock
|
|
};
|
|
|
|
if(unlikely(!bool(lock)))
|
|
throw panic
|
|
{
|
|
"[%s] Unexpected concurrent access to seqfile %p",
|
|
d.name,
|
|
this
|
|
};
|
|
|
|
assert(result);
|
|
assert(scratch);
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] seqfile:%p offset:%zu positioned read:%p offset:%zu length:%zu scratch:%p",
|
|
d.name,
|
|
this,
|
|
this->offset,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch
|
|
};
|
|
#endif
|
|
|
|
fs::read_opts opts;
|
|
opts.offset = offset;
|
|
opts.aio = this->aio;
|
|
opts.all = false;
|
|
const mutable_buffer buf
|
|
{
|
|
scratch, length
|
|
};
|
|
|
|
const const_buffer read
|
|
{
|
|
fs::read(fd, buf, opts)
|
|
};
|
|
|
|
*result = slice(read);
|
|
this->offset = std::max(this->offset, off_t(offset + size(read)));
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] seqfile:%p positioned read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] seqfile:%p positioned read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::sequential_file::Skip(uint64_t size)
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const std::unique_lock lock
|
|
{
|
|
mutex, std::try_to_lock
|
|
};
|
|
|
|
// RocksDB sez that this call requires "External synchronization" i.e the
|
|
// caller, not this class is responsible for exclusion. We assert anyway.
|
|
if(unlikely(!bool(lock)))
|
|
throw panic
|
|
{
|
|
"[%s] Unexpected concurrent access to seqfile %p",
|
|
d.name,
|
|
this
|
|
};
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] seqfile:%p offset:zu skip:%zu",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
size
|
|
};
|
|
#endif
|
|
|
|
offset += size;
|
|
return Status::OK();
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::sequential_file::InvalidateCache(size_t offset,
|
|
size_t length)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
"[%s] seqfile:%p invalidate cache offset:%zu length:%zu",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
length
|
|
};
|
|
#endif
|
|
|
|
if(opts.direct)
|
|
return Status::OK();
|
|
|
|
fs::evict(fd, length, offset);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
"[%s] seqfile:%p invalidate cache offset:%zu length:%zu :%s",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
length,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
"[%s] seqfile:%p invalidate cache offset:%zu length:%zu :%s",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
length,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
bool
|
|
ircd::db::database::env::sequential_file::use_direct_io()
|
|
const noexcept
|
|
{
|
|
return opts.direct;
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::sequential_file::GetRequiredBufferAlignment()
|
|
const noexcept
|
|
{
|
|
const auto &ret
|
|
{
|
|
_buffer_align
|
|
};
|
|
|
|
return ret;
|
|
}
|
|
|
|
//
|
|
// random_access_file
|
|
//
|
|
|
|
decltype(ircd::db::database::env::random_access_file::default_opts)
|
|
ircd::db::database::env::random_access_file::default_opts{[]
|
|
{
|
|
ircd::fs::fd::opts ret{std::ios_base::in};
|
|
return ret;
|
|
}()};
|
|
|
|
ircd::db::database::env::random_access_file::random_access_file(database *const &d,
|
|
const std::string &name,
|
|
const EnvOptions &env_opts)
|
|
try
|
|
:d
|
|
{
|
|
*d
|
|
}
|
|
,opts{[&env_opts]
|
|
{
|
|
fs::fd::opts ret{default_opts};
|
|
ret.direct = env_opts.use_direct_reads;
|
|
return ret;
|
|
}()}
|
|
,fd
|
|
{
|
|
name, this->opts
|
|
}
|
|
,_buffer_align
|
|
{
|
|
opts.direct?
|
|
fs::block_size(fd):
|
|
1
|
|
}
|
|
,aio
|
|
{
|
|
// When this flag is false then AIO operations are never used for this
|
|
// file; if true, AIO may be used if available and/or other conditions.
|
|
// Currently the /proc filesystem doesn't like AIO.
|
|
!startswith(name, "/proc/")
|
|
}
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] opened rfile:%p fd:%d bs:%zu '%s'",
|
|
d->name,
|
|
this,
|
|
int(fd),
|
|
_buffer_align,
|
|
name
|
|
};
|
|
#endif
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] opening rfile:%p `%s' :%s",
|
|
d->name,
|
|
this,
|
|
name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
ircd::db::database::env::random_access_file::~random_access_file()
|
|
noexcept
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] close rfile:%p fd:%d",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_access_file::Prefetch(uint64_t offset,
|
|
size_t length)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rfile:%p prefetch offset:%zu length:%zu",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
length
|
|
};
|
|
#endif
|
|
|
|
// Note RocksDB does not call our prefetch() when using direct IO.
|
|
assert(!this->opts.direct);
|
|
|
|
fs::prefetch(fd, length, offset);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] rfile:%p prefetch offset:%zu length:%zu :%s",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
length,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_access_file::Read(uint64_t offset,
|
|
size_t length,
|
|
Slice *const result,
|
|
char *const scratch)
|
|
const noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
assert(result);
|
|
assert(scratch);
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch
|
|
};
|
|
#endif
|
|
|
|
fs::read_opts opts;
|
|
opts.offset = offset;
|
|
opts.aio = this->aio;
|
|
opts.all = !this->opts.direct;
|
|
const mutable_buffer buf
|
|
{
|
|
scratch, length
|
|
};
|
|
|
|
assert(!this->opts.direct || buffer::aligned(buf, _buffer_align));
|
|
const auto read
|
|
{
|
|
fs::read(fd, buf, opts)
|
|
};
|
|
|
|
*result = slice(read);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] rfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_access_file::InvalidateCache(size_t offset,
|
|
size_t length)
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rfile:%p invalidate cache offset:%zu length:%zu",
|
|
d.name,
|
|
this,
|
|
offset,
|
|
length
|
|
};
|
|
#endif
|
|
|
|
if(opts.direct)
|
|
return Status::OK();
|
|
|
|
fs::evict(fd, length, offset);
|
|
return Status::OK();
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::random_access_file::GetUniqueId(char *const id,
|
|
size_t max_size)
|
|
const noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rfile:%p get unique id:%p max_size:%zu",
|
|
d.name,
|
|
this,
|
|
id,
|
|
max_size
|
|
};
|
|
#endif
|
|
|
|
const mutable_buffer buf
|
|
{
|
|
id, max_size
|
|
};
|
|
|
|
//return size(fs::uuid(fd, buf));
|
|
return 0;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] rfile:%p GetUniqueId id:%p max_size:%zu :%s",
|
|
d.name,
|
|
this,
|
|
id,
|
|
max_size,
|
|
e.what()
|
|
};
|
|
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::random_access_file::Hint(AccessPattern pattern)
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rfile:%p hint %s",
|
|
d.name,
|
|
this,
|
|
reflect(pattern)
|
|
};
|
|
#endif
|
|
}
|
|
|
|
bool
|
|
ircd::db::database::env::random_access_file::use_direct_io()
|
|
const noexcept
|
|
{
|
|
return opts.direct;
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::random_access_file::GetRequiredBufferAlignment()
|
|
const noexcept
|
|
{
|
|
const auto &ret
|
|
{
|
|
_buffer_align
|
|
};
|
|
|
|
return ret;
|
|
}
|
|
|
|
//
|
|
// random_rw_file
|
|
//
|
|
|
|
decltype(ircd::db::database::env::random_rw_file::default_opts)
|
|
ircd::db::database::env::random_rw_file::default_opts{[]
|
|
{
|
|
ircd::fs::fd::opts ret
|
|
{
|
|
std::ios_base::in | std::ios_base::out
|
|
};
|
|
|
|
return ret;
|
|
}()};
|
|
|
|
ircd::db::database::env::random_rw_file::random_rw_file(database *const &d,
|
|
const std::string &name,
|
|
const EnvOptions &env_opts)
|
|
try
|
|
:d
|
|
{
|
|
*d
|
|
}
|
|
,opts{[&env_opts]
|
|
{
|
|
fs::fd::opts ret{default_opts};
|
|
ret.direct = env_opts.use_direct_reads && env_opts.use_direct_writes;
|
|
return ret;
|
|
}()}
|
|
,fd
|
|
{
|
|
name, this->opts
|
|
}
|
|
,_buffer_align
|
|
{
|
|
opts.direct?
|
|
fs::block_size(fd):
|
|
1
|
|
}
|
|
,aio
|
|
{
|
|
true
|
|
}
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] opened rwfile:%p fd:%d bs:%zu '%s'",
|
|
d->name,
|
|
this,
|
|
int(fd),
|
|
_buffer_align,
|
|
name
|
|
};
|
|
#endif
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] opening rwfile:%p `%s' :%s",
|
|
d->name,
|
|
this,
|
|
name,
|
|
e.what()
|
|
};
|
|
}
|
|
|
|
ircd::db::database::env::random_rw_file::~random_rw_file()
|
|
noexcept
|
|
{
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] close rwfile:%p fd:%d '%s'",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_rw_file::Close()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] close rwfile:%p fd:%d '%s'",
|
|
d.name,
|
|
this,
|
|
int(fd)
|
|
};
|
|
#endif
|
|
|
|
this->fd = fs::fd{};
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
"[%s] rwfile:%p close :%s",
|
|
d.name,
|
|
this,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
"[%s] rwfile:%p close :%s",
|
|
d.name,
|
|
this,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_rw_file::Fsync()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rwfile:%p fd:%d fsync",
|
|
d.name,
|
|
int(fd),
|
|
this
|
|
};
|
|
#endif
|
|
|
|
fs::sync_opts opts;
|
|
fs::flush(fd, opts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
"[%s] rwfile:%p fd:%d fsync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
"[%s] rwfile:%p fd:%d fsync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_rw_file::Sync()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rwfile:%p fd:%d sync",
|
|
d.name,
|
|
int(fd),
|
|
this
|
|
};
|
|
#endif
|
|
|
|
fs::sync_opts opts;
|
|
fs::sync(fd, opts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
"[%s] rwfile:%p fd:%d sync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
"[%s] rwfile:%p fd:%d sync :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_rw_file::Flush()
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rwfile:%p fd:%d flush",
|
|
d.name,
|
|
int(fd),
|
|
this
|
|
};
|
|
#endif
|
|
|
|
fs::sync_opts opts;
|
|
opts.metadata = false;
|
|
fs::flush(fd, opts);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
"[%s] rwfile:%p fd:%d flush :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
"[%s] rwfile:%p fd:%d flush :%s",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_rw_file::Read(uint64_t offset,
|
|
size_t length,
|
|
Slice *const result,
|
|
char *const scratch)
|
|
const noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
assert(result);
|
|
assert(scratch);
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch
|
|
};
|
|
#endif
|
|
|
|
fs::read_opts opts;
|
|
opts.offset = offset;
|
|
opts.aio = this->aio;
|
|
opts.all = !this->opts.direct;
|
|
const mutable_buffer buf
|
|
{
|
|
scratch, length
|
|
};
|
|
|
|
const auto read
|
|
{
|
|
fs::read(fd, buf, opts)
|
|
};
|
|
|
|
*result = slice(read);
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] rwfile:%p read:%p offset:%zu length:%zu scratch:%p :%s",
|
|
d.name,
|
|
this,
|
|
result,
|
|
offset,
|
|
length,
|
|
scratch,
|
|
e.what()
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::random_rw_file::Write(uint64_t offset,
|
|
const Slice &slice)
|
|
noexcept try
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(slice),
|
|
size(slice),
|
|
offset
|
|
};
|
|
#endif
|
|
|
|
const const_buffer buf
|
|
{
|
|
data(slice), size(slice)
|
|
};
|
|
|
|
const auto read
|
|
{
|
|
fs::write(fd, buf, offset)
|
|
};
|
|
|
|
return Status::OK();
|
|
}
|
|
catch(const std::system_error &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(slice),
|
|
size(slice),
|
|
offset
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] rwfile:%p fd:%d write:%p length:%zu offset:%zu",
|
|
d.name,
|
|
this,
|
|
int(fd),
|
|
data(slice),
|
|
size(slice),
|
|
offset
|
|
};
|
|
|
|
return error_to_status{e};
|
|
}
|
|
|
|
bool
|
|
ircd::db::database::env::random_rw_file::use_direct_io()
|
|
const noexcept
|
|
{
|
|
return opts.direct;
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::random_rw_file::GetRequiredBufferAlignment()
|
|
const noexcept
|
|
{
|
|
const auto &ret
|
|
{
|
|
_buffer_align
|
|
};
|
|
|
|
return ret;
|
|
}
|
|
|
|
//
|
|
// directory
|
|
//
|
|
|
|
ircd::db::database::env::directory::directory(database *const &d,
|
|
const std::string &name,
|
|
std::unique_ptr<Directory> defaults)
|
|
:d{*d}
|
|
,defaults{std::move(defaults)}
|
|
{
|
|
}
|
|
|
|
ircd::db::database::env::directory::~directory()
|
|
noexcept
|
|
{
|
|
}
|
|
|
|
rocksdb::Status
|
|
ircd::db::database::env::directory::Fsync()
|
|
noexcept
|
|
{
|
|
const ctx::uninterruptible::nothrow ui;
|
|
|
|
#ifdef RB_DEBUG_DB_ENV
|
|
log::debug
|
|
{
|
|
log, "[%s] directory:%p fsync",
|
|
d.name,
|
|
this
|
|
};
|
|
#endif
|
|
|
|
return defaults->Fsync();
|
|
}
|
|
|
|
//
|
|
// file_lock
|
|
//
|
|
|
|
ircd::db::database::env::file_lock::file_lock(database *const &d)
|
|
:d{*d}
|
|
{
|
|
}
|
|
|
|
ircd::db::database::env::file_lock::~file_lock()
|
|
noexcept
|
|
{
|
|
}
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
//
|
|
// db/database/env/state.h
|
|
//
|
|
|
|
//
|
|
// env::state::state
|
|
//
|
|
|
|
ircd::db::database::env::state::state(database *const &d)
|
|
:d{*d}
|
|
{
|
|
for(size_t i(0); i < pool.size(); ++i)
|
|
pool.at(i) = std::make_unique<struct pool>(this->d, Priority(i));
|
|
}
|
|
|
|
ircd::db::database::env::state::~state()
|
|
noexcept
|
|
{
|
|
log::debug
|
|
{
|
|
log, "[%s] Shutting down environment...",
|
|
d.name
|
|
};
|
|
}
|
|
|
|
//
|
|
// state::pool
|
|
//
|
|
|
|
decltype(ircd::db::database::env::state::pool::stack_size)
|
|
ircd::db::database::env::state::pool::stack_size
|
|
{
|
|
{ "name", "ircd.db.env.pool.stack_size" },
|
|
{ "default", long(128_KiB) },
|
|
};
|
|
|
|
//
|
|
// state::pool::pool
|
|
//
|
|
|
|
ircd::db::database::env::state::pool::pool(database &d,
|
|
const Priority &pri)
|
|
:d{d}
|
|
,pri{pri}
|
|
,iopri
|
|
{
|
|
pri == Priority::HIGH?
|
|
IOPriority::IO_HIGH:
|
|
IOPriority::IO_LOW
|
|
}
|
|
,name
|
|
{
|
|
fmt::sprintf
|
|
{
|
|
namebuf, "db.%s", reflect(pri)
|
|
}
|
|
}
|
|
,popts
|
|
{
|
|
size_t(stack_size), // stack size of worker
|
|
0, // initial workers
|
|
-1, // queue hard limit
|
|
-1, // queue soft limit
|
|
}
|
|
,p
|
|
{
|
|
this->name, // name of pool
|
|
this->popts // pool options
|
|
}
|
|
{
|
|
}
|
|
|
|
ircd::db::database::env::state::pool::~pool()
|
|
noexcept
|
|
{
|
|
join();
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::state::pool::join()
|
|
try
|
|
{
|
|
if(!tasks.empty() || p.pending())
|
|
log::warning
|
|
{
|
|
log, "[%s] Waiting for tasks:%zu queued:%zu active:%zu in pool '%s'",
|
|
d.name,
|
|
tasks.size(),
|
|
p.queued(),
|
|
p.active(),
|
|
ctx::name(p)
|
|
};
|
|
|
|
this->wait();
|
|
assert(!p.pending());
|
|
assert(tasks.empty());
|
|
p.join();
|
|
|
|
log::debug
|
|
{
|
|
log, "[%s] Terminated pool '%s'.",
|
|
d.name,
|
|
ctx::name(p)
|
|
};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::critical
|
|
{
|
|
log, "[%s] Environment pool '%s' join :%s",
|
|
d.name,
|
|
ctx::name(p),
|
|
e.what()
|
|
};
|
|
|
|
throw;
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::state::pool::wait()
|
|
{
|
|
dock.wait([this]
|
|
{
|
|
return tasks.empty() && !p.pending();
|
|
});
|
|
}
|
|
|
|
void
|
|
ircd::db::database::env::state::pool::operator()(task &&task)
|
|
{
|
|
assert(task._id == 0);
|
|
task._id = ++taskctr;
|
|
tasks.emplace_back(std::move(task));
|
|
|
|
log::debug
|
|
{
|
|
log, "[%s] pool:%s queue:%zu QUEUE task:%lu func:%p arg:%p",
|
|
this->d.name,
|
|
ctx::name(p),
|
|
tasks.size(),
|
|
tasks.back()._id,
|
|
tasks.back().func,
|
|
tasks.back().arg,
|
|
};
|
|
|
|
p([this]
|
|
{
|
|
if(tasks.empty())
|
|
return;
|
|
|
|
const ctx::uninterruptible::nothrow ui;
|
|
const auto task{std::move(tasks.front())};
|
|
tasks.pop_front();
|
|
|
|
log::debug
|
|
{
|
|
log, "[%s] pool:%s queue:%zu ENTER task:%lu func:%p arg:%p",
|
|
this->d.name,
|
|
ctx::name(p),
|
|
tasks.size(),
|
|
task._id,
|
|
task.func,
|
|
task.arg,
|
|
};
|
|
|
|
const ctx::slice_usage_warning message
|
|
{
|
|
"[%s] pool:%s task:%p",
|
|
this->d.name,
|
|
ctx::name(p),
|
|
task.func
|
|
};
|
|
|
|
// Execute the task
|
|
task.func(task.arg);
|
|
|
|
log::debug
|
|
{
|
|
log, "[%s] pool:%s queue:%zu LEAVE task:%zu func:%p arg:%p",
|
|
this->d.name,
|
|
ctx::name(p),
|
|
tasks.size(),
|
|
task._id,
|
|
task.func,
|
|
task.arg,
|
|
};
|
|
|
|
dock.notify_all();
|
|
});
|
|
}
|
|
|
|
size_t
|
|
ircd::db::database::env::state::pool::cancel(void *const &tag)
|
|
{
|
|
size_t i(0);
|
|
auto it(begin(tasks));
|
|
while(it != end(tasks))
|
|
{
|
|
auto &task(*it);
|
|
log::debug
|
|
{
|
|
log, "[%s] pool:%s tasks:%zu cancel#%zu task:%lu func:%p cancel:%p arg:%p tag:%p",
|
|
d.name,
|
|
ctx::name(p),
|
|
tasks.size(),
|
|
i,
|
|
task._id,
|
|
task.func,
|
|
task.cancel,
|
|
task.arg,
|
|
tag
|
|
};
|
|
|
|
task.cancel(task.arg);
|
|
it = tasks.erase(it);
|
|
++i;
|
|
}
|
|
|
|
dock.notify_all();
|
|
return i;
|
|
}
|