0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-14 00:34:18 +01:00

ircd::json::stack: Add checkpoint device.

This commit is contained in:
Jason Volk 2019-02-24 11:20:26 -08:00
parent ea4912d09a
commit 940975b448
2 changed files with 115 additions and 0 deletions

View file

@ -47,11 +47,13 @@ struct ircd::json::stack
struct member;
struct chase;
struct const_chase;
struct checkpoint;
using flush_callback = std::function<const_buffer (const const_buffer &)>;
window_buffer buf;
flush_callback flusher;
std::exception_ptr eptr;
checkpoint *cp {nullptr};
size_t hiwat; ///< autoflush watermark
size_t lowat; ///< flush(false) call min watermark
@ -231,6 +233,25 @@ struct ircd::json::stack::const_chase
const_chase() = default;
};
struct ircd::json::stack::checkpoint
{
stack *s {nullptr};
checkpoint *pc {nullptr};
size_t point {0};
size_t vc {0};
bool committed {true};
public:
bool committing() const;
bool recommit();
bool rollback();
checkpoint(stack &s);
checkpoint(checkpoint &&) = delete;
checkpoint(const checkpoint &) = delete;
~checkpoint() noexcept;
};
template<class... T>
ircd::json::stack::member::member(stack &s,
const string_view &name,

View file

@ -435,11 +435,13 @@ noexcept
:buf{std::move(other.buf)}
,flusher{std::move(other.flusher)}
,eptr{std::move(other.eptr)}
,cp{std::move(other.cp)}
,hiwat{std::move(other.hiwat)}
,lowat{std::move(other.lowat)}
,co{std::move(other.co)}
,ca{std::move(other.ca)}
{
other.cp = nullptr;
other.co = nullptr;
other.ca = nullptr;
}
@ -539,6 +541,12 @@ noexcept try
if(!force && buf.consumed() < lowat)
return false;
if(!force && cp)
return false;
if(cp)
cp = nullptr;
// The user returns the portion of the buffer they were able to flush
// rather than forcing them to wait on their sink to flush the whole
// thing, they can continue with us for a little while more.
@ -1161,6 +1169,92 @@ ircd::json::stack::member::_post_append()
vc |= true;
}
//
// stack::checkpoint
//
ircd::json::stack::checkpoint::checkpoint(stack &s)
:s{&s}
,pc{s.cp}
,point
{
s.buf.consumed()
}
,vc{[&s]
{
const chase top
{
s, true
};
return
top.o?
top.o->mc:
top.a?
top.a->vc:
top.m?
top.m->vc:
0;
}()}
{
s.cp = this;
}
ircd::json::stack::checkpoint::~checkpoint()
noexcept
{
if(!s)
return;
if(!s->cp)
return;
if(std::uncaught_exceptions())
rollback();
if(!committing())
{
assert(point <= s->buf.consumed());
s->buf.rewind(s->buf.consumed() - point);
const chase top
{
*s, true
};
if(top.o)
top.o->mc = vc;
else if(top.a)
top.a->vc = vc;
else if(top.m)
top.m->vc = vc;
}
assert(s->cp == this);
s->cp = pc;
}
bool
ircd::json::stack::checkpoint::rollback()
{
committed = false;
return true;
}
bool
ircd::json::stack::checkpoint::recommit()
{
committed = true;
return true;
}
bool
ircd::json::stack::checkpoint::committing()
const
{
return committed;
}
//
// chase
//