0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-27 07:54:05 +01:00

ircd::db: Remove offload shenanigans.

This commit is contained in:
Jason Volk 2018-08-16 05:06:26 -07:00
parent 31ff4f68cb
commit 850fd0238a

View file

@ -5620,7 +5620,6 @@ namespace ircd::db
static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const string_view &);
static rocksdb::Iterator &_seek_lower_(rocksdb::Iterator &, const string_view &);
static rocksdb::Iterator &_seek_upper_(rocksdb::Iterator &, const string_view &);
static std::unique_ptr<rocksdb::Iterator> _seek_offload(database::column &c, const rocksdb::ReadOptions &opts, const std::function<void (rocksdb::Iterator &)> &closure);
bool _seek(database::column &, const pos &, const rocksdb::ReadOptions &, std::unique_ptr<rocksdb::Iterator> &it);
bool _seek(database::column &, const string_view &, const rocksdb::ReadOptions &, std::unique_ptr<rocksdb::Iterator> &it);
}
@ -5660,8 +5659,6 @@ ircd::db::seek(database::column &c,
const rocksdb::ReadOptions &opts,
std::unique_ptr<rocksdb::Iterator> &it)
{
assert(opts.read_tier == NON_BLOCKING);
if(!it)
{
database &d(*c.d);
@ -5672,12 +5669,6 @@ ircd::db::seek(database::column &c,
return _seek(c, p, opts, it);
}
//
// Seek with offload-safety in case of blocking IO.
//
// The options for an iterator cannot be changed after the iterator is created.
// This slightly complicates our toggling between blocking and non-blocking queries.
//
bool
ircd::db::_seek(database::column &c,
const string_view &p,
@ -5687,82 +5678,24 @@ ircd::db::_seek(database::column &c,
database &d(*c.d);
const ircd::timer timer;
// Start with a non-blocking query.
_seek_(*it, p);
// Branch for query being fulfilled from cache
if(!it->status().IsIncomplete())
{
#ifdef RB_DEBUG_DB_SEEK
log::debug
{
log, "'%s' %lu:%lu SEEK %s %s in %ld$us '%s'",
log, "'%s' %lu:%lu SEEK %s in %ld$us '%s'",
name(d),
sequence(d),
sequence(opts.snapshot),
it->status().ToString(),
valid(*it)? "VALID" : "INVALID",
timer.at<microseconds>().count(),
name(c)
};
#endif
return valid(*it);
}
const auto blocking_it
{
_seek_offload(c, opts, [&p](rocksdb::Iterator &blocking_it)
{
_seek_(blocking_it, p);
})
};
// When the blocking iterator comes back invalid the result is propagated
if(!valid(*blocking_it))
{
it.reset(rocksdb::NewErrorIterator(blocking_it->status()));
#ifdef RB_DEBUG_DB_SEEK
log::debug
{
log, "'%s' %lu:%lu SEEK %s INVALID CACHE MISS in %ld$us '%s'",
name(d),
sequence(d),
sequence(opts.snapshot),
it->status().ToString(),
timer.at<microseconds>().count(),
name(c)
};
#endif
return false;
}
it.reset(nullptr);
#ifdef RB_DEBUG_DB_SEEK
log::debug
{
log, "'%s' %lu:%lu SEEK %s VALID CACHE MISS in %ld$us '%s'",
name(d),
sequence(d),
sequence(opts.snapshot),
blocking_it->status().ToString(),
timer.at<microseconds>().count(),
name(c)
};
#endif
return seek(c, slice(blocking_it->key()), opts, it);
}
//
// Seek with offload-safety in case of blocking IO.
//
// The options for an iterator cannot be changed after the iterator is created.
// This slightly complicates our toggling between blocking and non-blocking queries.
//
bool
ircd::db::_seek(database::column &c,
const pos &p,
@ -5776,140 +5709,24 @@ ircd::db::_seek(database::column &c,
valid(*it)
};
const size_t it_key_size
{
valid_it? size(slice(it->key())) : 0
};
if(unlikely(it_key_size > 1024))
throw error
{
"Iteration with key size %zu not supported atm", it_key_size
};
char it_key_buf[it_key_size];
const mutable_buffer it_key_mb
{
it_key_buf, it_key_size
};
// Unfortunately we have to copy the key unless we use a pinned iterator.
// In the future this offloading system should disappear when the env impl
// is completed to use AIO. Right now the majority case for this copy is
// only an 8 byte key...
const auto it_key
{
valid_it?
string_view{it_key_buf, copy(it_key_mb, slice(it->key()))}:
string_view{}
};
// Start with a non-blocking query. This used to not invalidate the
// iterator when it failed with a cache miss (incomplete), but now it
// does (hence the copy above).
_seek_(*it, p);
// Branch for query being fulfilled from cache
if(!it->status().IsIncomplete())
{
#ifdef RB_DEBUG_DB_SEEK
log::debug
{
log, "'%s' %lu:%lu SEEK[%s] %s %s -> %s in %ld$us '%s'",
log, "'%s' %lu:%lu SEEK[%s] %s -> %s in %ld$us '%s'",
name(d),
sequence(d),
sequence(opts.snapshot),
reflect(p),
it->status().ToString(),
valid_it? "VALID" : "INVALID",
valid(*it)? "VALID" : "INVALID",
it->status().ToString(),
timer.at<microseconds>().count(),
name(c)
};
#endif
return valid(*it);
}
const auto blocking_it
{
_seek_offload(c, opts, [&valid_it, &it_key, &p]
(rocksdb::Iterator &blocking_it)
{
if(valid_it)
_seek_(blocking_it, it_key);
_seek_(blocking_it, p);
})
};
// When the blocking iterator comes back invalid the result is propagated
if(!valid(*blocking_it))
{
it.reset(rocksdb::NewErrorIterator(blocking_it->status()));
#ifdef RB_DEBUG_DB_SEEK
log::debug
{
log, "'%s' %lu:%lu SEEK[%s] %s %s -> %s|INVALID CACHE MISS in %ld$us '%s'",
name(d),
sequence(d),
sequence(opts.snapshot),
reflect(p),
it->status().ToString(),
valid_it? "VALID" : "INVALID",
valid(*it)? "VALID" : "INVALID",
timer.at<microseconds>().count(),
name(c)
};
#endif
return false;
}
it.reset(nullptr);
#ifdef RB_DEBUG_DB_SEEK
log::debug
{
log, "'%s' %lu:%lu SEEK[%s] %s %s -> VALID CACHE MISS in %ld$us '%s'",
name(d),
sequence(d),
sequence(opts.snapshot),
reflect(p),
blocking_it->status().ToString(),
valid_it? "VALID" : "INVALID",
timer.at<microseconds>().count(),
name(c)
};
#endif
return seek(c, slice(blocking_it->key()), opts, it);
}
/// DB cache miss: create a blocking iterator and offload it.
std::unique_ptr<rocksdb::Iterator>
ircd::db::_seek_offload(database::column &c,
const rocksdb::ReadOptions &opts,
const std::function<void (rocksdb::Iterator &)> &closure)
{
database &d(*c.d);
rocksdb::ColumnFamilyHandle *const &cf(c);
rocksdb::ReadOptions blocking_opts(opts);
blocking_opts.fill_cache = true;
blocking_opts.read_tier = BLOCKING;
std::unique_ptr<rocksdb::Iterator> blocking_it
{
d.d->NewIterator(blocking_opts, cf)
};
const auto function{[&closure, &blocking_it]
{
closure(*blocking_it);
}};
ctx::offload(function);
return blocking_it;
}
/// Seek to entry NOT GREATER THAN key. That is, equal to or less than key
@ -6336,7 +6153,7 @@ ircd::db::make_opts(const gopts &opts)
{
rocksdb::ReadOptions ret;
assert(ret.fill_cache);
ret.read_tier = NON_BLOCKING;
ret.read_tier = BLOCKING;
// slice* for exclusive upper bound. when prefixes are used this value must
// have the same prefix because ordering is not guaranteed between prefixes