logging: Print stack trace in case of errors.

fixes #1827
This commit is contained in:
Krishna Srinivas 2016-08-25 22:09:01 +05:30 committed by Harshavardhana
parent 37cbcae6ba
commit 9358ee011b
38 changed files with 485 additions and 311 deletions

View file

@ -572,6 +572,7 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
if err == nil {
return ErrNone
}
err = errorCause(err)
// Verify if the underlying error is signature mismatch.
switch err {
case errSignatureMismatch:

View file

@ -187,7 +187,6 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
// Save bucket policy.
if err = writeBucketPolicy(bucket, objAPI, bytes.NewReader(policyBytes), int64(len(policyBytes))); err != nil {
errorIf(err, "Unable to write bucket policy.")
switch err.(type) {
case BucketNameInvalid:
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
@ -232,7 +231,6 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
// Delete bucket access policy.
if err := removeBucketPolicy(bucket, objAPI); err != nil {
errorIf(err, "Unable to remove bucket policy.")
switch err.(type) {
case BucketNameInvalid:
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)

View file

@ -66,6 +66,9 @@ func (bp *bucketPolicies) RemoveBucketPolicy(bucket string) {
func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolicy, err error) {
// List buckets to proceed loading all notification configuration.
buckets, err := objAPI.ListBuckets()
errorIf(err, "Unable to list buckets.")
err = errorCause(err)
if err != nil {
return nil, err
}
@ -86,7 +89,6 @@ func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolic
// Success.
return policies, nil
}
// Intialize all bucket policies.
@ -128,6 +130,8 @@ func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader
}
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath)
errorIf(err, "Unable to get policy for the bucket %s.", bucket)
err = errorCause(err)
if err != nil {
if _, ok := err.(ObjectNotFound); ok {
return nil, BucketPolicyNotFound{Bucket: bucket}
@ -136,6 +140,8 @@ func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, policyPath, 0, objInfo.Size, &buffer)
errorIf(err, "Unable to get policy for the bucket %s.", bucket)
err = errorCause(err)
if err != nil {
if _, ok := err.(ObjectNotFound); ok {
return nil, BucketPolicyNotFound{Bucket: bucket}
@ -174,6 +180,8 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
}
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil {
errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket)
err = errorCause(err)
if _, ok := err.(ObjectNotFound); ok {
return BucketPolicyNotFound{Bucket: bucket}
}
@ -190,6 +198,9 @@ func writeBucketPolicy(bucket string, objAPI ObjectLayer, reader io.Reader, size
}
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
_, err := objAPI.PutObject(minioMetaBucket, policyPath, size, reader, nil)
return err
if _, err := objAPI.PutObject(minioMetaBucket, policyPath, size, reader, nil); err != nil {
errorIf(err, "Unable to set policy for the bucket %s", bucket)
return errorCause(err)
}
return nil
}

View file

@ -41,7 +41,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
// FIXME: this is a bug in Golang, n == 0 and err ==
// io.ErrUnexpectedEOF for io.ReadFull function.
if n == 0 && rErr == io.ErrUnexpectedEOF {
return 0, nil, rErr
return 0, nil, traceError(rErr)
}
if rErr == io.EOF {
// We have reached EOF on the first byte read, io.Reader
@ -58,7 +58,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
return 0, nil, rErr
return 0, nil, traceError(rErr)
}
if n > 0 {
// Returns encoded blocks.
@ -88,19 +88,19 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, error) {
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return nil, err
return nil, traceError(err)
}
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = rs.Split(dataBuffer)
if err != nil {
return nil, err
return nil, traceError(err)
}
// Encode parity blocks using data blocks.
err = rs.Encode(blocks)
if err != nil {
return nil, err
return nil, traceError(err)
}
// Return encoded blocks.
@ -122,7 +122,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
defer wg.Done()
wErr := disk.AppendFile(volume, path, enBlocks[index])
if wErr != nil {
wErrs[index] = wErr
wErrs[index] = traceError(wErr)
return
}
@ -139,7 +139,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
// Do we have write quorum?.
if !isDiskQuorum(wErrs, writeQuorum) {
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
return nil
}

View file

@ -93,8 +93,8 @@ func TestErasureCreateFile(t *testing.T) {
// 1 more disk down. 7 disk down in total. Should return quorum error.
disks[10] = AppendDiskDown{disks[10].(*posix)}
_, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != errXLWriteQuorum {
t.Errorf("erasureCreateFile returned expected errXLWriteQuorum error, got %s", err)
if errorCause(err) != errXLWriteQuorum {
t.Errorf("erasureCreateFile return value: expected errXLWriteQuorum, got %s", err)
}
}
@ -195,7 +195,7 @@ func TestErasureEncode(t *testing.T) {
}
// Failed as expected, but does it fail for the expected reason.
if actualErr != nil && !testCase.shouldPass {
if testCase.expectedErr != actualErr {
if errorCause(actualErr) != testCase.expectedErr {
t.Errorf("Test %d: Expected Error to be \"%v\", but instead found \"%v\" ", i+1, testCase.expectedErr, actualErr)
}
}

View file

@ -64,7 +64,7 @@ func erasureHealFile(latestDisks []StorageAPI, outDatedDisks []StorageAPI, volum
}
err := disk.AppendFile(healBucket, healPath, enBlocks[index])
if err != nil {
return nil, err
return nil, traceError(err)
}
hashWriters[index].Write(enBlocks[index])
}

View file

@ -84,10 +84,10 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis
// Sanity checks - we should never have this situation.
if dataDisks == dataBlocks {
return nil, 0, errUnexpected
return nil, 0, traceError(errUnexpected)
}
if dataDisks+parityDisks >= dataBlocks {
return nil, 0, errUnexpected
return nil, 0, traceError(errUnexpected)
}
// Find the disks from which next set of parallel reads should happen.
@ -107,7 +107,7 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis
return readDisks, i + 1, nil
}
}
return nil, 0, errXLReadQuorum
return nil, 0, traceError(errXLReadQuorum)
}
// parallelRead - reads chunks in parallel from the disks specified in []readDisks.
@ -161,12 +161,12 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string, algo string, pool *bpool.BytePool) (int64, error) {
// Offset and length cannot be negative.
if offset < 0 || length < 0 {
return 0, errUnexpected
return 0, traceError(errUnexpected)
}
// Can't request more data than what is available.
if offset+length > totalLength {
return 0, errUnexpected
return 0, traceError(errUnexpected)
}
// chunkSize is the amount of data that needs to be read from each disk at a time.
@ -248,7 +248,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
}
if nextIndex == len(disks) {
// No more disks to read from.
return bytesWritten, errXLReadQuorum
return bytesWritten, traceError(errXLReadQuorum)
}
// We do not have enough enough data blocks to reconstruct the data
// hence continue the for-loop till we have enough data blocks.
@ -325,24 +325,24 @@ func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error {
// Initialized reedsolomon.
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return err
return traceError(err)
}
// Reconstruct encoded blocks.
err = rs.Reconstruct(enBlocks)
if err != nil {
return err
return traceError(err)
}
// Verify reconstructed blocks (parity).
ok, err := rs.Verify(enBlocks)
if err != nil {
return err
return traceError(err)
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return err
return traceError(err)
}
// Success.

View file

@ -104,7 +104,7 @@ func testGetReadDisks(t *testing.T, xl xlObjects) {
for i, test := range testCases {
disks, nextIndex, err := getReadDisks(test.argDisks, test.index, xl.dataBlocks)
if err != test.err {
if errorCause(err) != test.err {
t.Errorf("test-case %d - expected error : %s, got : %s", i+1, test.err, err)
continue
}
@ -319,7 +319,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
disks[13] = ReadDiskDown{disks[13].(*posix)}
buf.Reset()
_, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool)
if err != errXLReadQuorum {
if errorCause(err) != errXLReadQuorum {
t.Fatal("expected errXLReadQuorum error")
}
}

View file

@ -76,17 +76,17 @@ func getDataBlockLen(enBlocks [][]byte, dataBlocks int) int {
func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset int64, length int64) (int64, error) {
// Offset and out size cannot be negative.
if offset < 0 || length < 0 {
return 0, errUnexpected
return 0, traceError(errUnexpected)
}
// Do we have enough blocks?
if len(enBlocks) < dataBlocks {
return 0, reedsolomon.ErrTooFewShards
return 0, traceError(reedsolomon.ErrTooFewShards)
}
// Do we have enough data?
if int64(getDataBlockLen(enBlocks, dataBlocks)) < length {
return 0, reedsolomon.ErrShortData
return 0, traceError(reedsolomon.ErrShortData)
}
// Counter to decrement total left to write.
@ -114,7 +114,7 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset in
if write < int64(len(block)) {
n, err := io.Copy(dst, bytes.NewReader(block[:write]))
if err != nil {
return 0, err
return 0, traceError(err)
}
totalWritten += n
break
@ -122,7 +122,7 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset in
// Copy the block.
n, err := io.Copy(dst, bytes.NewReader(block))
if err != nil {
return 0, err
return 0, traceError(err)
}
// Decrement output size.

122
cmd/errors.go Normal file
View file

@ -0,0 +1,122 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
)
// Holds the current directory path. Used for trimming path in traceError()
var rootPath string
// Figure out the rootPath
func initError() {
// Root path is automatically determined from the calling function's source file location.
// Catch the calling function's source file path.
_, file, _, _ := runtime.Caller(1)
// Save the directory alone.
rootPath = filepath.Dir(file)
}
// Represents a stack frame in the stack trace.
type traceInfo struct {
file string // File where error occurred
line int // Line where error occurred
name string // Name of the function where error occurred
}
// Error - error type containing cause and the stack trace.
type Error struct {
e error // Holds the cause error
trace []traceInfo // stack trace
errs []error // Useful for XL to hold errors from all disks
}
// Implement error interface.
func (e Error) Error() string {
return e.e.Error()
}
// Trace - returns stack trace.
func (e Error) Trace() []string {
var traceArr []string
for _, info := range e.trace {
traceArr = append(traceArr, fmt.Sprintf("%s:%d:%s",
info.file, info.line, info.name))
}
return traceArr
}
// NewStorageError - return new Error type.
func traceError(e error, errs ...error) error {
if e == nil {
return nil
}
err := &Error{}
err.e = e
err.errs = errs
stack := make([]uintptr, 40)
length := runtime.Callers(2, stack)
if length > len(stack) {
length = len(stack)
}
stack = stack[:length]
for _, pc := range stack {
pc = pc - 1
fn := runtime.FuncForPC(pc)
file, line := fn.FileLine(pc)
name := fn.Name()
if strings.HasSuffix(name, "ServeHTTP") {
break
}
if strings.HasSuffix(name, "runtime.") {
break
}
file = strings.TrimPrefix(file, rootPath+string(os.PathSeparator))
name = strings.TrimPrefix(name, "github.com/minio/minio/cmd.")
err.trace = append(err.trace, traceInfo{file, line, name})
}
return err
}
// Returns the underlying cause error.
func errorCause(err error) error {
if e, ok := err.(*Error); ok {
err = e.e
}
return err
}
// Returns slice of underlying cause error.
func errorsCause(errs []error) []error {
Errs := make([]error, len(errs))
for i, err := range errs {
if err == nil {
continue
}
Errs[i] = errorCause(err)
}
return Errs
}

View file

@ -227,6 +227,8 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
// Construct the notification config path.
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
errorIf(err, "Unable to get bucket-notification for butkcet %s", bucket)
err = errorCause(err)
if err != nil {
// 'notification.xml' not found return 'errNoSuchNotifications'.
// This is default when no bucket notifications are found on the bucket.
@ -239,6 +241,8 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
errorIf(err, "Unable to get bucket-notification for butkcet %s", bucket)
err = errorCause(err)
if err != nil {
// 'notification.xml' not found return 'errNoSuchNotifications'.
// This is default when no bucket notifications are found on the bucket.

View file

@ -24,13 +24,13 @@ func fsCreateFile(disk StorageAPI, reader io.Reader, buf []byte, tmpBucket, temp
for {
n, rErr := reader.Read(buf)
if rErr != nil && rErr != io.EOF {
return 0, rErr
return 0, traceError(rErr)
}
bytesWritten += int64(n)
if n > 0 {
wErr := disk.AppendFile(tmpBucket, tempObj, buf[0:n])
if wErr != nil {
return 0, wErr
return 0, traceError(wErr)
}
}
if rErr == io.EOF {

View file

@ -81,12 +81,12 @@ func readFSMetadata(disk StorageAPI, bucket, filePath string) (fsMeta fsMetaV1,
// Read all `fs.json`.
buf, err := disk.ReadAll(bucket, filePath)
if err != nil {
return fsMetaV1{}, err
return fsMetaV1{}, traceError(err)
}
// Decode `fs.json` into fsMeta structure.
if err = json.Unmarshal(buf, &fsMeta); err != nil {
return fsMetaV1{}, err
return fsMetaV1{}, traceError(err)
}
// Success.
@ -94,16 +94,23 @@ func readFSMetadata(disk StorageAPI, bucket, filePath string) (fsMeta fsMetaV1,
}
// Write fsMeta to fs.json or fs-append.json.
func writeFSMetadata(disk StorageAPI, bucket, filePath string, fsMeta fsMetaV1) (err error) {
func writeFSMetadata(disk StorageAPI, bucket, filePath string, fsMeta fsMetaV1) error {
tmpPath := path.Join(tmpMetaPrefix, getUUID())
metadataBytes, err := json.Marshal(fsMeta)
if err != nil {
return err
return traceError(err)
}
if err = disk.AppendFile(minioMetaBucket, tmpPath, metadataBytes); err != nil {
return err
return traceError(err)
}
return disk.RenameFile(minioMetaBucket, tmpPath, bucket, filePath)
err = disk.RenameFile(minioMetaBucket, tmpPath, bucket, filePath)
if err != nil {
err = disk.DeleteFile(minioMetaBucket, tmpPath)
if err != nil {
return traceError(err)
}
}
return nil
}
// newFSMetaV1 - initializes new fsMetaV1.

View file

@ -64,8 +64,8 @@ func (fs fsObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
var uploadsJSON uploadsV1
uploadsJSON, err = readUploadsJSON(bucket, object, fs.storage)
if err != nil {
// For any other errors.
if err != errFileNotFound {
// uploads.json might not exist hence ignore errFileNotFound.
if errorCause(err) != errFileNotFound {
return err
}
// Set uploads format to `fs`.
@ -77,18 +77,18 @@ func (fs fsObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
// Update `uploads.json` on all disks.
uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON)
if wErr != nil {
return wErr
return traceError(wErr)
}
// Write `uploads.json` to disk.
if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil {
return wErr
return traceError(wErr)
}
wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
if wErr != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil {
return dErr
return traceError(dErr)
}
return wErr
return traceError(wErr)
}
return nil
}
@ -100,13 +100,13 @@ func (fs fsObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads
tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID)
uploadsBytes, wErr := json.Marshal(uploadsJSON)
if wErr != nil {
return wErr
return traceError(wErr)
}
if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil {
return wErr
return traceError(wErr)
}
if wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil {
return wErr
return traceError(wErr)
}
return nil
}

View file

@ -94,7 +94,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
eof = true
break
}
return ListMultipartsInfo{}, err
return ListMultipartsInfo{}, walkResult.err
}
entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket)))
if strings.HasSuffix(walkResult.entry, slashSeparator) {
@ -176,42 +176,42 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
// Validate input arguments.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListMultipartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
if !fs.isBucketExist(bucket) {
return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket}
return ListMultipartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
return ListMultipartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, UnsupportedDelimiter{
return ListMultipartsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
}
})
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{
return ListMultipartsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
}
})
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return ListMultipartsInfo{}, InvalidUploadIDKeyCombination{
return ListMultipartsInfo{}, traceError(InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
}
})
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return ListMultipartsInfo{}, err
return ListMultipartsInfo{}, traceError(err)
}
if id.IsZero() {
return ListMultipartsInfo{}, MalformedUploadID{
return ListMultipartsInfo{}, traceError(MalformedUploadID{
UploadID: uploadIDMarker,
}
})
}
}
return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
@ -247,9 +247,9 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
if err = fs.writeUploadJSON(bucket, object, uploadID, initiated); err != nil {
return "", err
}
fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
if err = writeFSMetadata(fs.storage, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Return success.
return uploadID, nil
@ -263,15 +263,15 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
return "", traceError(BucketNotFound{Bucket: bucket})
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return fs.newMultipartUpload(bucket, object, meta)
}
@ -404,14 +404,14 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) {
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
@ -425,7 +425,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
if !uploadIDExists {
return "", InvalidUploadID{UploadID: uploadID}
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
partSuffix := fmt.Sprintf("object%d", partID)
@ -459,7 +459,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// bytes than specified in request header.
if bytesWritten < size {
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
return "", IncompleteBody{}
return "", traceError(IncompleteBody{})
}
// Validate if payload is valid.
@ -468,7 +468,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Incoming payload wrong, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
// Error return.
return "", toObjectErr(err, bucket, object)
return "", toObjectErr(traceError(err), bucket, object)
}
}
@ -478,7 +478,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// MD5 mismatch, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
// Returns md5 mismatch.
return "", BadDigest{md5Hex, newMD5Hex}
return "", traceError(BadDigest{md5Hex, newMD5Hex})
}
}
@ -492,7 +492,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Just check if the uploadID exists to avoid copy if it doesn't.
if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile)
@ -506,13 +506,13 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
err = fs.storage.RenameFile(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpPartPath); dErr != nil {
return "", toObjectErr(dErr, minioMetaBucket, tmpPartPath)
return "", toObjectErr(traceError(dErr), minioMetaBucket, tmpPartPath)
}
return "", toObjectErr(err, minioMetaBucket, partPath)
return "", toObjectErr(traceError(err), minioMetaBucket, partPath)
}
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
if err = writeFSMetadata(fs.storage, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
go appendParts(fs.storage, bucket, object, uploadID, opsID)
return newMD5Hex, nil
@ -541,7 +541,7 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM
partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name)
fi, err = fs.storage.StatFile(minioMetaBucket, partNamePath)
if err != nil {
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, partNamePath)
return ListPartsInfo{}, toObjectErr(traceError(err), minioMetaBucket, partNamePath)
}
result.Parts = append(result.Parts, partInfo{
PartNumber: part.Number,
@ -579,14 +579,14 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM
func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListPartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return ListPartsInfo{}, BucketNotFound{Bucket: bucket}
return ListPartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -598,7 +598,7 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !fs.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
}
return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
}
@ -612,17 +612,17 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !fs.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
return "", traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
})
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
@ -638,7 +638,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID)
if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
// fs-append.json path
@ -650,21 +650,21 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...)
if err != nil {
return "", err
return "", traceError(err)
}
// Read saved fs metadata for ongoing multipart.
fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile)
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
return "", toObjectErr(traceError(err), minioMetaBucket, fsMetaPath)
}
fsAppendMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsAppendMetaPath)
if err == nil && isPartsSame(fsAppendMeta.Parts, parts) {
fsAppendDataPath := getFSAppendDataPath(uploadID)
if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil {
return "", toObjectErr(err, minioMetaBucket, fsAppendDataPath)
return "", toObjectErr(traceError(err), minioMetaBucket, fsAppendDataPath)
}
// Remove the append-file metadata file in tmp location as we no longer need it.
fs.storage.DeleteFile(minioMetaBucket, fsAppendMetaPath)
@ -678,18 +678,18 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
for i, part := range parts {
partIdx := fsMeta.ObjectPartIndex(part.PartNumber)
if partIdx == -1 {
return "", InvalidPart{}
return "", traceError(InvalidPart{})
}
if fsMeta.Parts[partIdx].ETag != part.ETag {
return "", BadDigest{}
return "", traceError(BadDigest{})
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) {
return "", PartTooSmall{
return "", traceError(PartTooSmall{
PartNumber: part.PartNumber,
PartSize: fsMeta.Parts[partIdx].Size,
PartETag: part.ETag,
}
})
}
// Construct part suffix.
partSuffix := fmt.Sprintf("object%d", part.PartNumber)
@ -705,7 +705,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf[:curLeft])
if n > 0 {
if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempObj)
return "", toObjectErr(traceError(err), minioMetaBucket, tempObj)
}
}
if err != nil {
@ -713,9 +713,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
break
}
if err == errFileNotFound {
return "", InvalidPart{}
return "", traceError(InvalidPart{})
}
return "", toObjectErr(err, minioMetaBucket, multipartPartFile)
return "", toObjectErr(traceError(err), minioMetaBucket, multipartPartFile)
}
offset += n
totalLeft -= n
@ -726,9 +726,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, tempObj); dErr != nil {
return "", toObjectErr(dErr, minioMetaBucket, tempObj)
return "", toObjectErr(traceError(dErr), minioMetaBucket, tempObj)
}
return "", toObjectErr(err, bucket, object)
return "", toObjectErr(traceError(err), bucket, object)
}
}
@ -742,7 +742,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
fsMeta.Meta["md5Sum"] = s3MD5
fsMetaPath = path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
// Write the metadata to a temp file and rename it to the actual location.
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
@ -750,7 +751,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// Cleanup all the parts if everything else has been safely committed.
if err = cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil {
return "", toObjectErr(err, bucket, object)
return "", toObjectErr(traceError(err), bucket, object)
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -766,7 +767,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// the object, if yes do not attempt to delete 'uploads.json'.
uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, object)
return "", toObjectErr(traceError(err), minioMetaBucket, object)
}
// If we have successfully read `uploads.json`, then we proceed to
// purge or update `uploads.json`.
@ -776,14 +777,14 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
if len(uploadsJSON.Uploads) > 0 {
if err = fs.updateUploadsJSON(bucket, object, uploadsJSON); err != nil {
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
return "", toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
}
// Return success.
return s3MD5, nil
}
if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil {
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
return "", toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
}
// Return md5sum.
@ -820,7 +821,7 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
} // No more pending uploads for the object, we purge the entire
// entry at '.minio/multipart/bucket/object'.
if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil {
return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
return toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object))
}
return nil
}
@ -840,13 +841,13 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !fs.isBucketExist(bucket) {
return BucketNotFound{Bucket: bucket}
return traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -858,7 +859,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !fs.isUploadIDExists(bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID}
return traceError(InvalidUploadID{UploadID: uploadID})
}
fsAppendMetaPath := getFSAppendMetaPath(uploadID)

View file

@ -159,10 +159,10 @@ func (fs fsObjects) StorageInfo() StorageInfo {
func (fs fsObjects) MakeBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
if err := fs.storage.MakeVol(bucket); err != nil {
return toObjectErr(err, bucket)
return toObjectErr(traceError(err), bucket)
}
return nil
}
@ -171,11 +171,11 @@ func (fs fsObjects) MakeBucket(bucket string) error {
func (fs fsObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketInfo{}, BucketNameInvalid{Bucket: bucket}
return BucketInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
vi, err := fs.storage.StatVol(bucket)
if err != nil {
return BucketInfo{}, toObjectErr(err, bucket)
return BucketInfo{}, toObjectErr(traceError(err), bucket)
}
return BucketInfo{
Name: bucket,
@ -188,7 +188,7 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) {
var bucketInfos []BucketInfo
vols, err := fs.storage.ListVols()
if err != nil {
return nil, toObjectErr(err)
return nil, toObjectErr(traceError(err))
}
for _, vol := range vols {
// StorageAPI can send volume names which are incompatible
@ -213,11 +213,11 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) {
func (fs fsObjects) DeleteBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Attempt to delete regular bucket.
if err := fs.storage.DeleteVol(bucket); err != nil {
return toObjectErr(err, bucket)
return toObjectErr(traceError(err), bucket)
}
// Cleanup all the previously incomplete multiparts.
if err := cleanupDir(fs.storage, path.Join(minioMetaBucket, mpartMetaPrefix), bucket); err != nil && err != errVolumeNotFound {
@ -232,34 +232,34 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// Offset and length cannot be negative.
if offset < 0 || length < 0 {
return toObjectErr(errUnexpected, bucket, object)
return toObjectErr(traceError(errUnexpected), bucket, object)
}
// Writer cannot be nil.
if writer == nil {
return toObjectErr(errUnexpected, bucket, object)
return toObjectErr(traceError(errUnexpected), bucket, object)
}
// Stat the file to get file size.
fi, err := fs.storage.StatFile(bucket, object)
if err != nil {
return toObjectErr(err, bucket, object)
return toObjectErr(traceError(err), bucket, object)
}
// Reply back invalid range if the input offset and length fall out of range.
if offset > fi.Size || length > fi.Size {
return InvalidRange{offset, length, fi.Size}
return traceError(InvalidRange{offset, length, fi.Size})
}
// Reply if we have inputs with offset and length falling out of file size range.
if offset+length > fi.Size {
return InvalidRange{offset, length, fi.Size}
return traceError(InvalidRange{offset, length, fi.Size})
}
var totalLeft = length
@ -288,11 +288,11 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
offset += int64(nw)
}
if ew != nil {
err = ew
err = traceError(ew)
break
}
if nr != int64(nw) {
err = io.ErrShortWrite
err = traceError(io.ErrShortWrite)
break
}
}
@ -300,7 +300,7 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
break
}
if er != nil {
err = er
err = traceError(er)
break
}
if totalLeft == 0 {
@ -315,18 +315,19 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, (BucketNameInvalid{Bucket: bucket})
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object})
return ObjectInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
fi, err := fs.storage.StatFile(bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
}
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile))
if err != nil && err != errFileNotFound {
// Ignore error if the metadata file is not found, other errors must be returned.
if errorCause(err) != errFileNotFound {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -361,13 +362,13 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
return "", traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
})
}
// No metadata is set, allocate a new one.
if metadata == nil {
@ -398,7 +399,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// For size 0 we write a 0byte file.
err := fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
if err != nil {
return "", toObjectErr(err, bucket, object)
return "", toObjectErr(traceError(err), bucket, object)
}
} else {
// Allocate a buffer to Read() from request body
@ -418,7 +419,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// bytes than specified in request header.
if bytesWritten < size {
fs.storage.DeleteFile(minioMetaBucket, tempObj)
return "", IncompleteBody{}
return "", traceError(IncompleteBody{})
}
}
@ -434,7 +435,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// Incoming payload wrong, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tempObj)
// Error return.
return "", toObjectErr(vErr, bucket, object)
return "", toObjectErr(traceError(vErr), bucket, object)
}
}
@ -445,14 +446,14 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// MD5 mismatch, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tempObj)
// Returns md5 mismatch.
return "", BadDigest{md5Hex, newMD5Hex}
return "", traceError(BadDigest{md5Hex, newMD5Hex})
}
}
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
err := fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
return "", toObjectErr(traceError(err), bucket, object)
}
// Save additional metadata only if extended headers such as "X-Amz-Meta-" are set.
@ -476,17 +477,17 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
func (fs fsObjects) DeleteObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
err := fs.storage.DeleteFile(minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile))
if err != nil && err != errFileNotFound {
return toObjectErr(err, bucket, object)
return toObjectErr(traceError(err), bucket, object)
}
if err = fs.storage.DeleteFile(bucket, object); err != nil {
return toObjectErr(err, bucket, object)
return toObjectErr(traceError(err), bucket, object)
}
return nil
}
@ -517,11 +518,11 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
return
}
if fileInfo, err = fs.storage.StatFile(bucket, entry); err != nil {
return
return FileInfo{}, traceError(err)
}
fsMeta, mErr := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, entry, fsMetaJSONFile))
if mErr != nil && mErr != errFileNotFound {
return FileInfo{}, mErr
if errorCause(mErr) != errFileNotFound {
return FileInfo{}, traceError(mErr)
}
if len(fsMeta.Meta) == 0 {
fsMeta.Meta = make(map[string]string)
@ -534,28 +535,28 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !isBucketExist(fs.storage, bucket) {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, UnsupportedDelimiter{
return ListObjectsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
}
})
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, InvalidMarkerPrefixCombination{
return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
}
})
}
}
@ -610,7 +611,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
// For any walk error return right away.
if walkResult.err != nil {
// File not found is a valid case.
if walkResult.err == errFileNotFound {
if errorCause(walkResult.err) == errFileNotFound {
return ListObjectsInfo{}, nil
}
return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix)
@ -652,12 +653,12 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
// HealObject - no-op for fs. Valid only for XL.
func (fs fsObjects) HealObject(bucket, object string) error {
return NotImplemented{}
return traceError(NotImplemented{})
}
// HealListObjects - list objects for healing. Valid only for XL
func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return ListObjectsInfo{}, NotImplemented{}
return ListObjectsInfo{}, traceError(NotImplemented{})
}
// HealDiskMetadata -- heal disk metadata, not supported in FS

View file

@ -67,9 +67,10 @@ func errorIf(err error, msg string, data ...interface{}) {
fields := logrus.Fields{
"cause": err.Error(),
}
if globalTrace {
fields["stack"] = "\n" + stackInfo()
if e, ok := err.(*Error); ok {
fields["stack"] = strings.Join(e.Trace(), " ")
}
log.WithFields(fields).Errorf(msg, data...)
}

View file

@ -163,6 +163,9 @@ func Main() {
// Enable all loggers by now.
enableLoggers()
// Init the error tracing module.
initError()
// Set global quiet flag.
globalQuiet = c.Bool("quiet") || c.GlobalBool("quiet")

View file

@ -92,6 +92,7 @@ func testObjectAPIIsUploadIDExists(obj ObjectLayer, instanceType string, t TestE
}
err = obj.AbortMultipartUpload(bucket, object, "abc")
err = errorCause(err)
switch err.(type) {
case InvalidUploadID:
default:

View file

@ -152,6 +152,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl
for i, testCase := range testCases {
actualMd5Hex, actualErr := obj.PutObject(testCase.bucketName, testCase.objName, testCase.intputDataSize, bytes.NewReader(testCase.inputData), testCase.inputMeta)
actualErr = errorCause(actualErr)
if actualErr != nil && testCase.expectedError == nil {
t.Errorf("Test %d: %s: Expected to pass, but failed with: error %s.", i+1, instanceType, actualErr.Error())
}
@ -159,7 +160,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl
t.Errorf("Test %d: %s: Expected to fail with error \"%s\", but passed instead.", i+1, instanceType, testCase.expectedError.Error())
}
// Failed as expected, but does it fail for the expected reason.
if actualErr != nil && testCase.expectedError != actualErr {
if actualErr != nil && actualErr != testCase.expectedError {
t.Errorf("Test %d: %s: Expected to fail with error \"%s\", but instead failed with error \"%s\" instead.", i+1, instanceType, testCase.expectedError.Error(), actualErr.Error())
}
// Test passes as expected, but the output values are verified for correctness here.

View file

@ -35,6 +35,7 @@ const (
// isErrIgnored should we ignore this error?, takes a list of errors which can be ignored.
func isErrIgnored(err error, ignoredErrs []error) bool {
err = errorCause(err)
for _, ignoredErr := range ignoredErrs {
if ignoredErr == err {
return true
@ -220,7 +221,7 @@ func cleanupDir(storage StorageAPI, volume, dirPath string) error {
if err == errFileNotFound {
return nil
} else if err != nil { // For any other errors fail.
return err
return traceError(err)
} // else on success..
// Recurse and delete all other entries.

View file

@ -26,48 +26,57 @@ import (
// handle all cases where we have known types of errors returned by
// underlying storage layer.
func toObjectErr(err error, params ...string) error {
e, ok := err.(*Error)
if ok {
err = e.e
}
switch err {
case errVolumeNotFound:
if len(params) >= 1 {
return BucketNotFound{Bucket: params[0]}
err = BucketNotFound{Bucket: params[0]}
}
case errVolumeNotEmpty:
if len(params) >= 1 {
return BucketNotEmpty{Bucket: params[0]}
err = BucketNotEmpty{Bucket: params[0]}
}
case errVolumeExists:
if len(params) >= 1 {
return BucketExists{Bucket: params[0]}
err = BucketExists{Bucket: params[0]}
}
case errDiskFull:
return StorageFull{}
err = StorageFull{}
case errIsNotRegular, errFileAccessDenied:
if len(params) >= 2 {
return ObjectExistsAsDirectory{
err = ObjectExistsAsDirectory{
Bucket: params[0],
Object: params[1],
}
}
case errFileNotFound:
if len(params) >= 2 {
return ObjectNotFound{
err = ObjectNotFound{
Bucket: params[0],
Object: params[1],
}
}
case errFileNameTooLong:
if len(params) >= 2 {
return ObjectNameInvalid{
err = ObjectNameInvalid{
Bucket: params[0],
Object: params[1],
}
}
case errXLReadQuorum:
return InsufficientReadQuorum{}
err = InsufficientReadQuorum{}
case errXLWriteQuorum:
return InsufficientWriteQuorum{}
err = InsufficientWriteQuorum{}
case io.ErrUnexpectedEOF, io.ErrShortWrite:
return IncompleteBody{}
err = IncompleteBody{}
}
if ok {
e.e = err
return e
}
return err
}

View file

@ -72,12 +72,12 @@ func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV
// Reads entire `uploads.json`.
buf, err := disk.ReadAll(minioMetaBucket, uploadJSONPath)
if err != nil {
return uploadsV1{}, err
return uploadsV1{}, traceError(err)
}
// Decode `uploads.json`.
if err = json.Unmarshal(buf, &uploadIDs); err != nil {
return uploadsV1{}, err
return uploadsV1{}, traceError(err)
}
// Success.
@ -103,7 +103,7 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora
// Cleanup uploadID for all disks.
for index, disk := range storageDisks {
if disk == nil {
errs[index] = errDiskNotFound
errs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)

View file

@ -148,7 +148,7 @@ func completeMultipartMD5(parts ...completePart) (string, error) {
for _, part := range parts {
md5Bytes, err := hex.DecodeString(part.ETag)
if err != nil {
return "", err
return "", traceError(err)
}
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
}

View file

@ -707,6 +707,7 @@ func testNonExistantObjectInBucket(obj ObjectLayer, instanceType string, c TestE
if err == nil {
c.Fatalf("%s: Expected error but found nil", instanceType)
}
err = errorCause(err)
switch err := err.(type) {
case ObjectNotFound:
if err.Error() != "Object not found: bucket#dir1" {
@ -740,6 +741,7 @@ func testGetDirectoryReturnsObjectNotFound(obj ObjectLayer, instanceType string,
}
_, err = obj.GetObjectInfo("bucket", "dir1")
err = errorCause(err)
switch err := err.(type) {
case ObjectNotFound:
if err.Bucket != "bucket" {
@ -755,6 +757,7 @@ func testGetDirectoryReturnsObjectNotFound(obj ObjectLayer, instanceType string,
}
_, err = obj.GetObjectInfo("bucket", "dir1/")
err = errorCause(err)
switch err := err.(type) {
case ObjectNameInvalid:
if err.Bucket != "bucket" {

View file

@ -148,7 +148,7 @@ func listDirFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc {
break
}
// Return error at the end.
return nil, false, err
return nil, false, traceError(err)
}
return listDir
}
@ -173,7 +173,7 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo
if err != nil {
select {
case <-endWalkCh:
return errWalkAbort
return traceError(errWalkAbort)
case resultCh <- treeWalkResult{err: err}:
return err
}
@ -235,7 +235,7 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo
isEOF := ((i == len(entries)-1) && isEnd)
select {
case <-endWalkCh:
return errWalkAbort
return traceError(errWalkAbort)
case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}:
}
}

View file

@ -337,7 +337,7 @@ func TestListDir(t *testing.T) {
}
// None of the disks are available, should get errDiskNotFound.
_, _, err = listDir(volume, "", "")
if err != errDiskNotFound {
if errorCause(err) != errDiskNotFound {
t.Error("expected errDiskNotFound error.")
}
}

View file

@ -28,7 +28,7 @@ import (
func (xl xlObjects) MakeBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -47,7 +47,7 @@ func (xl xlObjects) MakeBucket(bucket string) error {
// Make a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if disk == nil {
dErrs[index] = errDiskNotFound
dErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
@ -56,7 +56,7 @@ func (xl xlObjects) MakeBucket(bucket string) error {
defer wg.Done()
err := disk.MakeVol(bucket)
if err != nil {
dErrs[index] = err
dErrs[index] = traceError(err)
}
}(index, disk)
}
@ -68,7 +68,7 @@ func (xl xlObjects) MakeBucket(bucket string) error {
if !isDiskQuorum(dErrs, xl.writeQuorum) {
// Purge successfully created buckets if we don't have writeQuorum.
xl.undoMakeBucket(bucket)
return toObjectErr(errXLWriteQuorum, bucket)
return toObjectErr(traceError(errXLWriteQuorum), bucket)
}
// Verify we have any other errors which should undo make bucket.
@ -146,6 +146,7 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err
}
return bucketInfo, nil
}
err = traceError(err)
// For any reason disk went offline continue and pick the next one.
if isErrIgnored(err, bucketMetadataOpIgnoredErrs) {
continue
@ -163,7 +164,6 @@ func (xl xlObjects) isBucketExist(bucket string) bool {
if err == errVolumeNotFound {
return false
}
errorIf(err, "Stat failed on bucket "+bucket+".")
return false
}
return true
@ -265,7 +265,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// Remove a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if disk == nil {
dErrs[index] = errDiskNotFound
dErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
@ -275,12 +275,15 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// Attempt to delete bucket.
err := disk.DeleteVol(bucket)
if err != nil {
dErrs[index] = err
dErrs[index] = traceError(err)
return
}
// Cleanup all the previously incomplete multiparts.
err = cleanupDir(disk, path.Join(minioMetaBucket, mpartMetaPrefix), bucket)
if err != nil && err != errVolumeNotFound {
if err != nil {
if errorCause(err) == errVolumeNotFound {
return
}
dErrs[index] = err
}
}(index, disk)
@ -291,7 +294,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
if !isDiskQuorum(dErrs, xl.writeQuorum) {
xl.undoDeleteBucket(bucket)
return toObjectErr(errXLWriteQuorum, bucket)
return toObjectErr(traceError(errXLWriteQuorum), bucket)
}
if reducedErr := reduceErrs(dErrs, []error{

View file

@ -162,28 +162,28 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, UnsupportedDelimiter{
return ListObjectsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
}
})
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, InvalidMarkerPrefixCombination{
return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
}
})
}
}

View file

@ -48,7 +48,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
// For any walk error return right away.
if walkResult.err != nil {
// File not found is a valid case.
if walkResult.err == errFileNotFound {
if errorCause(walkResult.err) == errFileNotFound {
return ListObjectsInfo{}, nil
}
return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix)
@ -66,8 +66,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
objInfo, err = xl.getObjectInfo(bucket, entry)
if err != nil {
// Ignore errFileNotFound
if err == errFileNotFound {
errorIf(err, "Unable to get object info", bucket, entry)
if errorCause(err) == errFileNotFound {
continue
}
return ListObjectsInfo{}, toObjectErr(err, bucket, prefix)
@ -109,28 +108,28 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, UnsupportedDelimiter{
return ListObjectsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
}
})
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, InvalidMarkerPrefixCombination{
return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
}
})
}
}

View file

@ -88,7 +88,7 @@ func (e erasureInfo) GetCheckSumInfo(partName string) (ckSum checkSumInfo, err e
return sum, nil
}
}
return checkSumInfo{}, errUnexpected
return checkSumInfo{}, traceError(errUnexpected)
}
// statInfo - carries stat information of the object.
@ -188,7 +188,7 @@ func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset in
partOffset -= part.Size
}
// Offset beyond the size of the object return InvalidRange.
return 0, 0, InvalidRange{}
return 0, 0, traceError(InvalidRange{})
}
// pickValidXLMeta - picks one valid xlMeta content and returns from a
@ -239,7 +239,7 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
// deleteXLMetadata - deletes `xl.json` on a single disk.
func deleteXLMetdata(disk StorageAPI, bucket, prefix string) error {
jsonFile := path.Join(prefix, xlMetaJSONFile)
return disk.DeleteFile(bucket, jsonFile)
return traceError(disk.DeleteFile(bucket, jsonFile))
}
// writeXLMetadata - writes `xl.json` to a single disk.
@ -249,10 +249,10 @@ func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) er
// Marshal json.
metadataBytes, err := json.Marshal(&xlMeta)
if err != nil {
return err
return traceError(err)
}
// Persist marshalled data.
return disk.AppendFile(bucket, jsonFile, metadataBytes)
return traceError(disk.AppendFile(bucket, jsonFile, metadataBytes))
}
// deleteAllXLMetadata - deletes all partially written `xl.json` depending on errs.
@ -284,7 +284,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
// Start writing `xl.json` to all disks in parallel.
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
mErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
@ -310,7 +310,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
if !isDiskQuorum(mErrs, quorum) {
// Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
return reduceErrs(mErrs, []error{
@ -328,7 +328,7 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
// Start writing `xl.json` to all disks in parallel.
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
mErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
@ -354,7 +354,7 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
if !isDiskQuorum(mErrs, writeQuorum) {
// Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
return reduceErrs(mErrs, []error{

View file

@ -136,6 +136,7 @@ func TestObjectToPartOffset(t *testing.T) {
// Test them.
for _, testCase := range testCases {
index, offset, err := xlMeta.ObjectToPartOffset(testCase.offset)
err = errorCause(err)
if err != testCase.expectedErr {
t.Fatalf("%+v: expected = %s, got: %s", testCase, testCase.expectedErr, err)
}

View file

@ -43,15 +43,15 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads
defer wg.Done()
uploadsBytes, wErr := json.Marshal(uploadsJSON)
if wErr != nil {
errs[index] = wErr
errs[index] = traceError(wErr)
return
}
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil {
errs[index] = wErr
errs[index] = traceError(wErr)
return
}
if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil {
errs[index] = wErr
errs[index] = traceError(wErr)
return
}
}(index, disk)
@ -82,7 +82,7 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads
}(index, disk)
}
wg.Wait()
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
return nil
}
@ -117,7 +117,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
// Reads `uploads.json` and returns error.
uploadsJSON, err := xl.readUploadsJSON(bucket, object)
if err != nil {
if err != errFileNotFound {
if errorCause(err) != errFileNotFound {
return err
}
// Set uploads format to `xl` otherwise.
@ -129,7 +129,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
// Update `uploads.json` on all disks.
for index, disk := range xl.storageDisks {
if disk == nil {
errs[index] = errDiskNotFound
errs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
@ -138,21 +138,21 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
defer wg.Done()
uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON)
if wErr != nil {
errs[index] = wErr
errs[index] = traceError(wErr)
return
}
// Write `uploads.json` to disk.
if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil {
errs[index] = wErr
errs[index] = traceError(wErr)
return
}
wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
if wErr != nil {
if dErr := disk.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil {
errs[index] = dErr
errs[index] = traceError(dErr)
return
}
errs[index] = wErr
errs[index] = traceError(wErr)
return
}
errs[index] = nil
@ -180,7 +180,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
}(index, disk)
}
wg.Wait()
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
// Ignored errors list.
@ -248,7 +248,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
if err == nil {
return fileInfo, nil
}
err = traceError(err)
// For any reason disk was deleted or goes offline we continue to next disk.
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
continue
@ -271,7 +271,7 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in
// Rename `xl.json` to all disks in parallel.
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
mErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
@ -284,7 +284,7 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in
// Renames `xl.json` from source prefix to destination prefix.
rErr := disk.RenameFile(minioMetaBucket, srcJSONFile, minioMetaBucket, dstJSONFile)
if rErr != nil {
mErrs[index] = rErr
mErrs[index] = traceError(rErr)
return
}
mErrs[index] = nil
@ -297,7 +297,7 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in
if !isDiskQuorum(mErrs, quorum) {
// Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs)
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
// List of ignored errors.

View file

@ -214,42 +214,42 @@ func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListMultipartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
if !xl.isBucketExist(bucket) {
return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket}
return ListMultipartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
return ListMultipartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, UnsupportedDelimiter{
return ListMultipartsInfo{}, traceError(UnsupportedDelimiter{
Delimiter: delimiter,
}
})
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{
return ListMultipartsInfo{}, traceError(InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
}
})
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return result, InvalidUploadIDKeyCombination{
return result, traceError(InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
}
})
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return result, err
return result, traceError(err)
}
if id.IsZero() {
return result, MalformedUploadID{
return result, traceError(MalformedUploadID{
UploadID: uploadIDMarker,
}
})
}
}
return xl.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
@ -314,15 +314,15 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
return "", traceError(BucketNotFound{Bucket: bucket})
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// No metadata is set, allocate a new one.
if meta == nil {
@ -339,14 +339,14 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st
func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
var partsMetadata []xlMetaV1
@ -361,14 +361,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Validates if upload ID exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
return "", InvalidUploadID{UploadID: uploadID}
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket,
uploadIDPath)
if !isDiskQuorum(errs, xl.writeQuorum) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
return "", toObjectErr(errXLWriteQuorum, bucket, object)
return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object)
}
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
@ -409,7 +409,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if sizeWritten < size {
return "", IncompleteBody{}
return "", traceError(IncompleteBody{})
}
// For size == -1, perhaps client is sending in chunked encoding
@ -435,7 +435,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// MD5 mismatch, delete the temporary object.
xl.deleteObject(minioMetaBucket, tmpPartPath)
// Returns md5 mismatch.
return "", BadDigest{md5Hex, newMD5Hex}
return "", traceError(BadDigest{md5Hex, newMD5Hex})
}
}
@ -448,7 +448,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Validate again if upload ID still exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
// Rename temporary part file to its final location.
@ -461,7 +461,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath)
if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object)
}
// Get current highest version based on re-read partsMetadata.
@ -578,14 +578,14 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM
func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket}
return ListPartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return ListPartsInfo{}, BucketNotFound{Bucket: bucket}
return ListPartsInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -597,7 +597,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !xl.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
}
result, err := xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
return result, err
@ -612,17 +612,17 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify whether the bucket exists.
if !xl.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
return "", traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -636,7 +636,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...)
@ -650,7 +650,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath)
// Do we have writeQuorum?.
if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object)
}
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
@ -678,21 +678,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
partIdx := currentXLMeta.ObjectPartIndex(part.PartNumber)
// All parts should have same part number.
if partIdx == -1 {
return "", InvalidPart{}
return "", traceError(InvalidPart{})
}
// All parts should have same ETag as previously generated.
if currentXLMeta.Parts[partIdx].ETag != part.ETag {
return "", BadDigest{}
return "", traceError(BadDigest{})
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].Size) {
return "", PartTooSmall{
return "", traceError(PartTooSmall{
PartNumber: part.PartNumber,
PartSize: currentXLMeta.Parts[partIdx].Size,
PartETag: part.ETag,
}
})
}
// Last part could have been uploaded as 0bytes, do not need
@ -716,7 +716,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Check if an object is present as one of the parent dir.
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return "", toObjectErr(errFileAccessDenied, bucket, object)
return "", toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
// Save the final object size and modtime.
@ -893,13 +893,13 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !xl.isBucketExist(bucket) {
return BucketNotFound{Bucket: bucket}
return traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -911,7 +911,7 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
if !xl.isUploadIDExists(bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID}
return traceError(InvalidUploadID{UploadID: uploadID})
}
err := xl.abortMultipartUpload(bucket, object, uploadID)
return err

View file

@ -42,19 +42,19 @@ import (
func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// Start offset and length cannot be negative.
if startOffset < 0 || length < 0 {
return toObjectErr(errUnexpected, bucket, object)
return traceError(errUnexpected)
}
// Writer cannot be nil.
if writer == nil {
return toObjectErr(errUnexpected, bucket, object)
return traceError(errUnexpected)
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -69,7 +69,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
// Do we have read quorum?
if !isDiskQuorum(errs, xl.readQuorum) {
return toObjectErr(errXLReadQuorum, bucket, object)
return traceError(InsufficientReadQuorum{}, errs...)
}
if reducedErr := reduceErrs(errs, []error{
@ -94,24 +94,24 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Reply back invalid range if the input offset and length fall out of range.
if startOffset > xlMeta.Stat.Size || length > xlMeta.Stat.Size {
return InvalidRange{startOffset, length, xlMeta.Stat.Size}
return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size})
}
// Reply if we have inputs with offset and length.
if startOffset+length > xlMeta.Stat.Size {
return InvalidRange{startOffset, length, xlMeta.Stat.Size}
return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size})
}
// Get start part index and offset.
partIndex, partOffset, err := xlMeta.ObjectToPartOffset(startOffset)
if err != nil {
return toObjectErr(err, bucket, object)
return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size})
}
// Get last part index to read given length.
lastPartIndex, _, err := xlMeta.ObjectToPartOffset(startOffset + length - 1)
if err != nil {
return toObjectErr(err, bucket, object)
return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size})
}
// Save the writer.
@ -125,17 +125,17 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
if err == nil { // Cache hit.
// Advance the buffer to offset as if it was read.
if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset.
return err
return traceError(err)
}
// Write the requested length.
if _, err = io.CopyN(writer, cachedBuffer, length); err != nil {
return err
return traceError(err)
}
return nil
} // Cache miss.
// For unknown error, return and error out.
if err != objcache.ErrKeyNotFoundInCache {
return err
return traceError(err)
} // Cache has not been found, fill the cache.
// Cache is only set if whole object is being read.
@ -152,7 +152,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Ignore error if cache is full, proceed to write the object.
if err != nil && err != objcache.ErrCacheFull {
// For any other error return here.
return toObjectErr(err, bucket, object)
return toObjectErr(traceError(err), bucket, object)
}
}
}
@ -223,12 +223,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
func (xl xlObjects) HealObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
// FIXME: return Invalid prefix.
return ObjectNameInvalid{Bucket: bucket, Object: object}
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -275,13 +275,13 @@ func (xl xlObjects) HealObject(bucket, object string) error {
err := disk.DeleteFile(bucket,
pathJoin(object, outDatedMeta.Parts[partIndex].Name))
if err != nil {
return err
return traceError(err)
}
}
// Delete xl.json file.
err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
if err != nil {
return err
return traceError(err)
}
}
@ -343,7 +343,7 @@ func (xl xlObjects) HealObject(bucket, object string) error {
}
err := disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
if err != nil {
return err
return traceError(err)
}
}
return nil
@ -447,7 +447,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
defer wg.Done()
err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry)
if err != nil && err != errFileNotFound {
errs[index] = err
errs[index] = traceError(err)
}
}(index, disk)
}
@ -460,7 +460,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
if !isDiskQuorum(errs, quorum) {
// Undo all the partial rename operations.
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs)
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
// Return on first error, also undo any partially successful rename operations.
return reduceErrs(errs, []error{
@ -495,17 +495,17 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (md5Sum string, err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
return "", traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify bucket exists.
if !xl.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
return "", traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
return "", traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
})
}
// No metadata is set, allocate a new one.
if metadata == nil {
@ -538,7 +538,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Ignore error if cache is full, proceed to write the object.
if err != nil && err != objcache.ErrCacheFull {
// For any other error return here.
return "", toObjectErr(err, bucket, object)
return "", toObjectErr(traceError(err), bucket, object)
}
} else {
mw = md5Writer
@ -636,7 +636,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
if xl.parentDirIsObject(bucket, path.Dir(object)) {
// Parent (in the namespace) is an object, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
return "", toObjectErr(errFileAccessDenied, bucket, object)
return "", toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
// Rename if an object already exists to temporary location.
@ -706,14 +706,14 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
for index, disk := range xl.storageDisks {
if disk == nil {
dErrs[index] = errDiskNotFound
dErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
err := cleanupDir(disk, bucket, object)
if err != nil && err != errFileNotFound {
if err != nil && errorCause(err) != errVolumeNotFound {
dErrs[index] = err
}
}(index, disk)
@ -725,7 +725,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
// Do we have write quorum?
if !isDiskQuorum(dErrs, xl.writeQuorum) {
// Return errXLWriteQuorum if errors were more than allowed write quorum.
return errXLWriteQuorum
return traceError(errXLWriteQuorum)
}
return nil
@ -737,10 +737,10 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
return traceError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
@ -752,7 +752,7 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
// Validate object exists.
if !xl.isObject(bucket, object) {
return ObjectNotFound{bucket, object}
return traceError(ObjectNotFound{bucket, object})
} // else proceed to delete the object.
// Delete the object on all disks.

View file

@ -95,6 +95,7 @@ func TestXLDeleteObjectBasic(t *testing.T) {
}
for i, test := range testCases {
actualErr := xl.DeleteObject(test.bucket, test.object)
actualErr = errorCause(actualErr)
if test.expectedErr != nil && actualErr != test.expectedErr {
t.Errorf("Test %d: Expected to fail with %s, but failed with %s", i+1, test.expectedErr, actualErr)
}
@ -146,6 +147,7 @@ func TestXLDeleteObjectDiskNotFound(t *testing.T) {
xl.storageDisks[7] = nil
xl.storageDisks[8] = nil
err = obj.DeleteObject(bucket, object)
err = errorCause(err)
if err != toObjectErr(errXLWriteQuorum, bucket, object) {
t.Errorf("Expected deleteObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err)
}
@ -196,6 +198,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
}
// Fetch object from store.
err = xl.GetObject(bucket, object, 0, int64(len("abcd")), ioutil.Discard)
err = errorCause(err)
if err != toObjectErr(errXLReadQuorum, bucket, object) {
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err)
}
@ -246,6 +249,7 @@ func TestPutObjectNoQuorum(t *testing.T) {
}
// Upload new content to same object "object"
_, err = obj.PutObject(bucket, object, int64(len("abcd")), bytes.NewReader([]byte("abcd")), nil)
err = errorCause(err)
if err != toObjectErr(errXLWriteQuorum, bucket, object) {
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err)
}

View file

@ -32,6 +32,7 @@ import (
func reduceErrs(errs []error, ignoredErrs []error) error {
errorCounts := make(map[error]int)
errs = errorsCause(errs)
for _, err := range errs {
if isErrIgnored(err, ignoredErrs) {
continue
@ -46,13 +47,14 @@ func reduceErrs(errs []error, ignoredErrs []error) error {
errMax = err
}
}
return errMax
return traceError(errMax, errs...)
}
// Validates if we have quorum based on the errors related to disk only.
// Returns 'true' if we have quorum, 'false' if we don't.
func isDiskQuorum(errs []error, minQuorumCount int) bool {
var count int
errs = errorsCause(errs)
for _, err := range errs {
switch err {
case errDiskNotFound, errFaultyDisk, errDiskAccessDenied:
@ -60,6 +62,7 @@ func isDiskQuorum(errs []error, minQuorumCount int) bool {
}
count++
}
return count >= minQuorumCount
}
@ -101,12 +104,12 @@ func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1,
// Reads entire `xl.json`.
buf, err := disk.ReadAll(bucket, path.Join(object, xlMetaJSONFile))
if err != nil {
return xlMetaV1{}, err
return xlMetaV1{}, traceError(err)
}
// Unmarshal xl metadata.
if err = json.Unmarshal(buf, &xlMeta); err != nil {
return xlMetaV1{}, err
return xlMetaV1{}, traceError(err)
}
// Return structured `xl.json`.

View file

@ -55,7 +55,7 @@ func TestReduceErrs(t *testing.T) {
// Validates list of all the testcases for returning valid errors.
for i, testCase := range testCases {
gotErr := reduceErrs(testCase.errs, testCase.ignoredErrs)
if testCase.err != gotErr {
if errorCause(gotErr) != testCase.err {
t.Errorf("Test %d : expected %s, got %s", i+1, testCase.err, gotErr)
}
}