0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-26 07:23:53 +01:00

ircd::server: Add dynamic chunk vectoring.

This commit is contained in:
Jason Volk 2018-04-25 18:18:09 -07:00
parent 4d3999b7b8
commit e2568457f4
2 changed files with 332 additions and 19 deletions

View file

@ -30,6 +30,7 @@ struct ircd::server::tag
{
size_t written {0};
size_t head_read {0}; // includes head terminator
size_t head_rem {0}; // how much of head buf wasn't used.
size_t content_read {0}; // total content read after head
size_t content_length {0}; // fixed; or grows monotonic for chunked enc
size_t chunk_read {0}; // content read after last chunk head
@ -50,12 +51,17 @@ struct ircd::server::tag
size_t content_overflow() const;
size_t content_remaining() const;
mutable_buffer make_read_discard_buffer() const;
mutable_buffer make_read_chunk_dynamic_content_buffer() const;
mutable_buffer make_read_chunk_dynamic_head_buffer() const;
mutable_buffer make_read_chunk_content_buffer() const;
mutable_buffer make_read_chunk_head_buffer() const;
mutable_buffer make_read_content_buffer() const;
mutable_buffer make_read_head_buffer() const;
const_buffer read_chunk_dynamic_content(const const_buffer &, bool &done);
const_buffer read_chunk_dynamic_head(const const_buffer &, bool &done);
const_buffer read_chunk_content(const const_buffer &, bool &done);
const_buffer read_chunk_head(const const_buffer &, bool &done);
const_buffer read_content(const const_buffer &, bool &done);

View file

@ -1932,7 +1932,7 @@ noexcept
// If the content is chunked encoding and the tag is in the phase of
// receiving the chunk head we have to copy what's been received of that
// head so far so the grammar can parse a coherent head to continue.
if(tag.state.chunk_length == size_t(-1))
if(tag.state.chunk_length == size_t(-1) && !null(request.in.content))
{
const const_buffer src
{
@ -1947,6 +1947,17 @@ noexcept
copy(dst, src);
}
// Moving the dynamic buffer should have no real effect because the
// cancellation buffer already took over for it. We could do it anyway
// to prevent regressions but at the cost of maintaining twice the memory
// allocated. For now it's commented to let it die with the user's req.
//tag.request->in.dynamic = std::move(request.in.dynamic);
// Moving the chunk vector is important to maintain the state of dynamic
// chunk transfers through this cancel. There is no condition here for if
// this is not a dynamic chunk transfer because it's trivial.
tag.request->in.chunks = std::move(request.in.chunks);
}
void
@ -2134,9 +2145,15 @@ ircd::server::tag::read_buffer(const const_buffer &buffer,
if(state.status == (http::code)0)
return read_head(buffer, done, link);
if(state.chunk_length == size_t(-1) && null(request->in.content))
return read_chunk_dynamic_head(buffer, done);
if(state.chunk_length == size_t(-1))
return read_chunk_head(buffer, done);
if(state.chunk_length && null(request->in.content))
return read_chunk_dynamic_content(buffer, done);
if(state.chunk_length)
return read_chunk_content(buffer, done);
@ -2209,6 +2226,7 @@ ircd::server::tag::read_head(const const_buffer &buffer,
// Resize the user's head buffer tight to the head; this is how we convey
// the size of the dome back to the user.
state.head_rem = size(req.in.head) - head_read;
req.in.head = mutable_buffer
{
data(req.in.head), head_read
@ -2256,29 +2274,19 @@ ircd::server::tag::read_head(const const_buffer &buffer,
req.in.chunks.reserve(req.opt->chunks_reserve);
}
///TODO: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
if(dynamic)
{
assert(req.opt);
const size_t alloc_size
{
40_MiB
};
req.in.dynamic = unique_buffer<mutable_buffer>{alloc_size};
req.in.content = req.in.dynamic;
}
const const_buffer chunk
{
data(req.in.content), move(req.in.content, beyond_head)
!dynamic?
const_buffer{data(req.in.content), move(req.in.content, beyond_head)}:
beyond_head
};
state.chunk_length = -1;
const const_buffer overrun
{
read_chunk_head(chunk, done)
!dynamic?
read_chunk_head(chunk, done):
read_chunk_dynamic_head(chunk, done)
};
assert(empty(overrun) || done == true);
@ -2570,6 +2578,217 @@ ircd::server::tag::read_chunk_content(const const_buffer &buffer,
return {};
}
ircd::const_buffer
ircd::server::tag::read_chunk_dynamic_head(const const_buffer &buffer,
bool &done)
{
assert(request);
auto &req{*request};
// informal search for head terminator
static const string_view terminator{"\r\n"};
const auto pos
{
string_view{buffer}.find(terminator)
};
if(pos == string_view::npos)
{
state.chunk_read += size(buffer);
state.content_read += size(buffer);
return {};
}
// This indicates how much head was just received from this buffer only.
const size_t addl_head_bytes
{
pos + size(terminator)
};
// The received buffer may go past the end of the head.
assert(addl_head_bytes <= size(buffer));
const size_t beyond_head_length
{
size(buffer) - addl_head_bytes
};
state.chunk_read += addl_head_bytes;
const auto head_length{state.chunk_read};
state.chunk_read = 0;
// Window on any data in the buffer after the head.
const const_buffer beyond_head
{
data(buffer) + addl_head_bytes, beyond_head_length
};
// Setup the capstan and mark the end of the tape
parse::buffer pb
{
mutable_buffer
{
data(req.in.head) + state.head_read, head_length
}
};
parse::capstan pc{pb};
pc.read += head_length;
// Play the tape through the formal grammar.
const http::response::chunk chunk{pc};
assert(state.chunk_length == size_t(-1));
state.chunk_length = chunk.size + size(terminator);
// Increment the content_length to now include this chunk
state.content_length += state.chunk_length;
// Allocate the chunk content on the vector.
//TODO: maxalloc
req.in.chunks.emplace_back(state.chunk_length);
// Now we check how much chunk was received beyond the head
// state.chunk_head is still 0 here because that's only incremented
// in the content read function.
const auto &chunk_read
{
std::min(state.chunk_length, beyond_head_length)
};
// Now we know how much bleed into the next message was also received
assert(beyond_head_length >= chunk_read);
const size_t beyond_chunk_length
{
beyond_head_length - chunk_read
};
const const_buffer partial_chunk
{
data(beyond_head), chunk_read
};
const size_t copied
{
copy(req.in.chunks.back(), partial_chunk)
};
const const_buffer overrun
{
data(beyond_head) + chunk_read, beyond_chunk_length
};
assert(state.chunk_length >= 2);
read_chunk_dynamic_content(partial_chunk, done);
if(done)
return overrun;
return read_chunk_dynamic_head(overrun, done); // gobble gobbles
}
ircd::const_buffer
ircd::server::tag::read_chunk_dynamic_content(const const_buffer &buffer,
bool &done)
{
assert(request);
auto &req{*request};
assert(state.chunk_length != size_t(-1));
assert(null(req.in.content));
assert(!req.in.chunks.empty());
const auto &chunk
{
req.in.chunks.back()
};
// The amount of remaining content for the response sequence
assert(state.chunk_read <= size(chunk));
const size_t remaining
{
size(chunk) - state.chunk_read
};
// The amount of content read in this buffer only.
const size_t addl_content_read
{
std::min(size(buffer), remaining)
};
// Increment the read counters for this chunk and all chunks.
state.chunk_read += addl_content_read;
state.content_read += addl_content_read;
assert(state.chunk_read <= state.content_read);
if(state.chunk_read == state.chunk_length)
{
static const string_view terminator{"\r\n"};
state.content_length -= size(terminator);
state.content_read -= size(terminator);
assert(state.chunk_length >= 2);
assert(state.chunk_read == state.chunk_length);
state.chunk_length -= size(terminator);
state.chunk_read -= size(terminator);
auto &chunk{req.in.chunks.back()};
std::get<1>(chunk) -= size(terminator);
assert(size(chunk) == state.chunk_length);
assert(std::get<0>(chunk) <= std::get<1>(chunk));
if(state.chunk_length == 0)
{
assert(state.chunk_read == 0);
assert(!done);
done = true;
assert(req.opt);
if(req.opt->contiguous_content)
{
assert(state.content_length == size_chunks(req.in));
assert(req.in.chunks.size() >= 1);
assert(empty(req.in.chunks.back()));
req.in.chunks.pop_back();
if(req.in.chunks.size() > 1)
{
req.in.dynamic = size_chunks(req.in);
req.in.content = req.in.dynamic;
size_t copied{0};
for(const auto &buffer : req.in.chunks)
copied += copy(req.in.content + copied, buffer);
assert(copied == size(req.in.content));
assert(copied == state.content_length);
}
else if(req.in.chunks.size() == 1)
{
req.in.dynamic = std::move(req.in.chunks.front());
req.in.content = req.in.dynamic;
assert(size(req.in.content) == state.content_length);
}
req.in.chunks.clear();
}
set_value(state.status);
}
}
// Invoke the user's optional progress callback; this function
// should be marked noexcept for the time being.
if(req.in.progress && !done)
req.in.progress(buffer, const_buffer{data(chunk), state.chunk_read});
if(state.chunk_read == state.chunk_length)
{
assert(state.chunk_read == state.chunk_length);
assert(state.chunk_read <= state.content_read);
state.chunk_length = size_t(-1);
state.chunk_read = 0;
}
return {};
}
/// An idempotent operation that provides the location of where the socket
/// should place the next received data. The tag figures this out based on
/// whether it receiving HTTP head data or whether it is in content mode.
@ -2585,15 +2804,21 @@ const
if(state.status == (http::code)0)
return make_read_head_buffer();
if(state.content_read >= size(request->in.content))
return make_read_discard_buffer();
if(state.chunk_length == size_t(-1) && null(request->in.content))
return make_read_chunk_dynamic_head_buffer();
if(state.chunk_length == size_t(-1))
return make_read_chunk_head_buffer();
if(state.chunk_length && null(request->in.content))
return make_read_chunk_dynamic_content_buffer();
if(state.chunk_length)
return make_read_chunk_content_buffer();
if(state.content_read >= size(request->in.content))
return make_read_discard_buffer();
return make_read_content_buffer();
}
@ -2669,6 +2894,7 @@ const
const auto &req{*request};
const auto &content{req.in.content};
if(unlikely(size(content) <= state.content_read))
throw buffer_overrun
{
@ -2736,6 +2962,87 @@ const
return buffer;
}
/// The dynamic chunk head buffer starts after the main head and has a size
/// of the remaining main head buffer. This area is overwritten for each
/// chunk head.
///
ircd::mutable_buffer
ircd::server::tag::make_read_chunk_dynamic_head_buffer()
const
{
assert(request);
const auto &req{*request};
assert(state.chunk_length == size_t(-1));
assert(null(req.in.content));
assert(size(req.in.head) >= state.head_read);
const size_t head_max
{
size(req.in.head) + state.head_rem
};
// The total offset in the head buffer is the message head plus the
// amount of chunk head received so far, which is kept in chunk_read.
const size_t head_offset
{
state.head_read + state.chunk_read
};
assert(head_max >= head_offset);
if(unlikely(head_max - head_offset <= 16))
throw buffer_overrun
{
"Remaining head buffer of %zu bytes too small to read next chunk header",
head_max - state.head_read
};
const size_t remaining
{
head_max - head_offset
};
const mutable_buffer buffer
{
data(req.in.head) + state.head_read + state.chunk_read, remaining
};
assert(size(buffer) > 0);
return buffer;
}
ircd::mutable_buffer
ircd::server::tag::make_read_chunk_dynamic_content_buffer()
const
{
assert(request);
const auto &req{*request};
assert(state.chunk_length > 0);
assert(state.content_read <= state.content_length);
assert(null(req.in.content));
assert(!req.in.chunks.empty());
const auto &buffer
{
req.in.chunks.back()
};
assert(size(buffer) == state.chunk_length);
assert(state.chunk_read <= size(buffer));
const size_t buffer_remaining
{
size(buffer) - state.chunk_read
};
const mutable_buffer ret
{
data(buffer) + state.chunk_read, buffer_remaining
};
assert(size(ret) > 0);
return ret;
}
ircd::mutable_buffer
ircd::server::tag::make_read_discard_buffer()
const