diff --git a/cmd/erasure-createfile_test.go b/cmd/erasure-createfile_test.go index fcf0ce6c6..f6cd17a97 100644 --- a/cmd/erasure-createfile_test.go +++ b/cmd/erasure-createfile_test.go @@ -174,11 +174,11 @@ func TestErasureEncode(t *testing.T) { reedsolomon.ErrInvShardNum, }, // TestCase - 8. - // test case with data + parity blocks > 255. + // test case with data + parity blocks > 256. // expected to fail with Error Max Shard number. { []byte("1"), - 128, + 129, 128, false, reedsolomon.ErrMaxShardNum, diff --git a/cmd/erasure-healfile.go b/cmd/erasure-healfile.go index 2d0e852cb..dc151cf35 100644 --- a/cmd/erasure-healfile.go +++ b/cmd/erasure-healfile.go @@ -53,8 +53,8 @@ func erasureHealFile(latestDisks []StorageAPI, outDatedDisks []StorageAPI, volum } } - // Reconstruct missing data. - err := decodeData(enBlocks, dataBlocks, parityBlocks) + // Reconstruct any missing data and parity blocks. + err := decodeDataAndParity(enBlocks, dataBlocks, parityBlocks) if err != nil { return nil, err } diff --git a/cmd/erasure-readfile.go b/cmd/erasure-readfile.go index bf03f033c..21d581803 100644 --- a/cmd/erasure-readfile.go +++ b/cmd/erasure-readfile.go @@ -17,7 +17,6 @@ package cmd import ( - "errors" "io" "sync" @@ -272,7 +271,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume, path string, // If we have all the data blocks no need to decode, continue to write. if !isSuccessDataBlocks(enBlocks, dataBlocks) { // Reconstruct the missing data blocks. - if err := decodeData(enBlocks, dataBlocks, parityBlocks); err != nil { + if err := decodeMissingData(enBlocks, dataBlocks, parityBlocks); err != nil { return bytesWritten, err } } @@ -314,31 +313,26 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume, path string, return bytesWritten, nil } -// decodeData - decode encoded blocks. -func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { - // Initialized reedsolomon. +// decodeMissingData - decode any missing data blocks. +func decodeMissingData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { + // Initialize reedsolomon. + rs, err := reedsolomon.New(dataBlocks, parityBlocks) + if err != nil { + return traceError(err) + } + + // Reconstruct any missing data blocks. + return rs.ReconstructData(enBlocks) +} + +// decodeDataAndParity - decode all encoded data and parity blocks. +func decodeDataAndParity(enBlocks [][]byte, dataBlocks, parityBlocks int) error { + // Initialize reedsolomon. rs, err := reedsolomon.New(dataBlocks, parityBlocks) if err != nil { return traceError(err) } // Reconstruct encoded blocks. - err = rs.Reconstruct(enBlocks) - if err != nil { - return traceError(err) - } - - // Verify reconstructed blocks (parity). - ok, err := rs.Verify(enBlocks) - if err != nil { - return traceError(err) - } - if !ok { - // Blocks cannot be reconstructed, corrupted data. - err = errors.New("Verification failed after reconstruction, data likely corrupted") - return traceError(err) - } - - // Success. - return nil + return rs.Reconstruct(enBlocks) } diff --git a/cmd/erasure_test.go b/cmd/erasure_test.go index ceed3f449..84f5e538c 100644 --- a/cmd/erasure_test.go +++ b/cmd/erasure_test.go @@ -124,26 +124,54 @@ func TestErasureDecode(t *testing.T) { // Data block size. blockSize := len(data) - // Generates encoded data based on type of testCase function. - encodedData := testCase.enFn(data, dataBlocks, parityBlocks) + // Test decoder for just the missing data blocks + { + // Generates encoded data based on type of testCase function. + encodedData := testCase.enFn(data, dataBlocks, parityBlocks) - // Decodes the data. - err := decodeData(encodedData, dataBlocks, parityBlocks) - if err != nil && testCase.shouldPass { - t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + // Decodes the data. + err := decodeMissingData(encodedData, dataBlocks, parityBlocks) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Proceed to extract the data blocks. + decodedDataWriter := new(bytes.Buffer) + _, err = writeDataBlocks(decodedDataWriter, encodedData, dataBlocks, 0, int64(blockSize)) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Validate if decoded data is what we expected. + if bytes.Equal(decodedDataWriter.Bytes(), data) != testCase.shouldPass { + err := errUnexpected + t.Errorf("Test %d: Expected to pass by failed instead %s", i+1, err) + } } - // Proceed to extract the data blocks. - decodedDataWriter := new(bytes.Buffer) - _, err = writeDataBlocks(decodedDataWriter, encodedData, dataBlocks, 0, int64(blockSize)) - if err != nil && testCase.shouldPass { - t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) - } + // Test decoder for all missing data and parity blocks + { + // Generates encoded data based on type of testCase function. + encodedData := testCase.enFn(data, dataBlocks, parityBlocks) - // Validate if decoded data is what we expected. - if bytes.Equal(decodedDataWriter.Bytes(), data) != testCase.shouldPass { - err := errUnexpected - t.Errorf("Test %d: Expected to pass by failed instead %s", i+1, err) + // Decodes the data. + err := decodeDataAndParity(encodedData, dataBlocks, parityBlocks) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Proceed to extract the data blocks. + decodedDataWriter := new(bytes.Buffer) + _, err = writeDataBlocks(decodedDataWriter, encodedData, dataBlocks, 0, int64(blockSize)) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Validate if decoded data is what we expected. + if bytes.Equal(decodedDataWriter.Bytes(), data) != testCase.shouldPass { + err := errUnexpected + t.Errorf("Test %d: Expected to pass by failed instead %s", i+1, err) + } } } } diff --git a/vendor/github.com/klauspost/reedsolomon/README.md b/vendor/github.com/klauspost/reedsolomon/README.md index 3e7f51841..37c2702fd 100644 --- a/vendor/github.com/klauspost/reedsolomon/README.md +++ b/vendor/github.com/klauspost/reedsolomon/README.md @@ -81,6 +81,17 @@ To indicate missing data, you set the shard to nil before calling `Reconstruct() ``` The missing data and parity shards will be recreated. If more than 3 shards are missing, the reconstruction will fail. +If you are only interested in the data shards (for reading purposes) you can call `ReconstructData()`: + +```Go + // Delete two data shards + data[3] = nil + data[7] = nil + + // Reconstruct just the missing data shards + err := enc.ReconstructData(data) +``` + So to sum up reconstruction: * The number of data/parity shards must match the numbers used for encoding. * The order of shards must be the same as used when encoding. @@ -101,7 +112,7 @@ You might have a large slice of data. To help you split this, there are some hel ``` This will split the file into the number of data shards set when creating the encoder and create empty parity shards. -An important thing to note is that you have to *keep track of the exact input size*. If the size of the input isn't diviable by the number of data shards, extra zeros will be inserted in the last shard. +An important thing to note is that you have to *keep track of the exact input size*. If the size of the input isn't divisible by the number of data shards, extra zeros will be inserted in the last shard. To join a data set, use the `Join()` function, which will join the shards and write it to the `io.Writer` you supply: ```Go @@ -153,7 +164,7 @@ This also means that you can divide big input up into smaller blocks, and do rec # Streaming API -There has been added a fully streaming API, to help perform fully streaming operations, which enables you to do the same operations, but on streams. To use the stream API, use [`NewStream`](https://godoc.org/github.com/klauspost/reedsolomon#NewStream) function to create the encoding/decoding interfaces. You can use [`NewStreamC`](https://godoc.org/github.com/klauspost/reedsolomon#NewStreamC) to ready an interface that reads/writes concurrently from the streams. +There has been added support for a streaming API, to help perform fully streaming operations, which enables you to do the same operations, but on streams. To use the stream API, use [`NewStream`](https://godoc.org/github.com/klauspost/reedsolomon#NewStream) function to create the encoding/decoding interfaces. You can use [`NewStreamC`](https://godoc.org/github.com/klauspost/reedsolomon#NewStreamC) to ready an interface that reads/writes concurrently from the streams. Input is delivered as `[]io.Reader`, output as `[]io.Writer`, and functionality corresponds to the in-memory API. Each stream must supply the same amount of data, similar to how each slice must be similar size with the in-memory API. If an error occurs in relation to a stream, a [`StreamReadError`](https://godoc.org/github.com/klauspost/reedsolomon#StreamReadError) or [`StreamWriteError`](https://godoc.org/github.com/klauspost/reedsolomon#StreamWriteError) will help you determine which stream was the offender. @@ -162,6 +173,18 @@ There is no buffering or timeouts/retry specified. If you want to add that, you For complete examples of a streaming encoder and decoder see the [examples folder](https://github.com/klauspost/reedsolomon/tree/master/examples). +#Advanced Options + +You can modify internal options which affects how jobs are split between and processed by goroutines. + +To create options, use the WithXXX functions. You can supply options to `New`, `NewStream` and `NewStreamC`. If no Options are supplied, default options are used. + +Example of how to supply options: + + ```Go + enc, err := reedsolomon.New(10, 3, WithMaxGoroutines(25)) + ``` + # Performance Performance depends mainly on the number of parity shards. In rough terms, doubling the number of parity shards will double the encoding time. @@ -186,6 +209,18 @@ Example of performance scaling on Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz - 4 ph | 4 | 3179,33 | 235% | | 8 | 4346,18 | 321% | +Benchmarking `Reconstruct()` followed by a `Verify()` (=`all`) versus just calling `ReconstructData()` (=`data`) gives the following result: +``` +benchmark all MB/s data MB/s speedup +BenchmarkReconstruct10x2x10000-8 2011.67 10530.10 5.23x +BenchmarkReconstruct50x5x50000-8 4585.41 14301.60 3.12x +BenchmarkReconstruct10x2x1M-8 8081.15 28216.41 3.49x +BenchmarkReconstruct5x2x1M-8 5780.07 28015.37 4.85x +BenchmarkReconstruct10x4x1M-8 4352.56 14367.61 3.30x +BenchmarkReconstruct50x20x1M-8 1364.35 4189.79 3.07x +BenchmarkReconstruct10x4x16M-8 1484.35 5779.53 3.89x +``` + # asm2plan9s [asm2plan9s](https://github.com/fwessels/asm2plan9s) is used for assembling the AVX2 instructions into their BYTE/WORD/LONG equivalents. diff --git a/vendor/github.com/klauspost/reedsolomon/galois_amd64.go b/vendor/github.com/klauspost/reedsolomon/galois_amd64.go index e4d686e7a..bb99ea659 100644 --- a/vendor/github.com/klauspost/reedsolomon/galois_amd64.go +++ b/vendor/github.com/klauspost/reedsolomon/galois_amd64.go @@ -5,10 +5,6 @@ package reedsolomon -import ( - "github.com/klauspost/cpuid" -) - //go:noescape func galMulSSSE3(low, high, in, out []byte) @@ -40,12 +36,12 @@ func galMulSSSE3Xor(low, high, in, out []byte) { } */ -func galMulSlice(c byte, in, out []byte) { +func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) { var done int - if cpuid.CPU.AVX2() { + if avx2 { galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 5) << 5 - } else if cpuid.CPU.SSSE3() { + } else if ssse3 { galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 4) << 4 } @@ -58,12 +54,12 @@ func galMulSlice(c byte, in, out []byte) { } } -func galMulSliceXor(c byte, in, out []byte) { +func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) { var done int - if cpuid.CPU.AVX2() { + if avx2 { galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 5) << 5 - } else if cpuid.CPU.SSSE3() { + } else if ssse3 { galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 4) << 4 } diff --git a/vendor/github.com/klauspost/reedsolomon/galois_noasm.go b/vendor/github.com/klauspost/reedsolomon/galois_noasm.go index 1c6b8c4da..be90a3311 100644 --- a/vendor/github.com/klauspost/reedsolomon/galois_noasm.go +++ b/vendor/github.com/klauspost/reedsolomon/galois_noasm.go @@ -4,14 +4,14 @@ package reedsolomon -func galMulSlice(c byte, in, out []byte) { +func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) { mt := mulTable[c] for n, input := range in { out[n] = mt[input] } } -func galMulSliceXor(c byte, in, out []byte) { +func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) { mt := mulTable[c] for n, input := range in { out[n] ^= mt[input] diff --git a/vendor/github.com/klauspost/reedsolomon/matrix.go b/vendor/github.com/klauspost/reedsolomon/matrix.go index e942ead94..339913a75 100644 --- a/vendor/github.com/klauspost/reedsolomon/matrix.go +++ b/vendor/github.com/klauspost/reedsolomon/matrix.go @@ -137,7 +137,7 @@ func (m matrix) Augment(right matrix) (matrix, error) { } // errMatrixSize is returned if matrix dimensions are doesn't match. -var errMatrixSize = errors.New("matrix sizes does not match") +var errMatrixSize = errors.New("matrix sizes do not match") func (m matrix) SameSize(n matrix) error { if len(m) != len(n) { diff --git a/vendor/github.com/klauspost/reedsolomon/options.go b/vendor/github.com/klauspost/reedsolomon/options.go new file mode 100644 index 000000000..44236614d --- /dev/null +++ b/vendor/github.com/klauspost/reedsolomon/options.go @@ -0,0 +1,78 @@ +package reedsolomon + +import ( + "runtime" + + "github.com/klauspost/cpuid" +) + +// Option allows to override processing parameters. +type Option func(*options) + +type options struct { + maxGoroutines int + minSplitSize int + useAVX2, useSSSE3 bool + usePAR1Matrix bool +} + +var defaultOptions = options{ + maxGoroutines: 50, + minSplitSize: 512, +} + +func init() { + if runtime.GOMAXPROCS(0) <= 1 { + defaultOptions.maxGoroutines = 1 + } + // Detect CPU capabilities. + defaultOptions.useSSSE3 = cpuid.CPU.SSSE3() + defaultOptions.useAVX2 = cpuid.CPU.AVX2() +} + +// WithMaxGoroutines is the maximum number of goroutines number for encoding & decoding. +// Jobs will be split into this many parts, unless each goroutine would have to process +// less than minSplitSize bytes (set with WithMinSplitSize). +// For the best speed, keep this well above the GOMAXPROCS number for more fine grained +// scheduling. +// If n <= 0, it is ignored. +func WithMaxGoroutines(n int) Option { + return func(o *options) { + if n > 0 { + o.maxGoroutines = n + } + } +} + +// WithMinSplitSize is the minimum encoding size in bytes per goroutine. +// See WithMaxGoroutines on how jobs are split. +// If n <= 0, it is ignored. +func WithMinSplitSize(n int) Option { + return func(o *options) { + if n > 0 { + o.minSplitSize = n + } + } +} + +func withSSE3(enabled bool) Option { + return func(o *options) { + o.useSSSE3 = enabled + } +} + +func withAVX2(enabled bool) Option { + return func(o *options) { + o.useAVX2 = enabled + } +} + +// WithPAR1Matrix causes the encoder to build the matrix how PARv1 +// does. Note that the method they use is buggy, and may lead to cases +// where recovery is impossible, even if there are enough parity +// shards. +func WithPAR1Matrix() Option { + return func(o *options) { + o.usePAR1Matrix = true + } +} diff --git a/vendor/github.com/klauspost/reedsolomon/reedsolomon.go b/vendor/github.com/klauspost/reedsolomon/reedsolomon.go index 914ebe0ad..4bb84c373 100644 --- a/vendor/github.com/klauspost/reedsolomon/reedsolomon.go +++ b/vendor/github.com/klauspost/reedsolomon/reedsolomon.go @@ -15,7 +15,6 @@ import ( "bytes" "errors" "io" - "runtime" "sync" ) @@ -50,6 +49,21 @@ type Encoder interface { // Use the Verify function to check if data set is ok. Reconstruct(shards [][]byte) error + // ReconstructData will recreate any missing data shards, if possible. + // + // Given a list of shards, some of which contain data, fills in the + // data shards that don't have data. + // + // The length of the array must be equal to Shards. + // You indicate that a shard is missing by setting it to nil. + // + // If there are too few shards to reconstruct the missing + // ones, ErrTooFewShards will be returned. + // + // As the reconstructed shard set may contain missing parity shards, + // calling the Verify function is likely to fail. + ReconstructData(shards [][]byte) error + // Split a data slice into the number of shards given to the encoder, // and create empty parity shards. // @@ -83,51 +97,113 @@ type reedSolomon struct { m matrix tree inversionTree parity [][]byte + o options } // ErrInvShardNum will be returned by New, if you attempt to create // an Encoder where either data or parity shards is zero or less. var ErrInvShardNum = errors.New("cannot create Encoder with zero or less data/parity shards") -// ErrMaxShardNum will be returned by New, if you attempt to create -// an Encoder where data and parity shards cannot be bigger than -// Galois field GF(2^8) - 1. -var ErrMaxShardNum = errors.New("cannot create Encoder with 255 or more data+parity shards") - -// New creates a new encoder and initializes it to -// the number of data shards and parity shards that -// you want to use. You can reuse this encoder. -// Note that the maximum number of data shards is 256. -func New(dataShards, parityShards int) (Encoder, error) { - r := reedSolomon{ - DataShards: dataShards, - ParityShards: parityShards, - Shards: dataShards + parityShards, - } - - if dataShards <= 0 || parityShards <= 0 { - return nil, ErrInvShardNum - } - - if dataShards+parityShards > 255 { - return nil, ErrMaxShardNum - } +// ErrMaxShardNum will be returned by New, if you attempt to create an +// Encoder where data and parity shards are bigger than the order of +// GF(2^8). +var ErrMaxShardNum = errors.New("cannot create Encoder with more than 256 data+parity shards") +// buildMatrix creates the matrix to use for encoding, given the +// number of data shards and the number of total shards. +// +// The top square of the matrix is guaranteed to be an identity +// matrix, which means that the data shards are unchanged after +// encoding. +func buildMatrix(dataShards, totalShards int) (matrix, error) { // Start with a Vandermonde matrix. This matrix would work, // in theory, but doesn't have the property that the data // shards are unchanged after encoding. - vm, err := vandermonde(r.Shards, dataShards) + vm, err := vandermonde(totalShards, dataShards) if err != nil { return nil, err } // Multiply by the inverse of the top square of the matrix. // This will make the top square be the identity matrix, but - // preserve the property that any square subset of rows is + // preserve the property that any square subset of rows is // invertible. - top, _ := vm.SubMatrix(0, 0, dataShards, dataShards) - top, _ = top.Invert() - r.m, _ = vm.Multiply(top) + top, err := vm.SubMatrix(0, 0, dataShards, dataShards) + if err != nil { + return nil, err + } + + topInv, err := top.Invert() + if err != nil { + return nil, err + } + + return vm.Multiply(topInv) +} + +// buildMatrixPAR1 creates the matrix to use for encoding according to +// the PARv1 spec, given the number of data shards and the number of +// total shards. Note that the method they use is buggy, and may lead +// to cases where recovery is impossible, even if there are enough +// parity shards. +// +// The top square of the matrix is guaranteed to be an identity +// matrix, which means that the data shards are unchanged after +// encoding. +func buildMatrixPAR1(dataShards, totalShards int) (matrix, error) { + result, err := newMatrix(totalShards, dataShards) + if err != nil { + return nil, err + } + + for r, row := range result { + // The top portion of the matrix is the identity + // matrix, and the bottom is a transposed Vandermonde + // matrix starting at 1 instead of 0. + if r < dataShards { + result[r][r] = 1 + } else { + for c := range row { + result[r][c] = galExp(byte(c+1), r-dataShards) + } + } + } + return result, nil +} + +// New creates a new encoder and initializes it to +// the number of data shards and parity shards that +// you want to use. You can reuse this encoder. +// Note that the maximum number of total shards is 256. +// If no options are supplied, default options are used. +func New(dataShards, parityShards int, opts ...Option) (Encoder, error) { + r := reedSolomon{ + DataShards: dataShards, + ParityShards: parityShards, + Shards: dataShards + parityShards, + o: defaultOptions, + } + + for _, opt := range opts { + opt(&r.o) + } + if dataShards <= 0 || parityShards <= 0 { + return nil, ErrInvShardNum + } + + if dataShards+parityShards > 256 { + return nil, ErrMaxShardNum + } + + var err error + if r.o.usePAR1Matrix { + r.m, err = buildMatrixPAR1(dataShards, r.Shards) + } else { + r.m, err = buildMatrix(dataShards, r.Shards) + } + if err != nil { + return nil, err + } // Inverted matrices are cached in a tree keyed by the indices // of the invalid rows of the data to reconstruct. @@ -201,7 +277,7 @@ func (r reedSolomon) Verify(shards [][]byte) (bool, error) { // number of matrix rows used, is determined by // outputCount, which is the number of outputs to compute. func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { - if runtime.GOMAXPROCS(0) > 1 && len(inputs[0]) > minSplitSize { + if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize { r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount) return } @@ -209,26 +285,21 @@ func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, output in := inputs[c] for iRow := 0; iRow < outputCount; iRow++ { if c == 0 { - galMulSlice(matrixRows[iRow][c], in, outputs[iRow]) + galMulSlice(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } else { - galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } } } } -const ( - minSplitSize = 512 // min split size per goroutine - maxGoroutines = 50 // max goroutines number for encoding & decoding -) - // Perform the same as codeSomeShards, but split the workload into // several goroutines. func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { var wg sync.WaitGroup - do := byteCount / maxGoroutines - if do < minSplitSize { - do = minSplitSize + do := byteCount / r.o.maxGoroutines + if do < r.o.minSplitSize { + do = r.o.minSplitSize } start := 0 for start < byteCount { @@ -241,9 +312,9 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu in := inputs[c] for iRow := 0; iRow < outputCount; iRow++ { if c == 0 { - galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) + galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2) } else { - galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) + galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2) } } } @@ -258,13 +329,36 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu // except this will check values and return // as soon as a difference is found. func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool { + if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize { + return r.checkSomeShardsP(matrixRows, inputs, toCheck, outputCount, byteCount) + } + outputs := make([][]byte, len(toCheck)) + for i := range outputs { + outputs[i] = make([]byte, byteCount) + } + for c := 0; c < r.DataShards; c++ { + in := inputs[c] + for iRow := 0; iRow < outputCount; iRow++ { + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) + } + } + + for i, calc := range outputs { + if !bytes.Equal(calc, toCheck[i]) { + return false + } + } + return true +} + +func (r reedSolomon) checkSomeShardsP(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool { same := true var mu sync.RWMutex // For above var wg sync.WaitGroup - do := byteCount / maxGoroutines - if do < minSplitSize { - do = minSplitSize + do := byteCount / r.o.maxGoroutines + if do < r.o.minSplitSize { + do = r.o.minSplitSize } start := 0 for start < byteCount { @@ -287,7 +381,7 @@ func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outpu mu.RUnlock() in := inputs[c][start : start+do] for iRow := 0; iRow < outputCount; iRow++ { - galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } } @@ -312,7 +406,7 @@ var ErrShardNoData = errors.New("no shard data") // ErrShardSize is returned if shard length isn't the same for all // shards. -var ErrShardSize = errors.New("shard sizes does not match") +var ErrShardSize = errors.New("shard sizes do not match") // checkShards will check if shards are the same size // or 0, if allowed. An error is returned if this fails. @@ -358,6 +452,35 @@ func shardSize(shards [][]byte) int { // The reconstructed shard set is complete, but integrity is not verified. // Use the Verify function to check if data set is ok. func (r reedSolomon) Reconstruct(shards [][]byte) error { + return r.reconstruct(shards, false) +} + +// ReconstructData will recreate any missing data shards, if possible. +// +// Given a list of shards, some of which contain data, fills in the +// data shards that don't have data. +// +// The length of the array must be equal to Shards. +// You indicate that a shard is missing by setting it to nil. +// +// If there are too few shards to reconstruct the missing +// ones, ErrTooFewShards will be returned. +// +// As the reconstructed shard set may contain missing parity shards, +// calling the Verify function is likely to fail. +func (r reedSolomon) ReconstructData(shards [][]byte) error { + return r.reconstruct(shards, true) +} + +// reconstruct will recreate the missing data shards, and unless +// dataOnly is true, also the missing parity shards +// +// The length of the array must be equal to Shards. +// You indicate that a shard is missing by setting it to nil. +// +// If there are too few shards to reconstruct the missing +// ones, ErrTooFewShards will be returned. +func (r reedSolomon) reconstruct(shards [][]byte, dataOnly bool) error { if len(shards) != r.Shards { return ErrTooFewShards } @@ -464,6 +587,11 @@ func (r reedSolomon) Reconstruct(shards [][]byte) error { } r.codeSomeShards(matrixRows, subShards, outputs[:outputCount], outputCount, shardSize) + if dataOnly { + // Exit out early if we are only interested in the data shards + return nil + } + // Now that we have all of the data shards intact, we can // compute any of the parity that is missing. // diff --git a/vendor/github.com/klauspost/reedsolomon/streaming.go b/vendor/github.com/klauspost/reedsolomon/streaming.go index 293a8b129..9e55d7352 100644 --- a/vendor/github.com/klauspost/reedsolomon/streaming.go +++ b/vendor/github.com/klauspost/reedsolomon/streaming.go @@ -145,8 +145,8 @@ type rsStream struct { // the number of data shards and parity shards that // you want to use. You can reuse this encoder. // Note that the maximum number of data shards is 256. -func NewStream(dataShards, parityShards int) (StreamEncoder, error) { - enc, err := New(dataShards, parityShards) +func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) { + enc, err := New(dataShards, parityShards, o...) if err != nil { return nil, err } @@ -161,8 +161,8 @@ func NewStream(dataShards, parityShards int) (StreamEncoder, error) { // the number of data shards and parity shards given. // // This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes. -func NewStreamC(dataShards, parityShards int, conReads, conWrites bool) (StreamEncoder, error) { - enc, err := New(dataShards, parityShards) +func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) { + enc, err := New(dataShards, parityShards, o...) if err != nil { return nil, err } @@ -256,7 +256,7 @@ func trimShards(in [][]byte, size int) [][]byte { func readShards(dst [][]byte, in []io.Reader) error { if len(in) != len(dst) { - panic("internal error: in and dst size does not match") + panic("internal error: in and dst size do not match") } size := -1 for i := range in { @@ -291,7 +291,7 @@ func readShards(dst [][]byte, in []io.Reader) error { func writeShards(out []io.Writer, in [][]byte) error { if len(out) != len(in) { - panic("internal error: in and out size does not match") + panic("internal error: in and out size do not match") } for i := range in { if out[i] == nil { @@ -318,7 +318,7 @@ type readResult struct { // cReadShards reads shards concurrently func cReadShards(dst [][]byte, in []io.Reader) error { if len(in) != len(dst) { - panic("internal error: in and dst size does not match") + panic("internal error: in and dst size do not match") } var wg sync.WaitGroup wg.Add(len(in)) @@ -366,7 +366,7 @@ func cReadShards(dst [][]byte, in []io.Reader) error { // cWriteShards writes shards concurrently func cWriteShards(out []io.Writer, in [][]byte) error { if len(out) != len(in) { - panic("internal error: in and out size does not match") + panic("internal error: in and out size do not match") } var errs = make(chan error, len(out)) var wg sync.WaitGroup @@ -450,8 +450,9 @@ var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutual // If there are too few shards to reconstruct the missing // ones, ErrTooFewShards will be returned. // -// The reconstructed shard set is complete, but integrity is not verified. -// Use the Verify function to check if data set is ok. +// The reconstructed shard set is complete when explicitly asked for all missing shards. +// However its integrity is not automatically verified. +// Use the Verify function to check in case the data set is complete. func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error { if len(valid) != r.r.Shards { return ErrTooFewShards @@ -461,10 +462,14 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error { } all := createSlice(r.r.Shards, r.bs) + reconDataOnly := true for i := range valid { if valid[i] != nil && fill[i] != nil { return ErrReconstructMismatch } + if i >= r.r.DataShards && fill[i] != nil { + reconDataOnly = false + } } read := 0 @@ -482,7 +487,11 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error { read += shardSize(all) all = trimShards(all, shardSize(all)) - err = r.r.Reconstruct(all) + if reconDataOnly { + err = r.r.ReconstructData(all) // just reconstruct missing data shards + } else { + err = r.r.Reconstruct(all) // reconstruct all missing shards + } if err != nil { return err } diff --git a/vendor/vendor.json b/vendor/vendor.json index 5a71dc91e..4f7de1b7d 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -243,10 +243,10 @@ "revisionTime": "2016-10-16T15:41:25Z" }, { - "checksumSHA1": "Pzd1bfm8Yj1radncaohNZu+UT1I=", + "checksumSHA1": "DnS7x0Gqc93p4hQ88FgE35vfIRw=", "path": "github.com/klauspost/reedsolomon", - "revision": "d0a56f72c0d40a6cdde43a1575ad9686a0098b70", - "revisionTime": "2016-10-28T07:13:20Z" + "revision": "a9202d772777d8d2264c3e0c6159be5047697380", + "revisionTime": "2017-07-19T04:51:23Z" }, { "checksumSHA1": "dNYxHiBLalTqluak2/Z8c3RsSEM=",