diff --git a/include/ircd/db/database.h b/include/ircd/db/database.h index 4a5ab8061..fa43ec983 100644 --- a/include/ircd/db/database.h +++ b/include/ircd/db/database.h @@ -95,6 +95,7 @@ struct ircd::db::database struct cache; struct sst; struct wal; + struct wal_filter; std::string name; uint64_t checkpoint; @@ -106,6 +107,7 @@ struct ircd::db::database std::shared_ptr logger; std::shared_ptr events; std::shared_ptr mergeop; + std::unique_ptr wal_filter; std::shared_ptr ssts; std::shared_ptr row_cache; std::vector descriptors; diff --git a/include/ircd/db/database/wal_filter.h b/include/ircd/db/database/wal_filter.h new file mode 100644 index 000000000..53724aa78 --- /dev/null +++ b/include/ircd/db/database/wal_filter.h @@ -0,0 +1,40 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2019 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#pragma once +#define HAVE_IRCD_DB_DATABASE_WAL_FILTER_H + +// +// This file is not included in the standard include stack because it contains +// symbols which we cannot declare without RocksDB headers. +// + +/// Callback surface for iterating/recovering the write-ahead-log journal. +struct ircd::db::database::wal_filter +:rocksdb::WalFilter +{ + using WriteBatch = rocksdb::WriteBatch; + using log_number_map = std::map; + using name_id_map = std::map; + + static conf::item debug; + + database *d {nullptr}; + log_number_map log_number; + name_id_map name_id; + + const char *Name() const noexcept override; + WalProcessingOption LogRecord(const WriteBatch &, WriteBatch *const replace, bool *replaced) const noexcept override; + WalProcessingOption LogRecordFound(unsigned long long log_nr, const std::string &name, const WriteBatch &, WriteBatch *const replace, bool *replaced) noexcept override; + void ColumnFamilyLogNumberMap(const log_number_map &, const name_id_map &) noexcept override; + + wal_filter(database *const &); + ~wal_filter() noexcept; +}; diff --git a/ircd/db.cc b/ircd/db.cc index a11146985..b62993b6b 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -998,6 +998,10 @@ try { std::make_shared(this) } +,wal_filter +{ + std::make_unique(this) +} ,ssts { // note: the sst file manager cannot be used for now because it will spawn @@ -1127,6 +1131,9 @@ try // Setup env opts->env = env.get(); + // Setup WAL filter + opts->wal_filter = this->wal_filter.get(); + // Setup SST file mgmt opts->sst_file_manager = this->ssts; @@ -3255,6 +3262,90 @@ const noexcept return db::name(*c).c_str(); } +/////////////////////////////////////////////////////////////////////////////// +// +// database::wal_filter +// + +decltype(ircd::db::database::wal_filter::debug) +ircd::db::database::wal_filter::debug +{ + { "name", "ircd.db.wal.debug" }, + { "default", false }, + { "persist", false }, +}; + +ircd::db::database::wal_filter::wal_filter(database *const &d) +:d{d} +{ +} + +ircd::db::database::wal_filter::~wal_filter() +noexcept +{ +} + +void +ircd::db::database::wal_filter::ColumnFamilyLogNumberMap(const log_number_map &log_number, + const name_id_map &name_id) +noexcept +{ + assert(d); + + this->log_number = log_number; + this->name_id = name_id; + + log::debug + { + log, "'%s': WAL recovery mapping update: log_number:%zu name_id:%zu", + db::name(*d), + log_number.size(), + name_id.size(), + }; +} + +rocksdb::WalFilter::WalProcessingOption +ircd::db::database::wal_filter::LogRecordFound(unsigned long long log_nr, + const std::string &name, + const WriteBatch &wb, + WriteBatch *const replace, + bool *const replaced) +noexcept +{ + assert(d && replace && replaced); + + if(debug) log::debug + { + log, "'%s': WAL recovery record log:%lu '%s' wb[count:%zu size:%zu]", + db::name(*d), + log_nr, + name, + wb.Count(), + wb.GetDataSize(), + }; + + *replaced = false; + return WalProcessingOption::kContinueProcessing; +} + +rocksdb::WalFilter::WalProcessingOption +ircd::db::database::wal_filter::LogRecord(const WriteBatch &wb, + WriteBatch *const replace, + bool *const replaced) +const noexcept +{ + return WalProcessingOption::kContinueProcessing; +} + +const char * +ircd::db::database::wal_filter::Name() +const noexcept +{ + assert(d); + return db::name(*d).c_str(); +} + + /////////////////////////////////////////////////////////////////////////////// // // database::sst diff --git a/ircd/db.h b/ircd/db.h index 4022c6428..680b8918e 100644 --- a/ircd/db.h +++ b/ircd/db.h @@ -50,10 +50,12 @@ #include #include #include +#include #include #include #include +#include #include #include #include