diff --git a/go.mod b/go.mod index a6662b78a..73cfcc6ec 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/json-iterator/go v1.1.7 github.com/klauspost/compress v1.8.3 github.com/klauspost/pgzip v1.2.1 - github.com/klauspost/readahead v1.3.0 + github.com/klauspost/readahead v1.3.1 github.com/klauspost/reedsolomon v1.9.3 github.com/kurin/blazer v0.5.4-0.20190613185654-cf2f27cc0be3 github.com/lib/pq v1.0.0 diff --git a/go.sum b/go.sum index 4ff2c6d23..f62cae9f7 100644 --- a/go.sum +++ b/go.sum @@ -362,6 +362,7 @@ github.com/klauspost/pgzip v1.2.1 h1:oIPZROsWuPHpOdMVWLuJZXwgjhrW8r1yEX8UqMyeNHM github.com/klauspost/pgzip v1.2.1/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/readahead v1.3.0 h1:ur57scQa1RS6oQgdq+6mylmP2u0iR1LFw1zy3Xwqacg= github.com/klauspost/readahead v1.3.0/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg= +github.com/klauspost/readahead v1.3.1/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg= github.com/klauspost/reedsolomon v0.0.0-20190210214925-2b210cf0866d/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/klauspost/reedsolomon v1.9.1 h1:kYrT1MlR4JH6PqOpC+okdb9CDTcwEC/BqpzK4WFyXL8= github.com/klauspost/reedsolomon v1.9.1/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= diff --git a/pkg/s3select/json/reader.go b/pkg/s3select/json/reader.go index 4be0fad0c..cd672db7d 100644 --- a/pkg/s3select/json/reader.go +++ b/pkg/s3select/json/reader.go @@ -17,7 +17,9 @@ package json import ( + "errors" "io" + "sync" "github.com/minio/minio/pkg/s3select/sql" @@ -67,7 +69,6 @@ func (r *Reader) Read(dst sql.Record) (sql.Record, error) { // Close - closes underlying reader. func (r *Reader) Close() error { // Close the input. - // Potentially racy if the stream decoder is still reading. err := r.readCloser.Close() for range r.valueCh { // Drain values so we don't leak a goroutine. @@ -78,6 +79,7 @@ func (r *Reader) Close() error { // NewReader - creates new JSON reader using readCloser. func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader { + readCloser = &syncReadCloser{rc: readCloser} d := jstream.NewDecoder(readCloser, 0).ObjectAsKVS() return &Reader{ args: args, @@ -86,3 +88,46 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader { readCloser: readCloser, } } + +// syncReadCloser will wrap a readcloser and make it safe to call Close +// while reads are running. +// All read errors are also postponed until Close is called and +// io.EOF is returned instead. +type syncReadCloser struct { + rc io.ReadCloser + errMu sync.Mutex + err error +} + +func (pr *syncReadCloser) Read(p []byte) (n int, err error) { + // This ensures that Close will block until Read has completed. + // This allows another goroutine to close the reader. + pr.errMu.Lock() + defer pr.errMu.Unlock() + if pr.err != nil { + return 0, io.EOF + } + n, pr.err = pr.rc.Read(p) + if pr.err != nil { + // Translate any error into io.EOF, so we don't crash: + // https://github.com/bcicen/jstream/blob/master/scanner.go#L48 + return n, io.EOF + } + + return n, nil +} + +var errClosed = errors.New("read after close") + +func (pr *syncReadCloser) Close() error { + pr.errMu.Lock() + defer pr.errMu.Unlock() + if pr.err == errClosed { + return nil + } + if pr.err != nil { + return pr.err + } + pr.err = errClosed + return pr.rc.Close() +} diff --git a/pkg/s3select/json/reader_test.go b/pkg/s3select/json/reader_test.go index 130fde9df..c8970a91b 100644 --- a/pkg/s3select/json/reader_test.go +++ b/pkg/s3select/json/reader_test.go @@ -33,22 +33,43 @@ func TestNewReader(t *testing.T) { t.Fatal(err) } for _, file := range files { - f, err := os.Open(filepath.Join("testdata", file.Name())) - if err != nil { - t.Fatal(err) - } - r := NewReader(f, &ReaderArgs{}) - var record sql.Record - for { - record, err = r.Read(record) + t.Run(file.Name(), func(t *testing.T) { + f, err := os.Open(filepath.Join("testdata", file.Name())) if err != nil { - break + t.Fatal(err) } - } - r.Close() - if err != io.EOF { - t.Fatalf("Reading failed with %s, %s", err, file.Name()) - } + r := NewReader(f, &ReaderArgs{}) + var record sql.Record + for { + record, err = r.Read(record) + if err != nil { + break + } + } + r.Close() + if err != io.EOF { + t.Fatalf("Reading failed with %s, %s", err, file.Name()) + } + }) + + t.Run(file.Name()+"-close", func(t *testing.T) { + f, err := os.Open(filepath.Join("testdata", file.Name())) + if err != nil { + t.Fatal(err) + } + r := NewReader(f, &ReaderArgs{}) + r.Close() + var record sql.Record + for { + record, err = r.Read(record) + if err != nil { + break + } + } + if err != io.EOF { + t.Fatalf("Reading failed with %s, %s", err, file.Name()) + } + }) } } diff --git a/pkg/s3select/progress.go b/pkg/s3select/progress.go index a6f32b886..fa0ace5c3 100644 --- a/pkg/s3select/progress.go +++ b/pkg/s3select/progress.go @@ -18,8 +18,10 @@ package s3select import ( "compress/bzip2" + "errors" "fmt" "io" + "sync" "sync/atomic" gzip "github.com/klauspost/pgzip" @@ -50,13 +52,29 @@ type progressReader struct { rc io.ReadCloser scannedReader *countUpReader processedReader *countUpReader + + closedMu sync.Mutex + closed bool } func (pr *progressReader) Read(p []byte) (n int, err error) { + // This ensures that Close will block until Read has completed. + // This allows another goroutine to close the reader. + pr.closedMu.Lock() + defer pr.closedMu.Unlock() + if pr.closed { + return 0, errors.New("progressReader: read after Close") + } return pr.processedReader.Read(p) } func (pr *progressReader) Close() error { + pr.closedMu.Lock() + defer pr.closedMu.Unlock() + if pr.closed { + return nil + } + pr.closed = true return pr.rc.Close() }