Compare commits

...

7 commits

Author SHA1 Message Date
Harshavardhana c8fef1d994 fix crash if disk is offline in startMergeWalkVersionsN() 2021-06-23 13:24:01 -07:00
Anis Elleuch 1aa5a642b8 make: Add hotfix target to generate hotfix binaries (#11053)
hotfix target will fetch the release tag prior to the latest commit and create a binary
with the same release tag plus '.hotfix' suffix

e.g.   RELEASE.2020-12-03T05-49-24Z.hotfix
2020-12-14 10:58:12 -08:00
Harshavardhana 11236d1009 fix a possible panic on close 2020-12-14 10:52:31 -08:00
Harshavardhana 4d2f1df591 fix: crash if disk is offline in startMergeWalks() 2020-12-14 10:09:50 -08:00
Harshavardhana eb5a5d1238 erasure: delete dangling objects automatically 2020-11-01 18:41:57 -08:00
Anis Elleuch 65f1ad03fa erasure: Commit data before xl.meta in RenameData() (#10734)
This will reduce the chance to have updated xl.meta without data.
2020-11-01 18:35:16 -08:00
Anis Elleuch 6046014253 fix: Get/HeadObject return 404 on non quorum objects (#10753) 2020-11-01 18:35:04 -08:00
15 changed files with 157 additions and 39 deletions

View file

@ -71,6 +71,10 @@ build: checks
@echo "Building minio binary to './minio'" @echo "Building minio binary to './minio'"
@GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null @GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
hotfix: LDFLAGS := $(shell MINIO_RELEASE="RELEASE" MINIO_HOTFIX="hotfix" go run buildscripts/gen-ldflags.go $(shell git describe --tags --abbrev=0 | \
sed 's#RELEASE\.\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)T\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)Z#\1-\2-\3T\4:\5:\6Z#'))
hotfix: install
docker: checks docker: checks
@echo "Building minio docker image '$(TAG)'" @echo "Building minio docker image '$(TAG)'"
@GOOS=linux GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null @GOOS=linux GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null

View file

@ -44,10 +44,21 @@ func releaseTag(version string) string {
relPrefix = prefix relPrefix = prefix
} }
relSuffix := ""
if hotfix := os.Getenv("MINIO_HOTFIX"); hotfix != "" {
relSuffix = hotfix
}
relTag := strings.Replace(version, " ", "-", -1) relTag := strings.Replace(version, " ", "-", -1)
relTag = strings.Replace(relTag, ":", "-", -1) relTag = strings.Replace(relTag, ":", "-", -1)
relTag = strings.Replace(relTag, ",", "", -1) relTag = strings.Replace(relTag, ",", "", -1)
return relPrefix + "." + relTag relTag = relPrefix + "." + relTag
if relSuffix != "" {
relTag += "." + relSuffix
}
return relTag
} }
// commitID returns the abbreviated commit-id hash of the last commit. // commitID returns the abbreviated commit-id hash of the last commit.
@ -68,5 +79,12 @@ func commitID() string {
} }
func main() { func main() {
fmt.Println(genLDFlags(time.Now().UTC().Format(time.RFC3339))) var version string
if len(os.Args) > 1 {
version = os.Args[1]
} else {
version = time.Now().UTC().Format(time.RFC3339)
}
fmt.Println(genLDFlags(version))
} }

View file

@ -67,7 +67,7 @@ func TestHealing(t *testing.T) {
} }
disk := er.getDisks()[0] disk := er.getDisks()[0]
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -84,7 +84,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -113,7 +113,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "") fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -113,9 +113,21 @@ func hashOrder(key string, cardinality int) []int {
return nums return nums
} }
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir exists as well,
// otherwise returns errFileNotFound (or errFileVersionNotFound)
func getAllObjectFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
return readVersionFromDisks(ctx, disks, bucket, object, versionID, true)
}
// Reads all `xl.meta` metadata as a FileInfo slice. // Reads all `xl.meta` metadata as a FileInfo slice.
// Returns error slice indicating the failed metadata reads. // Returns error slice indicating the failed metadata reads.
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
return readVersionFromDisks(ctx, disks, bucket, object, versionID, false)
}
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir
// exists as well, if checkDataDir is set to true.
func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, checkDataDir bool) ([]FileInfo, []error) {
metadataArray := make([]FileInfo, len(disks)) metadataArray := make([]FileInfo, len(disks))
g := errgroup.WithNErrs(len(disks)) g := errgroup.WithNErrs(len(disks))
@ -126,7 +138,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
if disks[index] == nil { if disks[index] == nil {
return errDiskNotFound return errDiskNotFound
} }
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID) metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir)
if err != nil { if err != nil {
if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound { if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound {
logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String()) logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String())

View file

@ -43,7 +43,25 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
// checkUploadIDExists - verify if a given uploadID exists and is valid. // checkUploadIDExists - verify if a given uploadID exists and is valid.
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error { func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error {
_, _, _, err := er.getObjectFileInfo(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), ObjectOptions{}) disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil {
return err
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return toObjectErr(reducedErr, bucket, object)
}
// List all online disks.
_, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
return err return err
} }
@ -110,7 +128,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
} }
for _, uploadIDDir := range uploadIDDirs { for _, uploadIDDir := range uploadIDDirs {
uploadIDPath := pathJoin(shaDir, uploadIDDir) uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "") fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
if err != nil { if err != nil {
continue continue
} }
@ -124,7 +142,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
return return
} }
for _, tmpDir := range tmpDirs { for _, tmpDir := range tmpDirs {
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "") fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
if err != nil { if err != nil {
continue continue
} }
@ -178,7 +196,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
if populatedUploadIds.Contains(uploadID) { if populatedUploadIds.Contains(uploadID) {
continue continue
} }
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
if err != nil { if err != nil {
return result, toObjectErr(err, bucket, object) return result, toObjectErr(err, bucket, object)
} }

View file

@ -355,14 +355,32 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
disks := er.getDisks() disks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil { if err != nil {
return fi, nil, nil, err readQuorum = len(metaArr) / 2
} }
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
if reducedErr == errErasureReadQuorum {
if _, ok := isObjectDangling(metaArr, errs, nil); ok {
reducedErr = errFileNotFound
if opts.VersionID != "" {
reducedErr = errFileVersionNotFound
}
// Remove the dangling object only when:
// - This is a non versioned bucket
// - This is a versioned bucket and the version ID is passed, the reason
// is that it is hard to pick that particular version that is dangling
if !opts.Versioned || opts.VersionID != "" {
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
Name: object,
VersionID: opts.VersionID,
})
}
}
}
return fi, nil, nil, toObjectErr(reducedErr, bucket, object) return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
} }

View file

@ -1007,6 +1007,12 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
wg.Add(1) wg.Add(1)
go func(disk StorageAPI) { go func(disk StorageAPI) {
defer wg.Done() defer wg.Done()
if disk == nil {
// disk offline, ignore it.
return
}
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
if err != nil { if err != nil {
return return
@ -1037,6 +1043,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
go func(disk StorageAPI) { go func(disk StorageAPI) {
defer wg.Done() defer wg.Done()
if disk == nil {
// disk offline, ignore it.
return
}
var entryCh chan FileInfo var entryCh chan FileInfo
var err error var err error
if splunk { if splunk {

View file

@ -30,15 +30,26 @@ const (
type mergeWalkVersions struct { type mergeWalkVersions struct {
added time.Time added time.Time
entryChs []FileInfoVersionsCh entryChs []FileInfoVersionsCh
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. endWalkCh *endWalkSafeCh
endTimerCh chan<- struct{} // To signal when timer go-routine should end. endTimerCh chan<- struct{} // To signal when timer go-routine should end.
} }
type endWalkSafeCh struct {
ch chan struct{} // To signal when mergeWalk go-routine should end.
once sync.Once
}
func (s *endWalkSafeCh) close() {
s.once.Do(func() {
close(s.ch)
})
}
// mergeWalk - represents the go routine that does the merge walk. // mergeWalk - represents the go routine that does the merge walk.
type mergeWalk struct { type mergeWalk struct {
added time.Time added time.Time
entryChs []FileInfoCh entryChs []FileInfoCh
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. endWalkCh *endWalkSafeCh
endTimerCh chan<- struct{} // To signal when timer go-routine should end. endTimerCh chan<- struct{} // To signal when timer go-routine should end.
} }
@ -81,7 +92,7 @@ func (t *MergeWalkVersionsPool) Release(params listParams) ([]FileInfoVersionsCh
delete(t.pool, params) delete(t.pool, params)
} }
walk.endTimerCh <- struct{}{} walk.endTimerCh <- struct{}{}
return walk.entryChs, walk.endWalkCh return walk.entryChs, walk.endWalkCh.ch
} }
// Set - similar to mergeWalkPool.Set but for file versions // Set - similar to mergeWalkPool.Set but for file versions
@ -120,7 +131,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
} }
select { select {
case endCh <- struct{}{}: case endCh <- struct{}{}:
close(endWalkCh) endWalkCh.close()
default: default:
} }
} else { } else {
@ -135,7 +146,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
walkInfo := mergeWalkVersions{ walkInfo := mergeWalkVersions{
added: UTCNow(), added: UTCNow(),
entryChs: resultChs, entryChs: resultChs,
endWalkCh: endWalkCh, endWalkCh: &endWalkSafeCh{ch: endWalkCh},
endTimerCh: endTimerCh, endTimerCh: endTimerCh,
} }
@ -147,7 +158,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
// We are at limit, invalidate oldest, move list down and add new as last. // We are at limit, invalidate oldest, move list down and add new as last.
select { select {
case walks[0].endTimerCh <- struct{}{}: case walks[0].endTimerCh <- struct{}{}:
close(walks[0].endWalkCh) walks[0].endWalkCh.close()
default: default:
} }
copy(walks, walks[1:]) copy(walks, walks[1:])
@ -185,7 +196,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
} }
} }
// Signal the mergeWalk go-routine to die. // Signal the mergeWalk go-routine to die.
close(endWalkCh) walkInfo.endWalkCh.close()
case <-endTimerCh: case <-endTimerCh:
return return
} }
@ -235,7 +246,7 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{})
delete(t.pool, params) delete(t.pool, params)
} }
walk.endTimerCh <- struct{}{} walk.endTimerCh <- struct{}{}
return walk.entryChs, walk.endWalkCh return walk.entryChs, walk.endWalkCh.ch
} }
// Set - adds a mergeWalk to the mergeWalkPool. // Set - adds a mergeWalk to the mergeWalkPool.
@ -281,7 +292,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
} }
select { select {
case endCh <- struct{}{}: case endCh <- struct{}{}:
close(endWalkCh) endWalkCh.close()
default: default:
} }
} else { } else {
@ -295,7 +306,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
walkInfo := mergeWalk{ walkInfo := mergeWalk{
added: UTCNow(), added: UTCNow(),
entryChs: resultChs, entryChs: resultChs,
endWalkCh: endWalkCh, endWalkCh: &endWalkSafeCh{ch: endWalkCh},
endTimerCh: endTimerCh, endTimerCh: endTimerCh,
} }
@ -307,7 +318,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
// We are at limit, invalidate oldest, move list down and add new as last. // We are at limit, invalidate oldest, move list down and add new as last.
select { select {
case walks[0].endTimerCh <- struct{}{}: case walks[0].endTimerCh <- struct{}{}:
close(walks[0].endWalkCh) walks[0].endWalkCh.close()
default: default:
} }
copy(walks, walks[1:]) copy(walks, walks[1:])
@ -345,7 +356,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
} }
} }
// Signal the mergeWalk go-routine to die. // Signal the mergeWalk go-routine to die.
close(endWalkCh) walkInfo.endWalkCh.close()
case <-endTimerCh: case <-endTimerCh:
return return
} }

View file

@ -259,11 +259,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
return d.disk.DeleteVersion(ctx, volume, path, fi) return d.disk.DeleteVersion(ctx, volume, path, fi)
} }
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
if err := d.calcError(); err != nil { if err := d.calcError(); err != nil {
return FileInfo{}, err return FileInfo{}, err
} }
return d.disk.ReadVersion(ctx, volume, path, versionID) return d.disk.ReadVersion(ctx, volume, path, versionID, checkDataDir)
} }
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) { func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) {

View file

@ -59,7 +59,7 @@ type StorageAPI interface {
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (FileInfo, error)
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
// File operations. // File operations.

View file

@ -370,11 +370,12 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
return err return err
} }
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
values.Set(storageRESTVersionID, versionID) values.Set(storageRESTVersionID, versionID)
values.Set(storageRESTCheckDataDir, strconv.FormatBool(checkDataDir))
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
if err != nil { if err != nil {

View file

@ -17,7 +17,7 @@
package cmd package cmd
const ( const (
storageRESTVersion = "v20" // Re-implementation of storage layer storageRESTVersion = "v21" // Add checkDataDir in ReadVersion API
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage" storageRESTPrefix = minioReservedBucketPath + "/storage"
) )
@ -60,6 +60,7 @@ const (
storageRESTDirPath = "dir-path" storageRESTDirPath = "dir-path"
storageRESTFilePath = "file-path" storageRESTFilePath = "file-path"
storageRESTVersionID = "version-id" storageRESTVersionID = "version-id"
storageRESTCheckDataDir = "check-data-dir"
storageRESTTotalVersions = "total-versions" storageRESTTotalVersions = "total-versions"
storageRESTSrcVolume = "source-volume" storageRESTSrcVolume = "source-volume"
storageRESTSrcPath = "source-path" storageRESTSrcPath = "source-path"

View file

@ -326,8 +326,13 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
volume := vars[storageRESTVolume] volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath] filePath := vars[storageRESTFilePath]
versionID := vars[storageRESTVersionID] versionID := vars[storageRESTVersionID]
checkDataDir, err := strconv.ParseBool(vars[storageRESTCheckDataDir])
if err != nil {
s.writeErrorResponse(w, err)
return
}
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID) fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, checkDataDir)
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
@ -925,7 +930,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTCheckDataDir)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir, Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
storageRESTDstVolume, storageRESTDstPath)...) storageRESTDstVolume, storageRESTDstPath)...)

View file

@ -288,12 +288,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
return p.storage.WriteMetadata(ctx, volume, path, fi) return p.storage.WriteMetadata(ctx, volume, path, fi)
} }
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
if err = p.checkDiskStale(); err != nil { if err = p.checkDiskStale(); err != nil {
return fi, err return fi, err
} }
return p.storage.ReadVersion(ctx, volume, path, versionID) return p.storage.ReadVersion(ctx, volume, path, versionID, checkDataDir)
} }
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {

View file

@ -1315,7 +1315,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
} }
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta` // ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
@ -1338,7 +1338,24 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, errFileNotFound return fi, errFileNotFound
} }
return getFileInfo(buf, volume, path, versionID) fi, err = getFileInfo(buf, volume, path, versionID)
if err != nil {
return fi, err
}
if fi.DataDir != "" && checkDataDir {
if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil {
if err == errVolumeNotFound {
if versionID != "" {
return fi, errFileVersionNotFound
}
return fi, errFileNotFound
}
return fi, err
}
}
return fi, nil
} }
// ReadAll reads from r until an error or EOF and returns the data it read. // ReadAll reads from r until an error or EOF and returns the data it read.
@ -2221,10 +2238,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
return err return err
} }
if err = renameAll(srcFilePath, dstFilePath); err != nil { // Commit data
return osErrToFileErr(err)
}
if srcDataPath != "" { if srcDataPath != "" {
removeAll(oldDstDataPath) removeAll(oldDstDataPath)
removeAll(dstDataPath) removeAll(dstDataPath)
@ -2233,6 +2247,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
} }
} }
// Commit meta-file
if err = renameAll(srcFilePath, dstFilePath); err != nil {
return osErrToFileErr(err)
}
// Remove parent dir of the source file if empty // Remove parent dir of the source file if empty
if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) { if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) {
deleteFile(srcVolumeDir, parentDir, false) deleteFile(srcVolumeDir, parentDir, false)