fix: remove auto-close GetObjectReader (#12009)
locks can get relinquished when Read() sees io.EOF leading to prematurely closing of the readers concurrent writes on the same object can have undesired consequences here when these locks are relinquished.
This commit is contained in:
parent
2965348694
commit
28a9409da7
|
@ -530,7 +530,7 @@ type SelectParameters struct {
|
|||
|
||||
// IsEmpty returns true if no select parameters set
|
||||
func (sp *SelectParameters) IsEmpty() bool {
|
||||
return sp == nil || sp.S3Select == s3select.S3Select{}
|
||||
return sp == nil
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
|
@ -814,13 +814,7 @@ func (g *GetObjectReader) Close() error {
|
|||
|
||||
// Read - to implement Reader interface.
|
||||
func (g *GetObjectReader) Read(p []byte) (n int, err error) {
|
||||
n, err = g.pReader.Read(p)
|
||||
if err != nil {
|
||||
// Calling code may not Close() in case of error, so
|
||||
// we ensure it.
|
||||
g.Close()
|
||||
}
|
||||
return
|
||||
return g.pReader.Read(p)
|
||||
}
|
||||
|
||||
//SealMD5CurrFn seals md5sum with object encryption key and returns sealed
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
* MinIO Cloud Storage, (C) 2019-2021 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -216,6 +216,7 @@ type S3Select struct {
|
|||
statement *sql.SelectStatement
|
||||
progressReader *progressReader
|
||||
recordReader recordReader
|
||||
close func() error
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -311,6 +312,7 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
|
|||
}
|
||||
return err
|
||||
}
|
||||
s3Select.close = rc.Close
|
||||
return nil
|
||||
case jsonFormat:
|
||||
rc, err := getReader(0, -1)
|
||||
|
@ -333,6 +335,8 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
|
|||
} else {
|
||||
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
|
||||
}
|
||||
|
||||
s3Select.close = rc.Close
|
||||
return nil
|
||||
case parquetFormat:
|
||||
if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") {
|
||||
|
@ -396,6 +400,12 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
|
|||
|
||||
// Evaluate - filters and sends records read from opened reader as per select statement to http response writer.
|
||||
func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
|
||||
defer func() {
|
||||
if s3Select.close != nil {
|
||||
s3Select.close()
|
||||
}
|
||||
}()
|
||||
|
||||
getProgressFunc := s3Select.getProgress
|
||||
if !s3Select.Progress.Enabled {
|
||||
getProgressFunc = nil
|
||||
|
|
Loading…
Reference in a new issue