From 82c3656f7995df958feb3381f697adbe25f06215 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 7 May 2015 22:43:19 -0700 Subject: [PATCH] Reply back CompleteMultipartUploadResult properly with final ETag computed - Now s3 libraries and also objectstorage-go work properly --- pkg/api/api_definitions.go | 16 +++++++-- pkg/api/api_object_handlers.go | 50 ++++++++++++++++++++-------- pkg/api/api_response.go | 9 +++++ pkg/api/errors.go | 8 ++++- pkg/api/utils.go | 16 +++++++-- pkg/storage/drivers/donut/donut.go | 5 +-- pkg/storage/drivers/driver.go | 2 +- pkg/storage/drivers/memory/memory.go | 27 +++++++++------ pkg/storage/drivers/mocks/Driver.go | 7 ++-- 9 files changed, 104 insertions(+), 36 deletions(-) diff --git a/pkg/api/api_definitions.go b/pkg/api/api_definitions.go index aeba5ab5e..f4ff4ba2a 100644 --- a/pkg/api/api_definitions.go +++ b/pkg/api/api_definitions.go @@ -95,7 +95,7 @@ type Owner struct { DisplayName string } -// InitiateMultipartUploadResult - Returns upload id to use +// InitiateMultipartUploadResult container for InitiateMultiPartUpload response, provides uploadID to start MultiPart upload type InitiateMultipartUploadResult struct { XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 InitiateMultipartUploadResult" json:"-"` @@ -104,12 +104,22 @@ type InitiateMultipartUploadResult struct { UploadID string `xml:"UploadId"` } -// CompleteMultipartUpload - Construct object from uploaded parts +// CompleteMultipartUpload container for completing multipart upload type CompleteMultipartUpload struct { Part []Part } -// Part - Description of a multipart part +// CompleteMultipartUploadResult container for completed multipart upload response +type CompleteMultipartUploadResult struct { + XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 CompleteMultipartUploadResult" json:"-"` + + Location string + Bucket string + Key string + ETag string +} + +// Part description of a multipart part type Part struct { PartNumber int ETag string diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index 546caf4c0..0d161a08c 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -21,6 +21,7 @@ import ( "strconv" "encoding/xml" + "github.com/gorilla/mux" "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" @@ -173,8 +174,11 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path) return } - // ignoring error here, TODO find a way to reply back if we can - sizeInt64, _ := strconv.ParseInt(size, 10, 64) + sizeInt64, err := strconv.ParseInt(size, 10, 64) + if err != nil { + writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path) + return + } calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt64, req.Body) switch err := iodine.ToError(err).(type) { case nil: @@ -271,19 +275,24 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path) return } - /// maximum Upload size for objects in a single operation + /// maximum Upload size for multipart objects in a single operation if isMaxObjectSize(size) { writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) return } - /// minimum Upload size for objects in a single operation - if isMinObjectSize(size) { - writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path) + // last part can be less than < 5MB so we need to figure out a way to handle it first + // and then enable below code (y4m4) + // + /// minimum Upload size for multipart objects in a single operation + // if isMinMultipartObjectSize(size) { + // writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path) + // return + // } + sizeInt64, err := strconv.ParseInt(size, 10, 64) + if err != nil { + writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path) return } - // ignoring error here, TODO find a way to reply back if we can - sizeInt64, _ := strconv.ParseInt(size, 10, 64) - var object, bucket string vars := mux.Vars(req) bucket = vars["bucket"] @@ -292,8 +301,7 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re partIDString := vars["partNumber"] partID, err := strconv.Atoi(partIDString) if err != nil { - // TODO find the write value for this error - writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) + writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path) } calculatedMD5, err := server.driver.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body) switch err := iodine.ToError(err).(type) { @@ -345,6 +353,7 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re if err != nil { log.Error.Println(err) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + return } partMap := make(map[int]string) @@ -357,8 +366,23 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re for _, part := range parts.Part { partMap[part.PartNumber] = part.ETag } - - err = server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap) + etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap) + switch err := iodine.ToError(err).(type) { + case nil: + response := generateCompleteMultpartUploadResult(bucket, object, "", etag) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType)) + // set content-length to the size of the body + w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse))) + w.WriteHeader(http.StatusOK) + // write body + w.Write(encodedSuccessResponse) + default: + // TODO handle all other errors, properly + log.Println(err) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } } func (server *minioAPI) notImplementedHandler(w http.ResponseWriter, req *http.Request) { diff --git a/pkg/api/api_response.go b/pkg/api/api_response.go index 09bfccd03..245c29537 100644 --- a/pkg/api/api_response.go +++ b/pkg/api/api_response.go @@ -120,6 +120,15 @@ func generateInitiateMultipartUploadResult(bucket, key, uploadID string) Initiat } } +func generateCompleteMultpartUploadResult(bucket, key, location, etag string) CompleteMultipartUploadResult { + return CompleteMultipartUploadResult{ + Location: location, + Bucket: bucket, + Key: key, + ETag: etag, + } +} + // writeSuccessResponse - write success headers func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) { setCommonHeaders(w, getContentTypeString(acceptsContentType)) diff --git a/pkg/api/errors.go b/pkg/api/errors.go index df72cba8c..6a9b2e924 100644 --- a/pkg/api/errors.go +++ b/pkg/api/errors.go @@ -63,11 +63,12 @@ const ( SignatureDoesNotMatch TooManyBuckets MethodNotAllowed + InvalidPart ) // Error codes, non exhaustive list - standard HTTP errors const ( - NotAcceptable = iota + 23 + NotAcceptable = iota + 24 ) // Error code to Error structure map @@ -187,6 +188,11 @@ var errorCodeResponse = map[int]Error{ Description: "The requested resource is only capable of generating content not acceptable according to the Accept headers sent in the request.", HTTPStatusCode: http.StatusNotAcceptable, }, + InvalidPart: { + Code: "InvalidPart", + Description: "One or more of the specified parts could not be found", + HTTPStatusCode: http.StatusBadRequest, + }, } // errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown diff --git a/pkg/api/utils.go b/pkg/api/utils.go index fda6b9b06..12e4a5459 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -35,11 +35,11 @@ func isValidMD5(md5 string) bool { } /// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html - -// these should be configurable? const ( // maximum object size per PUT request is 5GB maxObjectSize = 1024 * 1024 * 1024 * 5 + // mimimum object size per Multipart PUT request is 5MB + minMultiPartObjectSize = 1024 * 1024 * 5 // minimum object size per PUT request is 1B minObjectSize = 1 ) @@ -67,3 +67,15 @@ func isMinObjectSize(size string) bool { } return false } + +// isMinMultipartObjectSize - verify if the uploaded multipart is of minimum size +func isMinMultipartObjectSize(size string) bool { + i, err := strconv.ParseInt(size, 10, 64) + if err != nil { + return true + } + if i < minMultiPartObjectSize { + return true + } + return false +} diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index c6de90b59..79d77121b 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -30,6 +30,7 @@ import ( "io/ioutil" "errors" + "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/donut" "github.com/minio-io/minio/pkg/storage/drivers" @@ -407,6 +408,6 @@ func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, return "", iodine.New(errors.New("Not Implemented"), nil) } -func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error { - return iodine.New(errors.New("Not Implemented"), nil) +func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { + return "", iodine.New(errors.New("Not Implemented"), nil) } diff --git a/pkg/storage/drivers/driver.go b/pkg/storage/drivers/driver.go index e118c4c25..602bdb7fe 100644 --- a/pkg/storage/drivers/driver.go +++ b/pkg/storage/drivers/driver.go @@ -42,7 +42,7 @@ type Driver interface { // Object Multipart Operations NewMultipartUpload(bucket string, key string, contentType string) (string, error) CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) - CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error + CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error) } // BucketACL - bucket level access control diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index e66179bc1..ce0a68850 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -195,10 +195,10 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { } func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - humanReadableErr, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data) + md5sum, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data) // free debug.FreeOSMemory() - return humanReadableErr, iodine.New(err, nil) + return md5sum, iodine.New(err, nil) } // getMD5AndData - this is written as a wrapper to capture md5sum and data in a more memory efficient way @@ -497,15 +497,18 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive func (memory *memoryDriver) evictObject(a ...interface{}) { cacheStats := memory.objects.Stats() - log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d", + log.Printf("CurrentSize: %d, CurrentItems: %d, TotalEvictions: %d", cacheStats.Bytes, cacheStats.Items, cacheStats.Evictions) key := a[0].(string) // loop through all buckets - for bucket, storedBucket := range memory.storedBuckets { + for _, storedBucket := range memory.storedBuckets { delete(storedBucket.objectMetadata, key) // remove bucket if no objects found anymore if len(storedBucket.objectMetadata) == 0 { - delete(memory.storedBuckets, bucket) + // TODO (y4m4) + // for now refrain from deleting buckets, due to multipart deletes before fullobject being written + // this case gets trigerred and we can't store the actual data at all receiving 404 on the client + // delete(memory.storedBuckets, bucket) } } debug.FreeOSMemory() @@ -531,7 +534,7 @@ func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partI return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) } -func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error { +func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { // TODO verify upload id exists memory.lock.Lock() size := int64(0) @@ -542,7 +545,7 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string } } else { memory.lock.Unlock() - return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) } } @@ -552,13 +555,16 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string if _, ok := parts[i]; ok { if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true { obj := object.([]byte) - io.Copy(&fullObject, bytes.NewBuffer(obj)) + _, err := io.Copy(&fullObject, bytes.NewBuffer(obj)) + if err != nil { + return "", iodine.New(err, nil) + } } else { log.Println("Cannot fetch: ", getMultipartKey(key, uploadID, i)) } } else { memory.lock.Unlock() - return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) } } @@ -573,6 +579,5 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string md5sumSlice := md5.Sum(fullObject.Bytes()) md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) - _, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) - return err + return memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) } diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index d5cd0a02f..b3fc46c7b 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -146,10 +146,11 @@ func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, pa } // CompleteMultipartUpload is a mock -func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error { +func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error) { ret := m.Called(bucket, key, uploadID, parts) - r0 := ret.Error(0) + r0 := ret.Get(0).(string) + r1 := ret.Error(1) - return r0 + return r0, r1 }