crawler: Stream bucket usage cache data (#11068)

Stream bucket caches to storage and through RPC calls.
This commit is contained in:
Klaus Post 2020-12-10 13:03:22 -08:00 committed by Harshavardhana
parent 8724d49116
commit dc1a46e5d2
4 changed files with 207 additions and 31 deletions

View file

@ -456,9 +456,12 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
// save the content of the cache to minioMetaBackgroundOpsBucket with the provided name.
func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error {
b := d.serialize()
size := int64(len(b))
r, err := hash.NewReader(bytes.NewReader(b), size, "", "", size, false)
pr, pw := io.Pipe()
go func() {
pw.CloseWithError(d.serializeTo(pw))
}()
defer pr.Close()
r, err := hash.NewReader(pr, -1, "", "", -1, false)
if err != nil {
return err
}
@ -480,32 +483,33 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
const dataUsageCacheVer = 2
// serialize the contents of the cache.
func (d *dataUsageCache) serialize() []byte {
// Prepend version and compress.
dst := make([]byte, 0, d.Msgsize()+1)
dst = append(dst, dataUsageCacheVer)
buf := bytes.NewBuffer(dst)
enc, err := zstd.NewWriter(buf,
func (d *dataUsageCache) serializeTo(dst io.Writer) error {
// Add version and compress.
_, err := dst.Write([]byte{dataUsageCacheVer})
if err != nil {
return err
}
enc, err := zstd.NewWriter(dst,
zstd.WithEncoderLevel(zstd.SpeedFastest),
zstd.WithWindowSize(1<<20),
zstd.WithEncoderConcurrency(2))
if err != nil {
logger.LogIf(GlobalContext, err)
return nil
return err
}
mEnc := msgp.NewWriter(enc)
err = d.EncodeMsg(mEnc)
if err != nil {
logger.LogIf(GlobalContext, err)
return nil
return err
}
err = mEnc.Flush()
if err != nil {
return err
}
mEnc.Flush()
err = enc.Close()
if err != nil {
logger.LogIf(GlobalContext, err)
return nil
return err
}
return buf.Bytes()
return nil
}
// deserialize the supplied byte slice into the cache.

View file

@ -656,14 +656,17 @@ func TestDataUsageCacheSerialize(t *testing.T) {
if err != nil {
t.Fatal(err)
}
b := want.serialize()
var got dataUsageCache
err = got.deserialize(bytes.NewBuffer(b))
var buf bytes.Buffer
err = want.serializeTo(&buf)
if err != nil {
t.Fatal(err)
}
t.Log("serialized size:", buf.Len(), "bytes")
var got dataUsageCache
err = got.deserialize(&buf)
if err != nil {
t.Fatal(err)
}
t.Log("serialized size:", len(b), "bytes")
if got.Info.LastUpdate.IsZero() {
t.Error("lastupdate not set")
}

View file

@ -29,6 +29,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/minio/minio/cmd/http"
xhttp "github.com/minio/minio/cmd/http"
@ -167,18 +168,34 @@ func (client *storageRESTClient) Healing() bool {
}
func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
b := cache.serialize()
respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, bytes.NewBuffer(b), int64(len(b)))
pr, pw := io.Pipe()
go func() {
pw.CloseWithError(cache.serializeTo(pw))
}()
defer pr.Close()
respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, pr, -1)
defer http.DrainBody(respBody)
if err != nil {
return cache, err
}
reader, err := waitForHTTPResponse(respBody)
var wg sync.WaitGroup
var newCache dataUsageCache
var decErr error
pr, pw = io.Pipe()
wg.Add(1)
go func() {
defer wg.Done()
decErr = newCache.deserialize(pr)
pr.CloseWithError(err)
}()
err = waitForHTTPStream(respBody, pw)
pw.CloseWithError(err)
if err != nil {
return cache, err
}
var newCache dataUsageCache
return newCache, newCache.deserialize(reader)
wg.Wait()
return newCache, decErr
}
func (client *storageRESTClient) GetDiskID() (string, error) {

View file

@ -18,6 +18,7 @@ package cmd
import (
"bufio"
"encoding/binary"
"encoding/gob"
"encoding/hex"
"errors"
@ -168,15 +169,14 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r
return
}
done := keepHTTPResponseAlive(w)
resp := streamHTTPResponse(w)
usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache)
done(err)
if err != nil {
resp.CloseWithError(err)
return
}
w.Write(usageInfo.serialize())
w.(http.Flusher).Flush()
resp.CloseWithError(usageInfo.serializeTo(resp))
}
// MakeVolHandler - make a volume.
@ -792,6 +792,158 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
}
}
// drainCloser can be used for wrapping an http response.
// It will drain the body before closing.
type drainCloser struct {
rc io.ReadCloser
}
// Read forwards the read operation.
func (f drainCloser) Read(p []byte) (n int, err error) {
return f.rc.Read(p)
}
// Close drains the body and closes the upstream.
func (f drainCloser) Close() error {
xhttp.DrainBody(f.rc)
return nil
}
// httpStreamResponse allows streaming a response, but still send an error.
type httpStreamResponse struct {
done chan error
block chan []byte
err error
}
// Write part of the the streaming response.
// Note that upstream errors are currently not forwarded, but may be in the future.
func (h *httpStreamResponse) Write(b []byte) (int, error) {
if len(b) == 0 || h.err != nil {
// Ignore 0 length blocks
return 0, h.err
}
tmp := make([]byte, len(b))
copy(tmp, b)
h.block <- tmp
return len(b), h.err
}
// CloseWithError will close the stream and return the specified error.
// This can be done several times, but only the first error will be sent.
// After calling this the stream should not be written to.
func (h *httpStreamResponse) CloseWithError(err error) {
if h.done == nil {
return
}
h.done <- err
h.err = err
// Indicates that the response is done.
<-h.done
h.done = nil
}
// streamHTTPResponse can be used to avoid timeouts with long storage
// operations, such as bitrot verification or data usage crawling.
// Every 10 seconds a space character is sent.
// The returned function should always be called to release resources.
// An optional error can be sent which will be picked as text only error,
// without its original type by the receiver.
// waitForHTTPStream should be used to the receiving side.
func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
doneCh := make(chan error)
blockCh := make(chan []byte)
h := httpStreamResponse{done: doneCh, block: blockCh}
go func() {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
// Response not ready, write a filler byte.
w.Write([]byte{32})
w.(http.Flusher).Flush()
case err := <-doneCh:
ticker.Stop()
defer close(doneCh)
if err != nil {
w.Write([]byte{1})
w.Write([]byte(err.Error()))
} else {
w.Write([]byte{0})
}
return
case block := <-blockCh:
var tmp [5]byte
tmp[0] = 2
binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block)))
w.Write(tmp[:])
w.Write(block)
w.(http.Flusher).Flush()
}
}
}()
return &h
}
// waitForHTTPStream will wait for responses where
// streamHTTPResponse has been used.
// The returned reader contains the payload and must be closed if no error is returned.
func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
var tmp [1]byte
for {
_, err := io.ReadFull(respBody, tmp[:])
if err != nil {
return err
}
// Check if we have a response ready or a filler byte.
switch tmp[0] {
case 0:
// 0 is unbuffered, copy the rest.
_, err := io.Copy(w, respBody)
respBody.Close()
if err == io.EOF {
return nil
}
return err
case 1:
errorText, err := ioutil.ReadAll(respBody)
if err != nil {
return err
}
respBody.Close()
return errors.New(string(errorText))
case 3:
// gob style is already deprecated, we can remove this when
// storage API version will be greater or equal to 23.
defer respBody.Close()
dec := gob.NewDecoder(respBody)
var err error
if de := dec.Decode(&err); de == nil {
return err
}
return errors.New("rpc error")
case 2:
// Block of data
var tmp [4]byte
_, err := io.ReadFull(respBody, tmp[:])
if err != nil {
return err
}
length := binary.LittleEndian.Uint32(tmp[:])
_, err = io.CopyN(w, respBody, int64(length))
if err != nil {
return err
}
continue
case 32:
continue
default:
go xhttp.DrainBody(respBody)
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
}
}
}
// VerifyFileResp - VerifyFile()'s response.
type VerifyFileResp struct {
Err error