diff --git a/include/ircd/server/tag.h b/include/ircd/server/tag.h index 41f5cfef5..e4510f94c 100644 --- a/include/ircd/server/tag.h +++ b/include/ircd/server/tag.h @@ -30,6 +30,7 @@ struct ircd::server::tag { size_t written {0}; size_t head_read {0}; // includes head terminator + size_t head_rem {0}; // how much of head buf wasn't used. size_t content_read {0}; // total content read after head size_t content_length {0}; // fixed; or grows monotonic for chunked enc size_t chunk_read {0}; // content read after last chunk head @@ -50,12 +51,17 @@ struct ircd::server::tag size_t content_overflow() const; size_t content_remaining() const; + mutable_buffer make_read_discard_buffer() const; + mutable_buffer make_read_chunk_dynamic_content_buffer() const; + mutable_buffer make_read_chunk_dynamic_head_buffer() const; mutable_buffer make_read_chunk_content_buffer() const; mutable_buffer make_read_chunk_head_buffer() const; mutable_buffer make_read_content_buffer() const; mutable_buffer make_read_head_buffer() const; + const_buffer read_chunk_dynamic_content(const const_buffer &, bool &done); + const_buffer read_chunk_dynamic_head(const const_buffer &, bool &done); const_buffer read_chunk_content(const const_buffer &, bool &done); const_buffer read_chunk_head(const const_buffer &, bool &done); const_buffer read_content(const const_buffer &, bool &done); diff --git a/ircd/server.cc b/ircd/server.cc index d8f05e13f..8e6259123 100644 --- a/ircd/server.cc +++ b/ircd/server.cc @@ -1932,7 +1932,7 @@ noexcept // If the content is chunked encoding and the tag is in the phase of // receiving the chunk head we have to copy what's been received of that // head so far so the grammar can parse a coherent head to continue. - if(tag.state.chunk_length == size_t(-1)) + if(tag.state.chunk_length == size_t(-1) && !null(request.in.content)) { const const_buffer src { @@ -1947,6 +1947,17 @@ noexcept copy(dst, src); } + + // Moving the dynamic buffer should have no real effect because the + // cancellation buffer already took over for it. We could do it anyway + // to prevent regressions but at the cost of maintaining twice the memory + // allocated. For now it's commented to let it die with the user's req. + //tag.request->in.dynamic = std::move(request.in.dynamic); + + // Moving the chunk vector is important to maintain the state of dynamic + // chunk transfers through this cancel. There is no condition here for if + // this is not a dynamic chunk transfer because it's trivial. + tag.request->in.chunks = std::move(request.in.chunks); } void @@ -2134,9 +2145,15 @@ ircd::server::tag::read_buffer(const const_buffer &buffer, if(state.status == (http::code)0) return read_head(buffer, done, link); + if(state.chunk_length == size_t(-1) && null(request->in.content)) + return read_chunk_dynamic_head(buffer, done); + if(state.chunk_length == size_t(-1)) return read_chunk_head(buffer, done); + if(state.chunk_length && null(request->in.content)) + return read_chunk_dynamic_content(buffer, done); + if(state.chunk_length) return read_chunk_content(buffer, done); @@ -2209,6 +2226,7 @@ ircd::server::tag::read_head(const const_buffer &buffer, // Resize the user's head buffer tight to the head; this is how we convey // the size of the dome back to the user. + state.head_rem = size(req.in.head) - head_read; req.in.head = mutable_buffer { data(req.in.head), head_read @@ -2256,29 +2274,19 @@ ircd::server::tag::read_head(const const_buffer &buffer, req.in.chunks.reserve(req.opt->chunks_reserve); } - - ///TODO: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX - if(dynamic) - { - assert(req.opt); - const size_t alloc_size - { - 40_MiB - }; - - req.in.dynamic = unique_buffer{alloc_size}; - req.in.content = req.in.dynamic; - } - const const_buffer chunk { - data(req.in.content), move(req.in.content, beyond_head) + !dynamic? + const_buffer{data(req.in.content), move(req.in.content, beyond_head)}: + beyond_head }; state.chunk_length = -1; const const_buffer overrun { - read_chunk_head(chunk, done) + !dynamic? + read_chunk_head(chunk, done): + read_chunk_dynamic_head(chunk, done) }; assert(empty(overrun) || done == true); @@ -2570,6 +2578,217 @@ ircd::server::tag::read_chunk_content(const const_buffer &buffer, return {}; } +ircd::const_buffer +ircd::server::tag::read_chunk_dynamic_head(const const_buffer &buffer, + bool &done) +{ + assert(request); + auto &req{*request}; + + // informal search for head terminator + static const string_view terminator{"\r\n"}; + const auto pos + { + string_view{buffer}.find(terminator) + }; + + if(pos == string_view::npos) + { + state.chunk_read += size(buffer); + state.content_read += size(buffer); + return {}; + } + + // This indicates how much head was just received from this buffer only. + const size_t addl_head_bytes + { + pos + size(terminator) + }; + + // The received buffer may go past the end of the head. + assert(addl_head_bytes <= size(buffer)); + const size_t beyond_head_length + { + size(buffer) - addl_head_bytes + }; + + state.chunk_read += addl_head_bytes; + const auto head_length{state.chunk_read}; + state.chunk_read = 0; + + // Window on any data in the buffer after the head. + const const_buffer beyond_head + { + data(buffer) + addl_head_bytes, beyond_head_length + }; + + // Setup the capstan and mark the end of the tape + parse::buffer pb + { + mutable_buffer + { + data(req.in.head) + state.head_read, head_length + } + }; + parse::capstan pc{pb}; + pc.read += head_length; + + // Play the tape through the formal grammar. + const http::response::chunk chunk{pc}; + assert(state.chunk_length == size_t(-1)); + state.chunk_length = chunk.size + size(terminator); + + // Increment the content_length to now include this chunk + state.content_length += state.chunk_length; + + // Allocate the chunk content on the vector. + //TODO: maxalloc + req.in.chunks.emplace_back(state.chunk_length); + + // Now we check how much chunk was received beyond the head + // state.chunk_head is still 0 here because that's only incremented + // in the content read function. + const auto &chunk_read + { + std::min(state.chunk_length, beyond_head_length) + }; + + // Now we know how much bleed into the next message was also received + assert(beyond_head_length >= chunk_read); + const size_t beyond_chunk_length + { + beyond_head_length - chunk_read + }; + + const const_buffer partial_chunk + { + data(beyond_head), chunk_read + }; + + const size_t copied + { + copy(req.in.chunks.back(), partial_chunk) + }; + + const const_buffer overrun + { + data(beyond_head) + chunk_read, beyond_chunk_length + }; + + assert(state.chunk_length >= 2); + read_chunk_dynamic_content(partial_chunk, done); + + if(done) + return overrun; + + return read_chunk_dynamic_head(overrun, done); // gobble gobbles +} + +ircd::const_buffer +ircd::server::tag::read_chunk_dynamic_content(const const_buffer &buffer, + bool &done) +{ + assert(request); + auto &req{*request}; + + assert(state.chunk_length != size_t(-1)); + assert(null(req.in.content)); + assert(!req.in.chunks.empty()); + const auto &chunk + { + req.in.chunks.back() + }; + + // The amount of remaining content for the response sequence + assert(state.chunk_read <= size(chunk)); + const size_t remaining + { + size(chunk) - state.chunk_read + }; + + // The amount of content read in this buffer only. + const size_t addl_content_read + { + std::min(size(buffer), remaining) + }; + + // Increment the read counters for this chunk and all chunks. + state.chunk_read += addl_content_read; + state.content_read += addl_content_read; + assert(state.chunk_read <= state.content_read); + + if(state.chunk_read == state.chunk_length) + { + static const string_view terminator{"\r\n"}; + state.content_length -= size(terminator); + state.content_read -= size(terminator); + + assert(state.chunk_length >= 2); + assert(state.chunk_read == state.chunk_length); + state.chunk_length -= size(terminator); + state.chunk_read -= size(terminator); + + auto &chunk{req.in.chunks.back()}; + std::get<1>(chunk) -= size(terminator); + assert(size(chunk) == state.chunk_length); + assert(std::get<0>(chunk) <= std::get<1>(chunk)); + + if(state.chunk_length == 0) + { + assert(state.chunk_read == 0); + assert(!done); + done = true; + + assert(req.opt); + if(req.opt->contiguous_content) + { + assert(state.content_length == size_chunks(req.in)); + assert(req.in.chunks.size() >= 1); + assert(empty(req.in.chunks.back())); + req.in.chunks.pop_back(); + + if(req.in.chunks.size() > 1) + { + req.in.dynamic = size_chunks(req.in); + req.in.content = req.in.dynamic; + + size_t copied{0}; + for(const auto &buffer : req.in.chunks) + copied += copy(req.in.content + copied, buffer); + + assert(copied == size(req.in.content)); + assert(copied == state.content_length); + } + else if(req.in.chunks.size() == 1) + { + req.in.dynamic = std::move(req.in.chunks.front()); + req.in.content = req.in.dynamic; + assert(size(req.in.content) == state.content_length); + } + + req.in.chunks.clear(); + } + + set_value(state.status); + } + } + + // Invoke the user's optional progress callback; this function + // should be marked noexcept for the time being. + if(req.in.progress && !done) + req.in.progress(buffer, const_buffer{data(chunk), state.chunk_read}); + + if(state.chunk_read == state.chunk_length) + { + assert(state.chunk_read == state.chunk_length); + assert(state.chunk_read <= state.content_read); + state.chunk_length = size_t(-1); + state.chunk_read = 0; + } + + return {}; +} + /// An idempotent operation that provides the location of where the socket /// should place the next received data. The tag figures this out based on /// whether it receiving HTTP head data or whether it is in content mode. @@ -2585,15 +2804,21 @@ const if(state.status == (http::code)0) return make_read_head_buffer(); - if(state.content_read >= size(request->in.content)) - return make_read_discard_buffer(); + if(state.chunk_length == size_t(-1) && null(request->in.content)) + return make_read_chunk_dynamic_head_buffer(); if(state.chunk_length == size_t(-1)) return make_read_chunk_head_buffer(); + if(state.chunk_length && null(request->in.content)) + return make_read_chunk_dynamic_content_buffer(); + if(state.chunk_length) return make_read_chunk_content_buffer(); + if(state.content_read >= size(request->in.content)) + return make_read_discard_buffer(); + return make_read_content_buffer(); } @@ -2669,6 +2894,7 @@ const const auto &req{*request}; const auto &content{req.in.content}; + if(unlikely(size(content) <= state.content_read)) throw buffer_overrun { @@ -2736,6 +2962,87 @@ const return buffer; } +/// The dynamic chunk head buffer starts after the main head and has a size +/// of the remaining main head buffer. This area is overwritten for each +/// chunk head. +/// +ircd::mutable_buffer +ircd::server::tag::make_read_chunk_dynamic_head_buffer() +const +{ + assert(request); + const auto &req{*request}; + + assert(state.chunk_length == size_t(-1)); + assert(null(req.in.content)); + assert(size(req.in.head) >= state.head_read); + + const size_t head_max + { + size(req.in.head) + state.head_rem + }; + + // The total offset in the head buffer is the message head plus the + // amount of chunk head received so far, which is kept in chunk_read. + const size_t head_offset + { + state.head_read + state.chunk_read + }; + + assert(head_max >= head_offset); + if(unlikely(head_max - head_offset <= 16)) + throw buffer_overrun + { + "Remaining head buffer of %zu bytes too small to read next chunk header", + head_max - state.head_read + }; + + const size_t remaining + { + head_max - head_offset + }; + + const mutable_buffer buffer + { + data(req.in.head) + state.head_read + state.chunk_read, remaining + }; + + assert(size(buffer) > 0); + return buffer; +} + +ircd::mutable_buffer +ircd::server::tag::make_read_chunk_dynamic_content_buffer() +const +{ + assert(request); + const auto &req{*request}; + + assert(state.chunk_length > 0); + assert(state.content_read <= state.content_length); + assert(null(req.in.content)); + assert(!req.in.chunks.empty()); + const auto &buffer + { + req.in.chunks.back() + }; + + assert(size(buffer) == state.chunk_length); + assert(state.chunk_read <= size(buffer)); + const size_t buffer_remaining + { + size(buffer) - state.chunk_read + }; + + const mutable_buffer ret + { + data(buffer) + state.chunk_read, buffer_remaining + }; + + assert(size(ret) > 0); + return ret; +} + ircd::mutable_buffer ircd::server::tag::make_read_discard_buffer() const