0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-28 16:34:13 +01:00

ircd::db: Improve commitment stack.

This commit is contained in:
Jason Volk 2017-09-08 02:14:17 -07:00
parent 682686bcae
commit d219858e73
2 changed files with 134 additions and 80 deletions

View file

@ -79,6 +79,7 @@ namespace rocksdb
struct Iterator; struct Iterator;
struct ColumnFamilyHandle; struct ColumnFamilyHandle;
struct Snapshot; struct Snapshot;
struct WriteBatch;
} }
// //

View file

@ -606,16 +606,7 @@ ircd::db::database::operator()(const sopts &sopts,
}); });
}); });
auto opts(make_opts(sopts)); commit(*this, batch, sopts);
log.debug("'%s' @%lu PUT %zu column deltas",
name,
sequence(*this),
std::distance(begin, end));
throw_on_error
{
d->Write(opts, &batch)
};
} }
ircd::db::database::column & ircd::db::database::column &
@ -1360,30 +1351,7 @@ ircd::db::write(const cell::delta *const &begin,
append(batch, delta); append(batch, delta);
}); });
auto opts(make_opts(sopts)); commit(d, batch, sopts);
log.debug("'%s' @%lu PUT %zu cell deltas",
name(d),
sequence(d),
std::distance(begin, end));
// Commitment
throw_on_error
{
d.d->Write(opts, &batch)
};
}
void
ircd::db::append(rocksdb::WriteBatch &batch,
const cell::delta &delta)
{
auto &column(std::get<cell *>(delta)->c);
append(batch, column, column::delta
{
std::get<op>(delta),
std::get<cell *>(delta)->index,
std::get<string_view>(delta)
});
} }
template<class pos> template<class pos>
@ -2075,26 +2043,6 @@ ircd::db::write(column &column,
}; };
} }
void
ircd::db::append(rocksdb::WriteBatch &batch,
column &column,
const column::delta &delta)
{
database::column &c(column);
const auto k(slice(std::get<1>(delta)));
const auto v(slice(std::get<2>(delta)));
switch(std::get<0>(delta))
{
case op::GET: assert(0); break;
case op::SET: batch.Put(c, k, v); break;
case op::MERGE: batch.Merge(c, k, v); break;
case op::DELETE: batch.Delete(c, k); break;
case op::DELETE_RANGE: batch.DeleteRange(c, k, v); break;
case op::SINGLE_DELETE: batch.SingleDelete(c, k); break;
}
}
bool bool
ircd::db::has(column &column, ircd::db::has(column &column,
const string_view &key, const string_view &key,
@ -2196,16 +2144,7 @@ ircd::db::column::operator()(const delta *const &begin,
append(batch, *this, delta); append(batch, *this, delta);
}); });
auto opts(make_opts(sopts)); commit(d, batch, sopts);
log.debug("'%s' @%lu PUT %zu column deltas",
name(d),
sequence(d),
std::distance(begin, end));
throw_on_error
{
d.d->Write(opts, &batch)
};
} }
void void
@ -2489,11 +2428,108 @@ ircd::db::seek(column::const_iterator &it,
template bool ircd::db::seek<ircd::db::pos>(column::const_iterator &, const pos &); template bool ircd::db::seek<ircd::db::pos>(column::const_iterator &, const pos &);
template bool ircd::db::seek<ircd::string_view>(column::const_iterator &, const string_view &); template bool ircd::db::seek<ircd::string_view>(column::const_iterator &, const string_view &);
///////////////////////////////////////////////////////////////////////////////
//
// writebatch
//
namespace ircd::db
{
template<size_t SIZE> const char *info(char (&buf)[SIZE], const rocksdb::WriteBatch &);
}
void
ircd::db::append(rocksdb::WriteBatch &batch,
const cell::delta &delta)
{
auto &column(std::get<cell *>(delta)->c);
append(batch, column, column::delta
{
std::get<op>(delta),
std::get<cell *>(delta)->index,
std::get<string_view>(delta)
});
}
void
ircd::db::append(rocksdb::WriteBatch &batch,
column &column,
const column::delta &delta)
{
database::column &c(column);
const auto k(slice(std::get<1>(delta)));
const auto v(slice(std::get<2>(delta)));
switch(std::get<0>(delta))
{
case op::GET: assert(0); break;
case op::SET: batch.Put(c, k, v); break;
case op::MERGE: batch.Merge(c, k, v); break;
case op::DELETE: batch.Delete(c, k); break;
case op::DELETE_RANGE: batch.DeleteRange(c, k, v); break;
case op::SINGLE_DELETE: batch.SingleDelete(c, k); break;
}
}
void
ircd::db::commit(database &d,
rocksdb::WriteBatch &batch,
const sopts &sopts)
{
const auto opts(make_opts(sopts));
commit(d, batch, opts);
}
void
ircd::db::commit(database &d,
rocksdb::WriteBatch &batch,
const rocksdb::WriteOptions &opts)
{
char info_buf[96];
log.debug("'%s' @%lu COMMIT %s",
d.name,
sequence(d),
info(info_buf, batch));
throw_on_error
{
d.d->Write(opts, &batch)
};
}
template<size_t SIZE>
const char *
ircd::db::info(char (&buf)[SIZE],
const rocksdb::WriteBatch &batch)
{
snprintf(buf, SIZE, "%d deltas; size: %zuB :%s%s%s%s%s%s%s%s%s",
batch.Count(),
batch.GetDataSize(),
batch.HasPut()? " PUT" : "",
batch.HasDelete()? " DELETE" : "",
batch.HasSingleDelete()? " SINGLE_DELETE" : "",
batch.HasDeleteRange()? " DELETE_RANGE" : "",
batch.HasMerge()? " MERGE" : "",
batch.HasBeginPrepare()? " BEGIN_PREPARE" : "",
batch.HasEndPrepare()? " END_PREPARE" : "",
batch.HasCommit()? " COMMIT" : "",
batch.HasRollback()? " ROLLBACK" : "");
return buf;
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// //
// seek // seek
// //
namespace ircd::db
{
static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const rocksdb::Slice &);
static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const string_view &);
static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const pos &);
}
std::unique_ptr<rocksdb::Iterator> std::unique_ptr<rocksdb::Iterator>
ircd::db::seek(column &column, ircd::db::seek(column &column,
const string_view &key, const string_view &key,
@ -2552,11 +2588,11 @@ ircd::db::seek(database::column &c,
// Indicate a cache miss and blocking is required. // Indicate a cache miss and blocking is required.
if(!it->status().IsIncomplete()) if(!it->status().IsIncomplete())
{ {
log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE HIT %s", log.debug("'%s':'%s' @%lu SEEK %s CACHE HIT %s",
name(d), name(d),
name(c), name(c),
sequence(d), sequence(d),
valid(*it), valid(*it)? "VALID" : "INVALID",
it->status().ToString()); it->status().ToString());
return valid(*it); return valid(*it);
@ -2575,8 +2611,9 @@ ircd::db::seek(database::column &c,
{ {
// When the non-blocking iterator hit its cache miss in the middle of an // When the non-blocking iterator hit its cache miss in the middle of an
// iteration we have to copy its position to the blocking iterator first // iteration we have to copy its position to the blocking iterator first
// and then make the next query. TODO: this can be avoided when 'p' is a // and then make the next query. In other words, two seeks, because the
// slice and not an increment. // original seek (p) may be a `pos` and not a key. TODO: this can be
// avoided if we detect 'p' is a slice and not an increment.
if(valid(*it)) if(valid(*it))
_seek_(*blocking_it, it->key()); _seek_(*blocking_it, it->key());
@ -2588,11 +2625,11 @@ ircd::db::seek(database::column &c,
if(!valid(*blocking_it)) if(!valid(*blocking_it))
{ {
it.reset(rocksdb::NewErrorIterator(blocking_it->status())); it.reset(rocksdb::NewErrorIterator(blocking_it->status()));
log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s", log.debug("'%s':'%s' @%lu SEEK %s CACHE MISS %s",
name(d), name(d),
name(c), name(c),
sequence(d), sequence(d),
valid(*it), valid(*it)? "VALID" : "INVALID",
it->status().ToString()); it->status().ToString());
return false; return false;
@ -2603,21 +2640,22 @@ ircd::db::seek(database::column &c,
// seems to be forcing us to recreate the iterator after it failed with the // seems to be forcing us to recreate the iterator after it failed with the
// status IsIncomplete(). Regardless of reuse, a non-blocking seek must occur // status IsIncomplete(). Regardless of reuse, a non-blocking seek must occur
// to match this iterator with the result -- such a seek may fail again if // to match this iterator with the result -- such a seek may fail again if
// the cache has been hosed between the point the offload took place and this // the blocking_it's data has been evicted from cache between the point the
// seek! To properly handle this case we reenter this seek() function and // offload took place and the seek for the user's iterator. That may be
// enjoy the safety of offloading again for this edge case. // impossible. But if it ever becomes possible, we reenter this seek()
// function and enjoy the safety of offloading to try again.
it.reset(nullptr); it.reset(nullptr);
log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s", log.debug("'%s':'%s' @%lu SEEK %s CACHE MISS %s",
name(d), name(d),
name(c), name(c),
sequence(d), sequence(d),
valid(*blocking_it), valid(*blocking_it)? "VALID" : "INVALID",
blocking_it->status().ToString()); blocking_it->status().ToString());
return seek(c, blocking_it->key(), opts, it); return seek(c, blocking_it->key(), opts, it);
} }
void rocksdb::Iterator &
ircd::db::_seek_(rocksdb::Iterator &it, ircd::db::_seek_(rocksdb::Iterator &it,
const pos &p) const pos &p)
{ {
@ -2637,20 +2675,23 @@ ircd::db::_seek_(rocksdb::Iterator &it,
break; break;
} }
} }
return it;
} }
void rocksdb::Iterator &
ircd::db::_seek_(rocksdb::Iterator &it, ircd::db::_seek_(rocksdb::Iterator &it,
const string_view &sv) const string_view &sv)
{ {
_seek_(it, slice(sv)); return _seek_(it, slice(sv));
} }
void rocksdb::Iterator &
ircd::db::_seek_(rocksdb::Iterator &it, ircd::db::_seek_(rocksdb::Iterator &it,
const rocksdb::Slice &sk) const rocksdb::Slice &sk)
{ {
it.Seek(sk); it.Seek(sk);
return it;
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -2960,12 +3001,24 @@ ircd::db::valid_or_throw(const rocksdb::Iterator &it)
bool bool
ircd::db::operator!(const rocksdb::Iterator &it) ircd::db::operator!(const rocksdb::Iterator &it)
{ {
return !it.Valid(); return !valid(it);
} }
bool bool
ircd::db::valid(const rocksdb::Iterator &it) ircd::db::valid(const rocksdb::Iterator &it)
{ {
switch(it.status().code())
{
using rocksdb::Status;
case Status::kOk: break;
case Status::kNotFound: break;
case Status::kIncomplete: break;
default:
throw_on_error(it.status());
__builtin_unreachable();
}
return it.Valid(); return it.Valid();
} }