From 3b2486ebaf841df60070bc3ded80956af9b5497c Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 31 Jan 2018 13:17:24 -0800 Subject: [PATCH] Lock free multipart backend implementation for FS (#5401) --- cmd/format-fs.go | 78 +- cmd/format-fs_test.go | 4 +- cmd/fs-v1-background-append.go | 240 ------ cmd/fs-v1-helpers.go | 4 +- cmd/fs-v1-metadata.go | 70 -- cmd/fs-v1-multipart.go | 1157 ++++++++++------------------- cmd/fs-v1-multipart_test.go | 88 +-- cmd/fs-v1.go | 80 +- cmd/object-api-multipart_test.go | 22 +- cmd/object-handlers.go | 1 + cmd/xl-v1-multipart.go | 1 + pkg/ioutil/append-file_nix.go | 43 ++ pkg/ioutil/append-file_windows.go | 43 ++ pkg/ioutil/ioutil.go | 5 + pkg/ioutil/ioutil_test.go | 36 + pkg/lock/lock_nix.go | 5 + pkg/lock/lock_windows.go | 6 +- 17 files changed, 668 insertions(+), 1215 deletions(-) delete mode 100644 cmd/fs-v1-background-append.go create mode 100644 pkg/ioutil/append-file_nix.go create mode 100644 pkg/ioutil/append-file_windows.go diff --git a/cmd/format-fs.go b/cmd/format-fs.go index 63227a4cb..78f0873d7 100644 --- a/cmd/format-fs.go +++ b/cmd/format-fs.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "path" "time" errors2 "github.com/minio/minio/pkg/errors" @@ -31,6 +32,7 @@ import ( const ( formatBackendFS = "fs" formatFSVersionV1 = "1" + formatFSVersionV2 = "2" ) // formatFSV1 - structure holds format version '1'. @@ -41,6 +43,12 @@ type formatFSV1 struct { } `json:"fs"` } +// formatFSV2 - structure is same as formatFSV1. But the multipart backend +// structure is flat instead of hierarchy now. +// In .minio.sys/multipart we have: +// sha256(bucket/object)/uploadID/[fs.json, 1.etag, 2.etag ....] +type formatFSV2 = formatFSV1 + // Used to detect the version of "fs" format. type formatFSVersionDetect struct { FS struct { @@ -48,7 +56,7 @@ type formatFSVersionDetect struct { } `json:"fs"` } -// Returns the latest "fs" format. +// Returns the latest "fs" format V1 func newFormatFSV1() (format *formatFSV1) { f := &formatFSV1{} f.Version = formatMetaVersionV1 @@ -57,6 +65,15 @@ func newFormatFSV1() (format *formatFSV1) { return f } +// Returns the latest "fs" format V2 +func newFormatFSV2() (format *formatFSV2) { + f := &formatFSV2{} + f.Version = formatMetaVersionV1 + f.Format = formatBackendFS + f.FS.Version = formatFSVersionV2 + return f +} + // Save to fs format.json func formatFSSave(f *os.File, data interface{}) error { b, err := json.Marshal(data) @@ -98,20 +115,57 @@ func formatFSGetVersion(r io.ReadSeeker) (string, error) { return format.FS.Version, nil } -// Migrate the "fs" backend. -// Migration should happen when formatFSV1.FS.Version changes. This version -// can change when there is a change to the struct formatFSV1.FS or if there -// is any change in the backend file system tree structure. -func formatFSMigrate(wlk *lock.LockedFile) error { - // Add any migration code here in case we bump format.FS.Version - - // Make sure that the version is what we expect after the migration. +// Migrate from V1 to V2. V2 implements new backend format for multipart +// uploads. Delete the previous multipart directory. +func formatFSMigrateV1ToV2(wlk *lock.LockedFile, fsPath string) error { version, err := formatFSGetVersion(wlk) if err != nil { return err } + if version != formatFSVersionV1 { - return fmt.Errorf(`%s file: expected FS version: %s, found FS version: %s`, formatConfigFile, formatFSVersionV1, version) + return fmt.Errorf(`format.json version expected %s, found %s`, formatFSVersionV1, version) + } + + if err = fsRemoveAll(path.Join(fsPath, minioMetaMultipartBucket)); err != nil { + return err + } + + if err = os.MkdirAll(path.Join(fsPath, minioMetaMultipartBucket), 0755); err != nil { + return err + } + + return formatFSSave(wlk.File, newFormatFSV2()) +} + +// Migrate the "fs" backend. +// Migration should happen when formatFSV1.FS.Version changes. This version +// can change when there is a change to the struct formatFSV1.FS or if there +// is any change in the backend file system tree structure. +func formatFSMigrate(wlk *lock.LockedFile, fsPath string) error { + // Add any migration code here in case we bump format.FS.Version + version, err := formatFSGetVersion(wlk) + if err != nil { + return err + } + + switch version { + case formatFSVersionV1: + if err = formatFSMigrateV1ToV2(wlk, fsPath); err != nil { + return err + } + fallthrough + case formatFSVersionV2: + // We are at the latest version. + } + + // Make sure that the version is what we expect after the migration. + version, err = formatFSGetVersion(wlk) + if err != nil { + return err + } + if version != formatFSVersionV2 { + return fmt.Errorf(`%s file: expected FS version: %s, found FS version: %s`, formatConfigFile, formatFSVersionV2, version) } return nil } @@ -196,7 +250,7 @@ func initFormatFS(fsPath string) (rlk *lock.RLockedFile, err error) { if err != nil { return nil, err } - if version != formatFSVersionV1 { + if version != formatFSVersionV2 { // Format needs migration rlk.Close() // Hold write lock during migration so that we do not disturb any @@ -211,7 +265,7 @@ func initFormatFS(fsPath string) (rlk *lock.RLockedFile, err error) { if err != nil { return nil, err } - err = formatFSMigrate(wlk) + err = formatFSMigrate(wlk, fsPath) wlk.Close() if err != nil { // Migration failed, bail out so that the user can observe what happened. diff --git a/cmd/format-fs_test.go b/cmd/format-fs_test.go index 213deb2e5..e0d28c382 100644 --- a/cmd/format-fs_test.go +++ b/cmd/format-fs_test.go @@ -62,8 +62,8 @@ func TestFSFormatFS(t *testing.T) { if err != nil { t.Fatal(err) } - if version != formatFSVersionV1 { - t.Fatalf(`expected: %s, got: %s`, formatFSVersionV1, version) + if version != formatFSVersionV2 { + t.Fatalf(`expected: %s, got: %s`, formatFSVersionV2, version) } // Corrupt the format.json file and test the functions. diff --git a/cmd/fs-v1-background-append.go b/cmd/fs-v1-background-append.go deleted file mode 100644 index 7ccc2b105..000000000 --- a/cmd/fs-v1-background-append.go +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "errors" - "io" - "os" - "reflect" - "sync" - "time" -) - -// Error sent by appendParts go-routine when there are holes in parts. -// For ex. let's say client uploads part-2 before part-1 in which case we -// can not append and have to wait till part-1 is uploaded. Hence we return -// this error. Currently this error is not used in the caller. -var errPartsMissing = errors.New("required parts missing") - -// Error sent when appendParts go-routine has waited long enough and timedout. -var errAppendPartsTimeout = errors.New("appendParts go-routine timeout") - -// Timeout value for the appendParts go-routine. -var appendPartsTimeout = 24 * 60 * 60 * time.Second // 24 Hours. - -// Holds a map of uploadID->appendParts go-routine -type backgroundAppend struct { - sync.Mutex - infoMap map[string]bgAppendPartsInfo - appendFile io.WriteCloser -} - -// Input to the appendParts go-routine -type bgAppendPartsInput struct { - meta fsMetaV1 // list of parts that need to be appended - errCh chan error // error sent by appendParts go-routine -} - -// Identifies an appendParts go-routine. -type bgAppendPartsInfo struct { - inputCh chan bgAppendPartsInput - timeoutCh chan struct{} // closed by appendParts go-routine when it timesout - abortCh chan struct{} // closed after abort of upload to end the appendParts go-routine - completeCh chan struct{} // closed after complete of upload to end the appendParts go-routine -} - -// Called after a part is uploaded so that it can be appended in the background. -func (fs fsObjects) append(bucket, object, uploadID string, meta fsMetaV1) chan error { - fs.bgAppend.Lock() - info, ok := fs.bgAppend.infoMap[uploadID] - if !ok { - // Corresponding appendParts go-routine was not found, create a new one. Would happen when the first - // part of a multipart upload is uploaded. - inputCh := make(chan bgAppendPartsInput) - timeoutCh := make(chan struct{}) - abortCh := make(chan struct{}) - completeCh := make(chan struct{}) - - info = bgAppendPartsInfo{inputCh, timeoutCh, abortCh, completeCh} - fs.bgAppend.infoMap[uploadID] = info - - go fs.appendParts(bucket, object, uploadID, info) - } - fs.bgAppend.Unlock() - - errCh := make(chan error) - go func() { - // send input in a goroutine as send on the inputCh can block if appendParts go-routine - // is busy appending a part. - select { - case <-info.timeoutCh: - // This is to handle a rare race condition where we found info in b.infoMap - // but soon after that appendParts go-routine timed out. - errCh <- errAppendPartsTimeout - case info.inputCh <- bgAppendPartsInput{meta, errCh}: - } - }() - - return errCh -} - -// Called on complete-multipart-upload. Returns nil if the required parts have been appended. -func (fs *fsObjects) complete(bucket, object, uploadID string, meta fsMetaV1) error { - fs.bgAppend.Lock() - defer fs.bgAppend.Unlock() - - info, ok := fs.bgAppend.infoMap[uploadID] - delete(fs.bgAppend.infoMap, uploadID) - if !ok { - return errPartsMissing - } - - errCh := make(chan error) - - select { - case <-info.timeoutCh: - // This is to handle a rare race condition where we found info in b.infoMap - // but soon after that appendParts go-routine timedouted out. - return errAppendPartsTimeout - case info.inputCh <- bgAppendPartsInput{meta, errCh}: - } - - err := <-errCh - - close(info.completeCh) - - return err -} - -// Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling. -func (fs fsObjects) abort(uploadID string) { - fs.bgAppend.Lock() - defer fs.bgAppend.Unlock() - - info, ok := fs.bgAppend.infoMap[uploadID] - if !ok { - return - } - - delete(fs.bgAppend.infoMap, uploadID) - - info.abortCh <- struct{}{} -} - -// This is run as a go-routine that appends the parts in the background. -func (fs fsObjects) appendParts(bucket, object, uploadID string, info bgAppendPartsInfo) { - appendPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID) - // Holds the list of parts that is already appended to the "append" file. - appendMeta := fsMetaV1{} - - // Allocate staging read buffer. - buf := make([]byte, readSizeV1) - for { - select { - case input := <-info.inputCh: - // We receive on this channel when new part gets uploaded or when complete-multipart sends - // a value on this channel to confirm if all the required parts are appended. - meta := input.meta - - for { - // Append should be done such a way that if part-3 and part-2 is uploaded before part-1, we - // wait till part-1 is uploaded after which we append part-2 and part-3 as well in this for-loop. - part, appendNeeded := partToAppend(meta, appendMeta) - if !appendNeeded { - if reflect.DeepEqual(meta.Parts, appendMeta.Parts) { - // Sending nil is useful so that the complete-multipart-upload knows that - // all the required parts have been appended. - input.errCh <- nil - } else { - // Sending error is useful so that complete-multipart-upload can fall-back to - // its own append process. - input.errCh <- errPartsMissing - } - break - } - - if err := fs.appendPart(bucket, object, uploadID, part, buf); err != nil { - fsRemoveFile(appendPath) - appendMeta.Parts = nil - input.errCh <- err - break - } - - appendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size) - } - case <-info.abortCh: - // abort-multipart-upload closed abortCh to end the appendParts go-routine. - fsRemoveFile(appendPath) - - // So that any racing PutObjectPart does not leave a dangling go-routine. - close(info.timeoutCh) - - return - case <-info.completeCh: - // complete-multipart-upload closed completeCh to end the appendParts go-routine. - close(info.timeoutCh) // So that any racing PutObjectPart does not leave a dangling go-routine. - return - case <-time.After(appendPartsTimeout): - // Timeout the goroutine to garbage collect its resources. This would happen if the client initiates - // a multipart upload and does not complete/abort it. - fs.bgAppend.Lock() - delete(fs.bgAppend.infoMap, uploadID) - fs.bgAppend.Unlock() - - // Delete the temporary append file as well. - fsRemoveFile(appendPath) - - close(info.timeoutCh) - return - } - } -} - -// Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location -// upon complete-multipart-upload. -func (fs fsObjects) appendPart(bucket, object, uploadID string, part objectPartInfo, buf []byte) error { - partPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadID, part.Name) - - var offset int64 - // Read each file part to start writing to the temporary concatenated object. - file, size, err := fsOpenFile(partPath, offset) - if err != nil { - if err == errFileNotFound { - return errPartsMissing - } - return err - } - defer file.Close() - - tmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID) - // No need to hold a lock, this is a unique file and will be only written - // to one one process per uploadID per minio process. - wfile, err := os.OpenFile((tmpObjPath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) - if err != nil { - return err - } - defer wfile.Close() - - // Fallocate more space as we concatenate. - if err = fsFAllocate(int(wfile.Fd()), 0, size); err != nil { - return err - } - - _, err = io.CopyBuffer(wfile, file, buf) - return err -} diff --git a/cmd/fs-v1-helpers.go b/cmd/fs-v1-helpers.go index 138d5bea3..1fc9b8c4c 100644 --- a/cmd/fs-v1-helpers.go +++ b/cmd/fs-v1-helpers.go @@ -23,6 +23,7 @@ import ( "runtime" "github.com/minio/minio/pkg/errors" + "github.com/minio/minio/pkg/lock" ) // Removes only the file at given path does not remove @@ -266,7 +267,7 @@ func fsCreateFile(filePath string, reader io.Reader, buf []byte, fallocSize int6 return 0, errors.Trace(err) } - writer, err := os.OpenFile((filePath), os.O_CREATE|os.O_WRONLY, 0666) + writer, err := lock.Open(filePath, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return 0, osErrToFSFileErr(err) } @@ -291,6 +292,7 @@ func fsCreateFile(filePath string, reader io.Reader, buf []byte, fallocSize int6 return 0, errors.Trace(err) } } + return bytesWritten, nil } diff --git a/cmd/fs-v1-metadata.go b/cmd/fs-v1-metadata.go index b90b6208f..9a7390eb4 100644 --- a/cmd/fs-v1-metadata.go +++ b/cmd/fs-v1-metadata.go @@ -22,7 +22,6 @@ import ( "io/ioutil" "os" pathutil "path" - "sort" "strings" "github.com/minio/minio/pkg/errors" @@ -127,41 +126,6 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo return objInfo } -// ObjectPartIndex - returns the index of matching object part number. -func (m fsMetaV1) ObjectPartIndex(partNumber int) (partIndex int) { - for i, part := range m.Parts { - if partNumber == part.Number { - partIndex = i - return partIndex - } - } - return -1 -} - -// AddObjectPart - add a new object part in order. -func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) { - partInfo := objectPartInfo{ - Number: partNumber, - Name: partName, - ETag: partETag, - Size: partSize, - } - - // Update part info if it already exists. - for i, part := range m.Parts { - if partNumber == part.Number { - m.Parts[i] = partInfo - return - } - } - - // Proceed to include new part info. - m.Parts = append(m.Parts, partInfo) - - // Parts in fsMeta should be in sorted order by part number. - sort.Sort(byObjectPartNumber(m.Parts)) -} - func (m *fsMetaV1) WriteTo(lk *lock.LockedFile) (n int64, err error) { var metadataBytes []byte metadataBytes, err = json.Marshal(m) @@ -203,21 +167,6 @@ func parseFSMetaMap(fsMetaBuf []byte) map[string]string { return metaMap } -func parseFSParts(fsMetaBuf []byte) []objectPartInfo { - // Parse the FS Parts. - partsResult := gjson.GetBytes(fsMetaBuf, "parts").Array() - partInfo := make([]objectPartInfo, len(partsResult)) - for i, p := range partsResult { - info := objectPartInfo{} - info.Number = int(p.Get("number").Int()) - info.Name = p.Get("name").String() - info.ETag = p.Get("etag").String() - info.Size = p.Get("size").Int() - partInfo[i] = info - } - return partInfo -} - func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) { var fsMetaBuf []byte fi, err := lk.Stat() @@ -249,9 +198,6 @@ func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) { // obtain metadata. m.Meta = parseFSMetaMap(fsMetaBuf) - // obtain parts info list. - m.Parts = parseFSParts(fsMetaBuf) - // obtain minio release date. m.Minio.Release = parseFSRelease(fsMetaBuf) @@ -267,19 +213,3 @@ func newFSMetaV1() (fsMeta fsMetaV1) { fsMeta.Minio.Release = ReleaseTag return fsMeta } - -// Return if the part info in uploadedParts and CompleteParts are same. -func isPartsSame(uploadedParts []objectPartInfo, CompleteParts []CompletePart) bool { - if len(uploadedParts) != len(CompleteParts) { - return false - } - - for i := range CompleteParts { - if uploadedParts[i].Number != CompleteParts[i].PartNumber || - uploadedParts[i].ETag != CompleteParts[i].ETag { - return false - } - } - - return true -} diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 385015fa8..2e35d5863 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -18,324 +18,184 @@ package cmd import ( "encoding/hex" + "encoding/json" "fmt" "io" + "io/ioutil" "os" pathutil "path" + "sort" + "strconv" "strings" "time" "github.com/minio/minio/pkg/errors" + mioutil "github.com/minio/minio/pkg/ioutil" + "github.com/minio/minio/pkg/hash" - "github.com/minio/minio/pkg/lock" ) -// Returns if the prefix is a multipart upload. -func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { - uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile) - _, err := fsStatFile(uploadsIDPath) +const ( + // Expiry duration after which the multipart uploads are deemed stale. + fsMultipartExpiry = time.Hour * 24 * 14 // 2 weeks. + // Cleanup interval when the stale multipart cleanup is initiated. + fsMultipartCleanupInterval = time.Hour * 24 // 24 hrs. +) + +// Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID +func (fs *fsObjects) getUploadIDDir(bucket, object, uploadID string) string { + return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object))), uploadID) +} + +// Returns EXPORT/.minio.sys/multipart/SHA256 +func (fs *fsObjects) getMultipartSHADir(bucket, object string) string { + return pathJoin(fs.fsPath, minioMetaMultipartBucket, getSHA256Hash([]byte(pathJoin(bucket, object)))) +} + +// Returns partNumber.etag +func (fs *fsObjects) encodePartFile(partNumber int, etag string) string { + return fmt.Sprintf("%.5d.%s", partNumber, etag) +} + +// Returns partNumber and etag +func (fs *fsObjects) decodePartFile(name string) (partNumber int, etag string, err error) { + result := strings.Split(name, ".") + if len(result) != 2 { + return 0, "", errUnexpected + } + partNumber, err = strconv.Atoi(result[0]) if err != nil { - if errors.Cause(err) == errFileNotFound { - return false - } - errorIf(err, "Unable to access uploads.json "+uploadsIDPath) - return false + return 0, "", errUnexpected } - return true + return partNumber, result[1], nil } -// Delete uploads.json file wrapper -func (fs fsObjects) deleteUploadsJSON(bucket, object, uploadID string) error { - multipartBucketPath := pathJoin(fs.fsPath, minioMetaMultipartBucket) - uploadPath := pathJoin(multipartBucketPath, bucket, object) - uploadsMetaPath := pathJoin(uploadPath, uploadsJSONFile) +// Appends parts to an appendFile sequentially. +func (fs *fsObjects) backgroundAppend(bucket, object, uploadID string) { + fs.appendFileMapMu.Lock() + file := fs.appendFileMap[uploadID] + if file == nil { + file = &fsAppendFile{ + filePath: pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())), + } + fs.appendFileMap[uploadID] = file + } + fs.appendFileMapMu.Unlock() - tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID) + file.Lock() + defer file.Unlock() - return fsRemoveMeta(multipartBucketPath, uploadsMetaPath, tmpDir) -} + // Since we append sequentially nextPartNumber will always be len(file.parts)+1 + nextPartNumber := len(file.parts) + 1 + uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) -// Removes the uploadID, called either by CompleteMultipart of AbortMultipart. If the resuling uploads -// slice is empty then we remove/purge the file. -func (fs fsObjects) removeUploadID(bucket, object, uploadID string, rwlk *lock.LockedFile) (bool, error) { - uploadIDs := uploadsV1{} - _, err := uploadIDs.ReadFrom(rwlk) + entries, err := readDir(uploadIDDir) if err != nil { - return false, err + errorIf(err, "error reading directory %s", uploadIDDir) + return } + sort.Strings(entries) - // Removes upload id from the uploads list. - uploadIDs.RemoveUploadID(uploadID) - - // Check this is the last entry. - if uploadIDs.IsEmpty() { - // No more uploads left, so we delete `uploads.json` file. - return true, fs.deleteUploadsJSON(bucket, object, uploadID) - } // else not empty - - // Write update `uploads.json`. - _, err = uploadIDs.WriteTo(rwlk) - return false, err -} - -// Adds a new uploadID if no previous `uploads.json` is -// found we initialize a new one. -func (fs fsObjects) addUploadID(bucket, object, uploadID string, initiated time.Time, rwlk *lock.LockedFile) error { - uploadIDs := uploadsV1{} - - _, err := uploadIDs.ReadFrom(rwlk) - // For all unexpected errors, we return. - if err != nil && errors.Cause(err) != io.EOF { - return err - } - - // If we couldn't read anything, we assume a default - // (empty) upload info. - if errors.Cause(err) == io.EOF { - uploadIDs = newUploadsV1("fs") - } - - // Adds new upload id to the list. - uploadIDs.AddUploadID(uploadID, initiated) - - // Write update `uploads.json`. - _, err = uploadIDs.WriteTo(rwlk) - return err -} - -// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. -func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int) ([]MultipartInfo, bool, error) { - var uploads []MultipartInfo - - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucketName, objectName)) - if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { - return nil, false, errors.Trace(err) - } - defer objectMPartPathLock.RUnlock() - - uploadsPath := pathJoin(bucketName, objectName, uploadsJSONFile) - rlk, err := fs.rwPool.Open(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath)) - if err != nil { - if err == errFileNotFound || err == errFileAccessDenied { - return nil, true, nil + for _, entry := range entries { + if entry == fsMetaJSONFile { + continue } - return nil, false, errors.Trace(err) - } - defer fs.rwPool.Close(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath)) - - // Read `uploads.json`. - uploadIDs := uploadsV1{} - if _, err = uploadIDs.ReadFrom(rlk.LockedFile); err != nil { - return nil, false, err - } - - index := 0 - if uploadIDMarker != "" { - for ; index < len(uploadIDs.Uploads); index++ { - if uploadIDs.Uploads[index].UploadID == uploadIDMarker { - // Skip the uploadID as it would already be listed in previous listing. - index++ - break - } - } - } - - for index < len(uploadIDs.Uploads) { - uploads = append(uploads, MultipartInfo{ - Object: objectName, - UploadID: uploadIDs.Uploads[index].UploadID, - Initiated: uploadIDs.Uploads[index].Initiated, - }) - count-- - index++ - if count == 0 { - break - } - } - - end := (index == len(uploadIDs.Uploads)) - return uploads, end, nil -} - -// listMultipartUploadsCleanup - lists all multipart uploads. Called by fs.cleanupStaleMultipartUpload() -func (fs fsObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - result := ListMultipartsInfo{} - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - result.IsTruncated = true - result.MaxUploads = maxUploads - result.KeyMarker = keyMarker - result.Prefix = prefix - result.Delimiter = delimiter - - // Not using path.Join() as it strips off the trailing '/'. - multipartPrefixPath := pathJoin(bucket, prefix) - if prefix == "" { - // Should have a trailing "/" if prefix is "" - // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" - multipartPrefixPath += slashSeparator - } - multipartMarkerPath := "" - if keyMarker != "" { - multipartMarkerPath = pathJoin(bucket, keyMarker) - } - - var uploads []MultipartInfo - var err error - var eof bool - - if uploadIDMarker != "" { - uploads, _, err = fs.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads) + partNumber, etag, err := fs.decodePartFile(entry) if err != nil { - return lmi, err + errorIf(err, "unable to split the file name into partNumber and etag: %s", entry) + return } - maxUploads = maxUploads - len(uploads) - } - - var walkResultCh chan treeWalkResult - var endWalkCh chan struct{} - - // true only for xl.ListObjectsHeal(), set to false. - heal := false - - // Proceed to list only if we have more uploads to be listed. - if maxUploads > 0 { - listPrms := listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal} - - // Pop out any previously waiting marker. - walkResultCh, endWalkCh = fs.listPool.Release(listPrms) - if walkResultCh == nil { - endWalkCh = make(chan struct{}) - isLeaf := fs.isMultipartUpload - listDir := fs.listDirFactory(isLeaf) - walkResultCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath, - multipartMarkerPath, recursive, listDir, isLeaf, endWalkCh) + if partNumber < nextPartNumber { + // Part already appended. + continue + } + if partNumber > nextPartNumber { + // Required part number is not yet uploaded. + return } - // List until maxUploads requested. - for maxUploads > 0 { - walkResult, ok := <-walkResultCh - if !ok { - // Closed channel. - eof = true - break - } - - // For any walk error return right away. - if walkResult.err != nil { - // File not found or Disk not found is a valid case. - if errors.IsErrIgnored(walkResult.err, fsTreeWalkIgnoredErrs...) { - eof = true - break - } - return lmi, walkResult.err - } - - entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket)) - if hasSuffix(walkResult.entry, slashSeparator) { - uploads = append(uploads, MultipartInfo{ - Object: entry, - }) - maxUploads-- - if maxUploads == 0 { - if walkResult.end { - eof = true - break - } - } - continue - } - - var tmpUploads []MultipartInfo - var end bool - uploadIDMarker = "" - - tmpUploads, end, err = fs.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads) - if err != nil { - return lmi, err - } - - uploads = append(uploads, tmpUploads...) - maxUploads -= len(tmpUploads) - if walkResult.end && end { - eof = true - break - } + partPath := pathJoin(uploadIDDir, entry) + err = mioutil.AppendFile(file.filePath, partPath) + if err != nil { + errorIf(err, "Unable to append %s to %s", partPath, file.filePath) + return } - } - // Loop through all the received uploads fill in the multiparts result. - for _, upload := range uploads { - var objectName string - var uploadID string - if hasSuffix(upload.Object, slashSeparator) { - // All directory entries are common prefixes. - uploadID = "" // Upload ids are empty for CommonPrefixes. - objectName = upload.Object - result.CommonPrefixes = append(result.CommonPrefixes, objectName) - } else { - uploadID = upload.UploadID - objectName = upload.Object - result.Uploads = append(result.Uploads, upload) - } - result.NextKeyMarker = objectName - result.NextUploadIDMarker = uploadID + file.parts = append(file.parts, PartInfo{PartNumber: nextPartNumber, ETag: etag}) + nextPartNumber++ } - - if !eof { - // Save the go-routine state in the pool so that it can continue from where it left off on - // the next request. - fs.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkResultCh, endWalkCh) - } - - result.IsTruncated = !eof - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" - } - - // Success. - return result, nil } // ListMultipartUploads - lists all the uploadIDs for the specified object. // We do not support prefix based listing. -func (fs fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { +func (fs *fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, e error) { if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, fs); err != nil { - return lmi, err + return result, toObjectErr(errors.Trace(err)) } if _, err := fs.statBucketDir(bucket); err != nil { - return lmi, toObjectErr(err, bucket) + return result, toObjectErr(errors.Trace(err), bucket) } - result := ListMultipartsInfo{} - - result.IsTruncated = true result.MaxUploads = maxUploads result.KeyMarker = keyMarker result.Prefix = object result.Delimiter = delimiter - - uploads, _, err := fs.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads) - if err != nil { - return lmi, err - } - result.NextKeyMarker = object - // Loop through all the received uploads fill in the multiparts result. - for _, upload := range uploads { - uploadID := upload.UploadID - result.Uploads = append(result.Uploads, upload) - result.NextUploadIDMarker = uploadID + result.UploadIDMarker = uploadIDMarker + + uploadIDs, err := readDir(fs.getMultipartSHADir(bucket, object)) + if err != nil { + if err == errFileNotFound { + result.IsTruncated = false + return result, nil + } + return result, toObjectErr(errors.Trace(err)) } - result.IsTruncated = len(uploads) == maxUploads + // S3 spec says uploaIDs should be sorted based on initiated time. ModTime of fs.json + // is the creation time of the uploadID, hence we will use that. + var uploads []MultipartInfo + for _, uploadID := range uploadIDs { + metaFilePath := pathJoin(fs.getMultipartSHADir(bucket, object), uploadID, fsMetaJSONFile) + fi, err := fsStatFile(metaFilePath) + if err != nil { + return result, toObjectErr(err, bucket, object) + } + uploads = append(uploads, MultipartInfo{ + Object: object, + UploadID: strings.TrimSuffix(uploadID, slashSeparator), + Initiated: fi.ModTime(), + }) + } + sort.Slice(uploads, func(i int, j int) bool { + return uploads[i].Initiated.Before(uploads[j].Initiated) + }) + + uploadIndex := 0 + if uploadIDMarker != "" { + for uploadIndex < len(uploads) { + if uploads[uploadIndex].UploadID != uploadIDMarker { + uploadIndex++ + continue + } + if uploads[uploadIndex].UploadID == uploadIDMarker { + uploadIndex++ + break + } + uploadIndex++ + } + } + for uploadIndex < len(uploads) { + result.Uploads = append(result.Uploads, uploads[uploadIndex]) + result.NextUploadIDMarker = uploads[uploadIndex].UploadID + uploadIndex++ + if len(result.Uploads) == maxUploads { + break + } + } + + result.IsTruncated = uploadIndex < len(uploads) if !result.IsTruncated { result.NextKeyMarker = "" @@ -345,126 +205,58 @@ func (fs fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark return result, nil } -// newMultipartUpload - wrapper for initializing a new multipart -// request, returns back a unique upload id. -// -// Internally this function creates 'uploads.json' associated for the -// incoming object at '.minio.sys/multipart/bucket/object/uploads.json' on -// all the disks. `uploads.json` carries metadata regarding on going -// multipart operation on the object. -func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) { - // Initialize `fs.json` values. - fsMeta := newFSMetaV1() - - // Save additional metadata. - fsMeta.Meta = meta - - uploadID = mustGetUUID() - initiated := UTCNow() - - // Add upload ID to uploads.json - uploadsPath := pathJoin(bucket, object, uploadsJSONFile) - rwlk, err := fs.rwPool.Create(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath)) - if err != nil { - return "", toObjectErr(errors.Trace(err), bucket, object) - } - defer rwlk.Close() - - uploadIDPath := pathJoin(bucket, object, uploadID) - fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) - metaFile, err := fs.rwPool.Create(fsMetaPath) - if err != nil { - return "", toObjectErr(errors.Trace(err), bucket, object) - } - defer metaFile.Close() - - // Add a new upload id. - if err = fs.addUploadID(bucket, object, uploadID, initiated, rwlk); err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Write all the set metadata. - if _, err = fsMeta.WriteTo(metaFile); err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Return success. - return uploadID, nil -} - // NewMultipartUpload - initialize a new multipart upload, returns a // unique id. The unique id returned here is of UUID form, for each // subsequent request each UUID is unique. // // Implements S3 compatible initiate multipart API. -func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { +func (fs *fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { if err := checkNewMultipartArgs(bucket, object, fs); err != nil { - return "", err + return "", toObjectErr(err, bucket) } if _, err := fs.statBucketDir(bucket); err != nil { return "", toObjectErr(err, bucket) } - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return "", err - } - defer objectMPartPathLock.Unlock() + uploadID := mustGetUUID() + uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) - return fs.newMultipartUpload(bucket, object, meta) -} - -// Returns if a new part can be appended to fsAppendDataFile. -func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, appendNeeded bool) { - if len(fsMeta.Parts) == 0 { - return + err := mkdirAll(uploadIDDir, 0755) + if err != nil { + return "", errors.Trace(err) } - // As fsAppendMeta.Parts will be sorted len(fsAppendMeta.Parts) will naturally be the next part number - nextPartNum := len(fsAppendMeta.Parts) + 1 - nextPartIndex := fsMeta.ObjectPartIndex(nextPartNum) - if nextPartIndex == -1 { - return + // Initialize fs.json values. + fsMeta := newFSMetaV1() + fsMeta.Meta = meta + + fsMetaBytes, err := json.Marshal(fsMeta) + if err != nil { + return "", errors.Trace(err) } - return fsMeta.Parts[nextPartIndex], true + if err = ioutil.WriteFile(pathJoin(uploadIDDir, fsMetaJSONFile), fsMetaBytes, 0644); err != nil { + return "", errors.Trace(err) + } + return uploadID, nil } // CopyObjectPart - similar to PutObjectPart but reads data from an existing // object. Internally incoming data is written to '.minio.sys/tmp' location // and safely renamed to '.minio.sys/multipart' for reach parts. -func (fs fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, +func (fs *fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string, srcEtag string) (pi PartInfo, e error) { - // Hold read locks on source object only if we are - // going to read data from source object. - objectSRLock := fs.nsMutex.NewNSLock(srcBucket, srcObject) - if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil { - return pi, err - } - defer objectSRLock.RUnlock() - if err := checkNewMultipartArgs(srcBucket, srcObject, fs); err != nil { - return pi, err + return pi, toObjectErr(errors.Trace(err)) } - if srcEtag != "" { - etag, err := fs.getObjectETag(srcBucket, srcObject) - if err != nil { - return pi, toObjectErr(err, srcBucket, srcObject) - } - if etag != srcEtag { - return pi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject) - } - } // Initialize pipe. pipeReader, pipeWriter := io.Pipe() go func() { - if gerr := fs.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcEtag); gerr != nil { + if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcEtag); gerr != nil { errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) pipeWriter.CloseWithError(gerr) return @@ -492,13 +284,13 @@ func (fs fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, u // an ongoing multipart transaction. Internally incoming data is // written to '.minio.sys/tmp' location and safely renamed to // '.minio.sys/multipart' for reach parts. -func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *hash.Reader) (pi PartInfo, e error) { +func (fs *fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *hash.Reader) (pi PartInfo, e error) { if err := checkPutObjectPartArgs(bucket, object, fs); err != nil { - return pi, err + return pi, toObjectErr(errors.Trace(err), bucket) } if _, err := fs.statBucketDir(bucket); err != nil { - return pi, toObjectErr(err, bucket) + return pi, toObjectErr(errors.Trace(err), bucket) } // Validate input data size and it can never be less than zero. @@ -506,45 +298,16 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, d return pi, toObjectErr(errors.Trace(errInvalidArgument)) } - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return pi, err - } - defer objectMPartPathLock.Unlock() - - // Disallow any parallel abort or complete multipart operations. - uploadsPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile) - if _, err := fs.rwPool.Open(uploadsPath); err != nil { - if err == errFileNotFound || err == errFileAccessDenied { - return pi, errors.Trace(InvalidUploadID{UploadID: uploadID}) - } - return pi, toObjectErr(errors.Trace(err), bucket, object) - } - defer fs.rwPool.Close(uploadsPath) - - uploadIDPath := pathJoin(bucket, object, uploadID) + uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) // Just check if the uploadID exists to avoid copy if it doesn't. - fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) - rwlk, err := fs.rwPool.Write(fsMetaPath) + _, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile)) if err != nil { - if err == errFileNotFound || err == errFileAccessDenied { + if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied { return pi, errors.Trace(InvalidUploadID{UploadID: uploadID}) } - return pi, toObjectErr(errors.Trace(err), bucket, object) + return pi, toObjectErr(err, bucket, object) } - defer rwlk.Close() - - fsMeta := fsMetaV1{} - _, err = fsMeta.ReadFrom(rwlk) - if err != nil { - return pi, toObjectErr(err, minioMetaMultipartBucket, fsMetaPath) - } - - partSuffix := fmt.Sprintf("object%d", partID) - tmpPartPath := uploadID + "." + mustGetUUID() + "." + partSuffix bufSize := int64(readSizeV1) if size := data.Size(); size > 0 && bufSize > size { @@ -552,141 +315,49 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, d } buf := make([]byte, bufSize) - fsPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tmpPartPath) - bytesWritten, cErr := fsCreateFile(fsPartPath, data, buf, data.Size()) - if cErr != nil { - fsRemoveFile(fsPartPath) - return pi, toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath) + tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"."+mustGetUUID()+"."+strconv.Itoa(partID)) + bytesWritten, err := fsCreateFile(tmpPartPath, data, buf, data.Size()) + if err != nil { + fsRemoveFile(tmpPartPath) + return pi, toObjectErr(err, minioMetaTmpBucket, tmpPartPath) } // Should return IncompleteBody{} error when reader has fewer // bytes than specified in request header. if bytesWritten < data.Size() { - fsRemoveFile(fsPartPath) + fsRemoveFile(tmpPartPath) return pi, errors.Trace(IncompleteBody{}) } // Delete temporary part in case of failure. If // PutObjectPart succeeds then there would be nothing to - // delete. - defer fsRemoveFile(fsPartPath) + // delete in which case we just ignore the error. + defer fsRemoveFile(tmpPartPath) - partPath := pathJoin(bucket, object, uploadID, partSuffix) - // Lock the part so that another part upload with same part-number gets blocked - // while the part is getting appended in the background. - partLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, partPath) - if err = partLock.GetLock(globalOperationTimeout); err != nil { - return pi, err + etag := hex.EncodeToString(data.MD5Current()) + if etag == "" { + etag = GenETag() } + partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag)) - fsNSPartPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, partPath) - if err = fsRenameFile(fsPartPath, fsNSPartPath); err != nil { - partLock.Unlock() + if err = fsRenameFile(tmpPartPath, partPath); err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - md5hex := hex.EncodeToString(data.MD5Current()) + go fs.backgroundAppend(bucket, object, uploadID) - // Save the object part info in `fs.json`. - fsMeta.AddObjectPart(partID, partSuffix, md5hex, data.Size()) - if _, err = fsMeta.WriteTo(rwlk); err != nil { - partLock.Unlock() - return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) - } - - partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, partSuffix) - fi, err := fsStatFile(partNamePath) + fi, err := fsStatFile(partPath) if err != nil { - return pi, toObjectErr(err, minioMetaMultipartBucket, partSuffix) + return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - - // Append the part in background. - errCh := fs.append(bucket, object, uploadID, fsMeta) - go func() { - // Also receive the error so that the appendParts go-routine - // does not block on send. But the error received is ignored - // as fs.PutObjectPart() would have already returned success - // to the client. - <-errCh - partLock.Unlock() - }() - return PartInfo{ PartNumber: partID, LastModified: fi.ModTime(), - ETag: md5hex, + ETag: etag, Size: fi.Size(), }, nil } -// listObjectParts - wrapper scanning through -// '.minio.sys/multipart/bucket/object/UPLOADID'. Lists all the parts -// saved inside '.minio.sys/multipart/bucket/object/UPLOADID'. -func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { - result := ListPartsInfo{} - - uploadIDPath := pathJoin(bucket, object, uploadID) - fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) - metaFile, err := fs.rwPool.Open(fsMetaPath) - if err != nil { - if err == errFileNotFound || err == errFileAccessDenied { - // On windows oddly this is returned. - return lpi, errors.Trace(InvalidUploadID{UploadID: uploadID}) - } - return lpi, toObjectErr(errors.Trace(err), bucket, object) - } - defer fs.rwPool.Close(fsMetaPath) - - fsMeta := fsMetaV1{} - _, err = fsMeta.ReadFrom(metaFile.LockedFile) - if err != nil { - return lpi, toObjectErr(err, minioMetaBucket, fsMetaPath) - } - - // Only parts with higher part numbers will be listed. - partIdx := fsMeta.ObjectPartIndex(partNumberMarker) - parts := fsMeta.Parts - if partIdx != -1 { - parts = fsMeta.Parts[partIdx+1:] - } - - count := maxParts - for _, part := range parts { - var fi os.FileInfo - partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name) - fi, err = fsStatFile(partNamePath) - if err != nil { - return lpi, toObjectErr(err, minioMetaMultipartBucket, partNamePath) - } - result.Parts = append(result.Parts, PartInfo{ - PartNumber: part.Number, - ETag: part.ETag, - LastModified: fi.ModTime(), - Size: fi.Size(), - }) - count-- - if count == 0 { - break - } - } - - // If listed entries are more than maxParts, we set IsTruncated as true. - if len(parts) > len(result.Parts) { - result.IsTruncated = true - // Make sure to fill next part number marker if IsTruncated is - // true for subsequent listing. - nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber - result.NextPartNumberMarker = nextPartNumberMarker - } - result.Bucket = bucket - result.Object = object - result.UploadID = uploadID - result.MaxParts = maxParts - - // Success. - return result, nil -} - // ListObjectParts - lists all previously uploaded parts for a given // object and uploadID. Takes additional input of part-number-marker // to indicate where the listing should begin from. @@ -694,30 +365,100 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM // Implements S3 compatible ListObjectParts API. The resulting // ListPartsInfo structure is unmarshalled directly into XML and // replied back to the client. -func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { +func (fs *fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (result ListPartsInfo, e error) { if err := checkListPartsArgs(bucket, object, fs); err != nil { - return lpi, err + return result, toObjectErr(errors.Trace(err)) } + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + result.MaxParts = maxParts + result.PartNumberMarker = partNumberMarker // Check if bucket exists if _, err := fs.statBucketDir(bucket); err != nil { - return lpi, toObjectErr(err, bucket) + return result, toObjectErr(errors.Trace(err), bucket) } - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { - return lpi, errors.Trace(err) - } - defer objectMPartPathLock.RUnlock() - - listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) + uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) + _, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile)) if err != nil { - return lpi, toObjectErr(err, bucket, object) + if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied { + return result, errors.Trace(InvalidUploadID{UploadID: uploadID}) + } + return result, toObjectErr(errors.Trace(err), bucket, object) } - return listPartsInfo, nil + entries, err := readDir(uploadIDDir) + if err != nil { + return result, toObjectErr(errors.Trace(err), bucket) + } + + partsMap := make(map[int]string) + for _, entry := range entries { + if entry == fsMetaJSONFile { + continue + } + partNumber, etag1, err := fs.decodePartFile(entry) + if err != nil { + return result, toObjectErr(errors.Trace(err)) + } + etag2, ok := partsMap[partNumber] + if !ok { + partsMap[partNumber] = etag1 + continue + } + stat1, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1))) + if err != nil { + return result, toObjectErr(errors.Trace(err)) + } + stat2, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2))) + if err != nil { + return result, toObjectErr(errors.Trace(err)) + } + if stat1.ModTime().After(stat2.ModTime()) { + partsMap[partNumber] = etag1 + } + } + var parts []PartInfo + for partNumber, etag := range partsMap { + parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag}) + } + sort.SliceStable(parts, func(i int, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + i := 0 + if partNumberMarker != 0 { + // If the marker was set, skip the entries till the marker. + for _, part := range parts { + i++ + if part.PartNumber == partNumberMarker { + break + } + } + } + + partsCount := 0 + for partsCount < maxParts && i < len(parts) { + result.Parts = append(result.Parts, parts[i]) + i++ + partsCount++ + } + if i < len(parts) { + result.IsTruncated = true + if partsCount != 0 { + result.NextPartNumberMarker = result.Parts[partsCount-1].PartNumber + } + } + for i, part := range result.Parts { + stat, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))) + if err != nil { + return result, toObjectErr(errors.Trace(err)) + } + result.Parts[i].LastModified = stat.ModTime() + result.Parts[i].Size = stat.Size() + } + return result, nil } // CompleteMultipartUpload - completes an ongoing multipart @@ -726,17 +467,11 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM // md5sums of all the parts. // // Implements S3 compatible Complete multipart API. -func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []CompletePart) (oi ObjectInfo, e error) { +func (fs *fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []CompletePart) (oi ObjectInfo, e error) { if err := checkCompleteMultipartArgs(bucket, object, fs); err != nil { - return oi, err + return oi, toObjectErr(err) } - // Hold write lock on the object. - destLock := fs.nsMutex.NewNSLock(bucket, object) - if err := destLock.GetLock(globalObjectTimeout); err != nil { - return oi, err - } - defer destLock.Unlock() // Check if an object is present as one of the parent dir. if fs.parentDirIsObject(bucket, pathutil.Dir(object)) { return oi, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object) @@ -746,91 +481,51 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return oi, toObjectErr(err, bucket) } + uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) + // Just check if the uploadID exists to avoid copy if it doesn't. + _, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile)) + if err != nil { + if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied { + return oi, errors.Trace(InvalidUploadID{UploadID: uploadID}) + } + return oi, toObjectErr(err, bucket, object) + } + // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := getCompleteMultipartMD5(parts) if err != nil { return oi, err } - uploadIDPath := pathJoin(bucket, object, uploadID) - var removeObjectDir bool - - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return oi, err - } - - defer func() { - if removeObjectDir { - basePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket) - derr := fsDeleteFile(basePath, pathJoin(basePath, object)) - if derr = errors.Cause(derr); derr != nil { - // In parallel execution, CompleteMultipartUpload could have deleted temporary - // state files/directory, it is safe to ignore errFileNotFound - if derr != errFileNotFound { - errorIf(derr, "unable to remove %s in %s", pathJoin(basePath, object), basePath) - } - } - } - objectMPartPathLock.Unlock() - }() - - fsMetaPathMultipart := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) - rlk, err := fs.rwPool.Open(fsMetaPathMultipart) - if err != nil { - if err == errFileNotFound || err == errFileAccessDenied { - return oi, errors.Trace(InvalidUploadID{UploadID: uploadID}) - } - return oi, toObjectErr(errors.Trace(err), bucket, object) - } - - // Disallow any parallel abort or complete multipart operations. - rwlk, err := fs.rwPool.Write(pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile)) - if err != nil { - fs.rwPool.Close(fsMetaPathMultipart) - if err == errFileNotFound || err == errFileAccessDenied { - return oi, errors.Trace(InvalidUploadID{UploadID: uploadID}) - } - return oi, toObjectErr(errors.Trace(err), bucket, object) - } - defer rwlk.Close() - - fsMeta := fsMetaV1{} - // Read saved fs metadata for ongoing multipart. - _, err = fsMeta.ReadFrom(rlk.LockedFile) - if err != nil { - fs.rwPool.Close(fsMetaPathMultipart) - return oi, toObjectErr(err, minioMetaMultipartBucket, fsMetaPathMultipart) - } - partSize := int64(-1) // Used later to ensure that all parts sizes are same. + // Validate all parts and then commit to disk. for i, part := range parts { - partIdx := fsMeta.ObjectPartIndex(part.PartNumber) - if partIdx == -1 { - fs.rwPool.Close(fsMetaPathMultipart) - return oi, errors.Trace(InvalidPart{}) + partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)) + var fi os.FileInfo + fi, err = fsStatFile(partPath) + if err != nil { + if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied { + return oi, errors.Trace(InvalidPart{}) + } + return oi, errors.Trace(err) } - - if fsMeta.Parts[partIdx].ETag != part.ETag { - fs.rwPool.Close(fsMetaPathMultipart) - return oi, errors.Trace(InvalidPart{}) + if partSize == -1 { + partSize = fi.Size() + } + if i == len(parts)-1 { + break } // All parts except the last part has to be atleast 5MB. - if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) { - fs.rwPool.Close(fsMetaPathMultipart) + if !isMinAllowedPartSize(fi.Size()) { return oi, errors.Trace(PartTooSmall{ PartNumber: part.PartNumber, - PartSize: fsMeta.Parts[partIdx].Size, + PartSize: fi.Size(), PartETag: part.ETag, }) } - if partSize == -1 { - partSize = fsMeta.Parts[partIdx].Size - } + // TODO: Make necessary changes in future as explained in the below comment. // All parts except the last part has to be of same size. We are introducing this // check to see if any clients break. If clients do not break then we can optimize @@ -838,136 +533,99 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // so that we don't need to do background append at all. i.e by the time we get // CompleteMultipartUpload we already have the full file available which can be // renamed to the main name-space. - if (i < len(parts)-1) && partSize != fsMeta.Parts[partIdx].Size { - fs.rwPool.Close(fsMetaPathMultipart) + if partSize != fi.Size() { return oi, errors.Trace(PartsSizeUnequal{}) } } - // Wait for any competing PutObject() operation on bucket/object, since same namespace - // would be acquired for `fs.json`. - fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile) - metaFile, err := fs.rwPool.Create(fsMetaPath) - if err != nil { - fs.rwPool.Close(fsMetaPathMultipart) - return oi, toObjectErr(errors.Trace(err), bucket, object) - } - defer metaFile.Close() - - fsNSObjPath := pathJoin(fs.fsPath, bucket, object) - - // This lock is held during rename of the appended tmp file to the actual - // location so that any competing GetObject/PutObject/DeleteObject do not race. appendFallback := true // In case background-append did not append the required parts. + appendFilePath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, fmt.Sprintf("%s.%s", uploadID, mustGetUUID())) - if isPartsSame(fsMeta.Parts, parts) { - err = fs.complete(bucket, object, uploadID, fsMeta) - if err == nil { - appendFallback = false - fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID) - if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil { - fs.rwPool.Close(fsMetaPathMultipart) - return oi, toObjectErr(err, minioMetaTmpBucket, uploadID) + // Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend() + // to take care of the following corner case: + // 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet. + // 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending + // from the beginning + fs.backgroundAppend(bucket, object, uploadID) + + fs.appendFileMapMu.Lock() + file := fs.appendFileMap[uploadID] + delete(fs.appendFileMap, uploadID) + fs.appendFileMapMu.Unlock() + + if file != nil { + file.Lock() + defer file.Unlock() + // Verify that appendFile has all the parts. + if len(file.parts) == len(parts) { + for i := range parts { + if parts[i].ETag != file.parts[i].ETag { + break + } + if parts[i].PartNumber != file.parts[i].PartNumber { + break + } + if i == len(parts)-1 { + appendFilePath = file.filePath + appendFallback = false + } } } } if appendFallback { - // background append could not do append all the required parts, hence we do it here. - tempObj := uploadID + "-" + "part.1" - - fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj) - // Delete the temporary object in the case of a - // failure. If PutObject succeeds, then there would be - // nothing to delete. - defer fsRemoveFile(fsTmpObjPath) - - // Allocate staging buffer. - var buf = make([]byte, readSizeV1) - + fsRemoveFile(file.filePath) for _, part := range parts { - // Construct part suffix. - partSuffix := fmt.Sprintf("object%d", part.PartNumber) - multipartPartFile := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, partSuffix) - - var reader io.ReadCloser - var offset int64 - reader, _, err = fsOpenFile(multipartPartFile, offset) + partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)) + err = mioutil.AppendFile(appendFilePath, partPath) if err != nil { - fs.rwPool.Close(fsMetaPathMultipart) - if err == errFileNotFound { - return oi, errors.Trace(InvalidPart{}) - } - return oi, toObjectErr(errors.Trace(err), minioMetaMultipartBucket, partSuffix) + return oi, toObjectErr(errors.Trace(err)) } - - // No need to hold a lock, this is a unique file and will be only written - // to one one process per uploadID per minio process. - var wfile *os.File - wfile, err = os.OpenFile((fsTmpObjPath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) - if err != nil { - reader.Close() - fs.rwPool.Close(fsMetaPathMultipart) - return oi, toObjectErr(errors.Trace(err), bucket, object) - } - - _, err = io.CopyBuffer(wfile, reader, buf) - if err != nil { - wfile.Close() - reader.Close() - fs.rwPool.Close(fsMetaPathMultipart) - return oi, toObjectErr(errors.Trace(err), bucket, object) - } - - wfile.Close() - reader.Close() - } - - if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil { - fs.rwPool.Close(fsMetaPathMultipart) - return oi, toObjectErr(err, minioMetaTmpBucket, uploadID) } } - // No need to save part info, since we have concatenated all parts. - fsMeta.Parts = nil + // Hold write lock on the object. + destLock := fs.nsMutex.NewNSLock(bucket, object) + if err = destLock.GetLock(globalObjectTimeout); err != nil { + return oi, err + } + defer destLock.Unlock() + fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile) + metaFile, err := fs.rwPool.Create(fsMetaPath) + if err != nil { + return oi, toObjectErr(errors.Trace(err), bucket, object) + } + defer fs.rwPool.Close(fsMetaPath) + fsMeta := fsMetaV1{} + // Read saved fs metadata for ongoing multipart. + fsMetaBuf, err := ioutil.ReadFile(pathJoin(uploadIDDir, fsMetaJSONFile)) + if err != nil { + return oi, toObjectErr(errors.Trace(err), bucket, object) + } + err = json.Unmarshal(fsMetaBuf, &fsMeta) + if err != nil { + return oi, toObjectErr(errors.Trace(err), bucket, object) + } // Save additional metadata. if len(fsMeta.Meta) == 0 { fsMeta.Meta = make(map[string]string) } fsMeta.Meta["etag"] = s3MD5 - - // Write all the set metadata. if _, err = fsMeta.WriteTo(metaFile); err != nil { - fs.rwPool.Close(fsMetaPathMultipart) - return oi, toObjectErr(err, bucket, object) + return oi, toObjectErr(errors.Trace(err), bucket, object) } - // Close lock held on bucket/object/uploadid/fs.json, - // this needs to be done for windows so that we can happily - // delete the bucket/object/uploadid - fs.rwPool.Close(fsMetaPathMultipart) - - // Cleanup all the parts if everything else has been safely committed. - multipartObjectDir := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object) - multipartUploadIDDir := pathJoin(multipartObjectDir, uploadID) - if err = fsRemoveUploadIDPath(multipartObjectDir, multipartUploadIDDir); err != nil { - return oi, toObjectErr(err, bucket, object) - } - - // Remove entry from `uploads.json`. - removeObjectDir, err = fs.removeUploadID(bucket, object, uploadID, rwlk) + err = fsRenameFile(appendFilePath, pathJoin(fs.fsPath, bucket, object)) if err != nil { - return oi, toObjectErr(err, minioMetaMultipartBucket, pathutil.Join(bucket, object)) + return oi, toObjectErr(errors.Trace(err), bucket, object) } - - fi, err := fsStatFile(fsNSObjPath) + fsRemoveAll(uploadIDDir) + fi, err := fsStatFile(pathJoin(fs.fsPath, bucket, object)) if err != nil { - return oi, toObjectErr(err, bucket, object) + return oi, toObjectErr(errors.Trace(err), bucket, object) } - // Return object info. return fsMeta.ToObjectInfo(bucket, object, fi), nil } @@ -983,80 +641,67 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // that this is an atomic idempotent operation. Subsequent calls have // no affect and further requests to the same uploadID would not be // honored. -func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { +func (fs *fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { if err := checkAbortMultipartArgs(bucket, object, fs); err != nil { return err } if _, err := fs.statBucketDir(bucket); err != nil { - return toObjectErr(err, bucket) + return toObjectErr(errors.Trace(err), bucket) } - uploadIDPath := pathJoin(bucket, object, uploadID) - var removeObjectDir bool + fs.appendFileMapMu.Lock() + delete(fs.appendFileMap, uploadID) + fs.appendFileMapMu.Unlock() - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := fs.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, object)) - if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return err - } - - defer func() { - if removeObjectDir { - basePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket) - derr := fsDeleteFile(basePath, pathJoin(basePath, object)) - if derr = errors.Cause(derr); derr != nil { - // In parallel execution, AbortMultipartUpload could have deleted temporary - // state files/directory, it is safe to ignore errFileNotFound - if derr != errFileNotFound { - errorIf(derr, "unable to remove %s in %s", pathJoin(basePath, object), basePath) - } - } - } - objectMPartPathLock.Unlock() - }() - - fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) - if _, err := fs.rwPool.Open(fsMetaPath); err != nil { - if err == errFileNotFound || err == errFileAccessDenied { + uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) + // Just check if the uploadID exists to avoid copy if it doesn't. + _, err := fsStatFile(pathJoin(uploadIDDir, fsMetaJSONFile)) + if err != nil { + if errors.Cause(err) == errFileNotFound || errors.Cause(err) == errFileAccessDenied { return errors.Trace(InvalidUploadID{UploadID: uploadID}) } return toObjectErr(errors.Trace(err), bucket, object) } - - uploadsPath := pathJoin(bucket, object, uploadsJSONFile) - rwlk, err := fs.rwPool.Write(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath)) - if err != nil { - fs.rwPool.Close(fsMetaPath) - if err == errFileNotFound || err == errFileAccessDenied { - return errors.Trace(InvalidUploadID{UploadID: uploadID}) - } - return toObjectErr(errors.Trace(err), bucket, object) - } - defer rwlk.Close() - - // Signal appendParts routine to stop waiting for new parts to arrive. - fs.abort(uploadID) - - // Close lock held on bucket/object/uploadid/fs.json, - // this needs to be done for windows so that we can happily - // delete the bucket/object/uploadid - fs.rwPool.Close(fsMetaPath) - - // Cleanup all uploaded parts and abort the upload. - multipartObjectDir := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object) - multipartUploadIDDir := pathJoin(multipartObjectDir, uploadID) - if err = fsRemoveUploadIDPath(multipartObjectDir, multipartUploadIDDir); err != nil { - return toObjectErr(err, bucket, object) - } - - // Remove entry from `uploads.json`. - removeObjectDir, err = fs.removeUploadID(bucket, object, uploadID, rwlk) - if err != nil { - return toObjectErr(err, bucket, object) - } + // Ignore the error returned as Windows fails to remove directory if a file in it + // is Open()ed by the backgroundAppend() + fsRemoveAll(uploadIDDir) return nil } + +// Removes multipart uploads if any older than `expiry` duration +// on all buckets for every `cleanupInterval`, this function is +// blocking and should be run in a go-routine. +func (fs *fsObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) { + ticker := time.NewTicker(cleanupInterval) + for { + select { + case <-doneCh: + // Stop the timer. + ticker.Stop() + return + case <-ticker.C: + now := time.Now() + entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) + if err != nil { + continue + } + for _, entry := range entries { + uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry)) + if err != nil { + continue + } + for _, uploadID := range uploadIDs { + fi, err := fsStatDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)) + if err != nil { + continue + } + if now.Sub(fi.ModTime()) > expiry { + fsRemoveAll(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID)) + } + } + } + } + } +} diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index 7692866aa..d56b5331b 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -48,7 +48,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { t.Fatal("Unexpected err: ", err) } - go cleanupStaleMultipartUploads(20*time.Millisecond, 0, obj, fs.listMultipartUploadsCleanup, globalServiceDoneCh) + go fs.cleanupStaleMultipartUploads(20*time.Millisecond, 0, globalServiceDoneCh) // Wait for 40ms such that - we have given enough time for // cleanup routine to kick in. @@ -66,71 +66,6 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { } } -// Tests cleanup of stale upload ids. -func TestFSCleanupMultipartUpload(t *testing.T) { - // Prepare for tests - disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) - defer os.RemoveAll(disk) - - obj := initFSObjects(disk, t) - fs := obj.(*fsObjects) - - // Close the multipart cleanup go-routine. - // In this test we are going to manually call - // the function which actually cleans the stale - // uploads. - globalServiceDoneCh <- struct{}{} - - bucketName := "bucket" - objectName := "object" - - obj.MakeBucketWithLocation(bucketName, "") - uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil) - if err != nil { - t.Fatal("Unexpected err: ", err) - } - - if err = cleanupStaleMultipartUpload(bucketName, 0, obj, fs.listMultipartUploadsCleanup); err != nil { - t.Fatal("Unexpected err: ", err) - } - - // Check if upload id was already purged. - if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil { - err = errors.Cause(err) - if _, ok := err.(InvalidUploadID); !ok { - t.Fatal("Unexpected err: ", err) - } - } -} - -// TestFSWriteUploadJSON - tests for writeUploadJSON for FS -func TestFSWriteUploadJSON(t *testing.T) { - // Prepare for tests - disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) - defer os.RemoveAll(disk) - - obj := initFSObjects(disk, t) - fs := obj.(*fsObjects) - - bucketName := "bucket" - objectName := "object" - - obj.MakeBucketWithLocation(bucketName, "") - _, err := obj.NewMultipartUpload(bucketName, objectName, nil) - if err != nil { - t.Fatal("Unexpected err: ", err) - } - - // newMultipartUpload will fail. - fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) - _, err = obj.NewMultipartUpload(bucketName, objectName, nil) - if err != nil { - if _, ok := errors.Cause(err).(BucketNotFound); !ok { - t.Fatal("Unexpected err: ", err) - } - } -} - // TestNewMultipartUploadFaultyDisk - test NewMultipartUpload with faulty disks func TestNewMultipartUploadFaultyDisk(t *testing.T) { // Prepare for tests @@ -215,12 +150,7 @@ func TestCompleteMultipartUploadFaultyDisk(t *testing.T) { md5Hex := getMD5Hash(data) - if _, err := fs.PutObjectPart(bucketName, objectName, uploadID, 1, mustGetHashReader(t, bytes.NewReader(data), 5, md5Hex, "")); err != nil { - t.Fatal("Unexpected error ", err) - } - parts := []CompletePart{{PartNumber: 1, ETag: md5Hex}} - fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if _, err := fs.CompleteMultipartUpload(bucketName, objectName, uploadID, parts); err != nil { if !isSameType(errors.Cause(err), BucketNotFound{}) { @@ -229,7 +159,7 @@ func TestCompleteMultipartUploadFaultyDisk(t *testing.T) { } } -// TestCompleteMultipartUploadFaultyDisk - test CompleteMultipartUpload with faulty disks +// TestCompleteMultipartUpload - test CompleteMultipartUpload func TestCompleteMultipartUpload(t *testing.T) { // Prepare for tests disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) @@ -263,7 +193,7 @@ func TestCompleteMultipartUpload(t *testing.T) { } } -// TestCompleteMultipartUploadFaultyDisk - test CompleteMultipartUpload with faulty disks +// TestCompleteMultipartUpload - test CompleteMultipartUpload func TestAbortMultipartUpload(t *testing.T) { // Prepare for tests disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) @@ -289,7 +219,7 @@ func TestAbortMultipartUpload(t *testing.T) { if _, err := fs.PutObjectPart(bucketName, objectName, uploadID, 1, mustGetHashReader(t, bytes.NewReader(data), 5, md5Hex, "")); err != nil { t.Fatal("Unexpected error ", err) } - + time.Sleep(time.Second) // Without Sleep on windows, the fs.AbortMultipartUpload() fails with "The process cannot access the file because it is being used by another process." if err := fs.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil { t.Fatal("Unexpected error ", err) } @@ -306,24 +236,16 @@ func TestListMultipartUploadsFaultyDisk(t *testing.T) { fs := obj.(*fsObjects) bucketName := "bucket" objectName := "object" - data := []byte("12345") if err := obj.MakeBucketWithLocation(bucketName, ""); err != nil { t.Fatal("Cannot create bucket, err: ", err) } - uploadID, err := fs.NewMultipartUpload(bucketName, objectName, map[string]string{"X-Amz-Meta-xid": "3f"}) + _, err := fs.NewMultipartUpload(bucketName, objectName, map[string]string{"X-Amz-Meta-xid": "3f"}) if err != nil { t.Fatal("Unexpected error ", err) } - md5Hex := getMD5Hash(data) - sha256sum := "" - - if _, err := fs.PutObjectPart(bucketName, objectName, uploadID, 1, mustGetHashReader(t, bytes.NewReader(data), 5, md5Hex, sha256sum)); err != nil { - t.Fatal("Unexpected error ", err) - } - fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if _, err := fs.ListMultipartUploads(bucketName, objectName, "", "", "", 1000); err != nil { if !isSameType(errors.Cause(err), BucketNotFound{}) { diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index accee9d09..f1c2d2426 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -25,6 +25,7 @@ import ( "path" "path/filepath" "sort" + "sync" "syscall" "time" @@ -52,13 +53,20 @@ type fsObjects struct { // ListObjects pool management. listPool *treeWalkPool - // To manage the appendRoutine go0routines - bgAppend *backgroundAppend + appendFileMap map[string]*fsAppendFile + appendFileMapMu sync.Mutex - // name space mutex for object layer + // To manage the appendRoutine go-routines nsMutex *nsLockMap } +// Represents the background append file. +type fsAppendFile struct { + sync.Mutex + parts []PartInfo // List of parts appended. + filePath string // Absolute path of the file in the temp location. +} + // Initializes meta volume on all the fs path. func initMetaVolumeFS(fsPath, fsUUID string) error { // This happens for the first time, but keep this here since this @@ -139,11 +147,9 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) { rwPool: &fsIOPool{ readersMap: make(map[string]*lock.RLockedFile), }, - listPool: newTreeWalkPool(globalLookupTimeout), - bgAppend: &backgroundAppend{ - infoMap: make(map[string]bgAppendPartsInfo), - }, - nsMutex: newNSLock(false), + nsMutex: newNSLock(false), + listPool: newTreeWalkPool(globalLookupTimeout), + appendFileMap: make(map[string]*fsAppendFile), } // Once the filesystem has initialized hold the read lock for @@ -162,15 +168,13 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) { return nil, fmt.Errorf("Unable to initialize event notification. %s", err) } - // Start background process to cleanup old multipart objects in `.minio.sys`. - go cleanupStaleMultipartUploads(multipartCleanupInterval, multipartExpiry, fs, fs.listMultipartUploadsCleanup, globalServiceDoneCh) - + go fs.cleanupStaleMultipartUploads(multipartCleanupInterval, multipartExpiry, globalServiceDoneCh) // Return successfully initialized object layer. return fs, nil } // Should be called when process shuts down. -func (fs fsObjects) Shutdown() error { +func (fs *fsObjects) Shutdown() error { fs.fsFormatRlk.Close() // Cleanup and delete tmp uuid. @@ -178,7 +182,7 @@ func (fs fsObjects) Shutdown() error { } // StorageInfo - returns underlying storage statistics. -func (fs fsObjects) StorageInfo() StorageInfo { +func (fs *fsObjects) StorageInfo() StorageInfo { info, err := getDiskInfo((fs.fsPath)) errorIf(err, "Unable to get disk info %#v", fs.fsPath) storageInfo := StorageInfo{ @@ -192,12 +196,12 @@ func (fs fsObjects) StorageInfo() StorageInfo { // Locking operations // List namespace locks held in object layer -func (fs fsObjects) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { +func (fs *fsObjects) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { return []VolumeLockInfo{}, NotImplemented{} } // Clear namespace locks held in object layer -func (fs fsObjects) ClearLocks([]VolumeLockInfo) error { +func (fs *fsObjects) ClearLocks([]VolumeLockInfo) error { return NotImplemented{} } @@ -206,7 +210,7 @@ func (fs fsObjects) ClearLocks([]VolumeLockInfo) error { // getBucketDir - will convert incoming bucket names to // corresponding valid bucket names on the backend in a platform // compatible way for all operating systems. -func (fs fsObjects) getBucketDir(bucket string) (string, error) { +func (fs *fsObjects) getBucketDir(bucket string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return "", errors.Trace(BucketNameInvalid{Bucket: bucket}) @@ -216,7 +220,7 @@ func (fs fsObjects) getBucketDir(bucket string) (string, error) { return bucketDir, nil } -func (fs fsObjects) statBucketDir(bucket string) (os.FileInfo, error) { +func (fs *fsObjects) statBucketDir(bucket string) (os.FileInfo, error) { bucketDir, err := fs.getBucketDir(bucket) if err != nil { return nil, err @@ -230,7 +234,7 @@ func (fs fsObjects) statBucketDir(bucket string) (os.FileInfo, error) { // MakeBucket - create a new bucket, returns if it // already exists. -func (fs fsObjects) MakeBucketWithLocation(bucket, location string) error { +func (fs *fsObjects) MakeBucketWithLocation(bucket, location string) error { bucketLock := fs.nsMutex.NewNSLock(bucket, "") if err := bucketLock.GetLock(globalObjectTimeout); err != nil { return err @@ -249,7 +253,7 @@ func (fs fsObjects) MakeBucketWithLocation(bucket, location string) error { } // GetBucketInfo - fetch bucket metadata info. -func (fs fsObjects) GetBucketInfo(bucket string) (bi BucketInfo, e error) { +func (fs *fsObjects) GetBucketInfo(bucket string) (bi BucketInfo, e error) { st, err := fs.statBucketDir(bucket) if err != nil { return bi, toObjectErr(err, bucket) @@ -264,7 +268,7 @@ func (fs fsObjects) GetBucketInfo(bucket string) (bi BucketInfo, e error) { } // ListBuckets - list all s3 compatible buckets (directories) at fsPath. -func (fs fsObjects) ListBuckets() ([]BucketInfo, error) { +func (fs *fsObjects) ListBuckets() ([]BucketInfo, error) { if err := checkPathLength(fs.fsPath); err != nil { return nil, errors.Trace(err) } @@ -305,7 +309,7 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) { // DeleteBucket - delete a bucket and all the metadata associated // with the bucket including pending multipart, object metadata. -func (fs fsObjects) DeleteBucket(bucket string) error { +func (fs *fsObjects) DeleteBucket(bucket string) error { bucketDir, err := fs.getBucketDir(bucket) if err != nil { return toObjectErr(err, bucket) @@ -336,7 +340,7 @@ func (fs fsObjects) DeleteBucket(bucket string) error { // CopyObject - copy object source object to destination object. // if source object and destination object are same we only // update metadata. -func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (oi ObjectInfo, e error) { +func (fs *fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (oi ObjectInfo, e error) { cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject // Hold write lock on destination since in both cases // - if source and destination are same @@ -439,7 +443,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { +func (fs *fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { if err = checkGetObjArgs(bucket, object); err != nil { return err } @@ -454,7 +458,7 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, } // getObject - wrapper for GetObject -func (fs fsObjects) getObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { +func (fs *fsObjects) getObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { if _, err = fs.statBucketDir(bucket); err != nil { return toObjectErr(err, bucket) } @@ -525,7 +529,7 @@ func (fs fsObjects) getObject(bucket, object string, offset int64, length int64, } // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. -func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error) { +func (fs *fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error) { fsMeta := fsMetaV1{} fi, err := fsStatDir(pathJoin(fs.fsPath, bucket, object)) if err != nil && errors.Cause(err) != errFileAccessDenied { @@ -573,7 +577,7 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error } // GetObjectInfo - reads object metadata and replies back ObjectInfo. -func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { +func (fs *fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { // Lock the object before reading. objectLock := fs.nsMutex.NewNSLock(bucket, object) if err := objectLock.GetRLock(globalObjectTimeout); err != nil { @@ -595,7 +599,7 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error // This function does the following check, suppose // object is "a/b/c/d", stat makes sure that objects ""a/b/c"" // "a/b" and "a" do not exist. -func (fs fsObjects) parentDirIsObject(bucket, parent string) bool { +func (fs *fsObjects) parentDirIsObject(bucket, parent string) bool { var isParentDirObject func(string) bool isParentDirObject = func(p string) bool { if p == "." || p == "/" { @@ -616,7 +620,7 @@ func (fs fsObjects) parentDirIsObject(bucket, parent string) bool { // until EOF, writes data directly to configured filesystem path. // Additionally writes `fs.json` which carries the necessary metadata // for future object operations. -func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) { +func (fs *fsObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) { // Lock the object. objectLock := fs.nsMutex.NewNSLock(bucket, object) if err := objectLock.GetLock(globalObjectTimeout); err != nil { @@ -627,7 +631,7 @@ func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, m } // putObject - wrapper for PutObject -func (fs fsObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) { +func (fs *fsObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) { // No metadata is set, allocate a new one. if metadata == nil { metadata = make(map[string]string) @@ -751,7 +755,7 @@ func (fs fsObjects) putObject(bucket string, object string, data *hash.Reader, m // DeleteObject - deletes an object from a bucket, this operation is destructive // and there are no rollbacks supported. -func (fs fsObjects) DeleteObject(bucket, object string) error { +func (fs *fsObjects) DeleteObject(bucket, object string) error { // Acquire a write lock before deleting the object. objectLock := fs.nsMutex.NewNSLock(bucket, object) if err := objectLock.GetLock(globalOperationTimeout); err != nil { @@ -804,7 +808,7 @@ var fsTreeWalkIgnoredErrs = append(baseIgnoredErrs, []error{ // Returns function "listDir" of the type listDirFunc. // isLeaf - is used by listDir function to check if an entry // is a leaf or non-leaf entry. -func (fs fsObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc { +func (fs *fsObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc { // listDir - lists all the entries at a given prefix and given entry in the prefix. listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error) { entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir)) @@ -821,7 +825,7 @@ func (fs fsObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc { // getObjectETag is a helper function, which returns only the md5sum // of the file on the disk. -func (fs fsObjects) getObjectETag(bucket, entry string) (string, error) { +func (fs *fsObjects) getObjectETag(bucket, entry string) (string, error) { fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, entry, fsMetaJSONFile) // Read `fs.json` to perhaps contend with @@ -870,7 +874,7 @@ func (fs fsObjects) getObjectETag(bucket, entry string) (string, error) { // ListObjects - list all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool // state for future re-entrant list requests. -func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { +func (fs *fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { if err := checkListObjsArgs(bucket, prefix, marker, delimiter, fs); err != nil { return loi, err } @@ -1024,25 +1028,23 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey } // HealObject - no-op for fs. Valid only for XL. -func (fs fsObjects) HealObject(bucket, object string, dryRun bool) ( +func (fs *fsObjects) HealObject(bucket, object string, dryRun bool) ( res madmin.HealResultItem, err error) { - return res, errors.Trace(NotImplemented{}) } // HealBucket - no-op for fs, Valid only for XL. -func (fs fsObjects) HealBucket(bucket string, dryRun bool) ([]madmin.HealResultItem, +func (fs *fsObjects) HealBucket(bucket string, dryRun bool) ([]madmin.HealResultItem, error) { - return nil, errors.Trace(NotImplemented{}) } // ListObjectsHeal - list all objects to be healed. Valid only for XL -func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { +func (fs *fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { return loi, errors.Trace(NotImplemented{}) } // ListBucketsHeal - list all buckets to be healed. Valid only for XL -func (fs fsObjects) ListBucketsHeal() ([]BucketInfo, error) { +func (fs *fsObjects) ListBucketsHeal() ([]BucketInfo, error) { return []BucketInfo{}, errors.Trace(NotImplemented{}) } diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index f1a52eb12..8a78dc009 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -1374,11 +1374,12 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks }, // partinfos - 2. { - Bucket: bucketNames[0], - Object: objectNames[0], - MaxParts: 2, - IsTruncated: false, - UploadID: uploadIDs[0], + Bucket: bucketNames[0], + Object: objectNames[0], + MaxParts: 2, + IsTruncated: false, + UploadID: uploadIDs[0], + PartNumberMarker: 3, Parts: []PartInfo{ { PartNumber: 4, @@ -1611,11 +1612,12 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) }, // partinfos - 2. { - Bucket: bucketNames[0], - Object: objectNames[0], - MaxParts: 2, - IsTruncated: false, - UploadID: uploadIDs[0], + Bucket: bucketNames[0], + Object: objectNames[0], + MaxParts: 2, + IsTruncated: false, + UploadID: uploadIDs[0], + PartNumberMarker: 3, Parts: []PartInfo{ { PartNumber: 4, diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index eda1afa21..11eb641ad 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -909,6 +909,7 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, uploadID, _, _, _ := getObjectResources(r.URL.Query()) if err := objectAPI.AbortMultipartUpload(bucket, object, uploadID); err != nil { + errorIf(err, "AbortMultipartUpload failed") writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 321759f99..bc5f981fd 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -833,6 +833,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM result.Object = object result.UploadID = uploadID result.MaxParts = maxParts + result.PartNumberMarker = partNumberMarker // For empty number of parts or maxParts as zero, return right here. if len(xlParts) == 0 || maxParts == 0 { diff --git a/pkg/ioutil/append-file_nix.go b/pkg/ioutil/append-file_nix.go new file mode 100644 index 000000000..5aa837ef9 --- /dev/null +++ b/pkg/ioutil/append-file_nix.go @@ -0,0 +1,43 @@ +// +build !windows + +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ioutil + +import ( + "io" + "os" +) + +// AppendFile - appends the file "src" to the file "dst" +func AppendFile(dst string, src string) error { + appendFile, err := os.OpenFile(dst, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) + if err != nil { + return err + } + defer appendFile.Close() + + srcFile, err := os.Open(src) + if err != nil { + return err + } + defer srcFile.Close() + // Allocate staging buffer. + var buf = make([]byte, defaultAppendBufferSize) + _, err = io.CopyBuffer(appendFile, srcFile, buf) + return err +} diff --git a/pkg/ioutil/append-file_windows.go b/pkg/ioutil/append-file_windows.go new file mode 100644 index 000000000..26899aec1 --- /dev/null +++ b/pkg/ioutil/append-file_windows.go @@ -0,0 +1,43 @@ +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ioutil + +import ( + "io" + "os" + + "github.com/minio/minio/pkg/lock" +) + +// AppendFile - appends the file "src" to the file "dst" +func AppendFile(dst string, src string) error { + appendFile, err := lock.Open(dst, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) + if err != nil { + return err + } + defer appendFile.Close() + + srcFile, err := lock.Open(src, os.O_RDONLY, 0644) + if err != nil { + return err + } + defer srcFile.Close() + // Allocate staging buffer. + var buf = make([]byte, defaultAppendBufferSize) + _, err = io.CopyBuffer(appendFile, srcFile, buf) + return err +} diff --git a/pkg/ioutil/ioutil.go b/pkg/ioutil/ioutil.go index 27605e6bb..b2bf266d3 100644 --- a/pkg/ioutil/ioutil.go +++ b/pkg/ioutil/ioutil.go @@ -20,8 +20,13 @@ package ioutil import ( "io" + + humanize "github.com/dustin/go-humanize" ) +// defaultAppendBufferSize - Default buffer size for the AppendFile +const defaultAppendBufferSize = humanize.MiByte + // WriteOnCloser implements io.WriteCloser and always // exectues at least one write operation if it is closed. // diff --git a/pkg/ioutil/ioutil_test.go b/pkg/ioutil/ioutil_test.go index 3b6d6dc1e..6d6aad5f6 100644 --- a/pkg/ioutil/ioutil_test.go +++ b/pkg/ioutil/ioutil_test.go @@ -18,6 +18,7 @@ package ioutil import ( goioutil "io/ioutil" + "os" "testing" ) @@ -37,3 +38,38 @@ func TestCloseOnWriter(t *testing.T) { t.Error("WriteOnCloser must be marked as HasWritten") } } + +// Test for AppendFile. +func TestAppendFile(t *testing.T) { + f, err := goioutil.TempFile("", "") + if err != nil { + t.Fatal(err) + } + name1 := f.Name() + defer os.Remove(name1) + f.WriteString("aaaaaaaaaa") + f.Close() + + f, err = goioutil.TempFile("", "") + if err != nil { + t.Fatal(err) + } + name2 := f.Name() + defer os.Remove(name2) + f.WriteString("bbbbbbbbbb") + f.Close() + + if err = AppendFile(name1, name2); err != nil { + t.Error(err) + } + + b, err := goioutil.ReadFile(name1) + if err != nil { + t.Error(err) + } + + expected := "aaaaaaaaaabbbbbbbbbb" + if string(b) != expected { + t.Errorf("AppendFile() failed, expected: %s, got %s", expected, string(b)) + } +} diff --git a/pkg/lock/lock_nix.go b/pkg/lock/lock_nix.go index 703073f74..35d1eb79b 100644 --- a/pkg/lock/lock_nix.go +++ b/pkg/lock/lock_nix.go @@ -90,3 +90,8 @@ func TryLockedOpenFile(path string, flag int, perm os.FileMode) (*LockedFile, er func LockedOpenFile(path string, flag int, perm os.FileMode) (*LockedFile, error) { return lockedOpenFile(path, flag, perm, 0) } + +// Open - Call os.OpenFile +func Open(path string, flag int, perm os.FileMode) (*os.File, error) { + return os.OpenFile(path, flag, perm) +} diff --git a/pkg/lock/lock_windows.go b/pkg/lock/lock_windows.go index 8f4b549e6..6a9210082 100644 --- a/pkg/lock/lock_windows.go +++ b/pkg/lock/lock_windows.go @@ -42,7 +42,7 @@ const ( // lockedOpenFile is an internal function. func lockedOpenFile(path string, flag int, perm os.FileMode, lockType uint32) (*LockedFile, error) { - f, err := open(path, flag, perm) + f, err := Open(path, flag, perm) if err != nil { return nil, err } @@ -168,7 +168,7 @@ func fixLongPath(path string) string { // perm param is ignored, on windows file perms/NT acls // are not octet combinations. Providing access to NT // acls is out of scope here. -func open(path string, flag int, perm os.FileMode) (*os.File, error) { +func Open(path string, flag int, perm os.FileMode) (*os.File, error) { if path == "" { return nil, syscall.ERROR_FILE_NOT_FOUND } @@ -190,6 +190,8 @@ func open(path string, flag int, perm os.FileMode) (*os.File, error) { fallthrough case syscall.O_WRONLY | syscall.O_CREAT: access = syscall.GENERIC_READ | syscall.GENERIC_WRITE + case syscall.O_WRONLY | syscall.O_CREAT | syscall.O_APPEND: + access = syscall.FILE_APPEND_DATA default: return nil, fmt.Errorf("Unsupported flag (%d)", flag) }