janitor duty on erasure-decode

This commit is contained in:
Anand Babu (AB) Periasamy 2015-03-25 17:41:41 -07:00
parent db2ac35da5
commit bd39768de2
3 changed files with 52 additions and 40 deletions

View file

@ -23,8 +23,8 @@ import (
"unsafe"
)
// Integer to Int conversion
func int2CInt(srcErrList []int) *C.int32_t {
// intSlice2CIntArray converts Go int slice to C int array
func intSlice2CIntArray(srcErrList []int) *C.int32_t {
var sizeErrInt = int(unsafe.Sizeof(srcErrList[0]))
switch sizeInt {
case sizeErrInt:

View file

@ -27,11 +27,16 @@ import (
"unsafe"
)
// Decode decodes 2 tuple data containing (k + m) chunks back into its original form.
// Additionally original block length should also be provided as input.
// Decode decodes erasure coded blocks of data into its original
// form. Erasure coded data contains K data blocks and M parity
// blocks. Decode can withstand data loss up to any M number of blocks.
//
// Decoded data is exactly similar in length and content as the original data.
func (e *Encoder) Decode(chunks [][]byte, length int) ([]byte, error) {
// "encodedDataBlocks" is an array of K data blocks and M parity
// blocks. Data blocks are position and order dependent. Missing blocks
// are set to "nil". There must be at least "K" number of data|parity
// blocks.
// "dataLen" is the length of original source data
func (e *Encoder) Decode(encodedDataBlocks [][]byte, dataLen int) (decodedData []byte, err error) {
var decodeMatrix *C.uint8_t
var decodeTbls *C.uint8_t
var decodeIndex *C.uint32_t
@ -40,66 +45,73 @@ func (e *Encoder) Decode(chunks [][]byte, length int) ([]byte, error) {
k := int(e.params.K)
m := int(e.params.M)
n := k + m
if len(chunks) != n {
msg := fmt.Sprintf("chunks length must be %d", n)
return nil, errors.New(msg)
// We need the data and parity blocks preserved in the same order. Missing blocks are set to nil.
if len(encodedDataBlocks) != n {
return nil, errors.New(fmt.Sprintf("Encoded data blocks slice must of length [%d]", n))
}
chunkLen := GetEncodedBlockLen(length, uint8(k))
errorIndex := make([]int, n+1)
var errCount int
// Length of a single encoded block
encodedBlockLen := GetEncodedBlockLen(dataLen, uint8(k))
for i := range chunks {
// Check of chunks are really null
if chunks[i] == nil || len(chunks[i]) == 0 {
errorIndex[errCount] = i
errCount++
// Keep track of errors per block.
missingEncodedBlocks := make([]int, n+1)
var missingEncodedBlocksCount int = 0
// Check for the missing encoded blocks
for i := range encodedDataBlocks {
if encodedDataBlocks[i] == nil || len(encodedDataBlocks[i]) == 0 {
missingEncodedBlocks[missingEncodedBlocksCount] = i
missingEncodedBlocksCount++
}
}
errorIndex[errCount] = -1
errCount++
missingEncodedBlocks[missingEncodedBlocksCount] = -1
missingEncodedBlocksCount++
// Too many missing chunks, cannot be more than parity `m`
if errCount-1 > int(n-k) {
return nil, errors.New("too many erasures requested, can't decode")
// Cannot reconstruct original data. Need at least M number of data or parity blocks.
if missingEncodedBlocksCount-1 > m {
return nil, fmt.Errorf("Cannot reconstruct original data. Need at least [%d] data or parity blocks", m)
}
errorIndexPtr := int2CInt(errorIndex[:errCount])
// Convert from Go int slice to C int array
missingEncodedBlocksC := intSlice2CIntArray(missingEncodedBlocks[:missingEncodedBlocksCount])
for i := range chunks {
if chunks[i] == nil || len(chunks[i]) == 0 {
chunks[i] = make([]byte, chunkLen)
// Allocate buffer for the missing blocks
for i := range encodedDataBlocks {
if encodedDataBlocks[i] == nil || len(encodedDataBlocks[i]) == 0 {
encodedDataBlocks[i] = make([]byte, encodedBlockLen)
}
}
C.minio_init_decoder(errorIndexPtr, C.int(k), C.int(n), C.int(errCount-1),
// Initialzie decoder
C.minio_init_decoder(missingEncodedBlocksC, C.int(k), C.int(n), C.int(missingEncodedBlocksCount-1),
e.encodeMatrix, &decodeMatrix, &decodeTbls, &decodeIndex)
// Make a slice of pointers to encoded blocks. Necessary to bridge to the C world.
pointers := make([]*byte, n)
for i := range chunks {
pointers[i] = &chunks[i][0]
for i := range encodedDataBlocks {
pointers[i] = &encodedDataBlocks[i][0]
}
data := (**C.uint8_t)(unsafe.Pointer(&pointers[0]))
ret := C.minio_get_source_target(C.int(errCount-1), C.int(k), C.int(m), errorIndexPtr,
decodeIndex, data, &source, &target)
// Get pointers to source "data" and target "parity" blocks from the output byte array.
ret := C.minio_get_source_target(C.int(missingEncodedBlocksCount-1), C.int(k), C.int(m), missingEncodedBlocksC,
decodeIndex, (**C.uint8_t)(unsafe.Pointer(&pointers[0])), &source, &target)
if int(ret) == -1 {
return nil, errors.New("Decoding source target failed")
return nil, errors.New("Unable to decode data")
}
C.ec_encode_data(C.int(chunkLen), C.int(k), C.int(errCount-1), decodeTbls,
// Decode data
C.ec_encode_data(C.int(encodedBlockLen), C.int(k), C.int(missingEncodedBlocksCount-1), decodeTbls,
source, target)
recoveredOutput := make([]byte, 0, chunkLen*int(k))
// Allocate buffer to output buffer
decodedData = make([]byte, 0, encodedBlockLen*int(k))
for i := 0; i < int(k); i++ {
recoveredOutput = append(recoveredOutput, chunks[i]...)
decodedData = append(decodedData, encodedDataBlocks[i]...)
}
// TODO cache this if necessary
e.decodeMatrix = decodeMatrix
e.decodeTbls = decodeTbls
return recoveredOutput[:length], nil
return decodedData[:dataLen], nil
}

View file

@ -161,7 +161,7 @@ func (e *Encoder) Encode(inputData []byte) (encodedBlocks [][]byte, err error) {
}
// Extend inputData buffer to accommodate coded parity blocks
if true { // create a temporary scope to trigger garbage collect
{ // Local Scope
encodedParityBlocksLen := encodedBlockLen * m
parityBlocks := make([]byte, encodedParityBlocksLen)
inputData = append(inputData, parityBlocks...)