fix: CopyObject behavior on expanded zones (#9729)

CopyObject was not correctly figuring out the correct
destination object location and would end up creating
duplicate objects on two different zones, reproduced
by doing encryption based key rotation.
This commit is contained in:
Harshavardhana 2020-05-28 14:36:38 -07:00 committed by GitHub
parent b2db8123ec
commit 41688a936b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 86 deletions

View file

@ -112,7 +112,7 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp
// Set all other user defined metadata.
for k, v := range objInfo.UserDefined {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
// values to client.
continue

View file

@ -541,7 +541,7 @@ func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter,
if metadata {
content.UserMetadata = make(StringMap)
for k, v := range CleanMinioInternalMetadataKeys(object.UserDefined) {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
// values to client.
continue

View file

@ -99,14 +99,14 @@ func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *disk
bkMeta := make(map[string]string)
cacheMeta := make(map[string]string)
for k, v := range bkObjectInfo.UserDefined {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
continue
}
bkMeta[http.CanonicalHeaderKey(k)] = v
}
for k, v := range cacheObjInfo.UserDefined {
if strings.HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
continue
}

View file

@ -116,7 +116,10 @@ func isHTTPHeaderSizeTooLarge(header http.Header) bool {
// ReservedMetadataPrefix is the prefix of a metadata key which
// is reserved and for internal use only.
const ReservedMetadataPrefix = "X-Minio-Internal-"
const (
ReservedMetadataPrefix = "X-Minio-Internal-"
ReservedMetadataPrefixLower = "x-minio-internal-"
)
type reservedMetadataHandler struct {
http.Handler
@ -141,7 +144,7 @@ func (h reservedMetadataHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
// and must not set by clients
func containsReservedMetadata(header http.Header) bool {
for key := range header {
if HasPrefix(key, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(key), ReservedMetadataPrefixLower) {
return true
}
}

View file

@ -995,7 +995,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
for k, v := range srcInfo.UserDefined {
if HasPrefix(k, ReservedMetadataPrefix) {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
encMetadata[k] = v
}
}

View file

@ -773,18 +773,17 @@ func (s *xlSets) DeleteObjects(ctx context.Context, bucket string, objects []str
}
// CopyObject - copies objects from one hashedSet to another hashedSet, on server side.
func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
srcSet := s.getHashedSet(srcObject)
destSet := s.getHashedSet(destObject)
dstSet := s.getHashedSet(dstObject)
// Check if this request is only metadata update.
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
if cpSrcDstSame && srcInfo.metadataOnly {
return srcSet.CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
if srcSet == dstSet && srcInfo.metadataOnly {
return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
return destSet.putObject(ctx, destBucket, destObject, srcInfo.PutObjReader, putOpts)
return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}
// FileInfoCh - file info channel

View file

@ -65,64 +65,61 @@ func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, wri
// if source object and destination object are same we only
// update metadata.
func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) {
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
// Check if this request is only metadata update.
if cpSrcDstSame {
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
// Read metadata associated with the object from all disks.
storageDisks := xl.getDisks()
metaArr, errs := readAllXLMetadata(ctx, storageDisks, srcBucket, srcObject)
// get Quorum for this object
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return oi, toObjectErr(reducedErr, srcBucket, srcObject)
}
// List all online disks.
_, modTime := listOnlineDisks(storageDisks, metaArr, errs)
// Pick latest valid metadata.
xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Update `xl.json` content on each disks.
for index := range metaArr {
metaArr[index].Meta = srcInfo.UserDefined
metaArr[index].Meta["etag"] = srcInfo.ETag
}
var onlineDisks []StorageAPI
tempObj := mustGetUUID()
// Cleanup in case of xl.json writing failure
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempObj, writeQuorum, false)
// Write unique `xl.json` for each disk.
if onlineDisks, err = writeUniqueXLMetadata(ctx, storageDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Rename atomically `xl.json` from tmp location to destination for each disk.
if _, err = renameXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
return xlMeta.ToObjectInfo(srcBucket, srcObject), nil
// This call shouldn't be used for anything other than metadata updates.
if !srcInfo.metadataOnly {
return oi, NotImplemented{}
}
putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
return xl.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
defer ObjectPathUpdated(path.Join(dstBucket, dstObject))
// Read metadata associated with the object from all disks.
storageDisks := xl.getDisks()
metaArr, errs := readAllXLMetadata(ctx, storageDisks, srcBucket, srcObject)
// get Quorum for this object
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return oi, toObjectErr(reducedErr, srcBucket, srcObject)
}
// List all online disks.
_, modTime := listOnlineDisks(storageDisks, metaArr, errs)
// Pick latest valid metadata.
xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Update `xl.json` content on each disks.
for index := range metaArr {
metaArr[index].Meta = srcInfo.UserDefined
metaArr[index].Meta["etag"] = srcInfo.ETag
}
var onlineDisks []StorageAPI
tempObj := mustGetUUID()
// Cleanup in case of xl.json writing failure
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempObj, writeQuorum, false)
// Write unique `xl.json` for each disk.
if onlineDisks, err = writeUniqueXLMetadata(ctx, storageDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Rename atomically `xl.json` from tmp location to destination for each disk.
if _, err = renameXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
return xlMeta.ToObjectInfo(srcBucket, srcObject), nil
}
// GetObjectNInfo - returns object info and an object

View file

@ -533,11 +533,11 @@ func (z *xlZones) DeleteObjects(ctx context.Context, bucket string, objects []st
return derrs, nil
}
func (z *xlZones) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
func (z *xlZones) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
// Check if this request is only metadata update.
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if !cpSrcDstSame {
lk := z.NewNSLock(ctx, destBucket, destObject)
lk := z.NewNSLock(ctx, dstBucket, dstObject)
if err := lk.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
@ -545,24 +545,30 @@ func (z *xlZones) CopyObject(ctx context.Context, srcBucket, srcObject, destBuck
}
if z.SingleZone() {
return z.zones[0].CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
return z.zones[0].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
if cpSrcDstSame && srcInfo.metadataOnly {
for _, zone := range z.zones {
objInfo, err = zone.CopyObject(ctx, srcBucket, srcObject, destBucket,
destObject, srcInfo, srcOpts, dstOpts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return objInfo, err
zoneIndex := -1
for i, zone := range z.zones {
objInfo, err := zone.GetObjectInfo(ctx, dstBucket, dstObject, srcOpts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return objInfo, nil
return objInfo, err
}
return objInfo, ObjectNotFound{Bucket: srcBucket, Object: srcObject}
zoneIndex = i
break
}
return z.zones[z.getAvailableZoneIdx(ctx)].CopyObject(ctx, srcBucket, srcObject,
destBucket, destObject, srcInfo, srcOpts, dstOpts)
putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
if zoneIndex >= 0 {
if cpSrcDstSame && srcInfo.metadataOnly {
return z.zones[zoneIndex].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
return z.zones[zoneIndex].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}
return z.zones[z.getAvailableZoneIdx(ctx)].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}
func (z *xlZones) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
@ -1050,6 +1056,7 @@ func (z *xlZones) PutObjectPart(ctx context.Context, bucket, object, uploadID st
if z.SingleZone() {
return z.zones[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
}
for _, zone := range z.zones {
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
if err == nil {