Determine small objects on shard size (#11935)

Use shard size to determine when to inline data.

For unversioned objects, use 128K/shard and for versioned 16K thresholds.
This commit is contained in:
Klaus Post 2021-03-31 18:19:14 +02:00 committed by GitHub
parent 0d8c74358d
commit 4dcce17eb9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 9 deletions

View file

@ -177,7 +177,7 @@ func TestListOnlineDisks(t *testing.T) {
}
object := "object"
data := bytes.Repeat([]byte("a"), smallFileThreshold*2)
data := bytes.Repeat([]byte("a"), smallFileThreshold*16)
z := obj.(*erasureServerPools)
erasureDisks := z.serverPools[0].sets[0].getDisks()
for i, test := range testCases {

View file

@ -735,11 +735,15 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
}
}()
shardFileSize := erasure.ShardFileSize(data.Size())
writers := make([]io.Writer, len(onlineDisks))
dataSize := data.Size()
var inlineBuffers []*bytes.Buffer
if dataSize >= 0 && dataSize < smallFileThreshold {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
if shardFileSize >= 0 {
if !opts.Versioned && shardFileSize < smallFileThreshold {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
} else if shardFileSize < smallFileThreshold/8 {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
}
}
for i, disk := range onlineDisks {
if disk == nil {
@ -747,12 +751,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
}
if len(inlineBuffers) > 0 {
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(data.Size())))
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize))
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
continue
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj,
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize(), false)
}
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)

View file

@ -302,7 +302,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
bucket := "bucket"
object := "object"
opts := ObjectOptions{}
buf := make([]byte, 129<<10)
buf := make([]byte, smallFileThreshold*16)
if _, err = io.ReadFull(crand.Reader, buf); err != nil {
t.Fatal(err)
}
@ -467,7 +467,7 @@ func TestPutObjectNoQuorum(t *testing.T) {
object := "object"
opts := ObjectOptions{}
// Create "object" under "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold*2)), smallFileThreshold*2, "", ""), opts)
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold*16)), smallFileThreshold*16, "", ""), opts)
if err != nil {
t.Fatal(err)
}
@ -496,7 +496,7 @@ func TestPutObjectNoQuorum(t *testing.T) {
}
z.serverPools[0].erasureDisksMu.Unlock()
// Upload new content to same object "object"
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold*2)), smallFileThreshold*2, "", ""), opts)
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold*16)), smallFileThreshold*16, "", ""), opts)
if !errors.Is(err, errErasureWriteQuorum) {
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
}