Concurrent CSV parsing and reduce S3 select allocations (#8200)

```
CSV parsing, BEFORE:
BenchmarkReaderBasic-12         	    2842	    407533 ns/op	  397860 B/op	     957 allocs/op
BenchmarkReaderReplace-12       	    2718	    429914 ns/op	  397844 B/op	     957 allocs/op
BenchmarkReaderReplaceTwo-12    	    2718	    435556 ns/op	  397855 B/op	     957 allocs/op
BenchmarkAggregateCount_100K-12    	     171	   6798974 ns/op	16667102 B/op	  308077 allocs/op
BenchmarkAggregateCount_1M-12    	      19	  65657411 ns/op	168057743 B/op	 3146610 allocs/op
BenchmarkSelectAll_10M-12    	       1	20882119900 ns/op	2758799896 B/op	41978762 allocs/op

CSV parsing, AFTER:
BenchmarkReaderBasic-12         	    3721	    312549 ns/op	  101920 B/op	     338 allocs/op
BenchmarkReaderReplace-12       	    3776	    318810 ns/op	  101993 B/op	     340 allocs/op
BenchmarkReaderReplaceTwo-12    	    3610	    330967 ns/op	  102012 B/op	     341 allocs/op
BenchmarkAggregateCount_100K-12    	     295	   4149588 ns/op	 3553623 B/op	  103261 allocs/op
BenchmarkAggregateCount_1M-12    	      30	  37746503 ns/op	33827931 B/op	 1049435 allocs/op
BenchmarkSelectAll_10M-12    	       1	17608495800 ns/op	1416504040 B/op	21007082 allocs/op

~ benchcmp old.txt new.txt
benchmark                           old ns/op       new ns/op       delta
BenchmarkReaderBasic-12             407533          312549          -23.31%
BenchmarkReaderReplace-12           429914          318810          -25.84%
BenchmarkReaderReplaceTwo-12        435556          330967          -24.01%
BenchmarkAggregateCount_100K-12     6798974         4149588         -38.97%
BenchmarkAggregateCount_1M-12       65657411        37746503        -42.51%
BenchmarkSelectAll_10M-12           20882119900     17608495800     -15.68%

benchmark                           old allocs     new allocs     delta
BenchmarkReaderBasic-12             957            338            -64.68%
BenchmarkReaderReplace-12           957            340            -64.47%
BenchmarkReaderReplaceTwo-12        957            341            -64.37%
BenchmarkAggregateCount_100K-12     308077         103261         -66.48%
BenchmarkAggregateCount_1M-12       3146610        1049435        -66.65%
BenchmarkSelectAll_10M-12           41978762       21007082       -49.96%

benchmark                           old bytes      new bytes      delta
BenchmarkReaderBasic-12             397860         101920         -74.38%
BenchmarkReaderReplace-12           397844         101993         -74.36%
BenchmarkReaderReplaceTwo-12        397855         102012         -74.36%
BenchmarkAggregateCount_100K-12     16667102       3553623        -78.68%
BenchmarkAggregateCount_1M-12       168057743      33827931       -79.87%
BenchmarkSelectAll_10M-12           2758799896     1416504040     -48.66%
```

```
BenchmarkReaderHuge/97K-12         	    2200	    540840 ns/op	 184.32 MB/s	 1604450 B/op	     687 allocs/op
BenchmarkReaderHuge/194K-12        	    1522	    752257 ns/op	 265.04 MB/s	 2143135 B/op	    1335 allocs/op
BenchmarkReaderHuge/389K-12        	    1190	    947858 ns/op	 420.69 MB/s	 3221831 B/op	    2630 allocs/op
BenchmarkReaderHuge/778K-12        	     806	   1472486 ns/op	 541.61 MB/s	 5201856 B/op	    5187 allocs/op
BenchmarkReaderHuge/1557K-12       	     426	   2575269 ns/op	 619.36 MB/s	 9101330 B/op	   10233 allocs/op
BenchmarkReaderHuge/3115K-12       	     286	   4034656 ns/op	 790.66 MB/s	12397968 B/op	   16099 allocs/op
BenchmarkReaderHuge/6230K-12       	     172	   6830563 ns/op	 934.05 MB/s	16008416 B/op	   26844 allocs/op
BenchmarkReaderHuge/12461K-12      	     100	  11409467 ns/op	1118.39 MB/s	22655163 B/op	   48107 allocs/op
BenchmarkReaderHuge/24922K-12      	      66	  19780395 ns/op	1290.19 MB/s	35158559 B/op	   90216 allocs/op
BenchmarkReaderHuge/49844K-12      	      34	  37282559 ns/op	1369.03 MB/s	60528624 B/op	  174497 allocs/op
```
This commit is contained in:
Klaus Post 2019-09-13 14:18:35 -07:00 committed by Harshavardhana
parent e7f491a14b
commit ddea0bdf11
23 changed files with 1041 additions and 189 deletions

View file

@ -106,3 +106,4 @@ For a more detailed SELECT SQL reference, please see [here](https://docs.aws.ama
- Large numbers (outside of the signed 64-bit range) are not yet supported.
- The Date [functions](https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-date.html) `DATE_ADD`, `DATE_DIFF`, `EXTRACT` and `UTCNOW` along with type conversion using `CAST` to the `TIMESTAMP` data type are currently supported.
- AWS S3's [reserved keywords](https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-keyword-list.html) list is not yet respected.
- CSV input fields (even quoted) cannot contain newlines even if `RecordDelimiter` is something else.

1
go.mod
View file

@ -29,6 +29,7 @@ require (
github.com/hashicorp/vault v1.1.0
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
github.com/json-iterator/go v1.1.7
github.com/klauspost/compress v1.5.0
github.com/klauspost/pgzip v1.2.1
github.com/klauspost/readahead v1.3.0
github.com/klauspost/reedsolomon v1.9.1

View file

@ -17,89 +17,66 @@
package csv
import (
"bufio"
"bytes"
"encoding/csv"
"fmt"
"io"
"runtime"
"sync"
"github.com/minio/minio/pkg/s3select/sql"
)
type recordReader struct {
reader io.Reader
recordDelimiter []byte
oneByte []byte
useOneByte bool
}
func (rr *recordReader) Read(p []byte) (n int, err error) {
if rr.useOneByte {
p[0] = rr.oneByte[0]
rr.useOneByte = false
n, err = rr.reader.Read(p[1:])
n++
} else {
n, err = rr.reader.Read(p)
}
if err != nil {
return 0, err
}
if string(rr.recordDelimiter) == "\n" {
return n, nil
}
for {
i := bytes.Index(p, rr.recordDelimiter)
if i < 0 {
break
}
p[i] = '\n'
if len(rr.recordDelimiter) > 1 {
p = append(p[:i+1], p[i+len(rr.recordDelimiter):]...)
n--
}
}
if len(rr.recordDelimiter) == 1 || p[n-1] != rr.recordDelimiter[0] {
return n, nil
}
if _, err = rr.reader.Read(rr.oneByte); err != nil {
return 0, err
}
if rr.oneByte[0] == rr.recordDelimiter[1] {
p[n-1] = '\n'
return n, nil
}
rr.useOneByte = true
return n, nil
}
// Reader - CSV record reader for S3Select.
type Reader struct {
args *ReaderArgs
readCloser io.ReadCloser
csvReader *csv.Reader
columnNames []string
nameIndexMap map[string]int64
readCloser io.ReadCloser // raw input
buf *bufio.Reader // input to the splitter
columnNames []string // names of columns
nameIndexMap map[string]int64 // name to column index
current [][]string // current block of results to be returned
recordsRead int // number of records read in current slice
input chan *queueItem // input for workers
queue chan *queueItem // output from workers in order
err error // global error state, only touched by Reader.Read
bufferPool sync.Pool // pool of []byte objects for input
csvDstPool sync.Pool // pool of [][]string used for output
close chan struct{} // used for shutting down the splitter before end of stream
readerWg sync.WaitGroup // used to keep track of async reader.
}
// queueItem is an item in the queue.
type queueItem struct {
input []byte // raw input sent to the worker
dst chan [][]string // result of block decode
err error // any error encountered will be set here
}
// Read - reads single record.
func (r *Reader) Read() (sql.Record, error) {
csvRecord, err := r.csvReader.Read()
if err != nil {
if err != io.EOF {
return nil, errCSVParsingError(err)
// Once Read is called the previous record should no longer be referenced.
func (r *Reader) Read(dst sql.Record) (sql.Record, error) {
// If we have have any records left, return these before any error.
for len(r.current) <= r.recordsRead {
if r.err != nil {
return nil, r.err
}
return nil, err
// Move to next block
item, ok := <-r.queue
if !ok {
r.err = io.EOF
return nil, r.err
}
//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
r.csvDstPool.Put(r.current)
r.current = <-item.dst
r.err = item.err
r.recordsRead = 0
}
csvRecord := r.current[r.recordsRead]
r.recordsRead++
// If no column names are set, use _(index)
if r.columnNames == nil {
r.columnNames = make([]string, len(csvRecord))
for i := range csvRecord {
@ -107,67 +84,225 @@ func (r *Reader) Read() (sql.Record, error) {
}
}
// If no index max, add that.
if r.nameIndexMap == nil {
r.nameIndexMap = make(map[string]int64)
for i := range r.columnNames {
r.nameIndexMap[r.columnNames[i]] = int64(i)
}
}
dstRec, ok := dst.(*Record)
if !ok {
dstRec = &Record{}
}
dstRec.columnNames = r.columnNames
dstRec.csvRecord = csvRecord
dstRec.nameIndexMap = r.nameIndexMap
return &Record{
columnNames: r.columnNames,
csvRecord: csvRecord,
nameIndexMap: r.nameIndexMap,
}, nil
return dstRec, nil
}
// Close - closes underlaying reader.
// Close - closes underlying reader.
func (r *Reader) Close() error {
if r.close != nil {
close(r.close)
r.close = nil
r.readerWg.Wait()
}
r.recordsRead = len(r.current)
if r.err == nil {
r.err = io.EOF
}
return r.readCloser.Close()
}
// nextSplit will attempt to skip a number of bytes and
// return the buffer until the next newline occurs.
// The last block will be sent along with an io.EOF.
func (r *Reader) nextSplit(skip int, dst []byte) ([]byte, error) {
if cap(dst) < skip {
dst = make([]byte, 0, skip+1024)
}
dst = dst[:skip]
if skip > 0 {
n, err := io.ReadFull(r.buf, dst)
if err != nil && err != io.ErrUnexpectedEOF {
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
return dst[:n], err
}
dst = dst[:n]
if err == io.ErrUnexpectedEOF {
return dst, io.EOF
}
}
// Read until next line.
in, err := r.buf.ReadBytes('\n')
dst = append(dst, in...)
return dst, err
}
// csvSplitSize is the size of each block.
// Blocks will read this much and find the first following newline.
// 128KB appears to be a very reasonable default.
const csvSplitSize = 128 << 10
// startReaders will read the header if needed and spin up a parser
// and a number of workers based on GOMAXPROCS.
// If an error is returned no goroutines have been started and r.err will have been set.
func (r *Reader) startReaders(in io.Reader, newReader func(io.Reader) *csv.Reader) error {
if r.args.FileHeaderInfo != none {
// Read column names
// Get one line.
b, err := r.nextSplit(0, nil)
if err != nil {
r.err = err
return err
}
reader := newReader(bytes.NewReader(b))
record, err := reader.Read()
if err != nil {
r.err = err
if err != io.EOF {
r.err = errCSVParsingError(err)
return errCSVParsingError(err)
}
return err
}
if r.args.FileHeaderInfo == use {
// Copy column names since records will be reused.
columns := append(make([]string, 0, len(record)), record...)
r.columnNames = columns
}
}
r.bufferPool.New = func() interface{} {
return make([]byte, csvSplitSize+1024)
}
// Create queue
r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0))
r.input = make(chan *queueItem, runtime.GOMAXPROCS(0))
r.readerWg.Add(1)
// Start splitter
go func() {
defer close(r.input)
defer close(r.queue)
defer r.readerWg.Done()
for {
next, err := r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte))
q := queueItem{
input: next,
dst: make(chan [][]string, 1),
err: err,
}
select {
case <-r.close:
return
case r.queue <- &q:
}
select {
case <-r.close:
return
case r.input <- &q:
}
if err != nil {
// Exit on any error.
return
}
}
}()
// Start parsers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
go func() {
for in := range r.input {
if len(in.input) == 0 {
in.dst <- nil
continue
}
dst, ok := r.csvDstPool.Get().([][]string)
if !ok {
dst = make([][]string, 0, 1000)
}
cr := newReader(bytes.NewBuffer(in.input))
all := dst[:0]
err := func() error {
// Read all records until EOF or another error.
for {
record, err := cr.Read()
if err == io.EOF {
return nil
}
if err != nil {
return errCSVParsingError(err)
}
var recDst []string
if len(dst) > len(all) {
recDst = dst[len(all)]
}
if cap(recDst) < len(record) {
recDst = make([]string, len(record))
}
recDst = recDst[:len(record)]
copy(recDst, record)
all = append(all, recDst)
}
}()
if err != nil {
in.err = err
}
// We don't need the input any more.
//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
r.bufferPool.Put(in.input)
in.input = nil
in.dst <- all
}
}()
}
return nil
}
// NewReader - creates new CSV reader using readCloser.
func NewReader(readCloser io.ReadCloser, args *ReaderArgs) (*Reader, error) {
if args == nil || args.IsEmpty() {
panic(fmt.Errorf("empty args passed %v", args))
}
csvReader := csv.NewReader(&recordReader{
reader: readCloser,
recordDelimiter: []byte(args.RecordDelimiter),
oneByte: []byte{0},
})
csvReader.Comma = []rune(args.FieldDelimiter)[0]
csvReader.Comment = []rune(args.CommentCharacter)[0]
csvReader.FieldsPerRecord = -1
// If LazyQuotes is true, a quote may appear in an unquoted field and a
// non-doubled quote may appear in a quoted field.
csvReader.LazyQuotes = true
// We do not trim leading space to keep consistent with s3.
csvReader.TrimLeadingSpace = false
csvIn := io.Reader(readCloser)
if args.RecordDelimiter != "\n" {
csvIn = &recordTransform{
reader: readCloser,
recordDelimiter: []byte(args.RecordDelimiter),
oneByte: make([]byte, len(args.RecordDelimiter)-1),
}
}
r := &Reader{
args: args,
buf: bufio.NewReaderSize(csvIn, csvSplitSize*2),
readCloser: readCloser,
csvReader: csvReader,
close: make(chan struct{}),
}
if args.FileHeaderInfo == none {
return r, nil
// Assume args are validated by ReaderArgs.UnmarshalXML()
newCsvReader := func(r io.Reader) *csv.Reader {
ret := csv.NewReader(r)
ret.Comma = []rune(args.FieldDelimiter)[0]
ret.Comment = []rune(args.CommentCharacter)[0]
ret.FieldsPerRecord = -1
// If LazyQuotes is true, a quote may appear in an unquoted field and a
// non-doubled quote may appear in a quoted field.
ret.LazyQuotes = true
// We do not trim leading space to keep consistent with s3.
ret.TrimLeadingSpace = false
ret.ReuseRecord = true
return ret
}
record, err := csvReader.Read()
if err != nil {
if err != io.EOF {
return nil, errCSVParsingError(err)
}
return nil, err
}
if args.FileHeaderInfo == use {
r.columnNames = record
}
return r, nil
return r, r.startReaders(csvIn, newCsvReader)
}

View file

@ -18,11 +18,16 @@ package csv
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"reflect"
"strings"
"testing"
"github.com/klauspost/compress/zip"
"github.com/minio/minio/pkg/s3select/sql"
)
@ -49,12 +54,12 @@ func TestRead(t *testing.T) {
QuoteCharacter: defaultQuoteCharacter,
QuoteEscapeCharacter: defaultQuoteEscapeCharacter,
CommentCharacter: defaultCommentCharacter,
AllowQuotedRecordDelimiter: true,
AllowQuotedRecordDelimiter: false,
unmarshaled: true,
})
for {
record, err = r.Read()
record, err = r.Read(record)
if err != nil {
break
}
@ -72,3 +77,555 @@ func TestRead(t *testing.T) {
}
}
}
type tester interface {
Fatal(...interface{})
}
func openTestFile(t tester, file string) []byte {
f, err := ioutil.ReadFile(filepath.Join("testdata/testdata.zip"))
if err != nil {
t.Fatal(err)
}
z, err := zip.NewReader(bytes.NewReader(f), int64(len(f)))
if err != nil {
t.Fatal(err)
}
for _, f := range z.File {
if f.Name == file {
rc, err := f.Open()
if err != nil {
t.Fatal(err)
}
defer rc.Close()
b, err := ioutil.ReadAll(rc)
if err != nil {
t.Fatal(err)
}
return b
}
}
t.Fatal(file, "not found in testdata/testdata.zip")
return nil
}
func TestReadExtended(t *testing.T) {
cases := []struct {
file string
recordDelimiter string
fieldDelimiter string
header bool
wantColumns []string
wantTenFields string
totalFields int
}{
{
file: "nyc-taxi-data-100k.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
header: true,
wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"},
wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804
3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801
3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804
3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803
3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008
3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804
3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805
3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804
3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806
3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805
`,
totalFields: 308*2 + 1,
}, {
file: "nyc-taxi-data-tabs-100k.csv",
recordDelimiter: "\n",
fieldDelimiter: "\t",
header: true,
wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"},
wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804
3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801
3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804
3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803
3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008
3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804
3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805
3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804
3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806
3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805
`,
totalFields: 308*2 + 1,
}, {
file: "nyc-taxi-data-100k-single-delim.csv",
recordDelimiter: "^",
fieldDelimiter: ",",
header: true,
wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"},
wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804
3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801
3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804
3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803
3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008
3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804
3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805
3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804
3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806
3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805
`,
totalFields: 308*2 + 1,
}, {
file: "nyc-taxi-data-100k-multi-delim.csv",
recordDelimiter: "^Y",
fieldDelimiter: ",",
header: true,
wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"},
wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804
3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801
3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804
3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803
3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008
3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804
3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805
3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804
3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806
3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805
`,
totalFields: 308*2 + 1,
}, {
file: "nyc-taxi-data-noheader-100k.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
header: false,
wantColumns: []string{"_1", "_2", "_3", "_4", "_5", "_6", "_7", "_8", "_9", "_10", "_11", "_12", "_13", "_14", "_15", "_16", "_17", "_18", "_19", "_20", "_21", "_22", "_23", "_24", "_25", "_26", "_27", "_28", "_29", "_30", "_31", "_32", "_33", "_34", "_35", "_36", "_37", "_38", "_39", "_40", "_41", "_42", "_43", "_44", "_45", "_46", "_47", "_48", "_49", "_50", "_51"},
wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804
3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801
3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804
3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803
3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008
3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804
3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805
3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804
3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806
3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805
`,
totalFields: 308 * 2,
},
}
for i, c := range cases {
t.Run(c.file, func(t *testing.T) {
var err error
var record sql.Record
var result bytes.Buffer
input := openTestFile(t, c.file)
// Get above block size.
input = append(input, input...)
args := ReaderArgs{
FileHeaderInfo: use,
RecordDelimiter: c.recordDelimiter,
FieldDelimiter: c.fieldDelimiter,
QuoteCharacter: defaultQuoteCharacter,
QuoteEscapeCharacter: defaultQuoteEscapeCharacter,
CommentCharacter: defaultCommentCharacter,
AllowQuotedRecordDelimiter: false,
unmarshaled: true,
}
if !c.header {
args.FileHeaderInfo = none
}
r, _ := NewReader(ioutil.NopCloser(bytes.NewReader(input)), &args)
fields := 0
for {
record, err = r.Read(record)
if err != nil {
break
}
if fields < 10 {
// Write with fixed delimiters, newlines.
err := record.WriteCSV(&result, ',')
if err != nil {
t.Error(err)
}
}
fields++
}
r.Close()
if err != io.EOF {
t.Fatalf("Case %d failed with %s", i, err)
}
if !reflect.DeepEqual(r.columnNames, c.wantColumns) {
t.Errorf("Case %d failed: expected %#v, got result %#v", i, c.wantColumns, r.columnNames)
}
if result.String() != c.wantTenFields {
t.Errorf("Case %d failed: expected %v, got result %v", i, c.wantTenFields, result.String())
}
if fields != c.totalFields {
t.Errorf("Case %d failed: expected %v results %v", i, c.totalFields, fields)
}
})
}
}
type errReader struct {
err error
}
func (e errReader) Read(p []byte) (n int, err error) {
return 0, e.err
}
func TestReadFailures(t *testing.T) {
customErr := errors.New("unable to read file :(")
cases := []struct {
file string
recordDelimiter string
fieldDelimiter string
sendErr error
header bool
wantColumns []string
wantFields string
wantErr error
}{
{
file: "truncated-records.csv",
recordDelimiter: "^Y",
fieldDelimiter: ",",
header: true,
wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"},
wantFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804
3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100
`,
wantErr: io.EOF,
},
{
file: "truncated-records.csv",
recordDelimiter: "^Y",
fieldDelimiter: ",",
sendErr: customErr,
header: true,
wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"},
wantFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804
3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100
`,
wantErr: customErr,
},
{
// This works since LazyQuotes is true:
file: "invalid-badbarequote.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `"a ""word""",b` + "\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true:
file: "invalid-baddoubleq.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `"a""""b",c` + "\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true:
file: "invalid-badextraq.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `a word,"b"""` + "\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true:
file: "invalid-badstartline.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `a,"b` + "\n" + `c""d,e` + "\n\"\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true:
file: "invalid-badstartline2.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `a,b` + "\n" + `"d` + "\n\ne\"\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true:
file: "invalid-badtrailingq.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `a word,"b"""` + "\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true:
file: "invalid-crlfquoted.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `"foo""bar"` + "\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true:
file: "invalid-csv.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `"a""""b",c` + "\n",
wantErr: io.EOF,
},
{
// This works since LazyQuotes is true, but output is very weird.
file: "invalid-oddquote.csv",
recordDelimiter: "\n",
fieldDelimiter: ",",
sendErr: nil,
header: true,
wantColumns: []string{"header1", "header2", "header3"},
wantFields: "ok1,ok2,ok3\n" + `""""""",b,c` + "\n\"\n",
wantErr: io.EOF,
},
{
// Test when file ends with a half separator
file: "endswithhalfsep.csv",
recordDelimiter: "%!",
fieldDelimiter: ",",
sendErr: nil,
header: false,
wantColumns: []string{"_1", "_2", "_3"},
wantFields: "a,b,c\na2,b2,c2%\n",
wantErr: io.EOF,
},
}
for i, c := range cases {
t.Run(c.file, func(t *testing.T) {
var err error
var record sql.Record
var result bytes.Buffer
input := openTestFile(t, c.file)
args := ReaderArgs{
FileHeaderInfo: use,
RecordDelimiter: c.recordDelimiter,
FieldDelimiter: c.fieldDelimiter,
QuoteCharacter: defaultQuoteCharacter,
QuoteEscapeCharacter: defaultQuoteEscapeCharacter,
CommentCharacter: defaultCommentCharacter,
AllowQuotedRecordDelimiter: false,
unmarshaled: true,
}
if !c.header {
args.FileHeaderInfo = none
}
inr := io.Reader(bytes.NewReader(input))
if c.sendErr != nil {
inr = io.MultiReader(inr, errReader{c.sendErr})
}
r, _ := NewReader(ioutil.NopCloser(inr), &args)
fields := 0
for {
record, err = r.Read(record)
if err != nil {
break
}
// Write with fixed delimiters, newlines.
err := record.WriteCSV(&result, ',')
if err != nil {
t.Error(err)
}
fields++
}
r.Close()
if err != c.wantErr {
t.Fatalf("Case %d failed with %s", i, err)
}
if !reflect.DeepEqual(r.columnNames, c.wantColumns) {
t.Errorf("Case %d failed: expected \n%#v, got result \n%#v", i, c.wantColumns, r.columnNames)
}
if result.String() != c.wantFields {
t.Errorf("Case %d failed: expected \n%v\nGot result \n%v", i, c.wantFields, result.String())
}
})
}
}
func BenchmarkReaderBasic(b *testing.B) {
args := ReaderArgs{
FileHeaderInfo: use,
RecordDelimiter: "\n",
FieldDelimiter: ",",
QuoteCharacter: defaultQuoteCharacter,
QuoteEscapeCharacter: defaultQuoteEscapeCharacter,
CommentCharacter: defaultCommentCharacter,
AllowQuotedRecordDelimiter: false,
unmarshaled: true,
}
f := openTestFile(b, "nyc-taxi-data-100k.csv")
r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args)
if err != nil {
b.Fatalf("Reading init failed with %s", err)
}
defer r.Close()
b.ReportAllocs()
b.ResetTimer()
b.SetBytes(int64(len(f)))
var record sql.Record
for i := 0; i < b.N; i++ {
r, err = NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args)
if err != nil {
b.Fatalf("Reading init failed with %s", err)
}
for err == nil {
record, err = r.Read(record)
if err != nil && err != io.EOF {
b.Fatalf("Reading failed with %s", err)
}
}
r.Close()
}
}
func BenchmarkReaderHuge(b *testing.B) {
args := ReaderArgs{
FileHeaderInfo: use,
RecordDelimiter: "\n",
FieldDelimiter: ",",
QuoteCharacter: defaultQuoteCharacter,
QuoteEscapeCharacter: defaultQuoteEscapeCharacter,
CommentCharacter: defaultCommentCharacter,
AllowQuotedRecordDelimiter: false,
unmarshaled: true,
}
for n := 0; n < 11; n++ {
f := openTestFile(b, "nyc-taxi-data-100k.csv")
want := 309
for i := 0; i < n; i++ {
f = append(f, f...)
want *= 2
}
b.Run(fmt.Sprint(len(f)/(1<<10), "K"), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(f)))
b.ResetTimer()
var record sql.Record
for i := 0; i < b.N; i++ {
r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args)
if err != nil {
b.Fatalf("Reading init failed with %s", err)
}
got := 0
for err == nil {
record, err = r.Read(record)
if err != nil && err != io.EOF {
b.Fatalf("Reading failed with %s", err)
}
got++
}
r.Close()
if got != want {
b.Errorf("want %d records, got %d", want, got)
}
}
})
}
}
func BenchmarkReaderReplace(b *testing.B) {
args := ReaderArgs{
FileHeaderInfo: use,
RecordDelimiter: "^",
FieldDelimiter: ",",
QuoteCharacter: defaultQuoteCharacter,
QuoteEscapeCharacter: defaultQuoteEscapeCharacter,
CommentCharacter: defaultCommentCharacter,
AllowQuotedRecordDelimiter: false,
unmarshaled: true,
}
f := openTestFile(b, "nyc-taxi-data-100k-single-delim.csv")
r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args)
if err != nil {
b.Fatalf("Reading init failed with %s", err)
}
defer r.Close()
b.ReportAllocs()
b.ResetTimer()
b.SetBytes(int64(len(f)))
var record sql.Record
for i := 0; i < b.N; i++ {
r, err = NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args)
if err != nil {
b.Fatalf("Reading init failed with %s", err)
}
for err == nil {
record, err = r.Read(record)
if err != nil && err != io.EOF {
b.Fatalf("Reading failed with %s", err)
}
}
r.Close()
}
}
func BenchmarkReaderReplaceTwo(b *testing.B) {
args := ReaderArgs{
FileHeaderInfo: use,
RecordDelimiter: "^Y",
FieldDelimiter: ",",
QuoteCharacter: defaultQuoteCharacter,
QuoteEscapeCharacter: defaultQuoteEscapeCharacter,
CommentCharacter: defaultCommentCharacter,
AllowQuotedRecordDelimiter: false,
unmarshaled: true,
}
f := openTestFile(b, "nyc-taxi-data-100k-multi-delim.csv")
r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args)
if err != nil {
b.Fatalf("Reading init failed with %s", err)
}
defer r.Close()
b.ReportAllocs()
b.ResetTimer()
b.SetBytes(int64(len(f)))
var record sql.Record
for i := 0; i < b.N; i++ {
r, err = NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args)
if err != nil {
b.Fatalf("Reading init failed with %s", err)
}
for err == nil {
record, err = r.Read(record)
if err != nil && err != io.EOF {
b.Fatalf("Reading failed with %s", err)
}
}
r.Close()
}
}

View file

@ -27,7 +27,7 @@ import (
"github.com/minio/minio/pkg/s3select/sql"
)
// Record - is CSV record.
// Record - is a CSV record.
type Record struct {
columnNames []string
csvRecord []string

View file

@ -0,0 +1,93 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package csv
import (
"bytes"
"io"
)
// recordTransform will convert records to always have newline records.
type recordTransform struct {
reader io.Reader
// recordDelimiter can be up to 2 characters.
recordDelimiter []byte
oneByte []byte
useOneByte bool
}
func (rr *recordTransform) Read(p []byte) (n int, err error) {
if rr.useOneByte {
p[0] = rr.oneByte[0]
rr.useOneByte = false
n, err = rr.reader.Read(p[1:])
n++
} else {
n, err = rr.reader.Read(p)
}
if err != nil {
return n, err
}
// Do nothing if record-delimiter is already newline.
if string(rr.recordDelimiter) == "\n" {
return n, nil
}
// Change record delimiters to newline.
if len(rr.recordDelimiter) == 1 {
for idx := 0; idx < len(p); {
i := bytes.Index(p[idx:], rr.recordDelimiter)
if i < 0 {
break
}
idx += i
p[idx] = '\n'
}
return n, nil
}
// 2 characters...
for idx := 0; idx < len(p); {
i := bytes.Index(p[idx:], rr.recordDelimiter)
if i < 0 {
break
}
idx += i
p[idx] = '\n'
p = append(p[:idx+1], p[idx+2:]...)
n--
}
if p[n-1] != rr.recordDelimiter[0] {
return n, nil
}
if _, err = rr.reader.Read(rr.oneByte); err != nil {
return n, err
}
if rr.oneByte[0] == rr.recordDelimiter[1] {
p[n-1] = '\n'
return n, nil
}
rr.useOneByte = true
return n, nil
}

BIN
pkg/s3select/csv/testdata/testdata.zip vendored Normal file

Binary file not shown.

View file

@ -33,7 +33,7 @@ type Reader struct {
}
// Read - reads single record.
func (r *Reader) Read() (sql.Record, error) {
func (r *Reader) Read(dst sql.Record) (sql.Record, error) {
v, ok := <-r.valueCh
if !ok {
if err := r.decoder.Err(); err != nil {
@ -55,15 +55,25 @@ func (r *Reader) Read() (sql.Record, error) {
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v.Value}}
}
return &Record{
KVS: kvs,
SelectFormat: sql.SelectFmtJSON,
}, nil
dstRec, ok := dst.(*Record)
if !ok {
dstRec = &Record{}
}
dstRec.KVS = kvs
dstRec.SelectFormat = sql.SelectFmtJSON
return dstRec, nil
}
// Close - closes underlaying reader.
// Close - closes underlying reader.
func (r *Reader) Close() error {
return r.readCloser.Close()
// Close the input.
// Potentially racy if the stream decoder is still reading.
err := r.readCloser.Close()
for range r.valueCh {
// Drain values so we don't leak a goroutine.
// Since we have closed the input, it should fail rather quickly.
}
return err
}
// NewReader - creates new JSON reader using readCloser.

View file

@ -17,26 +17,30 @@
package json
import (
"bytes"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/minio/minio/pkg/s3select/sql"
)
func TestNewReader(t *testing.T) {
files, err := ioutil.ReadDir("data")
files, err := ioutil.ReadDir("testdata")
if err != nil {
t.Fatal(err)
}
for _, file := range files {
f, err := os.Open(filepath.Join("data", file.Name()))
f, err := os.Open(filepath.Join("testdata", file.Name()))
if err != nil {
t.Fatal(err)
}
r := NewReader(f, &ReaderArgs{})
var record sql.Record
for {
_, err = r.Read()
record, err = r.Read(record)
if err != nil {
break
}
@ -47,3 +51,35 @@ func TestNewReader(t *testing.T) {
}
}
}
func BenchmarkReader(b *testing.B) {
files, err := ioutil.ReadDir("testdata")
if err != nil {
b.Fatal(err)
}
for _, file := range files {
b.Run(file.Name(), func(b *testing.B) {
f, err := ioutil.ReadFile(filepath.Join("testdata", file.Name()))
if err != nil {
b.Fatal(err)
}
b.SetBytes(int64(len(f)))
b.ReportAllocs()
b.ResetTimer()
var record sql.Record
for i := 0; i < b.N; i++ {
r := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &ReaderArgs{})
for {
record, err = r.Read(record)
if err != nil {
break
}
}
r.Close()
if err != io.EOF {
b.Fatalf("Reading failed with %s, %s", err, file.Name())
}
}
})
}
}

View file

@ -33,7 +33,7 @@ type Reader struct {
}
// Read - reads single record.
func (r *Reader) Read() (rec sql.Record, rerr error) {
func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
parquetRecord, err := r.reader.Read()
if err != nil {
if err != io.EOF {
@ -73,11 +73,20 @@ func (r *Reader) Read() (rec sql.Record, rerr error) {
return true
}
// Apply our range
parquetRecord.Range(f)
return &jsonfmt.Record{KVS: kvs, SelectFormat: sql.SelectFmtParquet}, nil
// Reuse destination if we can.
dstRec, ok := dst.(*jsonfmt.Record)
if !ok {
dstRec = &jsonfmt.Record{}
}
dstRec.SelectFormat = sql.SelectFmtParquet
dstRec.KVS = kvs
return dstRec, nil
}
// Close - closes underlaying readers.
// Close - closes underlying readers.
func (r *Reader) Close() error {
return r.reader.Close()
}

View file

@ -34,7 +34,9 @@ import (
)
type recordReader interface {
Read() (sql.Record, error)
// Read a record.
// dst is optional but will be used if valid.
Read(dst sql.Record) (sql.Record, error)
Close() error
}
@ -399,6 +401,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
return true
}
var rec sql.Record
for {
if s3Select.statement.LimitReached() {
if err = writer.Finish(s3Select.getProgress()); err != nil {
@ -408,7 +411,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
break
}
if inputRecord, err = s3Select.recordReader.Read(); err != nil {
if rec, err = s3Select.recordReader.Read(rec); err != nil {
if err != io.EOF {
break
}
@ -431,7 +434,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
break
}
if inputRecord, err = s3Select.statement.EvalFrom(s3Select.Input.format, inputRecord); err != nil {
if inputRecord, err = s3Select.statement.EvalFrom(s3Select.Input.format, rec); err != nil {
break
}

View file

@ -18,6 +18,7 @@ package s3select
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
@ -108,26 +109,29 @@ func TestCSVInput(t *testing.T) {
2.5,baz,true
`)
for _, testCase := range testTable {
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
t.Fatal(err)
}
for i, testCase := range testTable {
t.Run(fmt.Sprint(i), func(t *testing.T) {
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
t.Fatal(err)
}
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(csvData)), nil
}); err != nil {
t.Fatal(err)
}
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(csvData)), nil
}); err != nil {
t.Fatal(err)
}
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
if !reflect.DeepEqual(w.response, testCase.expectedResult) {
t.Fatalf("received response does not match with expected reply")
}
if !reflect.DeepEqual(w.response, testCase.expectedResult) {
t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v", w.response, testCase.expectedResult)
}
})
}
}
func TestJSONInput(t *testing.T) {
@ -191,26 +195,27 @@ func TestJSONInput(t *testing.T) {
{"three":true,"two":"baz","one":2.5}
`)
for _, testCase := range testTable {
for i, testCase := range testTable {
t.Run(fmt.Sprint(i), func(t *testing.T) {
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
t.Fatal(err)
}
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
t.Fatal(err)
}
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(jsonData)), nil
}); err != nil {
t.Fatal(err)
}
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(jsonData)), nil
}); err != nil {
t.Fatal(err)
}
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
if !reflect.DeepEqual(w.response, testCase.expectedResult) {
t.Fatalf("received response does not match with expected reply")
}
if !reflect.DeepEqual(w.response, testCase.expectedResult) {
t.Errorf("received response does not match with expected reply\ngot: %s\nwant:%s", string(w.response), string(testCase.expectedResult))
}
})
}
}
@ -268,45 +273,47 @@ func TestParquetInput(t *testing.T) {
},
}
for _, testCase := range testTable {
getReader := func(offset int64, length int64) (io.ReadCloser, error) {
testdataFile := "testdata.parquet"
file, err := os.Open(testdataFile)
for i, testCase := range testTable {
t.Run(fmt.Sprint(i), func(t *testing.T) {
getReader := func(offset int64, length int64) (io.ReadCloser, error) {
testdataFile := "testdata.parquet"
file, err := os.Open(testdataFile)
if err != nil {
return nil, err
}
fi, err := file.Stat()
if err != nil {
return nil, err
}
if offset < 0 {
offset = fi.Size() + offset
}
if _, err = file.Seek(offset, os.SEEK_SET); err != nil {
return nil, err
}
return file, nil
}
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
return nil, err
t.Fatal(err)
}
fi, err := file.Stat()
if err != nil {
return nil, err
if err = s3Select.Open(getReader); err != nil {
t.Fatal(err)
}
if offset < 0 {
offset = fi.Size() + offset
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
if !reflect.DeepEqual(w.response, testCase.expectedResult) {
t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v", w.response, testCase.expectedResult)
}
if _, err = file.Seek(offset, os.SEEK_SET); err != nil {
return nil, err
}
return file, nil
}
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
t.Fatal(err)
}
if err = s3Select.Open(getReader); err != nil {
t.Fatal(err)
}
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
if !reflect.DeepEqual(w.response, testCase.expectedResult) {
t.Fatalf("received response does not match with expected reply")
}
})
}
}