From 498ce1e9bb592876e11a90a13245da7ad9617b0d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 14 May 2016 17:18:00 -0700 Subject: [PATCH] handler: Add a waitgroup to avoid expect100Continue crash. (#1623) This waitgroup allows for safe blocking operation where we can cleanly control the flow of the writes and the underlying pipe altogether. Fixes #1553 --- object-handlers.go | 58 ++++++++++++++++++++++++++++++++-------------- server_test.go | 10 ++++---- server_xl_test.go | 8 +++---- 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/object-handlers.go b/object-handlers.go index 4cadb90c0..d5748d1dc 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -28,6 +28,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" mux "github.com/gorilla/mux" @@ -570,14 +571,20 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req case authTypePresigned, authTypeSigned: // Initialize a pipe for data pipe line. reader, writer := io.Pipe() - + var wg = &sync.WaitGroup{} // Start writing in a routine. + wg.Add(1) go func() { + defer wg.Done() shaWriter := sha256.New() multiWriter := io.MultiWriter(shaWriter, writer) - if _, cerr := io.CopyN(multiWriter, r.Body, size); cerr != nil { - errorIf(cerr, "Unable to read HTTP body.", nil) - writer.CloseWithError(err) + if _, wErr := io.CopyN(multiWriter, r.Body, size); wErr != nil { + // Pipe closed. + if wErr == io.ErrClosedPipe { + return + } + errorIf(wErr, "Unable to read HTTP body.", nil) + writer.CloseWithError(wErr) return } shaPayload := shaWriter.Sum(nil) @@ -588,15 +595,16 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } else if isRequestPresignedSignatureV4(r) { s3Error = doesPresignedSignatureMatch(hex.EncodeToString(shaPayload), r, validateRegion) } + var sErr error if s3Error != ErrNone { if s3Error == ErrSignatureDoesNotMatch { - writer.CloseWithError(errSignatureMismatch) - return + sErr = errSignatureMismatch + } else { + sErr = fmt.Errorf("%v", getAPIError(s3Error)) } - writer.CloseWithError(fmt.Errorf("%v", getAPIError(s3Error))) + writer.CloseWithError(sErr) return } - // Close the writer. writer.Close() }() @@ -606,6 +614,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req metadata["md5Sum"] = hex.EncodeToString(md5Bytes) // Create object. md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata) + // Close the pipe. + reader.Close() + // Wait for all the routines to finish. + wg.Wait() } if err != nil { errorIf(err, "PutObject failed.", nil) @@ -710,22 +722,29 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } // No need to verify signature, anonymous request access is // already allowed. - partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, hex.EncodeToString(md5Bytes)) + hexMD5 := hex.EncodeToString(md5Bytes) + partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, hexMD5) case authTypePresigned, authTypeSigned: - validateRegion := true // Validate region. // Initialize a pipe for data pipe line. reader, writer := io.Pipe() - + var wg = &sync.WaitGroup{} // Start writing in a routine. + wg.Add(1) go func() { + defer wg.Done() shaWriter := sha256.New() multiWriter := io.MultiWriter(shaWriter, writer) - if _, err = io.CopyN(multiWriter, r.Body, size); err != nil { - errorIf(err, "Unable to read HTTP body.", nil) - writer.CloseWithError(err) + if _, wErr := io.CopyN(multiWriter, r.Body, size); wErr != nil { + // Pipe closed, just ignore it. + if wErr == io.ErrClosedPipe { + return + } + errorIf(wErr, "Unable to read HTTP body.", nil) + writer.CloseWithError(wErr) return } shaPayload := shaWriter.Sum(nil) + validateRegion := true // Validate region. var s3Error APIErrorCode if isRequestSignatureV4(r) { s3Error = doesSignatureMatch(hex.EncodeToString(shaPayload), r, validateRegion) @@ -734,10 +753,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } if s3Error != ErrNone { if s3Error == ErrSignatureDoesNotMatch { - writer.CloseWithError(errSignatureMismatch) - return + err = errSignatureMismatch + } else { + err = fmt.Errorf("%v", getAPIError(s3Error)) } - writer.CloseWithError(fmt.Errorf("%v", getAPIError(s3Error))) + writer.CloseWithError(err) return } // Close the writer. @@ -745,6 +765,10 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http }() md5SumHex := hex.EncodeToString(md5Bytes) partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, md5SumHex) + // Close the pipe. + reader.Close() + // Wait for all the routines to finish. + wg.Wait() } if err != nil { errorIf(err, "PutObjectPart failed.", nil) diff --git a/server_test.go b/server_test.go index 4f50649dc..e7c6fba3b 100644 --- a/server_test.go +++ b/server_test.go @@ -376,7 +376,7 @@ func (s *MyAPISuite) TestDeleteObject(c *C) { c.Assert(err, IsNil) c.Assert(response.StatusCode, Equals, http.StatusNoContent) - // Delete non existant object should return http.StatusNoContent. + // Delete non existent object should return http.StatusNoContent. request, err = s.newRequest("DELETE", testAPIFSCacheServer.URL+"/deletebucketobject/myobject1", 0, nil) c.Assert(err, IsNil) client = http.Client{} @@ -385,8 +385,8 @@ func (s *MyAPISuite) TestDeleteObject(c *C) { c.Assert(response.StatusCode, Equals, http.StatusNoContent) } -func (s *MyAPISuite) TestNonExistantBucket(c *C) { - request, err := s.newRequest("HEAD", testAPIFSCacheServer.URL+"/nonexistantbucket", 0, nil) +func (s *MyAPISuite) TestNonExistentBucket(c *C) { + request, err := s.newRequest("HEAD", testAPIFSCacheServer.URL+"/nonexistentbucket", 0, nil) c.Assert(err, IsNil) client := http.Client{} @@ -686,9 +686,9 @@ func (s *MyAPISuite) TestListBuckets(c *C) { c.Assert(err, IsNil) } -func (s *MyAPISuite) TestNotBeAbleToCreateObjectInNonexistantBucket(c *C) { +func (s *MyAPISuite) TestNotBeAbleToCreateObjectInNonexistentBucket(c *C) { buffer1 := bytes.NewReader([]byte("hello world")) - request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/innonexistantbucket/object", int64(buffer1.Len()), buffer1) + request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/innonexistentbucket/object", int64(buffer1.Len()), buffer1) c.Assert(err, IsNil) client := http.Client{} diff --git a/server_xl_test.go b/server_xl_test.go index 0be2a9c8d..6e5392ea9 100644 --- a/server_xl_test.go +++ b/server_xl_test.go @@ -393,8 +393,8 @@ func (s *MyAPIXLSuite) TestDeleteObject(c *C) { c.Assert(response.StatusCode, Equals, http.StatusNoContent) } -func (s *MyAPIXLSuite) TestNonExistantBucket(c *C) { - request, err := s.newRequest("HEAD", testAPIXLServer.URL+"/nonexistantbucket", 0, nil) +func (s *MyAPIXLSuite) TestNonExistentBucket(c *C) { + request, err := s.newRequest("HEAD", testAPIXLServer.URL+"/nonexistentbucket", 0, nil) c.Assert(err, IsNil) client := http.Client{} @@ -713,9 +713,9 @@ func (s *MyAPIXLSuite) TestListBuckets(c *C) { c.Assert(err, IsNil) } -func (s *MyAPIXLSuite) TestNotBeAbleToCreateObjectInNonexistantBucket(c *C) { +func (s *MyAPIXLSuite) TestNotBeAbleToCreateObjectInNonexistentBucket(c *C) { buffer1 := bytes.NewReader([]byte("hello world")) - request, err := s.newRequest("PUT", testAPIXLServer.URL+"/innonexistantbucket/object", int64(buffer1.Len()), buffer1) + request, err := s.newRequest("PUT", testAPIXLServer.URL+"/innonexistentbucket/object", int64(buffer1.Len()), buffer1) c.Assert(err, IsNil) client := http.Client{}