From 3109909355c99fdabbcd0f1133cbd499ec0ad309 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 29 Jun 2015 12:23:48 -0700 Subject: [PATCH] Handle couple of cases of OOM conditions, move caching to GetObject() rather than PutObject() --- pkg/storage/drivers/donut/donut.go | 40 +++++++++++--------------- pkg/storage/drivers/donut/multipart.go | 13 ++++++--- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index a38c61867..00660daf7 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -325,10 +325,16 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6 return 0, iodine.New(drivers.InternalError{}, nil) } } - n, err := io.CopyN(w, reader, size) + pw := newProxyWriter(w) + n, err := io.CopyN(pw, reader, size) if err != nil { return 0, iodine.New(err, nil) } + // Save in memory for future reads + d.objects.Set(objectKey, pw.writtenBytes) + // free up + pw.writtenBytes = nil + go debug.FreeOSMemory() return n, nil } written, err := io.Copy(w, bytes.NewBuffer(data)) @@ -492,31 +498,22 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso return results, resources, nil } -type proxyReader struct { - reader io.Reader - readBytes []byte +type proxyWriter struct { + writer io.Writer + writtenBytes []byte } -func (r *proxyReader) free(p []byte) { - p = nil - go debug.FreeOSMemory() -} -func (r *proxyReader) Read(p []byte) (n int, err error) { - defer r.free(p) - n, err = r.reader.Read(p) - if err == io.EOF || err == io.ErrUnexpectedEOF { - r.readBytes = append(r.readBytes, p[0:n]...) - return - } +func (r *proxyWriter) Write(p []byte) (n int, err error) { + n, err = r.writer.Write(p) if err != nil { return } - r.readBytes = append(r.readBytes, p[0:n]...) + r.writtenBytes = append(r.writtenBytes, p[0:n]...) return } -func newProxyReader(r io.Reader) *proxyReader { - return &proxyReader{reader: r, readBytes: nil} +func newProxyWriter(w io.Writer) *proxyWriter { + return &proxyWriter{writer: w, writtenBytes: nil} } // CreateObject creates a new object @@ -565,8 +562,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - newReader := newProxyReader(reader) - objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, newReader, metadata) + objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, reader, metadata) if err != nil { switch iodine.ToError(err).(type) { case donut.BadDigest: @@ -574,10 +570,6 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM } return "", iodine.New(err, errParams) } - d.objects.Set(objectKey, newReader.readBytes) - // free up - newReader.readBytes = nil - go debug.FreeOSMemory() newObject := drivers.ObjectMetadata{ Bucket: bucketName, Key: objectName, diff --git a/pkg/storage/drivers/donut/multipart.go b/pkg/storage/drivers/donut/multipart.go index 1b75a9fbe..2fe388dc3 100644 --- a/pkg/storage/drivers/donut/multipart.go +++ b/pkg/storage/drivers/donut/multipart.go @@ -193,6 +193,7 @@ func (d donutDriver) createObjectPart(bucketName, objectName, uploadID string, p d.lock.Unlock() // setting up for de-allocation readBytes = nil + go debug.FreeOSMemory() md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such @@ -258,6 +259,7 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st d.lock.Lock() var size int64 + fullHasher := md5.New() var fullObject bytes.Buffer for i := 1; i <= len(parts); i++ { recvMD5 := parts[i] @@ -280,7 +282,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st Key: getMultipartKey(objectName, uploadID, i), }, nil) } - _, err = io.Copy(&fullObject, bytes.NewBuffer(object)) + mw := io.MultiWriter(&fullObject, fullHasher) + _, err = io.Copy(mw, bytes.NewReader(object)) if err != nil { return "", iodine.New(err, nil) } @@ -289,9 +292,9 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st } d.lock.Unlock() - md5sumSlice := md5.Sum(fullObject.Bytes()) + md5sumSlice := fullHasher.Sum(nil) // this is needed for final verification inside CreateObject, do not convert this to hex - md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) + md5sum := base64.StdEncoding.EncodeToString(md5sumSlice) etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject) if err != nil { // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() @@ -299,6 +302,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st return "", iodine.New(err, nil) } fullObject.Reset() + go debug.FreeOSMemory() + d.cleanupMultiparts(bucketName, objectName, uploadID) d.cleanupMultipartSession(bucketName, objectName, uploadID) return etag, nil @@ -421,5 +426,5 @@ func (d donutDriver) expiredPart(a ...interface{}) { for _, storedBucket := range d.storedBuckets { delete(storedBucket.partMetadata, key) } - debug.FreeOSMemory() + go debug.FreeOSMemory() }