0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-13 16:33:53 +01:00

ircd::db: Attempt at direct IO writable_file (append) implementation.

This commit is contained in:
Jason Volk 2018-11-27 17:30:59 -08:00
parent 26ed0ee0e4
commit 0de55ce111
2 changed files with 401 additions and 75 deletions

View file

@ -71,10 +71,20 @@ struct ircd::db::database::env::writable_file_direct final
bool aligned(const size_t &) const;
bool aligned(const void *const &) const;
bool aligned(const const_buffer &) const;
size_t align(const size_t &) const;
size_t remain(const size_t &) const;
size_t blocks(const size_t &) const;
void write(const const_buffer &, const uint64_t &offset);
size_t buffer_remain() const;
size_t buffer_consumed() const;
const_buffer _write__aligned(const const_buffer &, const uint64_t &offset);
const_buffer _write_aligned(const const_buffer &, const uint64_t &offset);
const_buffer write_aligned(const const_buffer &);
const_buffer write_unaligned_off(const const_buffer &);
const_buffer write_unaligned_buf(const const_buffer &);
const_buffer write(const const_buffer &);
uint64_t GetFileSize() noexcept override;
Status PositionedAppend(const Slice& data, uint64_t offset) noexcept override;

View file

@ -4718,6 +4718,7 @@ rocksdb::Status
ircd::db::database::env::writable_file::Append(const Slice &s)
noexcept try
{
assert(!opts.direct);
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
@ -4779,6 +4780,7 @@ ircd::db::database::env::writable_file::PositionedAppend(const Slice &s,
uint64_t offset)
noexcept try
{
assert(!opts.direct);
const ctx::uninterruptible::nothrow ui;
const std::lock_guard<decltype(mutex)> lock{mutex};
@ -5327,55 +5329,20 @@ noexcept try
};
#endif
const_buffer input
const auto logical_check
{
data(s), size(s)
logical_offset
};
if(!aligned(logical_offset))
const_buffer buf
{
const size_t base(align(logical_offset));
const size_t rem(remain(logical_offset));
const size_t used(logical_offset % alignment);
const size_t cons(std::min(rem, size(input)));
const const_buffer underflow
{
data(input), cons
};
copy(buffer + used, underflow);
write(buffer, base);
logical_offset += cons;
input = const_buffer
{
data(input) + cons, size(input) - cons
};
}
if(empty(input))
return Status::OK();
const const_buffer output
{
data(input), align(size(input))
slice(s)
};
const const_buffer overflow
{
data(input) + size(output), size(input) - size(output)
};
write(output, logical_offset);
logical_offset += size(output);
if(!empty(overflow))
{
zero(buffer);
copy(buffer, overflow);
write(buffer, logical_offset);
logical_offset += size(overflow);
}
while(!empty(buf))
buf = write(buf);
assert(logical_check + size(slice(s)) == logical_offset);
return Status::OK();
}
catch(const fs::error &e)
@ -5409,36 +5376,6 @@ catch(const std::exception &e)
return error_to_status{e};
}
void
ircd::db::database::env::writable_file_direct::write(const const_buffer &buf,
const uint64_t &offset)
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p DIRECT fd:%d write:%p%s bytes:%zu%s offset:%zu%s (logical:%zu)",
d.name,
this,
int(fd),
data(buf),
aligned(data(buf))? "" : "#AC",
size(buf),
aligned(size(buf))? "" : "#AC",
offset,
aligned(offset)? "" : "#AC",
logical_offset
};
#endif
assert(aligned(buf));
assert(aligned(offset));
fs::write_opts wopts;
wopts.priority = this->prio;
wopts.offset = offset;
fs::write(fd, buf, wopts);
}
rocksdb::Status
ircd::db::database::env::writable_file_direct::PositionedAppend(const Slice &s,
uint64_t offset)
@ -5505,13 +5442,392 @@ catch(const std::exception &e)
return 0;
}
/// (Internal) Append buffer. This function is the internal entry interface
/// for appending a buffer of any size and alignment to the file. It is
/// internal because it does no locking or error handling back to rocksdb,
/// because it's expected to be called from some virtual override which does
/// those things. This function will branch off as required to other internal
/// write_* functions to properly align and rebuffer the supplied buffer
/// eventually culminating in an aligned append to the file.
///
/// Calling this function will always result in some write to the file; even
/// if temporary buffering is used to achieve alignment; even if the entire
/// supplied buffer is hopelessly unaligned: the supplied data will be written
/// out some way or another during this call. This means there is no
/// requirement to care about flushing the temporary this->buffer after this
/// call is made. Note that the temporary this->buffer has no reason to be
/// touched by anything other than this function stack.
///
/// !!! NOTE !!!
/// There is a requirement to truncate the file after this call is made before
/// closing the file. If a crash occurs after a write() which was padded out
/// to the block alignment: the file size will reflect the padding when it is
/// opened at next startup; RocksDB will not detect its terminator character
/// sequence and consider this file corrupt.
/// !!!
///
/// - any offset
/// - any data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write(const const_buffer &buf_)
{
const_buffer buf
{
// If the file's offset is aligned and the buffer's data is aligned
// we take an easy branch which writes everything and copies any
// unaligned overflow to the temporary this->buffer. Nothing is
// returned into buf from this branch so there's nothing else done
// as this function will return when empty(buf) is checked below.
aligned(logical_offset) && aligned(data(buf_))?
write_aligned(buf_):
// If the file's offset isn't aligned we have to bring it up to
// alignment first by using data from the front of buf_. All the
// remaining data will be returned to here, which may make a mess
// of buf's alignment and size but this frame will deal with that.
!aligned(logical_offset)?
write_unaligned_off(buf_):
// The file's offset is aligned but buf is not aligned. We'll deal
// with that in this frame.
buf_
};
assert(aligned(logical_offset) || empty(buf));
// buf can be empty here if it was entirely dealt with by the above
// branches and there's nothing else to do here.
if(empty(buf))
return buf;
// Branch on whether the buffer's address is aligned. If so, considering
// the logical_offset is aligned here we are then finished.
if(aligned(data(buf)))
return write_aligned(buf);
// Deal with an unaligned buffer by bringing it up to alignment. This
// will end up returning an aligned buffer, but may unalign the
// logical_offset by doing so. This write() call must be looped until
// it empties the buffer. It will be loopy if everything comes very
// unaligned out of rocksdb.
return write_unaligned_buf(buf);
}
/// Called when the logical_offset aligned but the supplied buffer's address
/// is not aligned. The supplied buffer's size can be unaligned here. This
/// function will fill up the temporary this->buffer with the front of buf
/// until an aligned address is achieved.
///
/// The rest of the buffer which starts at an aligned address is returned and
/// not written. It is not written since this function may leave the
/// logical_offset at an unaligned address.
///
/// * aligned offset
/// * unaligned data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write_unaligned_buf(const const_buffer &buf)
{
assert(aligned(logical_offset));
assert(!aligned(data(buf)));
assert(!aligned(buf));
// Window on the data between the given buffer's pointer and the next
// alignment boundary.
const const_buffer under_buf
{
data(buf), std::min(remain(uintptr_t(data(buf))), size(buf))
};
// Window on the data from the alignment boundary to the end of the
// given buffer.
const const_buffer remaining_buf
{
buf + size(under_buf)
};
assert(size(under_buf) <= size(buf));
assert(size(under_buf) + size(remaining_buf) == size(buf));
assert(data(buf) + size(under_buf) == data(remaining_buf));
assert(aligned(data(remaining_buf)) || empty(remaining_buf));
// We have to use the temporary buffer to deal with the unaligned
// leading part of the buffer. Since logical_offset is aligned this
// buffer isn't being used right now. We copy as much as possible
// to fill out a complete block, both the unaligned and aligned inputs
// and zero padding if both are not sufficient.
mutable_buffer dst(this->buffer);
consume(dst, copy(dst, under_buf));
consume(dst, copy(dst, remaining_buf));
consume(dst, zero(dst));
assert(empty(dst));
// Flush the temporary buffer.
_write__aligned(this->buffer, logical_offset);
// The logical_offset is only advanced by the underflow amount, even if
// we padded the temporary buffer with some remaing_buf data. The caller
// is lead to believe they must deal with remaining_buf in its entirety
// starting at the logical_offset.
logical_offset += size(under_buf);
return remaining_buf;
}
/// Called when the logical_offset is not aligned, indicating that something
/// was left in the temporary this->buffer which must be completed out to
/// alignment by consuming the front of the argument buf. This function appends
/// the front of buf to this->buffer and flushes this->buffer.
///
/// logical_offset is incremented, either to the next block alignment or less
/// if size(buf) can't get it there.
///
/// The rest of buf which isn't used to fill out this->buffer is returned and
/// not written. It is not written since the returned data(buf) might not
/// be aligned. In fact, this function does not care about the alignment of buf
/// at all.
///
/// * unaligned offset
/// - any data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write_unaligned_off(const const_buffer &buf)
{
assert(!aligned(logical_offset));
// Window on the amount of buf we can take to fill up remaining space in
// the temporary this->buffer
const const_buffer src
{
data(buf), std::min(size(buf), buffer_remain())
};
// Window on the remaining space in the temporary this->buffer.
const mutable_buffer dst
{
this->buffer + buffer_consumed()
};
// Window on the remaining space in dst after src is copied to dst, if any.
const mutable_buffer pad
{
dst + size(src)
};
assert(size(dst) - size(pad) == size(src));
assert(size(src) + size(pad) == buffer_remain());
assert(size(src) + size(pad) + buffer_consumed() == alignment);
assert(size(src) + buffer_consumed() != alignment || empty(pad));
copy(dst, src);
zero(pad);
// Backtrack the logical_offset to the aligned offset where this->buffer's
// data starts.
const auto aligned_offset
{
align(logical_offset)
};
// Write the whole temporary this->buffer at the aligned offset.
_write__aligned(this->buffer, aligned_offset);
// Only increment the logical_offset to indicate the appending of
// what this function added to the temporary this->buffer.
logical_offset += size(src);
// The logical_offset should either be aligned now after using buf's
// data to eliminate the temporary this->buffer, or buf's data wasn't
// enough and we'll have to call this function again later with more.
assert(aligned(logical_offset) || size(buf) < alignment);
// Return the rest of buf which we didn't use to fill out this->buf
// Caller will have to deal figuring out how to align the next write.
return const_buffer
{
buf + size(src)
};
}
/// Write function callable when the current logical_offset and the supplied
/// buffer's pointer are both aligned, but the size of the buffer need not
/// be aligned. This function thus assumes that the temporary this->buffer
/// is empty; it will write as much of the input buffer as aligned. The
/// unaligned overflow will be copied to the front of the temporary
/// this->buffer which will be padded to alignment and flushed and the
/// logical_offset will indicate an increment of the size of the input buffer.
///
/// * aligned offset
/// * aligned data
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::write_aligned(const const_buffer &buf)
{
assert(aligned(data(buf)));
assert(aligned(logical_offset));
// This portion at the end of buf did not fill out to the alignment.
const const_buffer overflow
{
_write_aligned(buf, logical_offset)
};
// The aligned portion was written so the offset is incremented here.
logical_offset += size(buf) - size(overflow);
assert(aligned(logical_offset));
assert(size(overflow) < alignment);
assert(aligned(data(overflow)) || empty(overflow));
assert(align(size(buf)) + size(overflow) == size(buf));
assert(blocks(size(buf)) * alignment + size(overflow) == size(buf));
if(!empty(overflow))
{
// The overflow is copied to the temporary this->buffer, padded out with
// zero and then flushed. The logical offset will be incremented by the
// size of that overflow and will no longer be an aligned value,
// indicating there is something in the temporary this->buffer.
mutable_buffer dst(this->buffer);
consume(dst, copy(dst, overflow));
consume(dst, zero(dst));
assert(empty(dst));
_write__aligned(this->buffer, logical_offset);
logical_offset += size(overflow);
assert(!aligned(logical_offset));
}
// Nothing is ever returned and required by the caller here because the
// input is aligned to its address and offset and any unaligned size was
// dealt with using the temporary this->buffer.
return {};
}
/// Lower level write to an aligned offset. The pointer of the buffer and the
/// offset both have to be aligned to alignment. The size of the buffer does
/// not have to be aligned to alignment. The unaligned portion of the input
/// buffer (the last partial block), if any, will be returned to the caller.
///
/// No modifications to the logical_offset or the temporary this->buffer take
/// place here so the caller must manipulate those accordingly.
///
/// * aligned data
/// * aligned offset
/// - any size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::_write_aligned(const const_buffer &buf,
const uint64_t &offset)
{
assert(aligned(data(buf)));
assert(aligned(offset));
// This portion will be written
const const_buffer aligned_buf
{
data(buf), blocks(size(buf)) * alignment
};
// This trailing portion will be returned to caller
const const_buffer ret
{
data(buf) + size(aligned_buf), size(buf) - size(aligned_buf)
};
assert(!empty(aligned_buf) || size(buf) < alignment);
assert(size(aligned_buf) + size(ret) == size(buf));
assert(size(ret) < alignment);
// aligned_buf will be empty if buf itself is smaller than the alignment.
if(empty(aligned_buf))
{
assert(size(ret) == size(buf));
return ret;
}
_write__aligned(aligned_buf, offset);
return ret;
}
/// Lowest level write of a fully aligned buffer to an aligned offset. The
/// pointer of the buffer, the size of the buffer, and the offset ALL have
/// to be aligned to alignment for this function. This function is the only
/// in the stack which actually writes to the filesystem.
///
/// No modifications to the logical_offset take place here so the caller must
/// increment that accordingly. The return value is a const_buffer to conform
/// with the rest of the stack but it is unconditionally empty here because
/// there is no possible overflowing.
///
/// * aligned offset
/// * aligned data
/// * aligned size
ircd::const_buffer
ircd::db::database::env::writable_file_direct::_write__aligned(const const_buffer &buf,
const uint64_t &offset)
{
#ifdef RB_DEBUG_DB_ENV
log::debug
{
log, "'%s': wfile:%p DIRECT fd:%d write:%p%s bytes:%zu%s offset:%zu%s (logical:%zu)",
d.name,
this,
int(fd),
data(buf),
aligned(data(buf))? "" : "#AC",
size(buf),
aligned(size(buf))? "" : "#AC",
offset,
aligned(offset)? "" : "#AC",
logical_offset
};
#endif
assert(aligned(buf));
assert(aligned(offset));
fs::write_opts wopts;
wopts.priority = this->prio;
wopts.offset = offset;
fs::write(fd, buf, wopts);
// Nothing is ever returned to the caller here because the input buffer
// and the offset must be fully aligned at this stage.
return {};
}
size_t
ircd::db::database::env::writable_file_direct::buffer_consumed()
const
{
return likely(alignment != 0)?
logical_offset % alignment:
0UL;
}
size_t
ircd::db::database::env::writable_file_direct::buffer_remain()
const
{
return remain(logical_offset);
}
size_t
ircd::db::database::env::writable_file_direct::blocks(const size_t &value)
const
{
return likely(alignment != 0)?
value / alignment:
0UL;
}
size_t
ircd::db::database::env::writable_file_direct::remain(const size_t &value)
const
{
return likely(alignment != 0)?
alignment - (value - align(value)):
0;
0UL;
}
size_t
@ -5527,7 +5843,7 @@ bool
ircd::db::database::env::writable_file_direct::aligned(const const_buffer &buf)
const
{
return aligned(data(buf)) && aligned(size(buf));
return buffer::aligned(buf, alignment);
}
bool