mirror of
https://github.com/matrix-construct/construct
synced 2024-11-25 16:22:35 +01:00
ircd::db: Enrich seek(row) opts; add proper error handling and propagation.
This commit is contained in:
parent
b035bb9824
commit
c1c11b4aed
3 changed files with 68 additions and 19 deletions
|
@ -50,6 +50,9 @@ enum class ircd::db::get
|
|||
NO_CHECKSUM = 0x0020, // Integrity of data will not be checked (overrides conf).
|
||||
PREFIX = 0x0040, // (prefix_same_as_start); automatic for index columns with pfx
|
||||
ORDERED = 0x0080, // (total_order_seek); relevant to index columns
|
||||
NO_PARALLEL = 0x0100, // Don't submit requests in parallel (relevant to db::row)
|
||||
NO_THROW = 0x0200, // Suppress exceptions if possible.
|
||||
THROW = 0x0400, // Throw exceptions more than usual.
|
||||
};
|
||||
|
||||
template<class T>
|
||||
|
|
|
@ -60,7 +60,7 @@ struct ircd::db::row
|
|||
gopts opts = {})
|
||||
__attribute__((stack_protect));
|
||||
|
||||
friend size_t seek(row &, const string_view &);
|
||||
friend size_t seek(row &, const string_view &, const gopts &opts = {});
|
||||
};
|
||||
|
||||
namespace ircd::db
|
||||
|
|
82
ircd/db.cc
82
ircd/db.cc
|
@ -9005,58 +9005,104 @@ ircd::db::write(const row::delta *const &begin,
|
|||
|
||||
size_t
|
||||
ircd::db::seek(row &r,
|
||||
const string_view &key)
|
||||
const string_view &key,
|
||||
const gopts &opts)
|
||||
{
|
||||
// This frame can't be interrupted because it may have requests
|
||||
// pending in the request pool which must synchronize back here.
|
||||
const ctx::uninterruptible ui;
|
||||
|
||||
#ifdef RB_DEBUG_DB_SEEK
|
||||
const ircd::timer timer;
|
||||
#endif
|
||||
|
||||
// This frame can't be interrupted because it may have requests
|
||||
// pending in the request pool which must synchronize back here.
|
||||
const ctx::uninterruptible ui;
|
||||
|
||||
size_t ret{0};
|
||||
std::exception_ptr eptr;
|
||||
ctx::latch latch{r.size()};
|
||||
const auto closure{[&latch, &ret, &key]
|
||||
(auto &cell)
|
||||
const auto closure{[&opts, &latch, &ret, &key, &eptr]
|
||||
(auto &cell) noexcept
|
||||
{
|
||||
ret += bool(seek(cell, key));
|
||||
// If there's a pending error from another cell by the time this
|
||||
// closure is executed we don't perform the seek() unless the user
|
||||
// specifies db::get::NO_THROW to suppress it.
|
||||
if(!eptr || test(opts, get::NO_THROW)) try
|
||||
{
|
||||
if(!seek(cell, key))
|
||||
{
|
||||
// If the cell is not_found that's not a thrown exception here;
|
||||
// the cell will just be !valid(). The user can specify
|
||||
// get::THROW to propagate a not_found from the seek(row);
|
||||
if(test(opts, get::THROW))
|
||||
throw not_found
|
||||
{
|
||||
"column '%s' key '%s'", cell.col(), key
|
||||
};
|
||||
}
|
||||
else ++ret;
|
||||
}
|
||||
catch(const not_found &e)
|
||||
{
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
log::error
|
||||
{
|
||||
log, "row seek: column '%s' key '%s' :%s",
|
||||
cell.col(),
|
||||
key,
|
||||
e.what()
|
||||
};
|
||||
|
||||
eptr = std::make_exception_ptr(e);
|
||||
}
|
||||
|
||||
// The latch must always be hit here. No exception should propagate
|
||||
// to prevent this from being reached or beyond.
|
||||
latch.count_down();
|
||||
}};
|
||||
|
||||
// Submit all the requests
|
||||
for(auto &cell : r)
|
||||
{
|
||||
db::column &column(cell);
|
||||
const auto reclosure{[&closure, &cell]
|
||||
{
|
||||
closure(cell);
|
||||
}};
|
||||
|
||||
//TODO: should check a bloom filter on the cache for this branch
|
||||
//TODO: because right now double-querying the cache is gross.
|
||||
if(!exists(cache(column), key))
|
||||
request([&closure, &cell]
|
||||
{
|
||||
closure(cell);
|
||||
});
|
||||
if(!test(opts, get::NO_PARALLEL) && !exists(cache(column), key))
|
||||
request(reclosure);
|
||||
else
|
||||
closure(cell);
|
||||
reclosure();
|
||||
}
|
||||
|
||||
// Wait for responses.
|
||||
latch.wait();
|
||||
assert(ret <= r.size());
|
||||
|
||||
#ifdef RB_DEBUG_DB_SEEK
|
||||
const column &c(r[0]);
|
||||
const database &d(c);
|
||||
log::debug
|
||||
{
|
||||
log, "'%s' %lu:%lu '%s' row SEEK KEY %zu of %zu in %ld$us",
|
||||
log, "'%s' %lu:%lu '%s' row SEEK KEY %zu of %zu in %ld$us %s",
|
||||
name(d),
|
||||
sequence(d),
|
||||
sequence(r[0]),
|
||||
name(c),
|
||||
ret,
|
||||
r.size(),
|
||||
timer.at<microseconds>().count()
|
||||
timer.at<microseconds>().count(),
|
||||
what(eptr)
|
||||
};
|
||||
#endif
|
||||
|
||||
assert(ret <= r.size());
|
||||
if(eptr && !test(opts, get::NO_THROW))
|
||||
std::rethrow_exception(eptr);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -9144,7 +9190,7 @@ ircd::db::row::row(database &d,
|
|||
}
|
||||
|
||||
if(key)
|
||||
seek(*this, key);
|
||||
seek(*this, key, opts);
|
||||
}
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
|
|
Loading…
Reference in a new issue