0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-26 15:33:54 +01:00

ircd::db: Add write-ahead-log recovery callback surface.

This commit is contained in:
Jason Volk 2019-04-20 14:22:08 -07:00
parent bbc472ad9e
commit db539c6268
4 changed files with 135 additions and 0 deletions

View file

@ -95,6 +95,7 @@ struct ircd::db::database
struct cache;
struct sst;
struct wal;
struct wal_filter;
std::string name;
uint64_t checkpoint;
@ -106,6 +107,7 @@ struct ircd::db::database
std::shared_ptr<struct logger> logger;
std::shared_ptr<struct events> events;
std::shared_ptr<struct mergeop> mergeop;
std::unique_ptr<struct wal_filter> wal_filter;
std::shared_ptr<rocksdb::SstFileManager> ssts;
std::shared_ptr<rocksdb::Cache> row_cache;
std::vector<descriptor> descriptors;

View file

@ -0,0 +1,40 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2019 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_DB_DATABASE_WAL_FILTER_H
//
// This file is not included in the standard include stack because it contains
// symbols which we cannot declare without RocksDB headers.
//
/// Callback surface for iterating/recovering the write-ahead-log journal.
struct ircd::db::database::wal_filter
:rocksdb::WalFilter
{
using WriteBatch = rocksdb::WriteBatch;
using log_number_map = std::map<uint32_t, uint64_t>;
using name_id_map = std::map<std::string, uint32_t>;
static conf::item<bool> debug;
database *d {nullptr};
log_number_map log_number;
name_id_map name_id;
const char *Name() const noexcept override;
WalProcessingOption LogRecord(const WriteBatch &, WriteBatch *const replace, bool *replaced) const noexcept override;
WalProcessingOption LogRecordFound(unsigned long long log_nr, const std::string &name, const WriteBatch &, WriteBatch *const replace, bool *replaced) noexcept override;
void ColumnFamilyLogNumberMap(const log_number_map &, const name_id_map &) noexcept override;
wal_filter(database *const &);
~wal_filter() noexcept;
};

View file

@ -998,6 +998,10 @@ try
{
std::make_shared<struct mergeop>(this)
}
,wal_filter
{
std::make_unique<struct wal_filter>(this)
}
,ssts
{
// note: the sst file manager cannot be used for now because it will spawn
@ -1127,6 +1131,9 @@ try
// Setup env
opts->env = env.get();
// Setup WAL filter
opts->wal_filter = this->wal_filter.get();
// Setup SST file mgmt
opts->sst_file_manager = this->ssts;
@ -3255,6 +3262,90 @@ const noexcept
return db::name(*c).c_str();
}
///////////////////////////////////////////////////////////////////////////////
//
// database::wal_filter
//
decltype(ircd::db::database::wal_filter::debug)
ircd::db::database::wal_filter::debug
{
{ "name", "ircd.db.wal.debug" },
{ "default", false },
{ "persist", false },
};
ircd::db::database::wal_filter::wal_filter(database *const &d)
:d{d}
{
}
ircd::db::database::wal_filter::~wal_filter()
noexcept
{
}
void
ircd::db::database::wal_filter::ColumnFamilyLogNumberMap(const log_number_map &log_number,
const name_id_map &name_id)
noexcept
{
assert(d);
this->log_number = log_number;
this->name_id = name_id;
log::debug
{
log, "'%s': WAL recovery mapping update: log_number:%zu name_id:%zu",
db::name(*d),
log_number.size(),
name_id.size(),
};
}
rocksdb::WalFilter::WalProcessingOption
ircd::db::database::wal_filter::LogRecordFound(unsigned long long log_nr,
const std::string &name,
const WriteBatch &wb,
WriteBatch *const replace,
bool *const replaced)
noexcept
{
assert(d && replace && replaced);
if(debug) log::debug
{
log, "'%s': WAL recovery record log:%lu '%s' wb[count:%zu size:%zu]",
db::name(*d),
log_nr,
name,
wb.Count(),
wb.GetDataSize(),
};
*replaced = false;
return WalProcessingOption::kContinueProcessing;
}
rocksdb::WalFilter::WalProcessingOption
ircd::db::database::wal_filter::LogRecord(const WriteBatch &wb,
WriteBatch *const replace,
bool *const replaced)
const noexcept
{
return WalProcessingOption::kContinueProcessing;
}
const char *
ircd::db::database::wal_filter::Name()
const noexcept
{
assert(d);
return db::name(*d).c_str();
}
///////////////////////////////////////////////////////////////////////////////
//
// database::sst

View file

@ -50,10 +50,12 @@
#include <rocksdb/sst_file_manager.h>
#include <rocksdb/sst_dump_tool.h>
#include <rocksdb/compaction_filter.h>
#include <rocksdb/wal_filter.h>
#include <ircd/db/database/comparator.h>
#include <ircd/db/database/prefix_transform.h>
#include <ircd/db/database/compaction_filter.h>
#include <ircd/db/database/wal_filter.h>
#include <ircd/db/database/mergeop.h>
#include <ircd/db/database/events.h>
#include <ircd/db/database/stats.h>