From 8bdedb569c84120a6e2574951792489e923a397c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 14 Sep 2020 19:53:35 -0700 Subject: [PATCH] ircd::db: Add preliminary custom rate limiter drop-in. --- include/ircd/db/database.h | 2 + ircd/db.cc | 127 +++++++++++++++++++++++++++++++++++++ ircd/db.h | 28 ++++++++ 3 files changed, 157 insertions(+) diff --git a/include/ircd/db/database.h b/include/ircd/db/database.h index 5dca5d07b..903611441 100644 --- a/include/ircd/db/database.h +++ b/include/ircd/db/database.h @@ -102,6 +102,7 @@ struct ircd::db::database struct wal; struct wal_filter; struct allocator; + struct rate_limiter; std::string name; uint64_t checkpoint; @@ -114,6 +115,7 @@ struct ircd::db::database std::shared_ptr events; std::shared_ptr mergeop; std::unique_ptr wal_filter; + std::shared_ptr rate_limiter; std::shared_ptr allocator; std::shared_ptr ssts; std::shared_ptr row_cache; diff --git a/ircd/db.cc b/ircd/db.cc index 8405da887..b4031ded8 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -1234,6 +1234,10 @@ try { std::make_unique(this) } +,rate_limiter +{ + std::make_unique(this) +} ,allocator { #ifdef IRCD_DB_HAS_ALLOCATOR @@ -1389,6 +1393,9 @@ try // Setup WAL filter opts->wal_filter = this->wal_filter.get(); + // Setup Rate Limiter + opts->rate_limiter = this->rate_limiter; + // Setup SST file mgmt opts->sst_file_manager = this->ssts; @@ -3691,6 +3698,126 @@ const noexcept return db::name(*d).c_str(); } +/////////////////////////////////////////////////////////////////////////////// +// +// database::rate_limiter +// + +ircd::db::database::rate_limiter::rate_limiter(database *const &d) +:d{d} +{ +} + +ircd::db::database::rate_limiter::~rate_limiter() +noexcept +{ +} + +void +ircd::db::database::rate_limiter::SetBytesPerSecond(int64_t bytes_per_second) +noexcept +{ + log::debug + { + log, "[%s] Rate Limiter update rate %zu -> %zu bytes per second", + db::name(*d), + this->bytes_per_second, + bytes_per_second, + }; + + this->bytes_per_second = bytes_per_second; +} + +size_t +ircd::db::database::rate_limiter::RequestToken(size_t bytes, + size_t alignment, + IOPriority prio, + Statistics *const stats, + OpType type) +noexcept +{ + log::debug + { + log, "[%s] Rate Limiter request bytes:%zu alignment:%zu prio:%s type:%s", + db::name(*d), + bytes, + alignment, + reflect(prio), + type == OpType::kWrite? + "WRITE"_sv: + type == OpType::kRead? + "READ"_sv: + "????"_sv, + }; + + assert(prio <= IOPriority::IO_TOTAL); + { + int64_t i(prio == IOPriority::IO_TOTAL? 0: prio); do + { + requests[i].bytes += bytes; + requests[i].count += 1; + } + while(++i < prio); + } + + //assert(stats); + //stats->recordTick(rocksdb::Tickers::RATE_LIMIT_DELAY_MILLIS, 0); + //stats->recordTick(rocksdb::Tickers::NUMBER_RATE_LIMITER_DRAINS, 0); + //stats->recordTick(rocksdb::Tickers::HARD_RATE_LIMIT_DELAY_COUNT, 0); + //stats->recordTick(rocksdb::Tickers::SOFT_RATE_LIMIT_DELAY_COUNT, 0); + + return bytes; +} + +int64_t +ircd::db::database::rate_limiter::GetTotalBytesThrough(const IOPriority prio) +const noexcept +{ + int64_t ret(0); + int64_t i(prio == IOPriority::IO_TOTAL? 0: prio); do + { + ret += requests[i].bytes; + } + while(++i < prio); + return ret; +} + +int64_t +ircd::db::database::rate_limiter::GetTotalRequests(const IOPriority prio) +const noexcept +{ + int64_t ret(0); + int64_t i(prio == IOPriority::IO_TOTAL? 0: prio); do + { + ret += requests[i].count; + } + while(++i < prio); + return ret; +} + +int64_t +ircd::db::database::rate_limiter::GetSingleBurstBytes() +const noexcept +{ + always_assert(false); + return bytes_per_second; +} + +int64_t +ircd::db::database::rate_limiter::GetBytesPerSecond() +const noexcept +{ + return bytes_per_second; +} + +bool +ircd::db::database::rate_limiter::IsRateLimited(OpType op) +noexcept +{ + always_assert(false); + return false; +} + /////////////////////////////////////////////////////////////////////////////// // // database::sst diff --git a/ircd/db.h b/ircd/db.h index fdd1906f2..4ab0bb83c 100644 --- a/ircd/db.h +++ b/ircd/db.h @@ -53,6 +53,7 @@ #include #include #include +#include namespace ircd::db { @@ -493,6 +494,33 @@ struct ircd::db::database::logger final ~logger() noexcept override; }; +struct ircd::db::database::rate_limiter +:std::enable_shared_from_this +,rocksdb::RateLimiter +{ + using Statistics = rocksdb::Statistics; + using IOPriority = rocksdb::Env::IOPriority; + + database *d {nullptr}; struct + { + int64_t count {0}, bytes {0}; + } + requests[IOPriority::IO_TOTAL]; + int64_t bytes_per_second {1_GiB}; + + bool IsRateLimited(OpType) noexcept override; + int64_t GetBytesPerSecond() const noexcept override; + int64_t GetSingleBurstBytes() const noexcept override; + int64_t GetTotalRequests(const IOPriority) const noexcept override; + int64_t GetTotalBytesThrough(const IOPriority) const noexcept override; + + size_t RequestToken(size_t, size_t, IOPriority, Statistics *, OpType) noexcept override; + void SetBytesPerSecond(int64_t) noexcept override; + + rate_limiter(database *const &); + ~rate_limiter() noexcept; +}; + // // util //