From b2c92cdaaa512cec668faea09211bbc8bbf121eb Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 6 Sep 2021 09:09:53 -0700 Subject: [PATCH] select: Add more compression formats (#13142) Support Zstandard, LZ4, S2, and snappy as additional compression formats for S3 Select. --- docs/select/README.md | 3 ++- go.mod | 2 +- go.sum | 4 ++-- internal/s3select/errors.go | 17 +++++------------ internal/s3select/progress.go | 31 ++++++++++++++++++++++++------- internal/s3select/select.go | 35 ++++++++++++++++++++++++++++------- 6 files changed, 62 insertions(+), 30 deletions(-) diff --git a/docs/select/README.md b/docs/select/README.md index dc8a2e993..189c087e1 100644 --- a/docs/select/README.md +++ b/docs/select/README.md @@ -5,7 +5,8 @@ You can use the Select API to query objects with following features: - Objects must be in CSV, JSON, or Parquet(*) format. - UTF-8 is the only encoding type the Select API supports. -- GZIP or BZIP2 - CSV and JSON files can be compressed using GZIP or BZIP2. The Select API supports columnar compression for Parquet using GZIP, Snappy, LZ4. Whole object compression is not supported for Parquet objects. +- GZIP or BZIP2 - CSV and JSON files can be compressed using GZIP, BZIP2, [ZSTD](https://facebook.github.io/zstd/), and streaming formats of [LZ4](https://lz4.github.io/lz4/), [S2](https://github.com/klauspost/compress/tree/master/s2#s2-compression) and [SNAPPY](http://google.github.io/snappy/). +- Parquet API supports columnar compression for using GZIP, Snappy, LZ4. Whole object compression is not supported for Parquet objects. - Server-side encryption - The Select API supports querying objects that are protected with server-side encryption. Type inference and automatic conversion of values is performed based on the context when the value is un-typed (such as when reading CSV data). If present, the CAST function overrides automatic conversion. diff --git a/go.mod b/go.mod index 52c1893c7..6497f42e2 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/jcmturner/gokrb5/v8 v8.4.2 github.com/json-iterator/go v1.1.11 - github.com/klauspost/compress v1.13.4 + github.com/klauspost/compress v1.13.5 github.com/klauspost/cpuid/v2 v2.0.4 github.com/klauspost/pgzip v1.2.5 github.com/klauspost/readahead v1.3.1 diff --git a/go.sum b/go.sum index 93f11f267..c1b761856 100644 --- a/go.sum +++ b/go.sum @@ -883,8 +883,8 @@ github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= -github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= +github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= diff --git a/internal/s3select/errors.go b/internal/s3select/errors.go index 1234adc84..eb99edbfc 100644 --- a/internal/s3select/errors.go +++ b/internal/s3select/errors.go @@ -17,6 +17,8 @@ package s3select +import "strings" + // SelectError - represents s3 select error specified in // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html#RESTObjectSELECTContent-responses-special-errors. type SelectError interface { @@ -66,25 +68,16 @@ func errMalformedXML(err error) *s3Error { func errInvalidCompressionFormat(err error) *s3Error { return &s3Error{ code: "InvalidCompressionFormat", - message: "The file is not in a supported compression format. Only GZIP and BZIP2 are supported.", + message: "The file is not in a supported compression format. GZIP, BZIP2, ZSTD, LZ4, S2 and SNAPPY are supported.", statusCode: 400, cause: err, } } -func errInvalidBZIP2CompressionFormat(err error) *s3Error { +func errInvalidCompression(err error, t CompressionType) *s3Error { return &s3Error{ code: "InvalidCompressionFormat", - message: "BZIP2 is not applicable to the queried object. Please correct the request and try again.", - statusCode: 400, - cause: err, - } -} - -func errInvalidGZIPCompressionFormat(err error) *s3Error { - return &s3Error{ - code: "InvalidCompressionFormat", - message: "GZIP is not applicable to the queried object. Please correct the request and try again.", + message: strings.ToUpper(string(t)) + " is not applicable to the queried object. Please correct the request and try again.", statusCode: 400, cause: err, } diff --git a/internal/s3select/progress.go b/internal/s3select/progress.go index 3383ebbc2..488a83243 100644 --- a/internal/s3select/progress.go +++ b/internal/s3select/progress.go @@ -25,7 +25,10 @@ import ( "sync" "sync/atomic" + "github.com/klauspost/compress/s2" + "github.com/klauspost/compress/zstd" gzip "github.com/klauspost/pgzip" + "github.com/pierrec/lz4" ) type countUpReader struct { @@ -58,7 +61,7 @@ type progressReader struct { processedReader *countUpReader closedMu sync.Mutex - gzr *gzip.Reader + closer io.ReadCloser closed bool } @@ -80,8 +83,8 @@ func (pr *progressReader) Close() error { return nil } pr.closed = true - if pr.gzr != nil { - pr.gzr.Close() + if pr.closer != nil { + pr.closer.Close() } return pr.rc.Close() } @@ -102,23 +105,37 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea rc: rc, scannedReader: scannedReader, } - var err error var r io.Reader switch compType { case noneType: r = scannedReader case gzipType: - pr.gzr, err = gzip.NewReader(scannedReader) + gzr, err := gzip.NewReader(scannedReader) if err != nil { if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) { - return nil, errInvalidGZIPCompressionFormat(err) + return nil, errInvalidCompression(err, compType) } return nil, errTruncatedInput(err) } - r = pr.gzr + r = gzr + pr.closer = gzr case bzip2Type: r = bzip2.NewReader(scannedReader) + case zstdType: + // Set a max window of 64MB. More than reasonable. + zr, err := zstd.NewReader(scannedReader, zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxWindow(64<<20)) + if err != nil { + return nil, errInvalidCompression(err, compType) + } + r = zr + pr.closer = zr.IOReadCloser() + case lz4Type: + r = lz4.NewReader(scannedReader) + case s2Type: + r = s2.NewReader(scannedReader) + case snappyType: + r = s2.NewReader(scannedReader, s2.ReaderMaxBlockSize(64<<10)) default: return nil, errInvalidCompressionFormat(fmt.Errorf("unknown compression type '%v'", compType)) } diff --git a/internal/s3select/select.go b/internal/s3select/select.go index 56f07d81b..dbdcaeecc 100644 --- a/internal/s3select/select.go +++ b/internal/s3select/select.go @@ -31,12 +31,16 @@ import ( "strings" "sync" + "github.com/klauspost/compress/s2" + "github.com/klauspost/compress/zstd" + gzip "github.com/klauspost/pgzip" "github.com/minio/minio/internal/s3select/csv" "github.com/minio/minio/internal/s3select/json" "github.com/minio/minio/internal/s3select/parquet" "github.com/minio/minio/internal/s3select/simdj" "github.com/minio/minio/internal/s3select/sql" "github.com/minio/simdjson-go" + "github.com/pierrec/lz4" ) type recordReader interface { @@ -57,8 +61,13 @@ type CompressionType string const ( noneType CompressionType = "none" - gzipType CompressionType = "gzip" - bzip2Type CompressionType = "bzip2" + gzipType CompressionType = "GZIP" + bzip2Type CompressionType = "BZIP2" + + zstdType CompressionType = "ZSTD" + lz4Type CompressionType = "LZ4" + s2Type CompressionType = "S2" + snappyType CompressionType = "SNAPPY" ) const ( @@ -87,13 +96,13 @@ func (c *CompressionType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) e return errMalformedXML(err) } - parsedType := CompressionType(strings.ToLower(s)) - if s == "" { + parsedType := CompressionType(strings.ToUpper(s)) + if s == "" || parsedType == "NONE" { parsedType = noneType } switch parsedType { - case noneType, gzipType, bzip2Type: + case noneType, gzipType, bzip2Type, snappyType, s2Type, zstdType, lz4Type: default: return errInvalidCompressionFormat(fmt.Errorf("invalid compression format '%v'", s)) } @@ -127,7 +136,7 @@ func (input *InputSerialization) UnmarshalXML(d *xml.Decoder, start xml.StartEle } // If no compression is specified, set to noneType - if parsedInput.CompressionType == CompressionType("") { + if parsedInput.CompressionType == "" { parsedInput.CompressionType = noneType } @@ -309,7 +318,19 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos rc.Close() var stErr bzip2.StructuralError if errors.As(err, &stErr) { - return errInvalidBZIP2CompressionFormat(err) + return errInvalidCompression(err, s3Select.Input.CompressionType) + } + // Test these compressor errors + errs := []error{ + gzip.ErrHeader, gzip.ErrChecksum, + s2.ErrCorrupt, s2.ErrUnsupported, s2.ErrCRC, + zstd.ErrBlockTooSmall, zstd.ErrMagicMismatch, zstd.ErrWindowSizeExceeded, zstd.ErrUnknownDictionary, zstd.ErrWindowSizeTooSmall, + lz4.ErrInvalid, lz4.ErrBlockDependency, + } + for _, e := range errs { + if errors.Is(err, e) { + return errInvalidCompression(err, s3Select.Input.CompressionType) + } } return err }