Compare commits
7 commits
master
...
RELEASE.20
Author | SHA1 | Date | |
---|---|---|---|
c8fef1d994 | |||
1aa5a642b8 | |||
11236d1009 | |||
4d2f1df591 | |||
eb5a5d1238 | |||
65f1ad03fa | |||
6046014253 |
4
Makefile
4
Makefile
|
@ -71,6 +71,10 @@ build: checks
|
||||||
@echo "Building minio binary to './minio'"
|
@echo "Building minio binary to './minio'"
|
||||||
@GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
@GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
||||||
|
|
||||||
|
hotfix: LDFLAGS := $(shell MINIO_RELEASE="RELEASE" MINIO_HOTFIX="hotfix" go run buildscripts/gen-ldflags.go $(shell git describe --tags --abbrev=0 | \
|
||||||
|
sed 's#RELEASE\.\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)T\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)Z#\1-\2-\3T\4:\5:\6Z#'))
|
||||||
|
hotfix: install
|
||||||
|
|
||||||
docker: checks
|
docker: checks
|
||||||
@echo "Building minio docker image '$(TAG)'"
|
@echo "Building minio docker image '$(TAG)'"
|
||||||
@GOOS=linux GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
@GOOS=linux GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
||||||
|
|
|
@ -44,10 +44,21 @@ func releaseTag(version string) string {
|
||||||
relPrefix = prefix
|
relPrefix = prefix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
relSuffix := ""
|
||||||
|
if hotfix := os.Getenv("MINIO_HOTFIX"); hotfix != "" {
|
||||||
|
relSuffix = hotfix
|
||||||
|
}
|
||||||
|
|
||||||
relTag := strings.Replace(version, " ", "-", -1)
|
relTag := strings.Replace(version, " ", "-", -1)
|
||||||
relTag = strings.Replace(relTag, ":", "-", -1)
|
relTag = strings.Replace(relTag, ":", "-", -1)
|
||||||
relTag = strings.Replace(relTag, ",", "", -1)
|
relTag = strings.Replace(relTag, ",", "", -1)
|
||||||
return relPrefix + "." + relTag
|
relTag = relPrefix + "." + relTag
|
||||||
|
|
||||||
|
if relSuffix != "" {
|
||||||
|
relTag += "." + relSuffix
|
||||||
|
}
|
||||||
|
|
||||||
|
return relTag
|
||||||
}
|
}
|
||||||
|
|
||||||
// commitID returns the abbreviated commit-id hash of the last commit.
|
// commitID returns the abbreviated commit-id hash of the last commit.
|
||||||
|
@ -68,5 +79,12 @@ func commitID() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
fmt.Println(genLDFlags(time.Now().UTC().Format(time.RFC3339)))
|
var version string
|
||||||
|
if len(os.Args) > 1 {
|
||||||
|
version = os.Args[1]
|
||||||
|
} else {
|
||||||
|
version = time.Now().UTC().Format(time.RFC3339)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(genLDFlags(version))
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ func TestHealing(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
disk := er.getDisks()[0]
|
disk := er.getDisks()[0]
|
||||||
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ func TestHealing(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -113,7 +113,7 @@ func TestHealing(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "")
|
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,9 +113,21 @@ func hashOrder(key string, cardinality int) []int {
|
||||||
return nums
|
return nums
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir exists as well,
|
||||||
|
// otherwise returns errFileNotFound (or errFileVersionNotFound)
|
||||||
|
func getAllObjectFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
||||||
|
return readVersionFromDisks(ctx, disks, bucket, object, versionID, true)
|
||||||
|
}
|
||||||
|
|
||||||
// Reads all `xl.meta` metadata as a FileInfo slice.
|
// Reads all `xl.meta` metadata as a FileInfo slice.
|
||||||
// Returns error slice indicating the failed metadata reads.
|
// Returns error slice indicating the failed metadata reads.
|
||||||
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
||||||
|
return readVersionFromDisks(ctx, disks, bucket, object, versionID, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir
|
||||||
|
// exists as well, if checkDataDir is set to true.
|
||||||
|
func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, checkDataDir bool) ([]FileInfo, []error) {
|
||||||
metadataArray := make([]FileInfo, len(disks))
|
metadataArray := make([]FileInfo, len(disks))
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
|
@ -126,7 +138,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
|
||||||
if disks[index] == nil {
|
if disks[index] == nil {
|
||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID)
|
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound {
|
if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound {
|
||||||
logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String())
|
logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String())
|
||||||
|
|
|
@ -43,7 +43,25 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
|
||||||
|
|
||||||
// checkUploadIDExists - verify if a given uploadID exists and is valid.
|
// checkUploadIDExists - verify if a given uploadID exists and is valid.
|
||||||
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error {
|
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error {
|
||||||
_, _, _, err := er.getObjectFileInfo(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), ObjectOptions{})
|
disks := er.getDisks()
|
||||||
|
|
||||||
|
// Read metadata associated with the object from all disks.
|
||||||
|
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
|
||||||
|
|
||||||
|
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||||
|
return toObjectErr(reducedErr, bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
// List all online disks.
|
||||||
|
_, modTime := listOnlineDisks(disks, metaArr, errs)
|
||||||
|
|
||||||
|
// Pick latest valid metadata.
|
||||||
|
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,7 +128,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||||
}
|
}
|
||||||
for _, uploadIDDir := range uploadIDDirs {
|
for _, uploadIDDir := range uploadIDDirs {
|
||||||
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
||||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "")
|
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -124,7 +142,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, tmpDir := range tmpDirs {
|
for _, tmpDir := range tmpDirs {
|
||||||
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "")
|
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -178,7 +196,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
|
||||||
if populatedUploadIds.Contains(uploadID) {
|
if populatedUploadIds.Contains(uploadID) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
|
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, toObjectErr(err, bucket, object)
|
return result, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
|
@ -355,14 +355,32 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||||
|
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fi, nil, nil, err
|
readQuorum = len(metaArr) / 2
|
||||||
}
|
}
|
||||||
|
|
||||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||||
|
if reducedErr == errErasureReadQuorum {
|
||||||
|
if _, ok := isObjectDangling(metaArr, errs, nil); ok {
|
||||||
|
reducedErr = errFileNotFound
|
||||||
|
if opts.VersionID != "" {
|
||||||
|
reducedErr = errFileVersionNotFound
|
||||||
|
}
|
||||||
|
// Remove the dangling object only when:
|
||||||
|
// - This is a non versioned bucket
|
||||||
|
// - This is a versioned bucket and the version ID is passed, the reason
|
||||||
|
// is that it is hard to pick that particular version that is dangling
|
||||||
|
if !opts.Versioned || opts.VersionID != "" {
|
||||||
|
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
|
||||||
|
Name: object,
|
||||||
|
VersionID: opts.VersionID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
|
return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1007,6 +1007,12 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(disk StorageAPI) {
|
go func(disk StorageAPI) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
|
if disk == nil {
|
||||||
|
// disk offline, ignore it.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
|
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -1037,6 +1043,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
|
||||||
go func(disk StorageAPI) {
|
go func(disk StorageAPI) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
|
if disk == nil {
|
||||||
|
// disk offline, ignore it.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var entryCh chan FileInfo
|
var entryCh chan FileInfo
|
||||||
var err error
|
var err error
|
||||||
if splunk {
|
if splunk {
|
||||||
|
|
|
@ -30,15 +30,26 @@ const (
|
||||||
type mergeWalkVersions struct {
|
type mergeWalkVersions struct {
|
||||||
added time.Time
|
added time.Time
|
||||||
entryChs []FileInfoVersionsCh
|
entryChs []FileInfoVersionsCh
|
||||||
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
|
endWalkCh *endWalkSafeCh
|
||||||
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type endWalkSafeCh struct {
|
||||||
|
ch chan struct{} // To signal when mergeWalk go-routine should end.
|
||||||
|
once sync.Once
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *endWalkSafeCh) close() {
|
||||||
|
s.once.Do(func() {
|
||||||
|
close(s.ch)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// mergeWalk - represents the go routine that does the merge walk.
|
// mergeWalk - represents the go routine that does the merge walk.
|
||||||
type mergeWalk struct {
|
type mergeWalk struct {
|
||||||
added time.Time
|
added time.Time
|
||||||
entryChs []FileInfoCh
|
entryChs []FileInfoCh
|
||||||
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
|
endWalkCh *endWalkSafeCh
|
||||||
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +92,7 @@ func (t *MergeWalkVersionsPool) Release(params listParams) ([]FileInfoVersionsCh
|
||||||
delete(t.pool, params)
|
delete(t.pool, params)
|
||||||
}
|
}
|
||||||
walk.endTimerCh <- struct{}{}
|
walk.endTimerCh <- struct{}{}
|
||||||
return walk.entryChs, walk.endWalkCh
|
return walk.entryChs, walk.endWalkCh.ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set - similar to mergeWalkPool.Set but for file versions
|
// Set - similar to mergeWalkPool.Set but for file versions
|
||||||
|
@ -120,7 +131,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case endCh <- struct{}{}:
|
case endCh <- struct{}{}:
|
||||||
close(endWalkCh)
|
endWalkCh.close()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -135,7 +146,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
||||||
walkInfo := mergeWalkVersions{
|
walkInfo := mergeWalkVersions{
|
||||||
added: UTCNow(),
|
added: UTCNow(),
|
||||||
entryChs: resultChs,
|
entryChs: resultChs,
|
||||||
endWalkCh: endWalkCh,
|
endWalkCh: &endWalkSafeCh{ch: endWalkCh},
|
||||||
endTimerCh: endTimerCh,
|
endTimerCh: endTimerCh,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,7 +158,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
||||||
// We are at limit, invalidate oldest, move list down and add new as last.
|
// We are at limit, invalidate oldest, move list down and add new as last.
|
||||||
select {
|
select {
|
||||||
case walks[0].endTimerCh <- struct{}{}:
|
case walks[0].endTimerCh <- struct{}{}:
|
||||||
close(walks[0].endWalkCh)
|
walks[0].endWalkCh.close()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
copy(walks, walks[1:])
|
copy(walks, walks[1:])
|
||||||
|
@ -185,7 +196,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Signal the mergeWalk go-routine to die.
|
// Signal the mergeWalk go-routine to die.
|
||||||
close(endWalkCh)
|
walkInfo.endWalkCh.close()
|
||||||
case <-endTimerCh:
|
case <-endTimerCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -235,7 +246,7 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{})
|
||||||
delete(t.pool, params)
|
delete(t.pool, params)
|
||||||
}
|
}
|
||||||
walk.endTimerCh <- struct{}{}
|
walk.endTimerCh <- struct{}{}
|
||||||
return walk.entryChs, walk.endWalkCh
|
return walk.entryChs, walk.endWalkCh.ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set - adds a mergeWalk to the mergeWalkPool.
|
// Set - adds a mergeWalk to the mergeWalkPool.
|
||||||
|
@ -281,7 +292,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case endCh <- struct{}{}:
|
case endCh <- struct{}{}:
|
||||||
close(endWalkCh)
|
endWalkCh.close()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -295,7 +306,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
||||||
walkInfo := mergeWalk{
|
walkInfo := mergeWalk{
|
||||||
added: UTCNow(),
|
added: UTCNow(),
|
||||||
entryChs: resultChs,
|
entryChs: resultChs,
|
||||||
endWalkCh: endWalkCh,
|
endWalkCh: &endWalkSafeCh{ch: endWalkCh},
|
||||||
endTimerCh: endTimerCh,
|
endTimerCh: endTimerCh,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,7 +318,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
||||||
// We are at limit, invalidate oldest, move list down and add new as last.
|
// We are at limit, invalidate oldest, move list down and add new as last.
|
||||||
select {
|
select {
|
||||||
case walks[0].endTimerCh <- struct{}{}:
|
case walks[0].endTimerCh <- struct{}{}:
|
||||||
close(walks[0].endWalkCh)
|
walks[0].endWalkCh.close()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
copy(walks, walks[1:])
|
copy(walks, walks[1:])
|
||||||
|
@ -345,7 +356,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Signal the mergeWalk go-routine to die.
|
// Signal the mergeWalk go-routine to die.
|
||||||
close(endWalkCh)
|
walkInfo.endWalkCh.close()
|
||||||
case <-endTimerCh:
|
case <-endTimerCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -259,11 +259,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
|
||||||
return d.disk.DeleteVersion(ctx, volume, path, fi)
|
return d.disk.DeleteVersion(ctx, volume, path, fi)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||||
if err := d.calcError(); err != nil {
|
if err := d.calcError(); err != nil {
|
||||||
return FileInfo{}, err
|
return FileInfo{}, err
|
||||||
}
|
}
|
||||||
return d.disk.ReadVersion(ctx, volume, path, versionID)
|
return d.disk.ReadVersion(ctx, volume, path, versionID, checkDataDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) {
|
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) {
|
||||||
|
|
|
@ -59,7 +59,7 @@ type StorageAPI interface {
|
||||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
|
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
|
||||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
||||||
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||||
ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error)
|
ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (FileInfo, error)
|
||||||
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
|
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
|
||||||
|
|
||||||
// File operations.
|
// File operations.
|
||||||
|
|
|
@ -370,11 +370,12 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Set(storageRESTVolume, volume)
|
values.Set(storageRESTVolume, volume)
|
||||||
values.Set(storageRESTFilePath, path)
|
values.Set(storageRESTFilePath, path)
|
||||||
values.Set(storageRESTVersionID, versionID)
|
values.Set(storageRESTVersionID, versionID)
|
||||||
|
values.Set(storageRESTCheckDataDir, strconv.FormatBool(checkDataDir))
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
const (
|
const (
|
||||||
storageRESTVersion = "v20" // Re-implementation of storage layer
|
storageRESTVersion = "v21" // Add checkDataDir in ReadVersion API
|
||||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||||
)
|
)
|
||||||
|
@ -60,6 +60,7 @@ const (
|
||||||
storageRESTDirPath = "dir-path"
|
storageRESTDirPath = "dir-path"
|
||||||
storageRESTFilePath = "file-path"
|
storageRESTFilePath = "file-path"
|
||||||
storageRESTVersionID = "version-id"
|
storageRESTVersionID = "version-id"
|
||||||
|
storageRESTCheckDataDir = "check-data-dir"
|
||||||
storageRESTTotalVersions = "total-versions"
|
storageRESTTotalVersions = "total-versions"
|
||||||
storageRESTSrcVolume = "source-volume"
|
storageRESTSrcVolume = "source-volume"
|
||||||
storageRESTSrcPath = "source-path"
|
storageRESTSrcPath = "source-path"
|
||||||
|
|
|
@ -326,8 +326,13 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
|
||||||
volume := vars[storageRESTVolume]
|
volume := vars[storageRESTVolume]
|
||||||
filePath := vars[storageRESTFilePath]
|
filePath := vars[storageRESTFilePath]
|
||||||
versionID := vars[storageRESTVersionID]
|
versionID := vars[storageRESTVersionID]
|
||||||
|
checkDataDir, err := strconv.ParseBool(vars[storageRESTCheckDataDir])
|
||||||
|
if err != nil {
|
||||||
|
s.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID)
|
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, checkDataDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.writeErrorResponse(w, err)
|
s.writeErrorResponse(w, err)
|
||||||
return
|
return
|
||||||
|
@ -925,7 +930,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...)
|
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTCheckDataDir)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
|
||||||
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
|
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
|
||||||
storageRESTDstVolume, storageRESTDstPath)...)
|
storageRESTDstVolume, storageRESTDstPath)...)
|
||||||
|
|
|
@ -288,12 +288,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
|
||||||
return p.storage.WriteMetadata(ctx, volume, path, fi)
|
return p.storage.WriteMetadata(ctx, volume, path, fi)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||||
if err = p.checkDiskStale(); err != nil {
|
if err = p.checkDiskStale(); err != nil {
|
||||||
return fi, err
|
return fi, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.storage.ReadVersion(ctx, volume, path, versionID)
|
return p.storage.ReadVersion(ctx, volume, path, versionID, checkDataDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
|
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
|
||||||
|
|
|
@ -1315,7 +1315,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
|
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
|
||||||
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errFileNotFound {
|
if err == errFileNotFound {
|
||||||
|
@ -1338,7 +1338,24 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||||
return fi, errFileNotFound
|
return fi, errFileNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return getFileInfo(buf, volume, path, versionID)
|
fi, err = getFileInfo(buf, volume, path, versionID)
|
||||||
|
if err != nil {
|
||||||
|
return fi, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if fi.DataDir != "" && checkDataDir {
|
||||||
|
if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil {
|
||||||
|
if err == errVolumeNotFound {
|
||||||
|
if versionID != "" {
|
||||||
|
return fi, errFileVersionNotFound
|
||||||
|
}
|
||||||
|
return fi, errFileNotFound
|
||||||
|
}
|
||||||
|
return fi, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadAll reads from r until an error or EOF and returns the data it read.
|
// ReadAll reads from r until an error or EOF and returns the data it read.
|
||||||
|
@ -2221,10 +2238,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
// Commit data
|
||||||
return osErrToFileErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if srcDataPath != "" {
|
if srcDataPath != "" {
|
||||||
removeAll(oldDstDataPath)
|
removeAll(oldDstDataPath)
|
||||||
removeAll(dstDataPath)
|
removeAll(dstDataPath)
|
||||||
|
@ -2233,6 +2247,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Commit meta-file
|
||||||
|
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||||
|
return osErrToFileErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Remove parent dir of the source file if empty
|
// Remove parent dir of the source file if empty
|
||||||
if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) {
|
if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) {
|
||||||
deleteFile(srcVolumeDir, parentDir, false)
|
deleteFile(srcVolumeDir, parentDir, false)
|
||||||
|
|
Loading…
Reference in a new issue