azure: allow parts > 100MiB size to work properly (#4869)

Previously if any multipart part size > 100MiB is uploaded, azure
gateway returns error.

This patch fixes the issue by creating sub parts sizing each 100MiB of
given multipart part.  On complete multipart, it fetches all uploaded
azure block ids for each parts and performs completion.

Fixes #4868
This commit is contained in:
Bala FA 2017-09-05 16:56:23 -07:00 committed by Dee Koder
parent cf479eb401
commit 189b6682d6
2 changed files with 110 additions and 43 deletions

View file

@ -25,17 +25,18 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/storage"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio-go/pkg/policy"
"github.com/minio/sha256-simd"
)
const globalAzureAPIVersion = "2016-05-31"
const azureBlockSize = 100 * humanize.MiByte
// Canonicalize the metadata headers, without this azure-sdk calculates
// incorrect signature. This attempt to canonicalize is to convert
@ -496,27 +497,23 @@ func (a *azureObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObje
return info, traceError(NotImplemented{})
}
// Encode partID+md5Hex to a blockID.
func azureGetBlockID(partID int, md5Hex string) string {
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%.5d.%s", partID, md5Hex)))
// Encode partID, subPartNumber and md5Hex to blockID.
func azureGetBlockID(partID, subPartNumber int, md5Hex string) string {
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s", partID, subPartNumber, md5Hex)))
}
// Decode blockID to partID+md5Hex.
func azureParseBlockID(blockID string) (int, string, error) {
idByte, err := base64.StdEncoding.DecodeString(blockID)
if err != nil {
return 0, "", traceError(err)
// Parse blockID into partID, subPartNumber and md5Hex.
func azureParseBlockID(blockID string) (partID, subPartNumber int, md5Hex string, err error) {
var blockIDBytes []byte
if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil {
return
}
idStr := string(idByte)
splitRes := strings.Split(idStr, ".")
if len(splitRes) != 2 {
return 0, "", traceError(errUnexpected)
if _, err = fmt.Sscanf(string(blockIDBytes), "%05d.%02d.%s", &partID, &subPartNumber, &md5Hex); err != nil {
err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes))
}
partID, err := strconv.Atoi(splitRes[0])
if err != nil {
return 0, "", traceError(err)
}
return partID, splitRes[1], nil
return
}
// PutObjectPart - Use Azure equivalent PutBlockWithLength.
@ -550,10 +547,25 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int
teeReader = io.TeeReader(data, io.MultiWriter(writers...))
}
id := azureGetBlockID(partID, etag)
err = a.client.PutBlockWithLength(bucket, object, id, uint64(size), teeReader, nil)
if err != nil {
return info, azureToObjectError(traceError(err), bucket, object)
subPartSize := int64(azureBlockSize)
subPartNumber := 1
for remainingSize := size; remainingSize >= 0; remainingSize -= subPartSize {
// Allow to create zero sized part.
if remainingSize == 0 && subPartNumber > 1 {
break
}
if remainingSize < subPartSize {
subPartSize = remainingSize
}
id := azureGetBlockID(partID, subPartNumber, etag)
err = a.client.PutBlockWithLength(bucket, object, id, uint64(subPartSize), io.LimitReader(teeReader, subPartSize), nil)
if err != nil {
return info, azureToObjectError(traceError(err), bucket, object)
}
subPartNumber++
}
if md5Hex != "" {
@ -601,7 +613,7 @@ func (a *azureObjects) ListObjectParts(bucket, object, uploadID string, partNumb
break
}
partCount++
partID, md5Hex, err := azureParseBlockID(part.Name)
partID, _, md5Hex, err := azureParseBlockID(part.Name)
if err != nil {
return result, err
}
@ -639,14 +651,63 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
if meta == nil {
return objInfo, traceError(InvalidUploadID{uploadID})
}
var blocks []storage.Block
for _, part := range uploadedParts {
blocks = append(blocks, storage.Block{
ID: azureGetBlockID(part.PartNumber, part.ETag),
Status: storage.BlockStatusUncommitted,
})
resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted)
if err != nil {
return objInfo, azureToObjectError(traceError(err), bucket, object)
}
err = a.client.PutBlockList(bucket, object, blocks)
getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) {
for _, part := range resp.UncommittedBlocks {
var partID int
var md5Hex string
if partID, _, md5Hex, err = azureParseBlockID(part.Name); err != nil {
return nil, 0, err
}
if partNumber == partID && etag == md5Hex {
blocks = append(blocks, storage.Block{
ID: part.Name,
Status: storage.BlockStatusUncommitted,
})
size += part.Size
}
}
if len(blocks) == 0 {
return nil, 0, InvalidPart{}
}
return blocks, size, nil
}
var allBlocks []storage.Block
partSizes := make([]int64, len(uploadedParts))
for i, part := range uploadedParts {
var blocks []storage.Block
var size int64
blocks, size, err = getBlocks(part.PartNumber, part.ETag)
if err != nil {
return objInfo, traceError(err)
}
allBlocks = append(allBlocks, blocks...)
partSizes[i] = size
}
// Error out if parts except last part sizing < 5MiB.
for i, size := range partSizes[:len(partSizes)-1] {
if size < globalMinPartSize {
return objInfo, traceError(PartTooSmall{
PartNumber: uploadedParts[i].PartNumber,
PartSize: size,
PartETag: uploadedParts[i].ETag,
})
}
}
err = a.client.PutBlockList(bucket, object, allBlocks)
if err != nil {
return objInfo, azureToObjectError(traceError(err), bucket, object)
}

View file

@ -133,15 +133,16 @@ func TestAzureToObjectError(t *testing.T) {
// Test azureGetBlockID().
func TestAzureGetBlockID(t *testing.T) {
testCases := []struct {
partID int
md5 string
blockID string
partID int
subPartNumber int
md5 string
blockID string
}{
{1, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="},
{2, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="},
{1, 7, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="},
{2, 19, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="},
}
for _, test := range testCases {
blockID := azureGetBlockID(test.partID, test.md5)
blockID := azureGetBlockID(test.partID, test.subPartNumber, test.md5)
if blockID != test.blockID {
t.Fatalf("%s is not equal to %s", blockID, test.blockID)
}
@ -151,26 +152,31 @@ func TestAzureGetBlockID(t *testing.T) {
// Test azureParseBlockID().
func TestAzureParseBlockID(t *testing.T) {
testCases := []struct {
partID int
md5 string
blockID string
blockID string
partID int
subPartNumber int
md5 string
}{
{1, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="},
{2, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="},
{"MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U=", 1, 7, "d41d8cd98f00b204e9800998ecf8427e"},
{"MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM=", 2, 19, "a7fb6b7b36ee4ed66b5546fac4690273"},
}
for _, test := range testCases {
partID, md5, err := azureParseBlockID(test.blockID)
partID, subPartNumber, md5, err := azureParseBlockID(test.blockID)
if err != nil {
t.Fatal(err)
}
if partID != test.partID {
t.Fatalf("%d not equal to %d", partID, test.partID)
}
if subPartNumber != test.subPartNumber {
t.Fatalf("%d not equal to %d", subPartNumber, test.subPartNumber)
}
if md5 != test.md5 {
t.Fatalf("%s not equal to %s", md5, test.md5)
}
}
_, _, err := azureParseBlockID("junk")
_, _, _, err := azureParseBlockID("junk")
if err == nil {
t.Fatal("Expected azureParseBlockID() to return error")
}