Use GetObjectNInfo in CopyObject and CopyObjectPart (#6489)

This commit is contained in:
Anis Elleuch 2018-09-25 20:39:46 +01:00 committed by Dee Koder
parent 1e5ac39ff3
commit aa4e2b1542
24 changed files with 226 additions and 274 deletions

View file

@ -43,6 +43,9 @@ func setCommonHeaders(w http.ResponseWriter) {
w.Header().Set("X-Amz-Bucket-Region", region)
}
w.Header().Set("Accept-Ranges", "bytes")
// Remove sensitive information
crypto.RemoveSensitiveHeaders(w.Header())
}
// Encodes the response headers into XML format.

View file

@ -68,3 +68,29 @@ func parseCopyPartRange(rangeString string, resourceSize int64) (offset, length
return hrange.GetOffsetLength(resourceSize)
}
// parseCopyPartRangeSpec transforms a range string (e.g. bytes=3-4) to HTTPRangeSpec
// and returns errors if weird values
func parseCopyPartRangeSpec(rangeString string) (hrange *HTTPRangeSpec, err error) {
hrange, err = parseRequestRangeSpec(rangeString)
if err != nil {
return nil, err
}
if hrange.IsSuffixLength || hrange.Start < 0 || hrange.End < 0 {
return nil, errInvalidRange
}
return hrange, nil
}
// checkCopyPartRangeWithSize adds more check to the range string in case of
// copy object part. This API requires having specific start and end range values
// e.g. 'bytes=3-10'. Other use cases will be rejected.
func checkCopyPartRangeWithSize(rs *HTTPRangeSpec, resourceSize int64) (err error) {
if rs == nil {
return nil
}
if rs.IsSuffixLength || rs.Start >= resourceSize || rs.End >= resourceSize {
return errInvalidRangeSource
}
return nil
}

View file

@ -172,7 +172,6 @@ func (ssecCopy) IsRequested(h http.Header) bool {
// ParseHTTP parses the SSE-C headers and returns the SSE-C client key
// on success. SSE-C copy headers are ignored.
func (ssec) ParseHTTP(h http.Header) (key [32]byte, err error) {
defer h.Del(SSECKey) // remove SSE-C key from headers after parsing
if h.Get(SSECAlgorithm) != SSEAlgorithmAES256 {
return key, ErrInvalidCustomerAlgorithm
}
@ -198,7 +197,6 @@ func (ssec) ParseHTTP(h http.Header) (key [32]byte, err error) {
// ParseHTTP parses the SSE-C copy headers and returns the SSE-C client key
// on success. Regular SSE-C headers are ignored.
func (ssecCopy) ParseHTTP(h http.Header) (key [32]byte, err error) {
defer h.Del(SSECopyKey) // remove SSE-C copy key of source object from headers after parsing
if h.Get(SSECopyAlgorithm) != SSEAlgorithmAES256 {
return key, ErrInvalidCustomerAlgorithm
}

View file

@ -251,9 +251,6 @@ func TestSSECParse(t *testing.T) {
if err == nil && key == zeroKey {
t.Errorf("Test %d: parsed client key is zero key", i)
}
if _, ok := test.Header[SSECKey]; ok {
t.Errorf("Test %d: client key is not removed from HTTP headers after parsing", i)
}
}
}

View file

@ -58,7 +58,7 @@ type cacheObjects struct {
// file path patterns to exclude from cache
exclude []string
// Object functions pointing to the corresponding functions of backend implementation.
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error)
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType) (gr *GetObjectReader, err error)
GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObjectFn func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
@ -90,7 +90,7 @@ type CacheObjectLayer interface {
ListBuckets(ctx context.Context) (buckets []BucketInfo, err error)
DeleteBucket(ctx context.Context, bucket string) error
// Object operations.
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error)
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType) (gr *GetObjectReader, err error)
GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
@ -183,9 +183,9 @@ func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string {
return metadata
}
func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error) {
func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType) (gr *GetObjectReader, err error) {
bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h)
bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, writeLock)
if c.isCacheExclude(bucket, object) || !bkReader.ObjInfo.IsCacheable() {
return bkReader, bkErr
@ -210,7 +210,7 @@ func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
return bkReader, bkErr
}
if cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs, h); cacheErr == nil {
if cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs, h, lockType); cacheErr == nil {
if backendDown {
// If the backend is down, serve the request from cache.
return cacheReader, nil

View file

@ -60,7 +60,7 @@ func (api *DummyObjectLayer) ListObjectsV2(ctx context.Context, bucket, prefix,
return
}
func (api *DummyObjectLayer) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error) {
func (api *DummyObjectLayer) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lock LockType) (gr *GetObjectReader, err error) {
return
}

View file

@ -123,8 +123,6 @@ func ParseSSECustomerHeader(header http.Header) (key []byte, err error) {
// This function rotates old to new key.
func rotateKey(oldKey []byte, newKey []byte, bucket, object string, metadata map[string]string) error {
delete(metadata, crypto.SSECKey) // make sure we do not save the key by accident
switch {
default:
return errObjectTampered
@ -155,8 +153,6 @@ func rotateKey(oldKey []byte, newKey []byte, bucket, object string, metadata map
}
func newEncryptMetadata(key []byte, bucket, object string, metadata map[string]string, sseS3 bool) ([]byte, error) {
delete(metadata, crypto.SSECKey) // make sure we do not save the key by accident
var sealedKey crypto.SealedKey
if sseS3 {
if globalKMS == nil {
@ -245,7 +241,6 @@ func DecryptCopyRequest(client io.Writer, r *http.Request, bucket, object string
return nil, err
}
}
delete(metadata, crypto.SSECopyKey) // make sure we do not save the key by accident
return newDecryptWriter(client, key, bucket, object, 0, metadata)
}
@ -325,7 +320,6 @@ func DecryptRequestWithSequenceNumberR(client io.Reader, h http.Header, bucket,
if err != nil {
return nil, err
}
delete(metadata, crypto.SSECKey) // make sure we do not save the key by accident
return newDecryptReader(client, key, bucket, object, seqNumber, metadata)
}
@ -342,7 +336,6 @@ func DecryptCopyRequestR(client io.Reader, h http.Header, bucket, object string,
return nil, err
}
}
delete(metadata, crypto.SSECopyKey) // make sure we do not save the key by accident
return newDecryptReader(client, key, bucket, object, 0, metadata)
}
@ -444,7 +437,6 @@ func DecryptRequestWithSequenceNumber(client io.Writer, r *http.Request, bucket,
if err != nil {
return nil, err
}
delete(metadata, crypto.SSECKey) // make sure we do not save the key by accident
return newDecryptWriter(client, key, bucket, object, seqNumber, metadata)
}
@ -514,13 +506,6 @@ func (d *DecryptBlocksReader) buildDecrypter(partID int) error {
mac.Write(partIDbin[:])
partEncryptionKey := mac.Sum(nil)
// make sure we do not save the key by accident
if d.copySource {
delete(m, crypto.SSECopyKey)
} else {
delete(m, crypto.SSECKey)
}
// Limit the reader, so the decryptor doesnt receive bytes
// from the next part (different DARE stream)
encLenToRead := d.parts[d.partIndex].Size - d.partEncRelOffset
@ -636,13 +621,6 @@ func (w *DecryptBlocksWriter) buildDecrypter(partID int) error {
mac.Write(partIDbin[:])
partEncryptionKey := mac.Sum(nil)
// make sure we do not save the key by accident
if w.copySource {
delete(m, crypto.SSECopyKey)
} else {
delete(m, crypto.SSECKey)
}
// make sure to provide a NopCloser such that a Close
// on sio.decryptWriter doesn't close the underlying writer's
// close which perhaps can close the stream prematurely.

View file

@ -210,11 +210,6 @@ func TestParseSSECustomerRequest(t *testing.T) {
if err != test.err {
t.Errorf("Test %d: Parse returned: %v want: %v", i, err, test.err)
}
key := request.Header.Get(crypto.SSECKey)
if (err == nil || err == crypto.ErrCustomerKeyMD5Mismatch) && key != "" {
t.Errorf("Test %d: Client key survived parsing - found key: %v", i, key)
}
}
}
@ -331,10 +326,6 @@ func TestParseSSECopyCustomerRequest(t *testing.T) {
if err != test.err {
t.Errorf("Test %d: Parse returned: %v want: %v", i, err, test.err)
}
key := request.Header.Get(crypto.SSECopyKey)
if (err == nil || err == crypto.ErrCustomerKeyMD5Mismatch) && key != "" {
t.Errorf("Test %d: Client key survived parsing - found key: %v", i, key)
}
}
}
@ -376,9 +367,6 @@ func TestEncryptRequest(t *testing.T) {
if err != nil {
t.Fatalf("Test %d: Failed to encrypt request: %v", i, err)
}
if key, ok := test.metadata[crypto.SSECKey]; ok {
t.Errorf("Test %d: Client provided key survived in metadata - key: %s", i, key)
}
if kdf, ok := test.metadata[crypto.SSESealAlgorithm]; !ok {
t.Errorf("Test %d: ServerSideEncryptionKDF must be part of metadata: %v", i, kdf)
}

View file

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"io"
"strings"
"github.com/klauspost/reedsolomon"
"github.com/minio/minio/cmd/logger"
@ -90,7 +91,9 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
// Copy the block.
n, err := io.Copy(dst, bytes.NewReader(block))
if err != nil {
logger.LogIf(ctx, err)
if !strings.Contains(err.Error(), "read/write on closed pipe") {
logger.LogIf(ctx, err)
}
return 0, err
}

View file

@ -255,22 +255,6 @@ func (fs *FSObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, d
return pi, toObjectErr(err)
}
// Initialize pipe.
go func() {
if gerr := fs.GetObject(ctx, srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag, srcOpts); gerr != nil {
if gerr = srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
return
}
return
}
// Close writer explicitly signaling we wrote all data.
if gerr := srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
return
}
}()
partInfo, err := fs.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, srcInfo.Reader, dstOpts)
if err != nil {
return pi, toObjectErr(err, dstBucket, dstObject)

View file

@ -418,35 +418,19 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
// update metadata.
func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) {
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
// Hold write lock on destination since in both cases
// - if source and destination are same
// - if source and destination are different
// it is the sole mutating state.
objectDWLock := fs.nsMutex.NewNSLock(dstBucket, dstObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return oi, err
}
defer objectDWLock.Unlock()
// if source and destination are different, we have to hold
// additional read lock as well to protect against writes on
// source.
if !cpSrcDstSame {
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := fs.nsMutex.NewNSLock(srcBucket, srcObject)
if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil {
objectDWLock := fs.nsMutex.NewNSLock(dstBucket, dstObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return oi, err
}
defer objectSRLock.RUnlock()
defer objectDWLock.Unlock()
}
if _, err := fs.statBucketDir(ctx, srcBucket); err != nil {
return oi, toObjectErr(err, srcBucket)
}
if cpSrcDstSame && srcInfo.metadataOnly {
// Close any writer which was initialized.
defer srcInfo.Writer.Close()
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, srcBucket, srcObject, fs.metaJSONFile)
wlk, err := fs.rwPool.Write(fsMetaPath)
if err != nil {
@ -478,20 +462,6 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
return fsMeta.ToObjectInfo(srcBucket, srcObject, fi), nil
}
go func() {
if gerr := fs.getObject(ctx, srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag, !cpSrcDstSame); gerr != nil {
if gerr = srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
}
return
}
// Close writer explicitly signaling we wrote all data.
if gerr := srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
return
}
}()
objInfo, err := fs.putObject(ctx, dstBucket, dstObject, srcInfo.Reader, srcInfo.UserDefined)
if err != nil {
return oi, toObjectErr(err, dstBucket, dstObject)
@ -502,7 +472,8 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
// GetObjectNInfo - returns object info and a reader for object
// content.
func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error) {
func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType) (gr *GetObjectReader, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return nil, err
}
@ -511,13 +482,26 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
return nil, toObjectErr(err, bucket)
}
// Lock the object before reading.
lock := fs.nsMutex.NewNSLock(bucket, object)
if err = lock.GetRLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return nil, err
var nsUnlocker = func() {}
if lockType != noLock {
// Lock the object before reading.
lock := fs.nsMutex.NewNSLock(bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(globalObjectTimeout); err != nil {
logger.LogIf(ctx, err)
return nil, err
}
nsUnlocker = lock.RUnlock
}
}
nsUnlocker := lock.RUnlock
// For a directory, we need to send an reader that returns no bytes.
if hasSuffix(object, slashSeparator) {
@ -535,7 +519,7 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
// Take a rwPool lock for NFS gateway type deployment
rwPoolUnlocker := func() {}
if bucket != minioMetaBucket {
if bucket != minioMetaBucket && lockType != noLock {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
_, err = fs.rwPool.Open(fsMetaPath)
if err != nil && err != errFileNotFound {

View file

@ -616,7 +616,7 @@ func (a *azureObjects) ListObjectsV2(ctx context.Context, bucket, prefix, contin
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (a *azureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header) (gr *minio.GetObjectReader, err error) {
func (a *azureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = a.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
if err != nil {

View file

@ -396,7 +396,7 @@ func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (l *b2Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header) (gr *minio.GetObjectReader, err error) {
func (l *b2Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = l.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
if err != nil {

View file

@ -737,7 +737,7 @@ func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continua
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (l *gcsGateway) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header) (gr *minio.GetObjectReader, err error) {
func (l *gcsGateway) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = l.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
if err != nil {

View file

@ -507,7 +507,7 @@ func (t *tritonObjects) ListObjectsV2(ctx context.Context, bucket, prefix, conti
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (t *tritonObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header) (gr *minio.GetObjectReader, err error) {
func (t *tritonObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = t.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
if err != nil {

View file

@ -547,7 +547,7 @@ func ossGetObject(ctx context.Context, client *oss.Client, bucket, key string, s
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (l *ossObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header) (gr *minio.GetObjectReader, err error) {
func (l *ossObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = l.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
if err != nil {

View file

@ -328,7 +328,7 @@ func (l *s3Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (l *s3Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header) (gr *minio.GetObjectReader, err error) {
func (l *s3Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = l.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
if err != nil {

View file

@ -432,7 +432,7 @@ func (s *siaObjects) ListObjects(ctx context.Context, bucket string, prefix stri
}
// GetObjectNInfo - returns object info and locked object ReadCloser
func (s *siaObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header) (gr *minio.GetObjectReader, err error) {
func (s *siaObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType) (gr *minio.GetObjectReader, err error) {
var objInfo minio.ObjectInfo
objInfo, err = s.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
if err != nil {

View file

@ -32,6 +32,15 @@ type ObjectOptions struct {
ServerSideEncryption encrypt.ServerSide
}
// LockType represents required locking for ObjectLayer operations
type LockType int
const (
noLock LockType = iota
readLock
writeLock
)
// ObjectLayer implements primitives for object API layer.
type ObjectLayer interface {
// Storage operations.
@ -54,7 +63,7 @@ type ObjectLayer interface {
//
// IMPORTANTLY, when implementations return err != nil, this
// function MUST NOT return a non-nil ReadCloser.
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (reader *GetObjectReader, err error)
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType) (reader *GetObjectReader, err error)
GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)

View file

@ -379,11 +379,14 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, cleanUpFns ...func())
// encrypted bytes. The header parameter is used to
// provide encryption parameters.
fn = func(inputReader io.Reader, h http.Header, cFns ...func()) (r *GetObjectReader, err error) {
copySource := h.Get(crypto.SSECopyAlgorithm) != ""
cFns = append(cleanUpFns, cFns...)
// Attach decrypter on inputReader
var decReader io.Reader
decReader, err = DecryptBlocksRequestR(inputReader, h,
off, length, seqNumber, partStart, oi, false)
off, length, seqNumber, partStart, oi, copySource)
if err != nil {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {

View file

@ -22,7 +22,6 @@ import (
"encoding/binary"
"encoding/hex"
"encoding/xml"
"fmt"
"io"
goioutil "io/ioutil"
"net"
@ -72,7 +71,6 @@ func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) {
// on an SQL expression. In the request, along with the sql expression, you must
// also specify a data serialization format (JSON, CSV) of the object.
func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "SelectObject")
// Fetch object stat info.
@ -156,7 +154,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
gr, err := getObjectNInfo(ctx, bucket, object, nil, r.Header)
gr, err := getObjectNInfo(ctx, bucket, object, nil, r.Header, readLock)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@ -351,7 +349,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
}
}
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header)
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@ -604,6 +602,11 @@ func getCpObjMetadataFromHeader(ctx context.Context, r *http.Request, userMeta m
// ----------
// This implementation of the PUT operation adds an object to a bucket
// while reading the object from another source.
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
// the handler should ensure to remove these keys before sending them to the object layer.
// Currently these keys are:
// - X-Amz-Server-Side-Encryption-Customer-Key
// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "CopyObject")
@ -649,12 +652,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
var srcOpts, dstOpts ObjectOptions
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
srcInfo, err := objectAPI.GetObjectInfo(ctx, srcBucket, srcObject, srcOpts)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
@ -664,13 +661,45 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
}
if objectAPI.IsEncryptionSupported() {
if apiErr, _ := DecryptCopyObjectInfo(&srcInfo, r.Header); apiErr != ErrNone {
writeErrorResponse(w, apiErr, r.URL)
return
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
// Get request range.
var rs *HTTPRangeSpec
rangeHeader := r.Header.Get("x-amz-copy-source-range")
if rangeHeader != "" {
var parseRangeErr error
if rs, parseRangeErr = parseRequestRangeSpec(rangeHeader); parseRangeErr != nil {
// Handle only errInvalidRange. Ignore other
// parse error and treat it as regular Get
// request like Amazon S3.
if parseRangeErr == errInvalidRange {
writeErrorResponse(w, ErrInvalidRange, r.URL)
return
}
// log the error.
logger.LogIf(ctx, parseRangeErr)
}
}
var lock = noLock
if !cpSrcDstSame {
lock = readLock
}
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, lock)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
defer gr.Close()
srcInfo := gr.ObjInfo
// Verify before x-amz-copy-source preconditions before continuing with CopyObject.
if checkCopyObjectPreconditions(w, r, srcInfo) {
return
@ -682,21 +711,16 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
// We have to copy metadata only if source and destination are same.
// this changes for encryption which can be observed below.
if cpSrcDstSame {
srcInfo.metadataOnly = true
}
var writer io.WriteCloser = pipeWriter
var reader io.Reader = pipeReader
var reader io.Reader = gr
srcInfo.Reader, err = hash.NewReader(reader, srcInfo.Size, "", "")
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -717,7 +741,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
newKey, err = ParseSSECustomerRequest(r)
}
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -729,7 +752,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// Get the old key which needs to be rotated.
oldKey, err = ParseSSECopyCustomerRequest(r.Header, srcInfo.UserDefined)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -737,7 +759,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
encMetadata[k] = v
}
if err = rotateKey(oldKey, newKey, srcBucket, srcObject, encMetadata); err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -746,14 +767,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
srcInfo.metadataOnly = true
} else {
if sseCopyC || sseCopyS3 {
// Source is encrypted make sure to save the encrypted size.
writer = ioutil.LimitedWriter(writer, 0, srcInfo.Size)
writer, srcInfo.Size, err = DecryptAllBlocksCopyRequest(writer, r, srcBucket, srcObject, srcInfo)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// We are not only copying just metadata instead
// we are creating a new object at this point, even
// if source and destination are same objects.
@ -765,7 +778,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if sseC || sseS3 {
reader, err = newEncryptReader(reader, newKey, dstBucket, dstObject, encMetadata, sseS3)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -776,20 +788,29 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if !sseCopyC && !sseCopyS3 {
size = srcInfo.EncryptedSize()
}
} else {
if sseCopyC || sseCopyS3 {
size, _ = srcInfo.DecryptedSize()
delete(srcInfo.UserDefined, crypto.SSEIV)
delete(srcInfo.UserDefined, crypto.SSESealAlgorithm)
delete(srcInfo.UserDefined, crypto.SSECSealedKey)
delete(srcInfo.UserDefined, crypto.SSEMultipart)
delete(srcInfo.UserDefined, crypto.S3SealedKey)
delete(srcInfo.UserDefined, crypto.S3KMSSealedKey)
delete(srcInfo.UserDefined, crypto.S3KMSKeyID)
}
}
srcInfo.Reader, err = hash.NewReader(reader, size, "", "") // do not try to verify encrypted content
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
}
srcInfo.Writer = writer
srcInfo.UserDefined, err = getCpObjMetadataFromHeader(ctx, r, srcInfo.UserDefined)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
@ -800,13 +821,15 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
srcInfo.UserDefined[k] = v
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(srcInfo.UserDefined)
// Check if x-amz-metadata-directive was not set to REPLACE and source,
// desination are same objects. Apply this restriction also when
// metadataOnly is true indicating that we are not overwriting the object.
// if encryption is enabled we do not need explicit "REPLACE" metadata to
// be enabled as well - this is to allow for key-rotation.
if !isMetadataReplace(r.Header) && srcInfo.metadataOnly && !crypto.SSEC.IsEncrypted(srcInfo.UserDefined) {
pipeWriter.CloseWithError(fmt.Errorf("invalid copy dest"))
// If x-amz-metadata-directive is not set to REPLACE then we need
// to error out if source and destination are same.
writeErrorResponse(w, ErrInvalidCopyDest, r.URL)
@ -844,27 +867,15 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
var dstRecords []dns.SrvRecord
if dstRecords, err = globalDNSConfig.Get(dstBucket); err == nil {
go func() {
if gerr := objectAPI.GetObject(ctx, srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag, srcOpts); gerr != nil {
pipeWriter.CloseWithError(gerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
// Close writer explicitly to indicate data has been written
srcInfo.Writer.Close()
}()
// Send PutObject request to appropriate instance (in federated deployment)
host, port := getRandomHostPort(dstRecords)
client, rerr := getRemoteInstanceClient(host, port)
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
remoteObjInfo, rerr := client.PutObject(dstBucket, dstObject, srcInfo.Reader, srcInfo.Size, "", "", srcInfo.UserDefined, dstOpts.ServerSideEncryption)
if rerr != nil {
pipeWriter.CloseWithError(rerr)
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
@ -876,14 +887,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// object is same then only metadata is updated.
objInfo, err = objectAPI.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
pipeReader.Close()
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
@ -911,6 +919,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// PutObjectHandler - PUT Object
// ----------
// This implementation of the PUT operation adds an object to a bucket.
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
// the handler should ensure to remove these keys before sending them to the object layer.
// Currently these keys are:
// - X-Amz-Server-Side-Encryption-Customer-Key
// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObject")
@ -1080,6 +1093,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) {
putObject = api.CacheAPI().PutObject
}
@ -1127,6 +1143,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
/// Multipart objectAPIHandlers
// NewMultipartUploadHandler - New multipart upload.
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
// the handler should ensure to remove these keys before sending them to the object layer.
// Currently these keys are:
// - X-Amz-Server-Side-Encryption-Customer-Key
// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "NewMultipartUpload")
@ -1193,6 +1214,9 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
metadata[k] = v
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
newMultipartUpload := objectAPI.NewMultipartUpload
if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) {
newMultipartUpload = api.CacheAPI().NewMultipartUpload
@ -1262,11 +1286,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
var srcOpts, dstOpts ObjectOptions
srcInfo, err := objectAPI.GetObjectInfo(ctx, srcBucket, srcObject, srcOpts)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
@ -1276,20 +1295,38 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
}
}
if objectAPI.IsEncryptionSupported() {
if apiErr, _ := DecryptCopyObjectInfo(&srcInfo, r.Header); apiErr != ErrNone {
writeErrorResponse(w, apiErr, r.URL)
return
}
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
// Get request range.
var startOffset, length int64
var rs *HTTPRangeSpec
rangeHeader := r.Header.Get("x-amz-copy-source-range")
if startOffset, length, err = parseCopyPartRange(rangeHeader, srcInfo.Size); err != nil {
logger.GetReqInfo(ctx).AppendTags("rangeHeader", rangeHeader)
logger.LogIf(ctx, err)
writeCopyPartErr(w, err, r.URL)
if rangeHeader != "" {
var parseRangeErr error
if rs, parseRangeErr = parseCopyPartRangeSpec(rangeHeader); parseRangeErr != nil {
// Handle only errInvalidRange
// Ignore other parse error and treat it as regular Get request like Amazon S3.
logger.GetReqInfo(ctx).AppendTags("rangeHeader", rangeHeader)
logger.LogIf(ctx, parseRangeErr)
writeCopyPartErr(w, parseRangeErr, r.URL)
return
}
}
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
defer gr.Close()
srcInfo := gr.ObjInfo
// Special care for CopyObjectPart
if partRangeErr := checkCopyPartRangeWithSize(rs, srcInfo.Size); partRangeErr != nil {
writeCopyPartErr(w, partRangeErr, r.URL)
return
}
@ -1298,21 +1335,30 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
// Get the object offset & length
startOffset, length, _ := rs.GetOffsetLength(srcInfo.Size)
if objectAPI.IsEncryptionSupported() {
if crypto.IsEncrypted(srcInfo.UserDefined) {
decryptedSize, decryptErr := srcInfo.DecryptedSize()
if decryptErr != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
startOffset, length, _ = rs.GetOffsetLength(decryptedSize)
}
}
/// maximum copy size for multipart objects in a single operation
if isMaxAllowedPartSize(length) {
writeErrorResponse(w, ErrEntityTooLarge, r.URL)
return
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
var writer io.WriteCloser = pipeWriter
var reader io.Reader = pipeReader
var reader io.Reader = gr
var getLength = length
srcInfo.Reader, err = hash.NewReader(reader, length, "", "")
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -1320,23 +1366,9 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
var li ListPartsInfo
li, err = objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
sseCopyC := crypto.SSECopy.IsRequested(r.Header)
sseCopyS3 := crypto.S3.IsEncrypted(srcInfo.UserDefined)
if sseCopyC || sseCopyS3 {
// Response writer should be limited early on for decryption upto required length,
// additionally also skipping mod(offset)64KiB boundaries.
writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length)
writer, startOffset, getLength, err = DecryptBlocksRequest(writer, r, srcBucket, srcObject, startOffset, length, srcInfo, true)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
if crypto.IsEncrypted(li.UserDefined) {
if !hasServerSideEncryptionHeader(r.Header) {
writeErrorResponse(w, ErrSSEMultipartEncrypted, r.URL)
@ -1346,7 +1378,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
if crypto.SSEC.IsRequested(r.Header) {
key, err = ParseSSECustomerRequest(r)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -1354,7 +1385,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
var objectEncryptionKey []byte
objectEncryptionKey, err = decryptObjectInfo(key, dstBucket, dstObject, li.UserDefined)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -1367,7 +1397,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
partEncryptionKey := mac.Sum(nil)
reader, err = sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey})
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@ -1376,13 +1405,11 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
size := info.EncryptedSize()
srcInfo.Reader, err = hash.NewReader(reader, size, "", "")
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
}
srcInfo.Writer = writer
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
partInfo, err := objectAPI.CopyObjectPart(ctx, srcBucket, srcObject, dstBucket,
@ -1392,9 +1419,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
// Close the pipe after successful operation.
pipeReader.Close()
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
encodedSuccessResponse := encodeResponse(response)

View file

@ -580,8 +580,8 @@ func (s *xlSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err err
// --- Object Operations ---
// GetObjectNInfo - returns object info and locked object ReadCloser
func (s *xlSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error) {
return s.getHashedSet(object).GetObjectNInfo(ctx, bucket, object, rs, h)
func (s *xlSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType) (gr *GetObjectReader, err error) {
return s.getHashedSet(object).GetObjectNInfo(ctx, bucket, object, rs, h, lockType)
}
// GetObject - reads an object from the hashedSet based on the object name.
@ -615,42 +615,14 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
return srcSet.CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
}
// Hold write lock on destination since in both cases
// - if source and destination are same
// - if source and destination are different
// it is the sole mutating state.
objectDWLock := destSet.nsMutex.NewNSLock(destBucket, destObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer objectDWLock.Unlock()
// if source and destination are different, we have to hold
// additional read lock as well to protect against writes on
// source.
if !cpSrcDstSame {
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := srcSet.nsMutex.NewNSLock(srcBucket, srcObject)
if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil {
objectDWLock := destSet.nsMutex.NewNSLock(destBucket, destObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer objectSRLock.RUnlock()
defer objectDWLock.Unlock()
}
go func() {
if gerr := srcSet.getObject(ctx, srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag, srcOpts); gerr != nil {
if gerr = srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
}
return
}
// Close writer explicitly signaling we wrote all data.
if gerr := srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
return
}
}()
return destSet.putObject(ctx, destBucket, destObject, srcInfo.Reader, srcInfo.UserDefined, dstOpts)
}
@ -846,23 +818,8 @@ func (s *xlSets) NewMultipartUpload(ctx context.Context, bucket, object string,
// Copies a part of an object from source hashedSet to destination hashedSet.
func (s *xlSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int,
startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (partInfo PartInfo, err error) {
srcSet := s.getHashedSet(srcObject)
destSet := s.getHashedSet(destObject)
go func() {
if gerr := srcSet.GetObject(ctx, srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag, srcOpts); gerr != nil {
if gerr = srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
return
}
}
if gerr := srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
return
}
}()
return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.Reader, dstOpts)
}

View file

@ -269,20 +269,6 @@ func (xl xlObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, ds
return pi, err
}
go func() {
if gerr := xl.getObject(ctx, srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag, srcOpts); gerr != nil {
if gerr = srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
}
return
}
// Close writer explicitly signaling we wrote all data.
if gerr := srcInfo.Writer.Close(); gerr != nil {
logger.LogIf(ctx, gerr)
return
}
}()
partInfo, err := xl.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, srcInfo.Reader, dstOpts)
if err != nil {
return pi, toObjectErr(err, dstBucket, dstObject)

View file

@ -166,13 +166,25 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc
// GetObjectNInfo - returns object info and an object
// Read(Closer). When err != nil, the returned reader is always nil.
func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error) {
func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType) (gr *GetObjectReader, err error) {
var nsUnlocker = func() {}
// Acquire lock
lock := xl.nsMutex.NewNSLock(bucket, object)
if err = lock.GetRLock(globalObjectTimeout); err != nil {
return nil, err
if lockType != noLock {
lock := xl.nsMutex.NewNSLock(bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(globalObjectTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(globalObjectTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.RUnlock
}
}
nsUnlocker := lock.RUnlock
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
nsUnlocker()