From dc1a46e5d2515677de5ee3a364a40318269e1ea3 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 10 Dec 2020 13:03:22 -0800 Subject: [PATCH] crawler: Stream bucket usage cache data (#11068) Stream bucket caches to storage and through RPC calls. --- cmd/data-usage-cache.go | 38 +++++---- cmd/data-usage_test.go | 13 +-- cmd/storage-rest-client.go | 27 +++++-- cmd/storage-rest-server.go | 160 ++++++++++++++++++++++++++++++++++++- 4 files changed, 207 insertions(+), 31 deletions(-) diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 3093338c1..207e6ae2f 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -456,9 +456,12 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) // save the content of the cache to minioMetaBackgroundOpsBucket with the provided name. func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error { - b := d.serialize() - size := int64(len(b)) - r, err := hash.NewReader(bytes.NewReader(b), size, "", "", size, false) + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(d.serializeTo(pw)) + }() + defer pr.Close() + r, err := hash.NewReader(pr, -1, "", "", -1, false) if err != nil { return err } @@ -480,32 +483,33 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) const dataUsageCacheVer = 2 // serialize the contents of the cache. -func (d *dataUsageCache) serialize() []byte { - // Prepend version and compress. - dst := make([]byte, 0, d.Msgsize()+1) - dst = append(dst, dataUsageCacheVer) - buf := bytes.NewBuffer(dst) - enc, err := zstd.NewWriter(buf, +func (d *dataUsageCache) serializeTo(dst io.Writer) error { + // Add version and compress. + _, err := dst.Write([]byte{dataUsageCacheVer}) + if err != nil { + return err + } + enc, err := zstd.NewWriter(dst, zstd.WithEncoderLevel(zstd.SpeedFastest), zstd.WithWindowSize(1<<20), zstd.WithEncoderConcurrency(2)) if err != nil { - logger.LogIf(GlobalContext, err) - return nil + return err } mEnc := msgp.NewWriter(enc) err = d.EncodeMsg(mEnc) if err != nil { - logger.LogIf(GlobalContext, err) - return nil + return err + } + err = mEnc.Flush() + if err != nil { + return err } - mEnc.Flush() err = enc.Close() if err != nil { - logger.LogIf(GlobalContext, err) - return nil + return err } - return buf.Bytes() + return nil } // deserialize the supplied byte slice into the cache. diff --git a/cmd/data-usage_test.go b/cmd/data-usage_test.go index 680672950..91507c633 100644 --- a/cmd/data-usage_test.go +++ b/cmd/data-usage_test.go @@ -656,14 +656,17 @@ func TestDataUsageCacheSerialize(t *testing.T) { if err != nil { t.Fatal(err) } - - b := want.serialize() - var got dataUsageCache - err = got.deserialize(bytes.NewBuffer(b)) + var buf bytes.Buffer + err = want.serializeTo(&buf) + if err != nil { + t.Fatal(err) + } + t.Log("serialized size:", buf.Len(), "bytes") + var got dataUsageCache + err = got.deserialize(&buf) if err != nil { t.Fatal(err) } - t.Log("serialized size:", len(b), "bytes") if got.Info.LastUpdate.IsZero() { t.Error("lastupdate not set") } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index f4ab878d5..310af1a9f 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -29,6 +29,7 @@ import ( "path" "strconv" "strings" + "sync" "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http" @@ -167,18 +168,34 @@ func (client *storageRESTClient) Healing() bool { } func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { - b := cache.serialize() - respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, bytes.NewBuffer(b), int64(len(b))) + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(cache.serializeTo(pw)) + }() + defer pr.Close() + respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, pr, -1) defer http.DrainBody(respBody) if err != nil { return cache, err } - reader, err := waitForHTTPResponse(respBody) + + var wg sync.WaitGroup + var newCache dataUsageCache + var decErr error + pr, pw = io.Pipe() + wg.Add(1) + go func() { + defer wg.Done() + decErr = newCache.deserialize(pr) + pr.CloseWithError(err) + }() + err = waitForHTTPStream(respBody, pw) + pw.CloseWithError(err) if err != nil { return cache, err } - var newCache dataUsageCache - return newCache, newCache.deserialize(reader) + wg.Wait() + return newCache, decErr } func (client *storageRESTClient) GetDiskID() (string, error) { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 20293e6b7..fe8d23a7b 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -18,6 +18,7 @@ package cmd import ( "bufio" + "encoding/binary" "encoding/gob" "encoding/hex" "errors" @@ -168,15 +169,14 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r return } - done := keepHTTPResponseAlive(w) + resp := streamHTTPResponse(w) usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache) - done(err) if err != nil { + resp.CloseWithError(err) return } - w.Write(usageInfo.serialize()) - w.(http.Flusher).Flush() + resp.CloseWithError(usageInfo.serializeTo(resp)) } // MakeVolHandler - make a volume. @@ -792,6 +792,158 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) { } } +// drainCloser can be used for wrapping an http response. +// It will drain the body before closing. +type drainCloser struct { + rc io.ReadCloser +} + +// Read forwards the read operation. +func (f drainCloser) Read(p []byte) (n int, err error) { + return f.rc.Read(p) +} + +// Close drains the body and closes the upstream. +func (f drainCloser) Close() error { + xhttp.DrainBody(f.rc) + return nil +} + +// httpStreamResponse allows streaming a response, but still send an error. +type httpStreamResponse struct { + done chan error + block chan []byte + err error +} + +// Write part of the the streaming response. +// Note that upstream errors are currently not forwarded, but may be in the future. +func (h *httpStreamResponse) Write(b []byte) (int, error) { + if len(b) == 0 || h.err != nil { + // Ignore 0 length blocks + return 0, h.err + } + tmp := make([]byte, len(b)) + copy(tmp, b) + h.block <- tmp + return len(b), h.err +} + +// CloseWithError will close the stream and return the specified error. +// This can be done several times, but only the first error will be sent. +// After calling this the stream should not be written to. +func (h *httpStreamResponse) CloseWithError(err error) { + if h.done == nil { + return + } + h.done <- err + h.err = err + // Indicates that the response is done. + <-h.done + h.done = nil +} + +// streamHTTPResponse can be used to avoid timeouts with long storage +// operations, such as bitrot verification or data usage crawling. +// Every 10 seconds a space character is sent. +// The returned function should always be called to release resources. +// An optional error can be sent which will be picked as text only error, +// without its original type by the receiver. +// waitForHTTPStream should be used to the receiving side. +func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse { + doneCh := make(chan error) + blockCh := make(chan []byte) + h := httpStreamResponse{done: doneCh, block: blockCh} + go func() { + ticker := time.NewTicker(time.Second * 10) + for { + select { + case <-ticker.C: + // Response not ready, write a filler byte. + w.Write([]byte{32}) + w.(http.Flusher).Flush() + case err := <-doneCh: + ticker.Stop() + defer close(doneCh) + if err != nil { + w.Write([]byte{1}) + w.Write([]byte(err.Error())) + } else { + w.Write([]byte{0}) + } + return + case block := <-blockCh: + var tmp [5]byte + tmp[0] = 2 + binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block))) + w.Write(tmp[:]) + w.Write(block) + w.(http.Flusher).Flush() + } + } + }() + return &h +} + +// waitForHTTPStream will wait for responses where +// streamHTTPResponse has been used. +// The returned reader contains the payload and must be closed if no error is returned. +func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error { + var tmp [1]byte + for { + _, err := io.ReadFull(respBody, tmp[:]) + if err != nil { + return err + } + // Check if we have a response ready or a filler byte. + switch tmp[0] { + case 0: + // 0 is unbuffered, copy the rest. + _, err := io.Copy(w, respBody) + respBody.Close() + if err == io.EOF { + return nil + } + return err + case 1: + errorText, err := ioutil.ReadAll(respBody) + if err != nil { + return err + } + respBody.Close() + return errors.New(string(errorText)) + case 3: + // gob style is already deprecated, we can remove this when + // storage API version will be greater or equal to 23. + defer respBody.Close() + dec := gob.NewDecoder(respBody) + var err error + if de := dec.Decode(&err); de == nil { + return err + } + return errors.New("rpc error") + case 2: + // Block of data + var tmp [4]byte + _, err := io.ReadFull(respBody, tmp[:]) + if err != nil { + return err + } + length := binary.LittleEndian.Uint32(tmp[:]) + _, err = io.CopyN(w, respBody, int64(length)) + if err != nil { + return err + } + continue + case 32: + continue + default: + go xhttp.DrainBody(respBody) + return fmt.Errorf("unexpected filler byte: %d", tmp[0]) + } + } +} + // VerifyFileResp - VerifyFile()'s response. type VerifyFileResp struct { Err error