restrict multi object delete > 1000 objects (#13454)

AWS S3 returns error if > 1000 objects are sent
per MultiObject delete request, we should comply
no reason to not comply.
This commit is contained in:
Harshavardhana 2021-10-18 08:38:33 -07:00 committed by GitHub
parent 779060bc16
commit 44e4bdc6f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 9 deletions

View file

@ -39,7 +39,7 @@ const (
// RFC3339 a subset of the ISO8601 timestamp format. e.g 2014-04-29T18:30:38Z // RFC3339 a subset of the ISO8601 timestamp format. e.g 2014-04-29T18:30:38Z
iso8601TimeFormat = "2006-01-02T15:04:05.000Z" // Reply date format with nanosecond precision. iso8601TimeFormat = "2006-01-02T15:04:05.000Z" // Reply date format with nanosecond precision.
maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse. maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
maxDeleteList = 10000 // Limit number of objects deleted in a delete call. maxDeleteList = 1000 // Limit number of objects deleted in a delete call.
maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
maxPartsList = 10000 // Limit number of parts in a listPartsResponse. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
) )

View file

@ -442,8 +442,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
deleteObjectsFn = api.CacheAPI().DeleteObjects deleteObjectsFn = api.CacheAPI().DeleteObjects
} }
// Return Malformed XML as S3 spec if the list of objects is empty // Return Malformed XML as S3 spec if the number of objects is empty
if len(deleteObjects.Objects) == 0 { if len(deleteObjects.Objects) == 0 || len(deleteObjects.Objects) > maxDeleteList {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL)
return return
} }

View file

@ -20,7 +20,6 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -81,6 +80,10 @@ func (l *localLocker) canTakeLock(resources ...string) bool {
} }
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > maxDeleteList {
return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList)
}
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
@ -117,6 +120,10 @@ func formatUUID(s string, idx int) string {
} }
func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) { func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > maxDeleteList {
return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList)
}
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
err = nil err = nil
@ -265,7 +272,7 @@ func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (rep
lris, ok := l.lockMap[resource] lris, ok := l.lockMap[resource]
if !ok { if !ok {
// Just to be safe, delete uuids. // Just to be safe, delete uuids.
for idx := 0; idx < math.MaxInt32; idx++ { for idx := 0; idx < maxDeleteList; idx++ {
mapID := formatUUID(uid, idx) mapID := formatUUID(uid, idx)
if _, ok := l.lockUID[mapID]; !ok { if _, ok := l.lockUID[mapID]; !ok {
break break
@ -282,14 +289,15 @@ func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (rep
idx := 0 idx := 0
for { for {
resource, ok := l.lockUID[formatUUID(args.UID, idx)] mapID := formatUUID(args.UID, idx)
resource, ok := l.lockUID[mapID]
if !ok { if !ok {
return idx > 0, nil return idx > 0, nil
} }
lris, ok := l.lockMap[resource] lris, ok := l.lockMap[resource]
if !ok { if !ok {
// Unexpected inconsistency, delete. // Unexpected inconsistency, delete.
delete(l.lockUID, formatUUID(args.UID, idx)) delete(l.lockUID, mapID)
idx++ idx++
continue continue
} }
@ -315,10 +323,11 @@ func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refresh
} }
idx := 0 idx := 0
for { for {
mapID := formatUUID(args.UID, idx)
lris, ok := l.lockMap[resource] lris, ok := l.lockMap[resource]
if !ok { if !ok {
// Inconsistent. Delete UID. // Inconsistent. Delete UID.
delete(l.lockUID, formatUUID(args.UID, idx)) delete(l.lockUID, mapID)
return idx > 0, nil return idx > 0, nil
} }
for i := range lris { for i := range lris {
@ -327,7 +336,7 @@ func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refresh
} }
} }
idx++ idx++
resource, ok = l.lockUID[formatUUID(args.UID, idx)] resource, ok = l.lockUID[mapID]
if !ok { if !ok {
// No more resources for UID, but we did update at least one. // No more resources for UID, but we did update at least one.
return true, nil return true, nil