From 1d1c4430b2cf2cb3c8328815bc2e1fd07e526fc7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 14 Aug 2020 11:56:35 -0700 Subject: [PATCH] decrypt ETags in parallel around 500 at a time (#10261) Listing speed-up gained from 10secs for just 400 entries to 2secs for 400 entries --- cmd/bucket-listobjects-handlers.go | 70 +++++++++++++----------------- cmd/crypto/header.go | 3 +- cmd/crypto/kes.go | 4 +- cmd/encryption-v1.go | 2 + cmd/object-api-utils.go | 9 ++++ 5 files changed, 45 insertions(+), 43 deletions(-) diff --git a/cmd/bucket-listobjects-handlers.go b/cmd/bucket-listobjects-handlers.go index c84d58658..1f83b663b 100644 --- a/cmd/bucket-listobjects-handlers.go +++ b/cmd/bucket-listobjects-handlers.go @@ -24,12 +24,36 @@ import ( "strings" "github.com/gorilla/mux" - "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bucket/policy" + "github.com/minio/minio/pkg/sync/errgroup" ) +func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) { + inParallel := func(objects []ObjectInfo) { + g := errgroup.WithNErrs(len(objects)) + for index := range objects { + index := index + g.Go(func() error { + objects[index].ETag = objects[index].GetActualETag(nil) + objects[index].Size, _ = objects[index].GetActualSize() + return nil + }, index) + } + g.Wait() + } + const maxConcurrent = 500 + for { + if len(objects) < maxConcurrent { + inParallel(objects) + return + } + inParallel(objects[:maxConcurrent]) + objects = objects[maxConcurrent:] + } +} + // Validate all the ListObjects query arguments, returns an APIErrorCode // if one of the args do not meet the required conditions. // Special conditions required by MinIO server are as below @@ -104,16 +128,7 @@ func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r return } - for i := range listObjectVersionsInfo.Objects { - if crypto.IsEncrypted(listObjectVersionsInfo.Objects[i].UserDefined) { - listObjectVersionsInfo.Objects[i].ETag = getDecryptedETag(r.Header, listObjectVersionsInfo.Objects[i], false) - } - listObjectVersionsInfo.Objects[i].Size, err = listObjectVersionsInfo.Objects[i].GetActualSize() - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - } + concurrentDecryptETag(ctx, listObjectVersionsInfo.Objects) response := generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delimiter, encodingType, maxkeys, listObjectVersionsInfo) @@ -182,16 +197,7 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt return } - for i := range listObjectsV2Info.Objects { - if crypto.IsEncrypted(listObjectsV2Info.Objects[i].UserDefined) { - listObjectsV2Info.Objects[i].ETag = getDecryptedETag(r.Header, listObjectsV2Info.Objects[i], false) - } - listObjectsV2Info.Objects[i].Size, err = listObjectsV2Info.Objects[i].GetActualSize() - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - } + concurrentDecryptETag(ctx, listObjectsV2Info.Objects) // The next continuation token has id@node_index format to optimize paginated listing nextContinuationToken := listObjectsV2Info.NextContinuationToken @@ -268,16 +274,7 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http return } - for i := range listObjectsV2Info.Objects { - if crypto.IsEncrypted(listObjectsV2Info.Objects[i].UserDefined) { - listObjectsV2Info.Objects[i].ETag = getDecryptedETag(r.Header, listObjectsV2Info.Objects[i], false) - } - listObjectsV2Info.Objects[i].Size, err = listObjectsV2Info.Objects[i].GetActualSize() - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - } + concurrentDecryptETag(ctx, listObjectsV2Info.Objects) // The next continuation token has id@node_index format to optimize paginated listing nextContinuationToken := listObjectsV2Info.NextContinuationToken @@ -400,16 +397,7 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http return } - for i := range listObjectsInfo.Objects { - if crypto.IsEncrypted(listObjectsInfo.Objects[i].UserDefined) { - listObjectsInfo.Objects[i].ETag = getDecryptedETag(r.Header, listObjectsInfo.Objects[i], false) - } - listObjectsInfo.Objects[i].Size, err = listObjectsInfo.Objects[i].GetActualSize() - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - } + concurrentDecryptETag(ctx, listObjectsInfo.Objects) response := generateListObjectsV1Response(bucket, prefix, marker, delimiter, encodingType, maxKeys, listObjectsInfo) diff --git a/cmd/crypto/header.go b/cmd/crypto/header.go index 9f83a72f2..013128394 100644 --- a/cmd/crypto/header.go +++ b/cmd/crypto/header.go @@ -18,10 +18,10 @@ import ( "bytes" "crypto/md5" "encoding/base64" - "encoding/json" "net/http" "strings" + jsoniter "github.com/json-iterator/go" xhttp "github.com/minio/minio/cmd/http" ) @@ -148,6 +148,7 @@ func (s3KMS) ParseHTTP(h http.Header) (string, interface{}, error) { contextStr, ok := h[SSEKmsContext] if ok { var context map[string]interface{} + var json = jsoniter.ConfigCompatibleWithStandardLibrary if err := json.Unmarshal([]byte(contextStr[0]), &context); err != nil { return "", nil, err } diff --git a/cmd/crypto/kes.go b/cmd/crypto/kes.go index c977dec96..9ac01b3bf 100644 --- a/cmd/crypto/kes.go +++ b/cmd/crypto/kes.go @@ -18,7 +18,6 @@ import ( "bytes" "crypto/tls" "crypto/x509" - "encoding/json" "errors" "fmt" "io" @@ -30,10 +29,13 @@ import ( "strings" "time" + jsoniter "github.com/json-iterator/go" xhttp "github.com/minio/minio/cmd/http" xnet "github.com/minio/minio/pkg/net" ) +var json = jsoniter.ConfigCompatibleWithStandardLibrary + // ErrKESKeyExists is the error returned a KES server // when a master key does exist. var ErrKESKeyExists = NewKESError(http.StatusBadRequest, "key does already exist") diff --git a/cmd/encryption-v1.go b/cmd/encryption-v1.go index ad61d5e76..d77ef21d4 100644 --- a/cmd/encryption-v1.go +++ b/cmd/encryption-v1.go @@ -605,12 +605,14 @@ func getDecryptedETag(headers http.Header, objInfo ObjectInfo, copySource bool) if crypto.IsMultiPart(objInfo.UserDefined) { return objInfo.ETag } + if crypto.SSECopy.IsRequested(headers) { key, err = crypto.SSECopy.ParseHTTP(headers) if err != nil { return objInfo.ETag } } + // As per AWS S3 Spec, ETag for SSE-C encrypted objects need not be MD5Sum of the data. // Since server side copy with same source and dest just replaces the ETag, we save // encrypted content MD5Sum as ETag for both SSE-C and SSE-S3, we standardize the ETag diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 419b7352b..be3124fbc 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -400,6 +400,15 @@ func (o ObjectInfo) IsCompressedOK() (bool, error) { return true, fmt.Errorf("unknown compression scheme: %s", scheme) } +// GetActualETag - returns the actual etag of the stored object +// decrypts SSE objects. +func (o ObjectInfo) GetActualETag(h http.Header) string { + if !crypto.IsEncrypted(o.UserDefined) { + return o.ETag + } + return getDecryptedETag(h, o, false) +} + // GetActualSize - returns the actual size of the stored object func (o ObjectInfo) GetActualSize() (int64, error) { if crypto.IsEncrypted(o.UserDefined) {