add missing responseBody drain (#12147)

Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Harshavardhana 2021-04-26 08:59:54 -07:00
parent 82780ec10e
commit e3f2a260aa
3 changed files with 24 additions and 53 deletions

View file

@ -27,6 +27,7 @@ import (
"strings"
"github.com/gorilla/mux"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
xioutil "github.com/minio/minio/pkg/ioutil"
)
@ -291,6 +292,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
logger.LogIf(ctx, err)
return err
}
defer xhttp.DrainBody(respBody)
return waitForHTTPStream(respBody, wr)
}

View file

@ -207,7 +207,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
pw.CloseWithError(cache.serializeTo(pw))
}()
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
pr.Close()
return cache, err
@ -247,7 +247,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
if err != nil {
return info, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err = msgp.Decode(respBody, &info); err != nil {
return info, err
}
@ -270,7 +270,7 @@ func (client *storageRESTClient) MakeVolBulk(ctx context.Context, volumes ...str
values := make(url.Values)
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -279,7 +279,7 @@ func (client *storageRESTClient) MakeVol(ctx context.Context, volume string) (er
values := make(url.Values)
values.Set(storageRESTVolume, volume)
respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -289,7 +289,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
if err != nil {
return
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
vinfos := VolsInfo(vols)
err = msgp.Decode(respBody, &vinfos)
return vinfos, err
@ -303,7 +303,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
if err != nil {
return
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
err = msgp.Decode(respBody, &vol)
return vol, err
}
@ -316,7 +316,7 @@ func (client *storageRESTClient) DeleteVol(ctx context.Context, volume string, f
values.Set(storageRESTForceDelete, "true")
}
respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -327,7 +327,7 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string,
values.Set(storageRESTFilePath, path)
reader := bytes.NewReader(buf)
respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -337,7 +337,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
values.Set(storageRESTFilePath, path)
values.Set(storageRESTLength, strconv.Itoa(int(size)))
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -352,7 +352,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
}
respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -368,7 +368,7 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
}
respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -378,7 +378,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b)))
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -388,7 +388,7 @@ func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, p
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -405,7 +405,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
}
respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -419,7 +419,6 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
values.Set(storageRESTDstPath, dstPath)
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, nil, -1)
defer http.DrainBody(respBody)
return err
}
@ -449,7 +448,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
if err != nil {
return fi, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
dec := msgpNewReader(respBody)
defer readMsgpReaderPool.Put(dec)
@ -467,7 +466,7 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
if err != nil {
return nil, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return ioutil.ReadAll(respBody)
}
@ -499,7 +498,7 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
if err != nil {
return 0, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
n, err := io.ReadFull(respBody, buf)
return int64(n), toStorageErr(err)
}
@ -514,7 +513,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st
if err != nil {
return nil, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&entries)
return entries, toStorageErr(err)
}
@ -527,7 +526,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -551,7 +550,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
errs = make([]error, len(versions))
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
for i := range errs {
errs[i] = err
@ -590,7 +589,7 @@ func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcP
values.Set(storageRESTDstVolume, dstVolume)
values.Set(storageRESTDstPath, dstPath)
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -605,7 +604,7 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
}
respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
return err
}

View file

@ -735,23 +735,6 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
}
}
// drainCloser can be used for wrapping an http response.
// It will drain the body before closing.
type drainCloser struct {
rc io.ReadCloser
}
// Read forwards the read operation.
func (f drainCloser) Read(p []byte) (n int, err error) {
return f.rc.Read(p)
}
// Close drains the body and closes the upstream.
func (f drainCloser) Close() error {
xhttp.DrainBody(f.rc)
return nil
}
// httpStreamResponse allows streaming a response, but still send an error.
type httpStreamResponse struct {
done chan error
@ -843,7 +826,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
case 0:
// 0 is unbuffered, copy the rest.
_, err := io.Copy(w, respBody)
respBody.Close()
if err == io.EOF {
return nil
}
@ -853,18 +835,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
if err != nil {
return err
}
respBody.Close()
return errors.New(string(errorText))
case 3:
// gob style is already deprecated, we can remove this when
// storage API version will be greater or equal to 23.
defer respBody.Close()
dec := gob.NewDecoder(respBody)
var err error
if de := dec.Decode(&err); de == nil {
return err
}
return errors.New("rpc error")
case 2:
// Block of data
var tmp [4]byte
@ -881,7 +852,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
case 32:
continue
default:
go xhttp.DrainBody(respBody)
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
}
}