avoid double listObjectParts calls improves performance (#9606)

this PR is to avoid double calls across multiple calls
in APIs

- CopyObjectPart
- PutObjectPart
This commit is contained in:
Harshavardhana 2020-05-15 08:06:45 -07:00 committed by GitHub
parent b730bd1396
commit d348ec0f6c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 133 deletions

View file

@ -95,7 +95,7 @@ func (m gwMetaV1) ToObjectInfo(bucket, object string) minio.ObjectInfo {
ContentType: m.Meta["content-type"],
ContentEncoding: m.Meta["content-encoding"],
ETag: minio.CanonicalizeETag(m.ETag),
UserDefined: minio.CleanMetadataKeys(m.Meta, filterKeys...),
UserDefined: minio.CleanMinioInternalMetadataKeys(minio.CleanMetadataKeys(m.Meta, filterKeys...)),
Parts: m.Parts,
}

View file

@ -314,7 +314,6 @@ func (l *s3EncObjects) GetObjectNInfo(ctx context.Context, bucket, object string
if err != nil {
return l.s3Objects.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}
objInfo.UserDefined = minio.CleanMinioInternalMetadataKeys(objInfo.UserDefined)
fn, off, length, err := minio.NewGetObjectReader(rs, objInfo, o)
if err != nil {
return nil, minio.ErrorRespToObjectError(err)
@ -549,7 +548,7 @@ func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, objec
}
lpi.Parts[i].ETag = partMeta.ETag
}
lpi.UserDefined = dm.Meta
lpi.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
lpi.Object = object
return lpi, nil
}

View file

@ -369,7 +369,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if objectAPI.IsEncryptionSupported() {
objInfo.UserDefined = CleanMinioInternalMetadataKeys(objInfo.UserDefined)
if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
@ -545,7 +544,6 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
objInfo.UserDefined = CleanMinioInternalMetadataKeys(objInfo.UserDefined)
}
// Set encryption response headers
@ -1820,8 +1818,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
}
// Read compression metadata preserved in the init multipart for the decision.
_, compressPart := li.UserDefined[ReservedMetadataPrefix+"compression"]
isCompressed := compressPart
_, isCompressed := li.UserDefined[ReservedMetadataPrefix+"compression"]
// Compress only if the compression is enabled during initial multipart.
if isCompressed {
s2c := newS2CompressReader(gr)
@ -1838,64 +1835,57 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, li.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
rawReader := srcInfo.Reader
pReader := NewPutObjReader(rawReader, nil, nil)
isEncrypted := false
isEncrypted := crypto.IsEncrypted(li.UserDefined)
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() && !isCompressed {
li, lerr := objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1, dstOpts)
if lerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, lerr), r.URL, guessIsBrowserReq(r))
if objectAPI.IsEncryptionSupported() && !isCompressed && isEncrypted {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
return
}
li.UserDefined = CleanMinioInternalMetadataKeys(li.UserDefined)
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, li.UserDefined)
if crypto.S3.IsEncrypted(li.UserDefined) && crypto.SSEC.IsRequested(r.Header) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
return
}
var key []byte
if crypto.SSEC.IsRequested(r.Header) {
key, err = ParseSSECustomerRequest(r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
key, err = decryptObjectInfo(key, dstBucket, dstObject, li.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if crypto.IsEncrypted(li.UserDefined) {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
return
}
if crypto.S3.IsEncrypted(li.UserDefined) && crypto.SSEC.IsRequested(r.Header) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
return
}
isEncrypted = true
var key []byte
if crypto.SSEC.IsRequested(r.Header) {
key, err = ParseSSECustomerRequest(r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
key, err = decryptObjectInfo(key, dstBucket, dstObject, li.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
copy(objectEncryptionKey[:], key)
copy(objectEncryptionKey[:], key)
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
reader, err = sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey[:]})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
info := ObjectInfo{Size: length}
srcInfo.Reader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", length, globalCLIContext.StrictS3Compat)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
pReader = NewPutObjReader(rawReader, srcInfo.Reader, &objectEncryptionKey)
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
reader, err = sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey[:]})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
info := ObjectInfo{Size: length}
srcInfo.Reader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", length, globalCLIContext.StrictS3Compat)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
pReader = NewPutObjReader(rawReader, srcInfo.Reader, &objectEncryptionKey)
}
srcInfo.PutObjReader = pReader
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
@ -2062,11 +2052,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Read compression metadata preserved in the init multipart for the decision.
_, compressPart := li.UserDefined[ReservedMetadataPrefix+"compression"]
isCompressed := false
if objectAPI.IsCompressionSupported() && compressPart {
// Read compression metadata preserved in the init multipart for the decision.
_, isCompressed := li.UserDefined[ReservedMetadataPrefix+"compression"]
if objectAPI.IsCompressionSupported() && isCompressed {
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize, globalCLIContext.StrictS3Compat)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
@ -2080,7 +2070,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
size = -1 // Since compressed size is un-predictable.
md5hex = "" // Do not try to verify the content.
sha256hex = ""
isCompressed = true
}
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize, globalCLIContext.StrictS3Compat)
@ -2091,67 +2080,57 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
rawReader := hashReader
pReader := NewPutObjReader(rawReader, nil, nil)
isEncrypted := false
isEncrypted := crypto.IsEncrypted(li.UserDefined)
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() && !isCompressed {
var li ListPartsInfo
li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1, ObjectOptions{})
if objectAPI.IsEncryptionSupported() && !isCompressed && isEncrypted {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
return
}
opts, err = putOpts(ctx, r, bucket, object, li.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
li.UserDefined = CleanMinioInternalMetadataKeys(li.UserDefined)
if crypto.IsEncrypted(li.UserDefined) {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r))
return
}
isEncrypted = true // to detect SSE-S3 encryption
opts, err = putOpts(ctx, r, bucket, object, li.UserDefined)
var key []byte
if crypto.SSEC.IsRequested(r.Header) {
key, err = ParseSSECustomerRequest(r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
var key []byte
if crypto.SSEC.IsRequested(r.Header) {
key, err = ParseSSECustomerRequest(r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
// Calculating object encryption key
key, err = decryptObjectInfo(key, bucket, object, li.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
copy(objectEncryptionKey[:], key)
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
in := io.Reader(hashReader)
if size > encryptBufferThreshold {
// The encryption reads in blocks of 64KB.
// We add a buffer on bigger files to reduce the number of syscalls upstream.
in = bufio.NewReaderSize(hashReader, encryptBufferSize)
}
reader, err = sio.EncryptReader(in, sio.Config{Key: partEncryptionKey[:]})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
info := ObjectInfo{Size: size}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", size, globalCLIContext.StrictS3Compat)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
pReader = NewPutObjReader(rawReader, hashReader, &objectEncryptionKey)
}
// Calculating object encryption key
key, err = decryptObjectInfo(key, bucket, object, li.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
copy(objectEncryptionKey[:], key)
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
in := io.Reader(hashReader)
if size > encryptBufferThreshold {
// The encryption reads in blocks of 64KB.
// We add a buffer on bigger files to reduce the number of syscalls upstream.
in = bufio.NewReaderSize(hashReader, encryptBufferSize)
}
reader, err = sio.EncryptReader(in, sio.Config{Key: partEncryptionKey[:]})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
info := ObjectInfo{Size: size}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", size, globalCLIContext.StrictS3Compat)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
pReader = NewPutObjReader(rawReader, hashReader, &objectEncryptionKey)
}
putObjectPart := objectAPI.PutObjectPart
@ -2249,42 +2228,36 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL, guessIsBrowserReq(r))
return
}
var opts ObjectOptions
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
var ssec bool
if objectAPI.IsEncryptionSupported() {
var li ListPartsInfo
li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
if objectAPI.IsEncryptionSupported() && crypto.IsEncrypted(listPartsInfo.UserDefined) {
var key []byte
if crypto.SSEC.IsEncrypted(listPartsInfo.UserDefined) {
ssec = true
}
if crypto.IsEncrypted(li.UserDefined) {
var key []byte
if crypto.SSEC.IsEncrypted(li.UserDefined) {
ssec = true
var objectEncryptionKey []byte
if crypto.S3.IsEncrypted(listPartsInfo.UserDefined) {
// Calculating object encryption key
objectEncryptionKey, err = decryptObjectInfo(key, bucket, object, listPartsInfo.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
var objectEncryptionKey []byte
if crypto.S3.IsEncrypted(li.UserDefined) {
// Calculating object encryption key
objectEncryptionKey, err = decryptObjectInfo(key, bucket, object, li.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
parts := make([]PartInfo, len(listPartsInfo.Parts))
for i, p := range listPartsInfo.Parts {
part := p
part.ETag = tryDecryptETag(objectEncryptionKey, p.ETag, ssec)
parts[i] = part
}
listPartsInfo.Parts = parts
}
parts := make([]PartInfo, len(listPartsInfo.Parts))
for i, p := range listPartsInfo.Parts {
part := p
part.ETag = tryDecryptETag(objectEncryptionKey, p.ETag, ssec)
parts[i] = part
}
listPartsInfo.Parts = parts
}
response := generateListPartsResponse(listPartsInfo, encodingType)