0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-18 07:50:57 +01:00

ircd::db: Add txn class to compose a transaction in stages.

This commit is contained in:
Jason Volk 2017-09-18 19:19:02 -07:00
parent b27a2a6423
commit d7e9c9182a
3 changed files with 454 additions and 0 deletions

View file

@ -122,6 +122,7 @@ enum class ircd::db::pos
#include "db/row.h" #include "db/row.h"
#include "db/index.h" #include "db/index.h"
#include "db/json.h" #include "db/json.h"
#include "db/txn.h"
#include "db/where.h" #include "db/where.h"
#include "db/cursor.h" #include "db/cursor.h"

89
include/ircd/db/txn.h Normal file
View file

@ -0,0 +1,89 @@
/*
* Copyright (C) 2017 Charybdis Development Team
* Copyright (C) 2017 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#pragma once
#define HAVE_IRCD_DB_TXN_H
namespace ircd::db
{
struct txn;
void for_each(const txn &, const std::function<void (const delta &)> &);
std::string debug(const txn &);
}
class ircd::db::txn
{
database *d {nullptr};
std::unique_ptr<rocksdb::WriteBatch> wb;
public:
struct opts;
struct append;
struct handler;
struct checkpoint;
explicit operator const rocksdb::WriteBatch &() const;
explicit operator const database &() const;
explicit operator rocksdb::WriteBatch &();
explicit operator database &();
bool has(const op &) const;
size_t bytes() const;
size_t size() const;
// commit
void operator()(database &, const sopts & = {});
void operator()(const sopts & = {});
// clear
void clear();
txn() = default;
txn(database &);
txn(database &, const opts &);
~txn() noexcept;
};
struct ircd::db::txn::checkpoint
{
txn &t;
checkpoint(txn &);
~checkpoint() noexcept;
};
struct ircd::db::txn::append
{
append(txn &, database &, const delta &);
append(txn &, column &, const column::delta &);
append(txn &, const cell::delta &);
append(txn &, const row::delta &);
append(txn &, const delta &);
append(txn &, const string_view &key, const json::iov &);
};
struct ircd::db::txn::opts
{
size_t reserve_bytes = 0;
size_t max_bytes = 0;
};

View file

@ -1293,6 +1293,370 @@ ircd::db::database::events::OnColumnFamilyHandleDeletionStarted(rocksdb::ColumnF
h); h);
} }
///////////////////////////////////////////////////////////////////////////////
//
// db/txn.h
//
struct ircd::db::txn::handler
:rocksdb::WriteBatch::Handler
{
using Status = rocksdb::Status;
using Slice = rocksdb::Slice;
const database &d;
const std::function<void (const delta &)> &cb;
bool _continue {true};
Status callback(const delta &) noexcept;
Status callback(const uint32_t &, const op &, const Slice &a, const Slice &b) noexcept;
bool Continue() noexcept override;
Status MarkRollback(const Slice &xid) noexcept override;
Status MarkCommit(const Slice &xid) noexcept override;
Status MarkEndPrepare(const Slice &xid) noexcept override;
Status MarkBeginPrepare() noexcept override;
Status MergeCF(const uint32_t cfid, const Slice &, const Slice &) noexcept override;
Status SingleDeleteCF(const uint32_t cfid, const Slice &) noexcept override;
Status DeleteRangeCF(const uint32_t cfid, const Slice &, const Slice &) noexcept override;
Status DeleteCF(const uint32_t cfid, const Slice &) noexcept override;
Status PutCF(const uint32_t cfid, const Slice &, const Slice &) noexcept override;
handler(const database &d,
const std::function<void (const delta &)> &cb)
:d{d}
,cb{cb}
{}
};
std::string
ircd::db::debug(const txn &t)
{
const rocksdb::WriteBatch &wb(t);
return db::debug(wb);
}
void
ircd::db::for_each(const txn &t,
const std::function<void (const delta &)> &closure)
{
const database &d(t);
const rocksdb::WriteBatch &wb{t};
txn::handler h{d, closure};
throw_on_error
{
wb.Iterate(&h)
};
}
///
/// handler
///
rocksdb::Status
ircd::db::txn::handler::PutCF(const uint32_t cfid,
const Slice &key,
const Slice &val)
noexcept
{
return callback(cfid, op::SET, key, val);
}
rocksdb::Status
ircd::db::txn::handler::DeleteCF(const uint32_t cfid,
const Slice &key)
noexcept
{
return callback(cfid, op::DELETE, key, {});
}
rocksdb::Status
ircd::db::txn::handler::DeleteRangeCF(const uint32_t cfid,
const Slice &begin,
const Slice &end)
noexcept
{
return callback(cfid, op::DELETE_RANGE, begin, end);
}
rocksdb::Status
ircd::db::txn::handler::SingleDeleteCF(const uint32_t cfid,
const Slice &key)
noexcept
{
return callback(cfid, op::SINGLE_DELETE, key, {});
}
rocksdb::Status
ircd::db::txn::handler::MergeCF(const uint32_t cfid,
const Slice &key,
const Slice &value)
noexcept
{
return callback(cfid, op::MERGE, key, value);
}
rocksdb::Status
ircd::db::txn::handler::MarkBeginPrepare()
noexcept
{
return Status::OK();
}
rocksdb::Status
ircd::db::txn::handler::MarkEndPrepare(const Slice &xid)
noexcept
{
return Status::OK();
}
rocksdb::Status
ircd::db::txn::handler::MarkCommit(const Slice &xid)
noexcept
{
return Status::OK();
}
rocksdb::Status
ircd::db::txn::handler::MarkRollback(const Slice &xid)
noexcept
{
return Status::OK();
}
rocksdb::Status
ircd::db::txn::handler::callback(const uint32_t &cfid,
const op &op,
const Slice &a,
const Slice &b)
noexcept try
{
auto &c{d[cfid]};
const delta delta
{
op,
db::name(c),
slice(a),
slice(b)
};
return callback(delta);
}
catch(const std::exception &e)
{
_continue = false;
return Status::Aborted(slice(string_view{e.what()}));
}
rocksdb::Status
ircd::db::txn::handler::callback(const delta &delta)
noexcept try
{
cb(delta);
return Status::OK();
}
catch(const std::exception &e)
{
_continue = false;
return Status::Aborted(slice(string_view{e.what()}));
}
bool
ircd::db::txn::handler::Continue()
noexcept
{
return _continue;
}
//
// txn
//
ircd::db::txn::txn(database &d)
:txn{d, {}}
{
}
ircd::db::txn::txn(database &d,
const opts &opts)
:d{&d}
,wb
{
std::make_unique<rocksdb::WriteBatch>(opts.reserve_bytes, opts.max_bytes)
}
{
}
ircd::db::txn::~txn()
noexcept
{
}
void
ircd::db::txn::operator()(const sopts &opts)
{
assert(bool(d));
operator()(*d, opts);
}
void
ircd::db::txn::operator()(database &d,
const sopts &opts)
{
assert(bool(wb));
commit(d, *wb, opts);
}
void
ircd::db::txn::clear()
{
assert(bool(wb));
wb->Clear();
}
size_t
ircd::db::txn::size()
const
{
assert(bool(wb));
return wb->Count();
}
size_t
ircd::db::txn::bytes()
const
{
assert(bool(wb));
return wb->GetDataSize();
}
bool
ircd::db::txn::has(const op &op)
const
{
assert(bool(wb));
switch(op)
{
case op::GET: assert(0); return false;
case op::SET: return wb->HasPut();
case op::MERGE: return wb->HasMerge();
case op::DELETE: return wb->HasDelete();
case op::DELETE_RANGE: return wb->HasDeleteRange();
case op::SINGLE_DELETE: return wb->HasSingleDelete();
}
return false;
}
ircd::db::txn::operator
ircd::db::database &()
{
assert(bool(d));
return *d;
}
ircd::db::txn::operator
rocksdb::WriteBatch &()
{
assert(bool(wb));
return *wb;
}
ircd::db::txn::operator
const ircd::db::database &()
const
{
assert(bool(d));
return *d;
}
ircd::db::txn::operator
const rocksdb::WriteBatch &()
const
{
assert(bool(wb));
return *wb;
}
//
// Checkpoint
//
ircd::db::txn::checkpoint::checkpoint(txn &t)
:t{t}
{
assert(bool(t.wb));
t.wb->SetSavePoint();
}
ircd::db::txn::checkpoint::~checkpoint()
noexcept
{
if(likely(!std::uncaught_exception()))
throw_on_error { t.wb->PopSavePoint() };
else
throw_on_error { t.wb->RollbackToSavePoint() };
}
ircd::db::txn::append::append(txn &t,
const string_view &key,
const json::iov &iov)
{
std::for_each(std::begin(iov), std::end(iov), [&t, &key]
(const auto &member)
{
append
{
t, delta
{
member.first, // col
key, // key
member.second // val
}
};
});
}
ircd::db::txn::append::append(txn &t,
const delta &delta)
{
assert(bool(t.d));
append(t, *t.d, delta);
}
ircd::db::txn::append::append(txn &t,
const row::delta &delta)
{
assert(0);
}
ircd::db::txn::append::append(txn &t,
const cell::delta &delta)
{
db::append(*t.wb, delta);
}
ircd::db::txn::append::append(txn &t,
column &c,
const column::delta &delta)
{
db::append(*t.wb, c, delta);
}
ircd::db::txn::append::append(txn &t,
database &d,
const delta &delta)
{
db::column c{d[std::get<1>(delta)]};
db::append(*t.wb, c, db::column::delta
{
std::get<op>(delta),
std::get<2>(delta),
std::get<3>(delta)
});
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// //
// db/index.h // db/index.h