diff --git a/include/ircd/db.h b/include/ircd/db.h index 9612b9e29..9381aa6a1 100644 --- a/include/ircd/db.h +++ b/include/ircd/db.h @@ -79,6 +79,7 @@ namespace rocksdb struct Iterator; struct ColumnFamilyHandle; struct Snapshot; + struct WriteBatch; } // diff --git a/ircd/db.cc b/ircd/db.cc index bf28027b0..2459698e3 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -606,16 +606,7 @@ ircd::db::database::operator()(const sopts &sopts, }); }); - auto opts(make_opts(sopts)); - log.debug("'%s' @%lu PUT %zu column deltas", - name, - sequence(*this), - std::distance(begin, end)); - - throw_on_error - { - d->Write(opts, &batch) - }; + commit(*this, batch, sopts); } ircd::db::database::column & @@ -1360,30 +1351,7 @@ ircd::db::write(const cell::delta *const &begin, append(batch, delta); }); - auto opts(make_opts(sopts)); - log.debug("'%s' @%lu PUT %zu cell deltas", - name(d), - sequence(d), - std::distance(begin, end)); - - // Commitment - throw_on_error - { - d.d->Write(opts, &batch) - }; -} - -void -ircd::db::append(rocksdb::WriteBatch &batch, - const cell::delta &delta) -{ - auto &column(std::get(delta)->c); - append(batch, column, column::delta - { - std::get(delta), - std::get(delta)->index, - std::get(delta) - }); + commit(d, batch, sopts); } template @@ -2075,26 +2043,6 @@ ircd::db::write(column &column, }; } -void -ircd::db::append(rocksdb::WriteBatch &batch, - column &column, - const column::delta &delta) -{ - database::column &c(column); - - const auto k(slice(std::get<1>(delta))); - const auto v(slice(std::get<2>(delta))); - switch(std::get<0>(delta)) - { - case op::GET: assert(0); break; - case op::SET: batch.Put(c, k, v); break; - case op::MERGE: batch.Merge(c, k, v); break; - case op::DELETE: batch.Delete(c, k); break; - case op::DELETE_RANGE: batch.DeleteRange(c, k, v); break; - case op::SINGLE_DELETE: batch.SingleDelete(c, k); break; - } -} - bool ircd::db::has(column &column, const string_view &key, @@ -2196,16 +2144,7 @@ ircd::db::column::operator()(const delta *const &begin, append(batch, *this, delta); }); - auto opts(make_opts(sopts)); - log.debug("'%s' @%lu PUT %zu column deltas", - name(d), - sequence(d), - std::distance(begin, end)); - - throw_on_error - { - d.d->Write(opts, &batch) - }; + commit(d, batch, sopts); } void @@ -2489,11 +2428,108 @@ ircd::db::seek(column::const_iterator &it, template bool ircd::db::seek(column::const_iterator &, const pos &); template bool ircd::db::seek(column::const_iterator &, const string_view &); +/////////////////////////////////////////////////////////////////////////////// +// +// writebatch +// + +namespace ircd::db +{ + template const char *info(char (&buf)[SIZE], const rocksdb::WriteBatch &); +} + +void +ircd::db::append(rocksdb::WriteBatch &batch, + const cell::delta &delta) +{ + auto &column(std::get(delta)->c); + append(batch, column, column::delta + { + std::get(delta), + std::get(delta)->index, + std::get(delta) + }); +} + +void +ircd::db::append(rocksdb::WriteBatch &batch, + column &column, + const column::delta &delta) +{ + database::column &c(column); + + const auto k(slice(std::get<1>(delta))); + const auto v(slice(std::get<2>(delta))); + switch(std::get<0>(delta)) + { + case op::GET: assert(0); break; + case op::SET: batch.Put(c, k, v); break; + case op::MERGE: batch.Merge(c, k, v); break; + case op::DELETE: batch.Delete(c, k); break; + case op::DELETE_RANGE: batch.DeleteRange(c, k, v); break; + case op::SINGLE_DELETE: batch.SingleDelete(c, k); break; + } +} + +void +ircd::db::commit(database &d, + rocksdb::WriteBatch &batch, + const sopts &sopts) +{ + const auto opts(make_opts(sopts)); + commit(d, batch, opts); +} + +void +ircd::db::commit(database &d, + rocksdb::WriteBatch &batch, + const rocksdb::WriteOptions &opts) +{ + char info_buf[96]; + log.debug("'%s' @%lu COMMIT %s", + d.name, + sequence(d), + info(info_buf, batch)); + + throw_on_error + { + d.d->Write(opts, &batch) + }; +} + +template +const char * +ircd::db::info(char (&buf)[SIZE], + const rocksdb::WriteBatch &batch) +{ + snprintf(buf, SIZE, "%d deltas; size: %zuB :%s%s%s%s%s%s%s%s%s", + batch.Count(), + batch.GetDataSize(), + batch.HasPut()? " PUT" : "", + batch.HasDelete()? " DELETE" : "", + batch.HasSingleDelete()? " SINGLE_DELETE" : "", + batch.HasDeleteRange()? " DELETE_RANGE" : "", + batch.HasMerge()? " MERGE" : "", + batch.HasBeginPrepare()? " BEGIN_PREPARE" : "", + batch.HasEndPrepare()? " END_PREPARE" : "", + batch.HasCommit()? " COMMIT" : "", + batch.HasRollback()? " ROLLBACK" : ""); + + return buf; +} + /////////////////////////////////////////////////////////////////////////////// // // seek // +namespace ircd::db +{ + static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const rocksdb::Slice &); + static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const string_view &); + static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const pos &); +} + std::unique_ptr ircd::db::seek(column &column, const string_view &key, @@ -2552,11 +2588,11 @@ ircd::db::seek(database::column &c, // Indicate a cache miss and blocking is required. if(!it->status().IsIncomplete()) { - log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE HIT %s", + log.debug("'%s':'%s' @%lu SEEK %s CACHE HIT %s", name(d), name(c), sequence(d), - valid(*it), + valid(*it)? "VALID" : "INVALID", it->status().ToString()); return valid(*it); @@ -2575,8 +2611,9 @@ ircd::db::seek(database::column &c, { // When the non-blocking iterator hit its cache miss in the middle of an // iteration we have to copy its position to the blocking iterator first - // and then make the next query. TODO: this can be avoided when 'p' is a - // slice and not an increment. + // and then make the next query. In other words, two seeks, because the + // original seek (p) may be a `pos` and not a key. TODO: this can be + // avoided if we detect 'p' is a slice and not an increment. if(valid(*it)) _seek_(*blocking_it, it->key()); @@ -2588,11 +2625,11 @@ ircd::db::seek(database::column &c, if(!valid(*blocking_it)) { it.reset(rocksdb::NewErrorIterator(blocking_it->status())); - log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s", + log.debug("'%s':'%s' @%lu SEEK %s CACHE MISS %s", name(d), name(c), sequence(d), - valid(*it), + valid(*it)? "VALID" : "INVALID", it->status().ToString()); return false; @@ -2603,21 +2640,22 @@ ircd::db::seek(database::column &c, // seems to be forcing us to recreate the iterator after it failed with the // status IsIncomplete(). Regardless of reuse, a non-blocking seek must occur // to match this iterator with the result -- such a seek may fail again if - // the cache has been hosed between the point the offload took place and this - // seek! To properly handle this case we reenter this seek() function and - // enjoy the safety of offloading again for this edge case. + // the blocking_it's data has been evicted from cache between the point the + // offload took place and the seek for the user's iterator. That may be + // impossible. But if it ever becomes possible, we reenter this seek() + // function and enjoy the safety of offloading to try again. it.reset(nullptr); - log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s", + log.debug("'%s':'%s' @%lu SEEK %s CACHE MISS %s", name(d), name(c), sequence(d), - valid(*blocking_it), + valid(*blocking_it)? "VALID" : "INVALID", blocking_it->status().ToString()); return seek(c, blocking_it->key(), opts, it); } -void +rocksdb::Iterator & ircd::db::_seek_(rocksdb::Iterator &it, const pos &p) { @@ -2637,20 +2675,23 @@ ircd::db::_seek_(rocksdb::Iterator &it, break; } } + + return it; } -void +rocksdb::Iterator & ircd::db::_seek_(rocksdb::Iterator &it, const string_view &sv) { - _seek_(it, slice(sv)); + return _seek_(it, slice(sv)); } -void +rocksdb::Iterator & ircd::db::_seek_(rocksdb::Iterator &it, const rocksdb::Slice &sk) { it.Seek(sk); + return it; } /////////////////////////////////////////////////////////////////////////////// @@ -2960,12 +3001,24 @@ ircd::db::valid_or_throw(const rocksdb::Iterator &it) bool ircd::db::operator!(const rocksdb::Iterator &it) { - return !it.Valid(); + return !valid(it); } bool ircd::db::valid(const rocksdb::Iterator &it) { + switch(it.status().code()) + { + using rocksdb::Status; + + case Status::kOk: break; + case Status::kNotFound: break; + case Status::kIncomplete: break; + default: + throw_on_error(it.status()); + __builtin_unreachable(); + } + return it.Valid(); }