mirror of
https://github.com/matrix-construct/construct
synced 2025-01-01 18:34:18 +01:00
ircd::db: Add preliminary custom rate limiter drop-in.
This commit is contained in:
parent
f7400edc75
commit
8bdedb569c
3 changed files with 157 additions and 0 deletions
|
@ -102,6 +102,7 @@ struct ircd::db::database
|
|||
struct wal;
|
||||
struct wal_filter;
|
||||
struct allocator;
|
||||
struct rate_limiter;
|
||||
|
||||
std::string name;
|
||||
uint64_t checkpoint;
|
||||
|
@ -114,6 +115,7 @@ struct ircd::db::database
|
|||
std::shared_ptr<struct events> events;
|
||||
std::shared_ptr<struct mergeop> mergeop;
|
||||
std::unique_ptr<struct wal_filter> wal_filter;
|
||||
std::shared_ptr<struct rate_limiter> rate_limiter;
|
||||
std::shared_ptr<struct allocator> allocator;
|
||||
std::shared_ptr<rocksdb::SstFileManager> ssts;
|
||||
std::shared_ptr<rocksdb::Cache> row_cache;
|
||||
|
|
127
ircd/db.cc
127
ircd/db.cc
|
@ -1234,6 +1234,10 @@ try
|
|||
{
|
||||
std::make_unique<struct wal_filter>(this)
|
||||
}
|
||||
,rate_limiter
|
||||
{
|
||||
std::make_unique<struct rate_limiter>(this)
|
||||
}
|
||||
,allocator
|
||||
{
|
||||
#ifdef IRCD_DB_HAS_ALLOCATOR
|
||||
|
@ -1389,6 +1393,9 @@ try
|
|||
// Setup WAL filter
|
||||
opts->wal_filter = this->wal_filter.get();
|
||||
|
||||
// Setup Rate Limiter
|
||||
opts->rate_limiter = this->rate_limiter;
|
||||
|
||||
// Setup SST file mgmt
|
||||
opts->sst_file_manager = this->ssts;
|
||||
|
||||
|
@ -3691,6 +3698,126 @@ const noexcept
|
|||
return db::name(*d).c_str();
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// database::rate_limiter
|
||||
//
|
||||
|
||||
ircd::db::database::rate_limiter::rate_limiter(database *const &d)
|
||||
:d{d}
|
||||
{
|
||||
}
|
||||
|
||||
ircd::db::database::rate_limiter::~rate_limiter()
|
||||
noexcept
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::database::rate_limiter::SetBytesPerSecond(int64_t bytes_per_second)
|
||||
noexcept
|
||||
{
|
||||
log::debug
|
||||
{
|
||||
log, "[%s] Rate Limiter update rate %zu -> %zu bytes per second",
|
||||
db::name(*d),
|
||||
this->bytes_per_second,
|
||||
bytes_per_second,
|
||||
};
|
||||
|
||||
this->bytes_per_second = bytes_per_second;
|
||||
}
|
||||
|
||||
size_t
|
||||
ircd::db::database::rate_limiter::RequestToken(size_t bytes,
|
||||
size_t alignment,
|
||||
IOPriority prio,
|
||||
Statistics *const stats,
|
||||
OpType type)
|
||||
noexcept
|
||||
{
|
||||
log::debug
|
||||
{
|
||||
log, "[%s] Rate Limiter request bytes:%zu alignment:%zu prio:%s type:%s",
|
||||
db::name(*d),
|
||||
bytes,
|
||||
alignment,
|
||||
reflect(prio),
|
||||
type == OpType::kWrite?
|
||||
"WRITE"_sv:
|
||||
type == OpType::kRead?
|
||||
"READ"_sv:
|
||||
"????"_sv,
|
||||
};
|
||||
|
||||
assert(prio <= IOPriority::IO_TOTAL);
|
||||
{
|
||||
int64_t i(prio == IOPriority::IO_TOTAL? 0: prio); do
|
||||
{
|
||||
requests[i].bytes += bytes;
|
||||
requests[i].count += 1;
|
||||
}
|
||||
while(++i < prio);
|
||||
}
|
||||
|
||||
//assert(stats);
|
||||
//stats->recordTick(rocksdb::Tickers::RATE_LIMIT_DELAY_MILLIS, 0);
|
||||
//stats->recordTick(rocksdb::Tickers::NUMBER_RATE_LIMITER_DRAINS, 0);
|
||||
//stats->recordTick(rocksdb::Tickers::HARD_RATE_LIMIT_DELAY_COUNT, 0);
|
||||
//stats->recordTick(rocksdb::Tickers::SOFT_RATE_LIMIT_DELAY_COUNT, 0);
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
int64_t
|
||||
ircd::db::database::rate_limiter::GetTotalBytesThrough(const IOPriority prio)
|
||||
const noexcept
|
||||
{
|
||||
int64_t ret(0);
|
||||
int64_t i(prio == IOPriority::IO_TOTAL? 0: prio); do
|
||||
{
|
||||
ret += requests[i].bytes;
|
||||
}
|
||||
while(++i < prio);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t
|
||||
ircd::db::database::rate_limiter::GetTotalRequests(const IOPriority prio)
|
||||
const noexcept
|
||||
{
|
||||
int64_t ret(0);
|
||||
int64_t i(prio == IOPriority::IO_TOTAL? 0: prio); do
|
||||
{
|
||||
ret += requests[i].count;
|
||||
}
|
||||
while(++i < prio);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t
|
||||
ircd::db::database::rate_limiter::GetSingleBurstBytes()
|
||||
const noexcept
|
||||
{
|
||||
always_assert(false);
|
||||
return bytes_per_second;
|
||||
}
|
||||
|
||||
int64_t
|
||||
ircd::db::database::rate_limiter::GetBytesPerSecond()
|
||||
const noexcept
|
||||
{
|
||||
return bytes_per_second;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::db::database::rate_limiter::IsRateLimited(OpType op)
|
||||
noexcept
|
||||
{
|
||||
always_assert(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// database::sst
|
||||
|
|
28
ircd/db.h
28
ircd/db.h
|
@ -53,6 +53,7 @@
|
|||
#include <rocksdb/sst_dump_tool.h>
|
||||
#include <rocksdb/compaction_filter.h>
|
||||
#include <rocksdb/wal_filter.h>
|
||||
#include <rocksdb/rate_limiter.h>
|
||||
|
||||
namespace ircd::db
|
||||
{
|
||||
|
@ -493,6 +494,33 @@ struct ircd::db::database::logger final
|
|||
~logger() noexcept override;
|
||||
};
|
||||
|
||||
struct ircd::db::database::rate_limiter
|
||||
:std::enable_shared_from_this<struct database::rate_limiter>
|
||||
,rocksdb::RateLimiter
|
||||
{
|
||||
using Statistics = rocksdb::Statistics;
|
||||
using IOPriority = rocksdb::Env::IOPriority;
|
||||
|
||||
database *d {nullptr}; struct
|
||||
{
|
||||
int64_t count {0}, bytes {0};
|
||||
}
|
||||
requests[IOPriority::IO_TOTAL];
|
||||
int64_t bytes_per_second {1_GiB};
|
||||
|
||||
bool IsRateLimited(OpType) noexcept override;
|
||||
int64_t GetBytesPerSecond() const noexcept override;
|
||||
int64_t GetSingleBurstBytes() const noexcept override;
|
||||
int64_t GetTotalRequests(const IOPriority) const noexcept override;
|
||||
int64_t GetTotalBytesThrough(const IOPriority) const noexcept override;
|
||||
|
||||
size_t RequestToken(size_t, size_t, IOPriority, Statistics *, OpType) noexcept override;
|
||||
void SetBytesPerSecond(int64_t) noexcept override;
|
||||
|
||||
rate_limiter(database *const &);
|
||||
~rate_limiter() noexcept;
|
||||
};
|
||||
|
||||
//
|
||||
// util
|
||||
//
|
||||
|
|
Loading…
Reference in a new issue