diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index 54300e9ec..91cddcb0d 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -57,6 +57,10 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) { b.closeWithErr(err) return n, err } + if n != len(p) { + err = io.ErrShortWrite + b.closeWithErr(err) + } return n, err } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index dfe57899e..4fff5da07 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -377,6 +377,10 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st values.Set(storageRESTLength, strconv.Itoa(int(size))) respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size) defer xhttp.DrainBody(respBody) + if err != nil { + return err + } + _, err = waitForHTTPResponse(respBody) return err } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 50b9a9ff1..a43119155 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -326,8 +326,8 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req return } - done := keepHTTPResponseAlive(w) - done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body)) + done, body := keepHTTPReqResponseAlive(w, r) + done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body)) } // DeleteVersion delete updated metadata. @@ -719,8 +719,99 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req } } +// closeNotifier is itself a ReadCloser that will notify when either an error occurs or +// the Close() function is called. +type closeNotifier struct { + rc io.ReadCloser + done chan struct{} +} + +func (c *closeNotifier) Read(p []byte) (n int, err error) { + n, err = c.rc.Read(p) + if err != nil { + if c.done != nil { + close(c.done) + c.done = nil + } + } + return n, err +} + +func (c *closeNotifier) Close() error { + if c.done != nil { + close(c.done) + c.done = nil + } + return c.rc.Close() +} + +// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage +// operations, such as bitrot verification or data usage scanning. +// Every 10 seconds a space character is sent. +// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker. +// The returned function should always be called to release resources. +// An optional error can be sent which will be picked as text only error, +// without its original type by the receiver. +// waitForHTTPResponse should be used to the receiving side. +func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) { + bodyDoneCh := make(chan struct{}) + doneCh := make(chan error) + ctx := r.Context() + go func() { + // Wait for body to be read. + select { + case <-ctx.Done(): + case <-bodyDoneCh: + case err := <-doneCh: + if err != nil { + w.Write([]byte{1}) + w.Write([]byte(err.Error())) + } else { + w.Write([]byte{0}) + } + return + } + defer close(doneCh) + // Initiate ticker after body has been read. + ticker := time.NewTicker(time.Second * 10) + for { + select { + case <-ticker.C: + // Response not ready, write a filler byte. + w.Write([]byte{32}) + w.(http.Flusher).Flush() + case err := <-doneCh: + if err != nil { + w.Write([]byte{1}) + w.Write([]byte(err.Error())) + } else { + w.Write([]byte{0}) + } + ticker.Stop() + return + } + } + }() + return func(err error) { + if doneCh == nil { + return + } + + // Indicate we are ready to write. + doneCh <- err + + // Wait for channel to be closed so we don't race on writes. + <-doneCh + + // Clear so we can be called multiple times without crashing. + doneCh = nil + }, &closeNotifier{rc: r.Body, done: bodyDoneCh} +} + // keepHTTPResponseAlive can be used to avoid timeouts with long storage // operations, such as bitrot verification or data usage scanning. +// keepHTTPResponseAlive may NOT be used until the request body has been read, +// use keepHTTPReqResponseAlive instead. // Every 10 seconds a space character is sent. // The returned function should always be called to release resources. // An optional error can be sent which will be picked as text only error,