Compare commits
7 Commits
master
...
RELEASE.20
Author | SHA1 | Date |
---|---|---|
Harshavardhana | c8fef1d994 | |
Anis Elleuch | 1aa5a642b8 | |
Harshavardhana | 11236d1009 | |
Harshavardhana | 4d2f1df591 | |
Harshavardhana | eb5a5d1238 | |
Anis Elleuch | 65f1ad03fa | |
Anis Elleuch | 6046014253 |
4
Makefile
4
Makefile
|
@ -71,6 +71,10 @@ build: checks
|
|||
@echo "Building minio binary to './minio'"
|
||||
@GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
||||
|
||||
hotfix: LDFLAGS := $(shell MINIO_RELEASE="RELEASE" MINIO_HOTFIX="hotfix" go run buildscripts/gen-ldflags.go $(shell git describe --tags --abbrev=0 | \
|
||||
sed 's#RELEASE\.\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)T\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)Z#\1-\2-\3T\4:\5:\6Z#'))
|
||||
hotfix: install
|
||||
|
||||
docker: checks
|
||||
@echo "Building minio docker image '$(TAG)'"
|
||||
@GOOS=linux GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
|
||||
|
|
|
@ -44,10 +44,21 @@ func releaseTag(version string) string {
|
|||
relPrefix = prefix
|
||||
}
|
||||
|
||||
relSuffix := ""
|
||||
if hotfix := os.Getenv("MINIO_HOTFIX"); hotfix != "" {
|
||||
relSuffix = hotfix
|
||||
}
|
||||
|
||||
relTag := strings.Replace(version, " ", "-", -1)
|
||||
relTag = strings.Replace(relTag, ":", "-", -1)
|
||||
relTag = strings.Replace(relTag, ",", "", -1)
|
||||
return relPrefix + "." + relTag
|
||||
relTag = relPrefix + "." + relTag
|
||||
|
||||
if relSuffix != "" {
|
||||
relTag += "." + relSuffix
|
||||
}
|
||||
|
||||
return relTag
|
||||
}
|
||||
|
||||
// commitID returns the abbreviated commit-id hash of the last commit.
|
||||
|
@ -68,5 +79,12 @@ func commitID() string {
|
|||
}
|
||||
|
||||
func main() {
|
||||
fmt.Println(genLDFlags(time.Now().UTC().Format(time.RFC3339)))
|
||||
var version string
|
||||
if len(os.Args) > 1 {
|
||||
version = os.Args[1]
|
||||
} else {
|
||||
version = time.Now().UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
fmt.Println(genLDFlags(version))
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ func TestHealing(t *testing.T) {
|
|||
}
|
||||
|
||||
disk := er.getDisks()[0]
|
||||
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
||||
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ func TestHealing(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
||||
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ func TestHealing(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "")
|
||||
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -113,9 +113,21 @@ func hashOrder(key string, cardinality int) []int {
|
|||
return nums
|
||||
}
|
||||
|
||||
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir exists as well,
|
||||
// otherwise returns errFileNotFound (or errFileVersionNotFound)
|
||||
func getAllObjectFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
||||
return readVersionFromDisks(ctx, disks, bucket, object, versionID, true)
|
||||
}
|
||||
|
||||
// Reads all `xl.meta` metadata as a FileInfo slice.
|
||||
// Returns error slice indicating the failed metadata reads.
|
||||
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
||||
return readVersionFromDisks(ctx, disks, bucket, object, versionID, false)
|
||||
}
|
||||
|
||||
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir
|
||||
// exists as well, if checkDataDir is set to true.
|
||||
func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, checkDataDir bool) ([]FileInfo, []error) {
|
||||
metadataArray := make([]FileInfo, len(disks))
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
@ -126,7 +138,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
|
|||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID)
|
||||
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir)
|
||||
if err != nil {
|
||||
if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound {
|
||||
logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String())
|
||||
|
|
|
@ -43,7 +43,25 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
|
|||
|
||||
// checkUploadIDExists - verify if a given uploadID exists and is valid.
|
||||
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error {
|
||||
_, _, _, err := er.getObjectFileInfo(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), ObjectOptions{})
|
||||
disks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
|
||||
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||
return toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
||||
// List all online disks.
|
||||
_, modTime := listOnlineDisks(disks, metaArr, errs)
|
||||
|
||||
// Pick latest valid metadata.
|
||||
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -110,7 +128,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||
}
|
||||
for _, uploadIDDir := range uploadIDDirs {
|
||||
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "")
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
@ -124,7 +142,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||
return
|
||||
}
|
||||
for _, tmpDir := range tmpDirs {
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "")
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
@ -178,7 +196,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
|
|||
if populatedUploadIds.Contains(uploadID) {
|
||||
continue
|
||||
}
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
|
|
@ -355,14 +355,32 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||
disks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||
metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
return fi, nil, nil, err
|
||||
readQuorum = len(metaArr) / 2
|
||||
}
|
||||
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||
if reducedErr == errErasureReadQuorum {
|
||||
if _, ok := isObjectDangling(metaArr, errs, nil); ok {
|
||||
reducedErr = errFileNotFound
|
||||
if opts.VersionID != "" {
|
||||
reducedErr = errFileVersionNotFound
|
||||
}
|
||||
// Remove the dangling object only when:
|
||||
// - This is a non versioned bucket
|
||||
// - This is a versioned bucket and the version ID is passed, the reason
|
||||
// is that it is hard to pick that particular version that is dangling
|
||||
if !opts.Versioned || opts.VersionID != "" {
|
||||
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
|
||||
Name: object,
|
||||
VersionID: opts.VersionID,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
||||
|
|
|
@ -1007,6 +1007,12 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
|
|||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
if disk == nil {
|
||||
// disk offline, ignore it.
|
||||
return
|
||||
}
|
||||
|
||||
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -1037,6 +1043,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
|
|||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
if disk == nil {
|
||||
// disk offline, ignore it.
|
||||
return
|
||||
}
|
||||
|
||||
var entryCh chan FileInfo
|
||||
var err error
|
||||
if splunk {
|
||||
|
|
|
@ -30,15 +30,26 @@ const (
|
|||
type mergeWalkVersions struct {
|
||||
added time.Time
|
||||
entryChs []FileInfoVersionsCh
|
||||
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
|
||||
endWalkCh *endWalkSafeCh
|
||||
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
||||
}
|
||||
|
||||
type endWalkSafeCh struct {
|
||||
ch chan struct{} // To signal when mergeWalk go-routine should end.
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (s *endWalkSafeCh) close() {
|
||||
s.once.Do(func() {
|
||||
close(s.ch)
|
||||
})
|
||||
}
|
||||
|
||||
// mergeWalk - represents the go routine that does the merge walk.
|
||||
type mergeWalk struct {
|
||||
added time.Time
|
||||
entryChs []FileInfoCh
|
||||
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
|
||||
endWalkCh *endWalkSafeCh
|
||||
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
|
||||
}
|
||||
|
||||
|
@ -81,7 +92,7 @@ func (t *MergeWalkVersionsPool) Release(params listParams) ([]FileInfoVersionsCh
|
|||
delete(t.pool, params)
|
||||
}
|
||||
walk.endTimerCh <- struct{}{}
|
||||
return walk.entryChs, walk.endWalkCh
|
||||
return walk.entryChs, walk.endWalkCh.ch
|
||||
}
|
||||
|
||||
// Set - similar to mergeWalkPool.Set but for file versions
|
||||
|
@ -120,7 +131,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
|||
}
|
||||
select {
|
||||
case endCh <- struct{}{}:
|
||||
close(endWalkCh)
|
||||
endWalkCh.close()
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
|
@ -135,7 +146,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
|||
walkInfo := mergeWalkVersions{
|
||||
added: UTCNow(),
|
||||
entryChs: resultChs,
|
||||
endWalkCh: endWalkCh,
|
||||
endWalkCh: &endWalkSafeCh{ch: endWalkCh},
|
||||
endTimerCh: endTimerCh,
|
||||
}
|
||||
|
||||
|
@ -147,7 +158,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
|||
// We are at limit, invalidate oldest, move list down and add new as last.
|
||||
select {
|
||||
case walks[0].endTimerCh <- struct{}{}:
|
||||
close(walks[0].endWalkCh)
|
||||
walks[0].endWalkCh.close()
|
||||
default:
|
||||
}
|
||||
copy(walks, walks[1:])
|
||||
|
@ -185,7 +196,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
|
|||
}
|
||||
}
|
||||
// Signal the mergeWalk go-routine to die.
|
||||
close(endWalkCh)
|
||||
walkInfo.endWalkCh.close()
|
||||
case <-endTimerCh:
|
||||
return
|
||||
}
|
||||
|
@ -235,7 +246,7 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{})
|
|||
delete(t.pool, params)
|
||||
}
|
||||
walk.endTimerCh <- struct{}{}
|
||||
return walk.entryChs, walk.endWalkCh
|
||||
return walk.entryChs, walk.endWalkCh.ch
|
||||
}
|
||||
|
||||
// Set - adds a mergeWalk to the mergeWalkPool.
|
||||
|
@ -281,7 +292,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
|||
}
|
||||
select {
|
||||
case endCh <- struct{}{}:
|
||||
close(endWalkCh)
|
||||
endWalkCh.close()
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
|
@ -295,7 +306,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
|||
walkInfo := mergeWalk{
|
||||
added: UTCNow(),
|
||||
entryChs: resultChs,
|
||||
endWalkCh: endWalkCh,
|
||||
endWalkCh: &endWalkSafeCh{ch: endWalkCh},
|
||||
endTimerCh: endTimerCh,
|
||||
}
|
||||
|
||||
|
@ -307,7 +318,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
|||
// We are at limit, invalidate oldest, move list down and add new as last.
|
||||
select {
|
||||
case walks[0].endTimerCh <- struct{}{}:
|
||||
close(walks[0].endWalkCh)
|
||||
walks[0].endWalkCh.close()
|
||||
default:
|
||||
}
|
||||
copy(walks, walks[1:])
|
||||
|
@ -345,7 +356,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
|
|||
}
|
||||
}
|
||||
// Signal the mergeWalk go-routine to die.
|
||||
close(endWalkCh)
|
||||
walkInfo.endWalkCh.close()
|
||||
case <-endTimerCh:
|
||||
return
|
||||
}
|
||||
|
|
|
@ -259,11 +259,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
|
|||
return d.disk.DeleteVersion(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
return d.disk.ReadVersion(ctx, volume, path, versionID)
|
||||
return d.disk.ReadVersion(ctx, volume, path, versionID, checkDataDir)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) {
|
||||
|
|
|
@ -59,7 +59,7 @@ type StorageAPI interface {
|
|||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
||||
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error)
|
||||
ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (FileInfo, error)
|
||||
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
|
||||
|
||||
// File operations.
|
||||
|
|
|
@ -370,11 +370,12 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
|||
return err
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTVersionID, versionID)
|
||||
values.Set(storageRESTCheckDataDir, strconv.FormatBool(checkDataDir))
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
|
||||
if err != nil {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v20" // Re-implementation of storage layer
|
||||
storageRESTVersion = "v21" // Add checkDataDir in ReadVersion API
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
@ -60,6 +60,7 @@ const (
|
|||
storageRESTDirPath = "dir-path"
|
||||
storageRESTFilePath = "file-path"
|
||||
storageRESTVersionID = "version-id"
|
||||
storageRESTCheckDataDir = "check-data-dir"
|
||||
storageRESTTotalVersions = "total-versions"
|
||||
storageRESTSrcVolume = "source-volume"
|
||||
storageRESTSrcPath = "source-path"
|
||||
|
|
|
@ -326,8 +326,13 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
|
|||
volume := vars[storageRESTVolume]
|
||||
filePath := vars[storageRESTFilePath]
|
||||
versionID := vars[storageRESTVersionID]
|
||||
checkDataDir, err := strconv.ParseBool(vars[storageRESTCheckDataDir])
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID)
|
||||
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, checkDataDir)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
|
@ -925,7 +930,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
|
|||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...)
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTCheckDataDir)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
|
||||
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
|
||||
storageRESTDstVolume, storageRESTDstPath)...)
|
||||
|
|
|
@ -288,12 +288,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
|
|||
return p.storage.WriteMetadata(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
return fi, err
|
||||
}
|
||||
|
||||
return p.storage.ReadVersion(ctx, volume, path, versionID)
|
||||
return p.storage.ReadVersion(ctx, volume, path, versionID, checkDataDir)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
|
||||
|
|
|
@ -1315,7 +1315,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
|
|||
}
|
||||
|
||||
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
|
||||
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
|
@ -1338,7 +1338,24 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||
return fi, errFileNotFound
|
||||
}
|
||||
|
||||
return getFileInfo(buf, volume, path, versionID)
|
||||
fi, err = getFileInfo(buf, volume, path, versionID)
|
||||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
|
||||
if fi.DataDir != "" && checkDataDir {
|
||||
if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil {
|
||||
if err == errVolumeNotFound {
|
||||
if versionID != "" {
|
||||
return fi, errFileVersionNotFound
|
||||
}
|
||||
return fi, errFileNotFound
|
||||
}
|
||||
return fi, err
|
||||
}
|
||||
}
|
||||
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
// ReadAll reads from r until an error or EOF and returns the data it read.
|
||||
|
@ -2221,10 +2238,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
|||
return err
|
||||
}
|
||||
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// Commit data
|
||||
if srcDataPath != "" {
|
||||
removeAll(oldDstDataPath)
|
||||
removeAll(dstDataPath)
|
||||
|
@ -2233,6 +2247,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
|||
}
|
||||
}
|
||||
|
||||
// Commit meta-file
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// Remove parent dir of the source file if empty
|
||||
if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) {
|
||||
deleteFile(srcVolumeDir, parentDir, false)
|
||||
|
|
Loading…
Reference in New Issue