0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-09-30 04:38:52 +02:00

ircd::db: Add compaction callback interface.

This commit is contained in:
Jason Volk 2018-09-18 15:07:09 -07:00
parent 6a06f2c89e
commit fc09ba81af
7 changed files with 184 additions and 0 deletions

View file

@ -0,0 +1,36 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_DB_COMPACTOR_H
namespace ircd::db
{
struct compactor;
}
/// Compaction callback
///
/// return db::op::GET (0) from callback for no-op
/// return db::op::DELETE from callback to delete this kv.
/// return db::op::SET from callback if replacement modified.
/// return db::op::DELETE_RANGE from callback if skip_until modified.
///
struct ircd::db::compactor
{
using prototype = db::op (const int &level,
const string_view &key,
const string_view &val,
std::string *const replace,
std::string *const skip_until);
std::function<prototype> value;
std::function<prototype> merge;
};

View file

@ -37,6 +37,7 @@ struct ircd::db::database::column final
database::descriptor descriptor; database::descriptor descriptor;
comparator cmp; comparator cmp;
prefix_transform prefix; prefix_transform prefix;
compaction_filter cfilter;
rocksdb::BlockBasedTableOptions table_opts; rocksdb::BlockBasedTableOptions table_opts;
custom_ptr<rocksdb::ColumnFamilyHandle> handle; custom_ptr<rocksdb::ColumnFamilyHandle> handle;

View file

@ -0,0 +1,34 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_DB_DATABASE_COMPACTION_FILTER_H
// This file is not part of the standard include stack because it requires
// RocksDB symbols which we cannot forward declare. It is used internally
// and does not need to be included by general users of IRCd.
struct ircd::db::database::compaction_filter
final
:rocksdb::CompactionFilter
{
using Slice = rocksdb::Slice;
column *c;
database *d;
db::compactor user;
const char *Name() const override;
bool IgnoreSnapshots() const override;
Decision FilterV2(const int level, const Slice &key, const ValueType v, const Slice &oldval, std::string *newval, std::string *skipuntil) const override;
compaction_filter(column *const &c, db::compactor);
~compaction_filter() noexcept override;
};

View file

@ -76,6 +76,7 @@ struct ircd::db::database
struct snapshot; struct snapshot;
struct comparator; struct comparator;
struct prefix_transform; struct prefix_transform;
struct compaction_filter;
struct column; struct column;
struct env; struct env;
struct cache; struct cache;

View file

@ -60,4 +60,7 @@ struct ircd::db::database::descriptor
/// blocks will read and cache unrelated data if values are smaller /// blocks will read and cache unrelated data if values are smaller
/// than this size. /// than this size.
size_t block_size { 512 }; size_t block_size { 512 };
/// User given compaction callback surface.
db::compactor compactor {};
}; };

View file

@ -61,6 +61,7 @@ enum class ircd::db::pos
#include "delta.h" #include "delta.h"
#include "comparator.h" #include "comparator.h"
#include "compactor.h"
#include "prefix.h" #include "prefix.h"
#include "merge.h" #include "merge.h"
#include "database/rocksdb.h" #include "database/rocksdb.h"

View file

@ -26,10 +26,12 @@
#include <rocksdb/filter_policy.h> #include <rocksdb/filter_policy.h>
#include <rocksdb/table.h> #include <rocksdb/table.h>
#include <rocksdb/sst_file_manager.h> #include <rocksdb/sst_file_manager.h>
#include <rocksdb/compaction_filter.h>
// ircd::db interfaces requiring complete RocksDB (frontside). // ircd::db interfaces requiring complete RocksDB (frontside).
#include <ircd/db/database/comparator.h> #include <ircd/db/database/comparator.h>
#include <ircd/db/database/prefix_transform.h> #include <ircd/db/database/prefix_transform.h>
#include <ircd/db/database/compaction_filter.h>
#include <ircd/db/database/mergeop.h> #include <ircd/db/database/mergeop.h>
#include <ircd/db/database/events.h> #include <ircd/db/database/events.h>
#include <ircd/db/database/stats.h> #include <ircd/db/database/stats.h>
@ -1389,6 +1391,7 @@ ircd::db::database::column::column(database *const &d,
,descriptor{descriptor} ,descriptor{descriptor}
,cmp{d, this->descriptor.cmp} ,cmp{d, this->descriptor.cmp}
,prefix{d, this->descriptor.prefix} ,prefix{d, this->descriptor.prefix}
,cfilter{this, this->descriptor.compactor}
,handle ,handle
{ {
nullptr, [this](rocksdb::ColumnFamilyHandle *const handle) nullptr, [this](rocksdb::ColumnFamilyHandle *const handle)
@ -1426,6 +1429,9 @@ ircd::db::database::column::column(database *const &d,
&this->prefix, [](const rocksdb::SliceTransform *) {} &this->prefix, [](const rocksdb::SliceTransform *) {}
}; };
// Set the compaction filter
this->options.compaction_filter = &this->cfilter;
// //
// Table options // Table options
// //
@ -1457,6 +1463,9 @@ ircd::db::database::column::column(database *const &d,
// Misc options // Misc options
// //
// More stats reported by the rocksdb.stats property.
this->options.report_bg_io_stats = true;
// Set the compaction style; we don't override this in the descriptor yet. // Set the compaction style; we don't override this in the descriptor yet.
this->options.compaction_style = rocksdb::kCompactionStyleLevel; this->options.compaction_style = rocksdb::kCompactionStyleLevel;
@ -2244,6 +2253,105 @@ noexcept
} }
///////////////////////////////////////////////////////////////////////////////
//
// database::compaction_filter
//
ircd::db::database::compaction_filter::compaction_filter(column *const &c,
db::compactor user)
:c{c}
,d{c->d}
,user{std::move(user)}
{
}
ircd::db::database::compaction_filter::~compaction_filter()
noexcept
{
}
rocksdb::CompactionFilter::Decision
ircd::db::database::compaction_filter::FilterV2(const int level,
const Slice &key,
const ValueType type,
const Slice &oldval,
std::string *const newval,
std::string *const skip)
const
{
#ifdef RB_DEBUG_DB_ENV
const auto typestr
{
type == kValue?
"VALUE"_sv:
type == kMergeOperand?
"MERGE"_sv:
"BLOB"_sv
};
log::debug
{
log, "'%s':'%s': compaction level:%d key:%zu@%p type:%s old:%zu@%p new:%p skip:%p",
d->name,
c->name,
level,
size(key),
data(key),
typestr,
size(oldval),
data(oldval),
(const void *)newval,
(const void *)skipuntil
};
#endif
db::op ret
{
db::op::GET
};
switch(type)
{
case ValueType::kValue:
if(user.value)
ret = user.value(level, slice(key), slice(oldval), newval, skip);
break;
case ValueType::kMergeOperand:
if(user.merge)
ret = user.merge(level, slice(key), slice(oldval), newval, skip);
break;
case ValueType::kBlobIndex:
break;
}
switch(ret)
{
default:
case db::op::GET: return Decision::kKeep;
case db::op::SET: return Decision::kChangeValue;
case db::op::DELETE: return Decision::kRemove;
case db::op::DELETE_RANGE: return Decision::kRemoveAndSkipUntil;
}
}
bool
ircd::db::database::compaction_filter::IgnoreSnapshots()
const
{
return false;
}
const char *
ircd::db::database::compaction_filter::Name()
const
{
assert(c);
return db::name(*c).c_str();
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// //
// database::env // database::env