fix: remove parentIsObject() check (#12851)

we will allow situations such as

```
a/b/1.txt
a/b
```

and

```
a/b
a/b/1.txt
```

we are going to document that this usecase is
not supported and we will never support it, if
any application does this users have to delete
the top level parent to make sure namespace is
accessible at lower level.

rest of the situations where the prefixes get
created across sets are supported as is.
This commit is contained in:
Harshavardhana 2021-08-03 13:26:57 -07:00 committed by GitHub
parent 9371852c7d
commit 035882d292
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 204 additions and 631 deletions

View File

@ -182,7 +182,7 @@ func testServicesCmdHandler(cmd cmdType, t *testing.T) {
adminTestBed, err := prepareAdminErasureTestBed(ctx)
if err != nil {
t.Fatal("Failed to initialize a single node Erasure backend for admin handler tests.")
t.Fatal("Failed to initialize a single node Erasure backend for admin handler tests.", err)
}
defer adminTestBed.TearDown()
@ -253,7 +253,7 @@ func TestAdminServerInfo(t *testing.T) {
adminTestBed, err := prepareAdminErasureTestBed(ctx)
if err != nil {
t.Fatal("Failed to initialize a single node Erasure backend for admin handler tests.")
t.Fatal("Failed to initialize a single node Erasure backend for admin handler tests.", err)
}
defer adminTestBed.TearDown()

View File

@ -231,7 +231,6 @@ const (
// MinIO extended errors.
ErrReadQuorum
ErrWriteQuorum
ErrParentIsObject
ErrStorageFull
ErrRequestBodyParse
ErrObjectExistsAsDirectory
@ -1122,11 +1121,6 @@ var errorCodes = errorCodeMap{
Description: "Storage backend has reached its minimum free disk threshold. Please delete a few objects to proceed.",
HTTPStatusCode: http.StatusInsufficientStorage,
},
ErrParentIsObject: {
Code: "XMinioParentIsObject",
Description: "Object-prefix is already an object, please choose a different object-prefix name.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrRequestBodyParse: {
Code: "XMinioRequestBodyParse",
Description: "The request body failed to parse.",
@ -1899,8 +1893,6 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrObjectExistsAsDirectory
case PrefixAccessDenied:
apiErr = ErrAccessDenied
case ParentIsObject:
apiErr = ErrParentIsObject
case BucketNameInvalid:
apiErr = ErrInvalidBucketName
case BucketNotFound:

File diff suppressed because one or more lines are too long

View File

@ -20,8 +20,6 @@ package cmd
import (
"context"
"sync"
"github.com/minio/minio/internal/sync/errgroup"
)
func (er erasureObjects) getLocalDisks() (localDisks []StorageAPI) {
@ -101,34 +99,3 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
// Return disks which have maximum disk usage common.
return newDisks[max]
}
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects
// - "a/b/c"
// - "a/b"
// - "a"
// do not exist on the namespace.
func (er erasureObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] == nil {
return errDiskNotFound
}
// Check if 'prefix' is an object on this 'disk', else continue the check the next disk
return storageDisks[index].CheckFile(ctx, bucket, parent)
}, index)
}
// NOTE: Observe we are not trying to read `xl.meta` and figure out the actual
// quorum intentionally, but rely on the default case scenario. Actual quorum
// verification will happen by top layer by using getObjectInfo() and will be
// ignored if necessary.
readQuorum := getReadQuorum(len(storageDisks))
return reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) == nil
}

View File

@ -1,83 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"os"
"testing"
)
// Tests for if parent directory is object
func TestErasureParentDirIsObject(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obj, fsDisks, err := prepareErasureSets32(ctx)
if err != nil {
t.Fatalf("Unable to initialize 'Erasure' object layer.")
}
defer obj.Shutdown(context.Background())
// Remove all disks.
for _, disk := range fsDisks {
defer os.RemoveAll(disk)
}
bucketName := "testbucket"
objectName := "object"
if err = obj.MakeBucketWithLocation(GlobalContext, bucketName, BucketOptions{}); err != nil {
t.Fatal(err)
}
objectContent := "12345"
_, err = obj.PutObject(GlobalContext, bucketName, objectName,
mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{})
if err != nil {
t.Fatal(err)
}
testCases := []struct {
expectedErr bool
objectName string
}{
{
expectedErr: true,
objectName: pathJoin(objectName, "parent-is-object"),
},
{
expectedErr: false,
objectName: pathJoin("no-parent", "object"),
},
}
for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
_, err = obj.PutObject(GlobalContext, bucketName, testCase.objectName,
mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{})
if testCase.expectedErr && err == nil {
t.Error("Expected error but got nil")
}
if !testCase.expectedErr && err != nil {
t.Errorf("Expected nil but got %v", err)
}
})
}
}

View File

@ -783,11 +783,9 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
var err error
var returnNotFound bool
if !opts.DryRun && opts.Remove {
if versionID == "" {
err = er.deleteObject(ctx, bucket, object, writeQuorum)
} else {
err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: versionID}, false)
}
err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{
VersionID: versionID,
}, false)
// If Delete was successful, make sure to return the appropriate error
// and heal result appropriate with delete's error messages

View File

@ -221,7 +221,7 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
if err = firstDisk.CheckFile(context.Background(), bucket, object); err != nil {
if _, err = firstDisk.StatInfoFile(context.Background(), bucket, object+"/"+xlStorageFormatFile); err != nil {
t.Errorf("Expected er.meta file to be present but stat failed - %v", err)
}
@ -365,7 +365,7 @@ func TestHealObjectErasure(t *testing.T) {
t.Fatalf("Failed to heal object - %v", err)
}
if err = firstDisk.CheckFile(context.Background(), bucket, object); err != nil {
if _, err = firstDisk.StatInfoFile(context.Background(), bucket, object+"/"+xlStorageFormatFile); err != nil {
t.Errorf("Expected er.meta file to be present but stat failed - %v", err)
}

View File

@ -782,12 +782,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
return oi, toObjectErr(err, bucket, object, uploadID)
}
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if opts.ParentIsObject != nil && opts.ParentIsObject(rctx, bucket, path.Dir(object)) {
return oi, toObjectErr(errFileParentIsFile, bucket, object)
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := getCompleteMultipartMD5(parts)

View File

@ -660,13 +660,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
return ObjectInfo{}, toObjectErr(errInvalidArgument)
}
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
// -- FIXME (this also causes performance issue when disks are down).
if opts.ParentIsObject != nil && opts.ParentIsObject(ctx, bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
}
// Initialize parts metadata
partsMetadata := make([]FileInfo, len(storageDisks))

View File

@ -878,18 +878,10 @@ func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string,
return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}
func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
if parent == "." {
return false
}
return s.getHashedSet(parent).parentDirIsObject(ctx, bucket, parent)
}
// PutObject - writes an object to hashedSet based on the object name.
func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set)
opts.ParentIsObject = s.parentDirIsObject
return set.PutObject(ctx, bucket, object, data, opts)
}
@ -1074,7 +1066,6 @@ func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object,
func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set)
opts.ParentIsObject = s.parentDirIsObject
return set.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
}

View File

@ -24,7 +24,6 @@ import (
"fmt"
"io/ioutil"
"os"
pathutil "path"
"sort"
"strconv"
"strings"
@ -550,11 +549,6 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
return oi, toObjectErr(err)
}
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(ctx, bucket, pathutil.Dir(object)) {
return oi, toObjectErr(errFileParentIsFile, bucket, object)
}
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
return oi, toObjectErr(err, bucket)
}

View File

@ -1006,26 +1006,6 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o
return oi, toObjectErr(err, bucket, object)
}
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist.
func (fs *FSObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
var isParentDirObject func(string) bool
isParentDirObject = func(p string) bool {
if p == "." || p == SlashSeparator {
return false
}
if fsIsFile(ctx, pathJoin(fs.fsPath, bucket, p)) {
// If there is already a file at prefix "p", return true.
return true
}
// Check if there is a file as one of the parent paths.
return isParentDirObject(path.Dir(p))
}
return isParentDirObject(parent)
}
// PutObject - creates an object upon reading from the input stream
// until EOF, writes data directly to configured filesystem path.
// Additionally writes `fs.json` which carries the necessary metadata
@ -1079,10 +1059,6 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
// with a slash separator, we treat it like a valid operation
// and return success.
if isObjectDir(object, data.Size()) {
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
}
if err = mkdirAll(pathJoin(fs.fsPath, bucket, object), 0777); err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
@ -1094,11 +1070,6 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
}
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument, logger.Application)

View File

@ -26,71 +26,6 @@ import (
"github.com/minio/madmin-go"
)
// Tests for if parent directory is object
func TestFSParentDirIsObject(t *testing.T) {
obj, disk, err := prepareFS()
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(disk)
bucketName := "testbucket"
objectName := "object"
if err = obj.MakeBucketWithLocation(GlobalContext, bucketName, BucketOptions{}); err != nil {
t.Fatal(err)
}
objectContent := "12345"
objInfo, err := obj.PutObject(GlobalContext, bucketName, objectName,
mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{})
if err != nil {
t.Fatal(err)
}
if objInfo.Name != objectName {
t.Fatalf("Unexpected object name returned got %s, expected %s", objInfo.Name, objectName)
}
fs := obj.(*FSObjects)
testCases := []struct {
parentIsObject bool
objectName string
}{
// parentIsObject is true if object is available.
{
parentIsObject: true,
objectName: objectName,
},
{
parentIsObject: false,
objectName: "",
},
{
parentIsObject: false,
objectName: ".",
},
// Should not cause infinite loop.
{
parentIsObject: false,
objectName: SlashSeparator,
},
{
parentIsObject: false,
objectName: "\\",
},
// Should not cause infinite loop with double forward slash.
{
parentIsObject: false,
objectName: "//",
},
}
for i, testCase := range testCases {
gotValue := fs.parentDirIsObject(GlobalContext, bucketName, testCase.objectName)
if testCase.parentIsObject != gotValue {
t.Errorf("Test %d: Unexpected value returned got %t, expected %t", i+1, gotValue, testCase.parentIsObject)
}
}
}
// TestNewFS - tests initialization of all input disks
// and constructs a valid `FS` object layer.
func TestNewFS(t *testing.T) {
@ -226,35 +161,6 @@ func TestFSPutObject(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, err = obj.PutObject(GlobalContext, bucketName, objectName+"/1", mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{})
if err == nil {
t.Fatal("Unexpected should fail here, backend corruption occurred")
}
if nerr, ok := err.(ParentIsObject); !ok {
t.Fatalf("Expected ParentIsObject, got %#v", err)
} else {
if nerr.Bucket != "bucket" {
t.Fatalf("Expected 'bucket', got %s", nerr.Bucket)
}
if nerr.Object != "1/2/3/4/object/1" {
t.Fatalf("Expected '1/2/3/4/object/1', got %s", nerr.Object)
}
}
_, err = obj.PutObject(GlobalContext, bucketName, objectName+"/1/", mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), 0, "", ""), ObjectOptions{})
if err == nil {
t.Fatal("Unexpected should fail here, backned corruption occurred")
}
if nerr, ok := err.(ParentIsObject); !ok {
t.Fatalf("Expected ParentIsObject, got %#v", err)
} else {
if nerr.Bucket != "bucket" {
t.Fatalf("Expected 'bucket', got %s", nerr.Bucket)
}
if nerr.Object != "1/2/3/4/object/1/" {
t.Fatalf("Expected '1/2/3/4/object/1/', got %s", nerr.Object)
}
}
}
// TestFSDeleteObject - test fs.DeleteObject() with healthy and corrupted disks

View File

@ -649,9 +649,8 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
logger.LogIf(ctx, err)
custom := b.headerKV()
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
ParentIsObject: nil,
UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
})
if err != nil {
mc.setErr(err.Error())

View File

@ -218,13 +218,6 @@ func (d *naughtyDisk) CheckParts(ctx context.Context, volume string, path string
return d.disk.CheckParts(ctx, volume, path, fi)
}
func (d *naughtyDisk) CheckFile(ctx context.Context, volume string, path string) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.CheckFile(ctx, volume, path)
}
func (d *naughtyDisk) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) {
if err := d.calcError(); err != nil {
return err

View File

@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"io"
"path"
)
// Converts underlying storage error. Convenience function written to
@ -64,15 +63,6 @@ func toObjectErr(err error, params ...string) error {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errFileParentIsFile.Error():
apiErr := ParentIsObject{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errIsNotRegular.Error():
apiErr := ObjectExistsAsDirectory{}
if len(params) >= 1 {
@ -322,13 +312,6 @@ func (e PrefixAccessDenied) Error() string {
return "Prefix access is denied: " + e.Bucket + SlashSeparator + e.Object
}
// ParentIsObject object access is denied.
type ParentIsObject GenericError
func (e ParentIsObject) Error() string {
return "Parent is object " + e.Bucket + SlashSeparator + path.Dir(e.Object)
}
// BucketExists bucket exists.
type BucketExists GenericError

View File

@ -53,10 +53,9 @@ type ObjectOptions struct {
VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted.
Transition TransitionOptions
NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario
ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object.
NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario
DeletePrefix bool // set true to enforce a prefix deletion, only application for DeleteObject API,

View File

@ -88,9 +88,6 @@ var errFileAccessDenied = StorageErr("file access denied")
// errFileCorrupt - file has an unexpected size, or is not readable
var errFileCorrupt = StorageErr("file is corrupted")
// errFileParentIsFile - cannot have overlapping objects, parent is already a file.
var errFileParentIsFile = StorageErr("parent is a file")
// errBitrotHashAlgoInvalid - the algo for bit-rot hash
// verification is empty or invalid.
var errBitrotHashAlgoInvalid = StorageErr("bit-rot hash algorithm is invalid")

View File

@ -71,7 +71,6 @@ type StorageAPI interface {
ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error)
RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error
CheckParts(ctx context.Context, volume string, path string, fi FileInfo) error
CheckFile(ctx context.Context, volume string, path string) (err error)
Delete(ctx context.Context, volume string, path string, recursive bool) (err error)
VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error
StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error)

View File

@ -434,16 +434,6 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
return err
}
// CheckFile - stat a file metadata.
func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, path string) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
defer xhttp.DrainBody(respBody)
return err
}
// CheckParts - stat all file parts.
func (client *storageRESTClient) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) error {
values := make(url.Values)

View File

@ -18,7 +18,7 @@
package cmd
const (
storageRESTVersion = "v37" // cleanup behavior change at storage layer.
storageRESTVersion = "v38" // Remove CheckFile API
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)
@ -42,7 +42,6 @@ const (
storageRESTMethodReadVersion = "/readversion"
storageRESTMethodRenameData = "/renamedata"
storageRESTMethodCheckParts = "/checkparts"
storageRESTMethodCheckFile = "/checkfile"
storageRESTMethodReadAll = "/readall"
storageRESTMethodReadFile = "/readfile"
storageRESTMethodReadFileStream = "/readfilestream"

View File

@ -487,20 +487,6 @@ func (s *storageRESTServer) CheckPartsHandler(w http.ResponseWriter, r *http.Req
}
}
// CheckFileHandler - check if a file metadata exists.
func (s *storageRESTServer) CheckFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
if err := s.storage.CheckFile(r.Context(), volume, filePath); err != nil {
s.writeErrorResponse(w, err)
}
}
// ReadAllHandler - read all the contents of a file.
func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -1121,8 +1107,6 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(httpTraceHdrs(server.CreateFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTLength)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCheckFile).HandlerFunc(httpTraceHdrs(server.CheckFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCheckParts).HandlerFunc(httpTraceHdrs(server.CheckPartsHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)

View File

@ -166,7 +166,7 @@ func testStorageAPIDeleteVol(t *testing.T, storage StorageAPI) {
}
}
func testStorageAPICheckFile(t *testing.T, storage StorageAPI) {
func testStorageAPIStatInfoFile(t *testing.T, storage StorageAPI) {
err := storage.MakeVol(context.Background(), "foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -187,7 +187,7 @@ func testStorageAPICheckFile(t *testing.T, storage StorageAPI) {
}
for i, testCase := range testCases {
err := storage.CheckFile(context.Background(), testCase.volumeName, testCase.objectName)
_, err := storage.StatInfoFile(context.Background(), testCase.volumeName, testCase.objectName+"/"+xlStorageFormatFile)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
@ -515,7 +515,7 @@ func TestStorageRESTClientDeleteVol(t *testing.T) {
testStorageAPIDeleteVol(t, restClient)
}
func TestStorageRESTClientCheckFile(t *testing.T) {
func TestStorageRESTClientStatInfoFile(t *testing.T) {
httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t)
defer httpServer.Close()
defer func() {
@ -523,7 +523,7 @@ func TestStorageRESTClientCheckFile(t *testing.T) {
}()
defer os.RemoveAll(endpointPath)
testStorageAPICheckFile(t, restClient)
testStorageAPIStatInfoFile(t, restClient)
}
func TestStorageRESTClientListDir(t *testing.T) {

View File

@ -22,23 +22,22 @@ func _() {
_ = x[storageMetricRenameFile-11]
_ = x[storageMetricRenameData-12]
_ = x[storageMetricCheckParts-13]
_ = x[storageMetricCheckFile-14]
_ = x[storageMetricDelete-15]
_ = x[storageMetricDeleteVersions-16]
_ = x[storageMetricVerifyFile-17]
_ = x[storageMetricWriteAll-18]
_ = x[storageMetricDeleteVersion-19]
_ = x[storageMetricWriteMetadata-20]
_ = x[storageMetricUpdateMetadata-21]
_ = x[storageMetricReadVersion-22]
_ = x[storageMetricReadAll-23]
_ = x[storageStatInfoFile-24]
_ = x[storageMetricLast-25]
_ = x[storageMetricDelete-14]
_ = x[storageMetricDeleteVersions-15]
_ = x[storageMetricVerifyFile-16]
_ = x[storageMetricWriteAll-17]
_ = x[storageMetricDeleteVersion-18]
_ = x[storageMetricWriteMetadata-19]
_ = x[storageMetricUpdateMetadata-20]
_ = x[storageMetricReadVersion-21]
_ = x[storageMetricReadAll-22]
_ = x[storageStatInfoFile-23]
_ = x[storageMetricLast-24]
}
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllstorageStatInfoFileLast"
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllstorageStatInfoFileLast"
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 252, 256}
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 224, 243, 247}
func (i storageMetric) String() string {
if i >= storageMetric(len(_storageMetric_index)-1) {

View File

@ -48,7 +48,6 @@ const (
storageMetricRenameFile
storageMetricRenameData
storageMetricCheckParts
storageMetricCheckFile
storageMetricDelete
storageMetricDeleteVersions
storageMetricVerifyFile
@ -436,22 +435,6 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
return p.storage.CheckParts(ctx, volume, path, fi)
}
func (p *xlStorageDiskIDCheck) CheckFile(ctx context.Context, volume string, path string) (err error) {
defer p.updateStorageMetrics(storageMetricCheckFile, volume, path)()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.CheckFile(ctx, volume, path)
}
func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) {
defer p.updateStorageMetrics(storageMetricDelete, volume, path)()

View File

@ -37,7 +37,6 @@ import (
"time"
"github.com/dustin/go-humanize"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
"github.com/klauspost/readahead"
"github.com/minio/minio/internal/bucket/lifecycle"
@ -880,16 +879,16 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
// PR #11758 used DataDir, preserve it
// for users who might have used master
// branch
xlMeta.data.remove(versionID, dataDir)
filePath := pathJoin(volumeDir, path, dataDir)
if err = checkPathLength(filePath); err != nil {
return err
}
if err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID())); err != nil {
if err != errFileNotFound {
if !xlMeta.data.remove(versionID, dataDir) {
filePath := pathJoin(volumeDir, path, dataDir)
if err = checkPathLength(filePath); err != nil {
return err
}
if err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID())); err != nil {
if err != errFileNotFound {
return err
}
}
}
}
if !lastVersion {
@ -901,19 +900,16 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
}
// Move everything to trash.
filePath := retainSlash(pathJoin(volumeDir, path))
// Move xl.meta to trash
filePath := pathJoin(volumeDir, path, xlStorageFormatFile)
if err = checkPathLength(filePath); err != nil {
return err
}
err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
// Delete parents if needed.
filePath = retainSlash(pathutil.Dir(pathJoin(volumeDir, path)))
if filePath == retainSlash(volumeDir) {
return err
err = Rename(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
if err == nil || err == errFileNotFound {
s.deleteFile(volumeDir, pathJoin(volumeDir, path), false)
}
s.deleteFile(volumeDir, filePath, false)
return err
}
@ -1701,65 +1697,6 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
return nil
}
// CheckFile check if path has necessary metadata.
// This function does the following check, suppose
// you are creating a metadata file at "a/b/c/d/xl.meta",
// makes sure that there is no `xl.meta` at
// - "a/b/c/"
// - "a/b/"
// - "a/"
func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) error {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
s.RLock()
formatLegacy := s.formatLegacy
s.RUnlock()
var checkFile func(p string) error
checkFile = func(p string) error {
if p == "." || p == SlashSeparator {
return errPathNotFound
}
filePath := pathJoin(volumeDir, p, xlStorageFormatFile)
if err := checkPathLength(filePath); err != nil {
return err
}
st, _ := Lstat(filePath)
if st == nil {
if !formatLegacy {
return errPathNotFound
}
filePathOld := pathJoin(volumeDir, p, xlStorageFormatFileV1)
if err := checkPathLength(filePathOld); err != nil {
return err
}
st, _ = Lstat(filePathOld)
if st == nil {
return errPathNotFound
}
}
if st != nil {
if !st.Mode().IsRegular() {
// not a regular file return error.
return errFileNotFound
}
// Success fully found
return nil
}
return checkFile(pathutil.Dir(p))
}
return checkFile(path)
}
// deleteFile deletes a file or a directory if its empty unless recursive
// is set to true. If the target is successfully deleted, it will recursively
// move up the tree, deleting empty parent directories until it finds one
@ -1907,6 +1844,15 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
dstBuf, err := xioutil.ReadFile(dstFilePath)
if err != nil {
// handle situations when dstFilePath is 'file'
// for example such as someone is trying to
// upload an object such as `prefix/object/xl.meta`
// where `prefix/object` is already an object
if isSysErrNotDir(err) && runtime.GOOS != globalWindowsOSName {
// NOTE: On windows the error happens at
// next line and returns appropriate error.
return errFileAccessDenied
}
if !osIsNotExist(err) {
return osErrToFileErr(err)
}
@ -1921,38 +1867,6 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
return osErrToFileErr(err)
}
}
if err == errFileNotFound {
// Verification to ensure that we
// don't have objects already created
// at this location, verify that resultant
// directories don't have any unexpected
// directories that we do not understand
// or expect. If its already there we should
// make sure to reject further renames
// for such objects.
//
// This elaborate check is necessary to avoid
// scenarios such as these.
//
// bucket1/name1/obj1/xl.meta
// bucket1/name1/xl.meta --> this should never
// be allowed.
{
entries, err := readDirN(pathutil.Dir(dstFilePath), 1)
if err != nil && err != errFileNotFound {
return err
}
if len(entries) > 0 {
entry := pathutil.Clean(entries[0])
if entry != legacyDataDir {
_, uerr := uuid.Parse(entry)
if uerr != nil {
return errFileParentIsFile
}
}
}
}
}
}
var xlMeta xlMetaV2

View File

@ -1628,8 +1628,8 @@ func TestXLStorageRenameFile(t *testing.T) {
}
}
// TestXLStorage xlStorage.CheckFile()
func TestXLStorageCheckFile(t *testing.T) {
// TestXLStorage xlStorage.StatInfoFile()
func TestXLStorageStatInfoFile(t *testing.T) {
// create xlStorage test setup
xlStorage, path, err := newXLStorageTestSetup()
if err != nil {
@ -1699,19 +1699,20 @@ func TestXLStorageCheckFile(t *testing.T) {
{
srcVol: "non-existent-vol",
srcPath: "success-file",
expectedErr: errPathNotFound,
expectedErr: errVolumeNotFound,
},
// TestXLStorage case - 7.
// TestXLStorage case with file with directory.
{
srcVol: "success-vol",
srcPath: "path/to",
expectedErr: errFileNotFound,
expectedErr: nil,
},
}
for i, testCase := range testCases {
if err := xlStorage.CheckFile(context.Background(), testCase.srcVol, testCase.srcPath); err != testCase.expectedErr {
_, err := xlStorage.StatInfoFile(context.Background(), testCase.srcVol, testCase.srcPath+"/"+xlStorageFormatFile)
if err != testCase.expectedErr {
t.Errorf("TestXLStorage case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err)
}
}

View File

@ -113,7 +113,7 @@ func TestIsValidUmaskFile(t *testing.T) {
}
// CheckFile - stat the file.
if err := disk.CheckFile(context.Background(), testCase.volName, "hello-world.txt"); err != nil {
if _, err := disk.StatInfoFile(context.Background(), testCase.volName, "hello-world.txt/"+xlStorageFormatFile); err != nil {
t.Fatalf("Stat failed with %s expected to pass.", err)
}
}

View File

@ -1,4 +1,5 @@
## MinIO Server Limits Per Tenant
For best deployment experience MinIO recommends operating systems RHEL/CentOS 8.x or later, Ubuntu 18.04 LTS or later. These operating systems package the latest 'xfsprogs' that support large scale deployments.
### Erasure Code (Multiple Drives / Servers)
@ -49,6 +50,16 @@ We found the following APIs to be redundant or less useful outside of AWS S3. If
- ObjectTorrent
### Object name restrictions on MinIO
Object names that contain characters `^*|\/&";` are unsupported on Windows and other file systems which do not support filenames with these characters. NOTE: This list is not an exhaustive, it depends on the operating system and filesystem under use.
- Object names that contain characters `^*|\/&";` are unsupported on Windows platform or any other file systems that do not support filenames with special charaters. **This list is non exhaustive, it depends on the operating system and filesystem under use - please consult your operating system vendor**. MinIO recommends using Linux based deployments for production workloads.
For best experience we recommend that you use distributions that ship fairly recent Linux kernel such as CentOS 8, Ubuntu 18.04 LTS along with XFS as the choice for your backend filesystem.
- Objects should not have conflicting objects as parents, applications using this behavior should change their behavior and use proper unique keys, for example situations such as following conflicting key patterns are not supported.
```
PUT <bucketname>/a/b/1.txt
PUT <bucketname>/a/b
```
```
PUT <bucketname>/a/b
PUT <bucketname>/a/b/1.txt
```