Compare commits

...

7 Commits

Author SHA1 Message Date
Harshavardhana c8fef1d994 fix crash if disk is offline in startMergeWalkVersionsN() 2021-06-23 13:24:01 -07:00
Anis Elleuch 1aa5a642b8 make: Add hotfix target to generate hotfix binaries (#11053)
hotfix target will fetch the release tag prior to the latest commit and create a binary
with the same release tag plus '.hotfix' suffix

e.g.   RELEASE.2020-12-03T05-49-24Z.hotfix
2020-12-14 10:58:12 -08:00
Harshavardhana 11236d1009 fix a possible panic on close 2020-12-14 10:52:31 -08:00
Harshavardhana 4d2f1df591 fix: crash if disk is offline in startMergeWalks() 2020-12-14 10:09:50 -08:00
Harshavardhana eb5a5d1238 erasure: delete dangling objects automatically 2020-11-01 18:41:57 -08:00
Anis Elleuch 65f1ad03fa erasure: Commit data before xl.meta in RenameData() (#10734)
This will reduce the chance to have updated xl.meta without data.
2020-11-01 18:35:16 -08:00
Anis Elleuch 6046014253 fix: Get/HeadObject return 404 on non quorum objects (#10753) 2020-11-01 18:35:04 -08:00
15 changed files with 157 additions and 39 deletions

View File

@ -71,6 +71,10 @@ build: checks
@echo "Building minio binary to './minio'"
@GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null
hotfix: LDFLAGS := $(shell MINIO_RELEASE="RELEASE" MINIO_HOTFIX="hotfix" go run buildscripts/gen-ldflags.go $(shell git describe --tags --abbrev=0 | \
sed 's#RELEASE\.\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)T\([0-9]\+\)-\([0-9]\+\)-\([0-9]\+\)Z#\1-\2-\3T\4:\5:\6Z#'))
hotfix: install
docker: checks
@echo "Building minio docker image '$(TAG)'"
@GOOS=linux GO111MODULE=on CGO_ENABLED=0 go build -tags kqueue -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null

View File

@ -44,10 +44,21 @@ func releaseTag(version string) string {
relPrefix = prefix
}
relSuffix := ""
if hotfix := os.Getenv("MINIO_HOTFIX"); hotfix != "" {
relSuffix = hotfix
}
relTag := strings.Replace(version, " ", "-", -1)
relTag = strings.Replace(relTag, ":", "-", -1)
relTag = strings.Replace(relTag, ",", "", -1)
return relPrefix + "." + relTag
relTag = relPrefix + "." + relTag
if relSuffix != "" {
relTag += "." + relSuffix
}
return relTag
}
// commitID returns the abbreviated commit-id hash of the last commit.
@ -68,5 +79,12 @@ func commitID() string {
}
func main() {
fmt.Println(genLDFlags(time.Now().UTC().Format(time.RFC3339)))
var version string
if len(os.Args) > 1 {
version = os.Args[1]
} else {
version = time.Now().UTC().Format(time.RFC3339)
}
fmt.Println(genLDFlags(version))
}

View File

@ -67,7 +67,7 @@ func TestHealing(t *testing.T) {
}
disk := er.getDisks()[0]
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {
t.Fatal(err)
}
@ -84,7 +84,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err)
}
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {
t.Fatal(err)
}
@ -113,7 +113,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err)
}
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "")
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {
t.Fatal(err)
}

View File

@ -113,9 +113,21 @@ func hashOrder(key string, cardinality int) []int {
return nums
}
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir exists as well,
// otherwise returns errFileNotFound (or errFileVersionNotFound)
func getAllObjectFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
return readVersionFromDisks(ctx, disks, bucket, object, versionID, true)
}
// Reads all `xl.meta` metadata as a FileInfo slice.
// Returns error slice indicating the failed metadata reads.
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
return readVersionFromDisks(ctx, disks, bucket, object, versionID, false)
}
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir
// exists as well, if checkDataDir is set to true.
func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, checkDataDir bool) ([]FileInfo, []error) {
metadataArray := make([]FileInfo, len(disks))
g := errgroup.WithNErrs(len(disks))
@ -126,7 +138,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
if disks[index] == nil {
return errDiskNotFound
}
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID)
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir)
if err != nil {
if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound {
logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String())

View File

@ -43,7 +43,25 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
// checkUploadIDExists - verify if a given uploadID exists and is valid.
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error {
_, _, _, err := er.getObjectFileInfo(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), ObjectOptions{})
disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil {
return err
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return toObjectErr(reducedErr, bucket, object)
}
// List all online disks.
_, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
return err
}
@ -110,7 +128,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
}
for _, uploadIDDir := range uploadIDDirs {
uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "")
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
if err != nil {
continue
}
@ -124,7 +142,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
return
}
for _, tmpDir := range tmpDirs {
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "")
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
if err != nil {
continue
}
@ -178,7 +196,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
if populatedUploadIds.Contains(uploadID) {
continue
}
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
if err != nil {
return result, toObjectErr(err, bucket, object)
}

View File

@ -355,14 +355,32 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil {
return fi, nil, nil, err
readQuorum = len(metaArr) / 2
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
if reducedErr == errErasureReadQuorum {
if _, ok := isObjectDangling(metaArr, errs, nil); ok {
reducedErr = errFileNotFound
if opts.VersionID != "" {
reducedErr = errFileVersionNotFound
}
// Remove the dangling object only when:
// - This is a non versioned bucket
// - This is a versioned bucket and the version ID is passed, the reason
// is that it is hard to pick that particular version that is dangling
if !opts.Versioned || opts.VersionID != "" {
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
Name: object,
VersionID: opts.VersionID,
})
}
}
}
return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
}

View File

@ -1007,6 +1007,12 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
if disk == nil {
// disk offline, ignore it.
return
}
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
if err != nil {
return
@ -1037,6 +1043,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
go func(disk StorageAPI) {
defer wg.Done()
if disk == nil {
// disk offline, ignore it.
return
}
var entryCh chan FileInfo
var err error
if splunk {

View File

@ -30,15 +30,26 @@ const (
type mergeWalkVersions struct {
added time.Time
entryChs []FileInfoVersionsCh
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
endWalkCh *endWalkSafeCh
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
}
type endWalkSafeCh struct {
ch chan struct{} // To signal when mergeWalk go-routine should end.
once sync.Once
}
func (s *endWalkSafeCh) close() {
s.once.Do(func() {
close(s.ch)
})
}
// mergeWalk - represents the go routine that does the merge walk.
type mergeWalk struct {
added time.Time
entryChs []FileInfoCh
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
endWalkCh *endWalkSafeCh
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
}
@ -81,7 +92,7 @@ func (t *MergeWalkVersionsPool) Release(params listParams) ([]FileInfoVersionsCh
delete(t.pool, params)
}
walk.endTimerCh <- struct{}{}
return walk.entryChs, walk.endWalkCh
return walk.entryChs, walk.endWalkCh.ch
}
// Set - similar to mergeWalkPool.Set but for file versions
@ -120,7 +131,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
}
select {
case endCh <- struct{}{}:
close(endWalkCh)
endWalkCh.close()
default:
}
} else {
@ -135,7 +146,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
walkInfo := mergeWalkVersions{
added: UTCNow(),
entryChs: resultChs,
endWalkCh: endWalkCh,
endWalkCh: &endWalkSafeCh{ch: endWalkCh},
endTimerCh: endTimerCh,
}
@ -147,7 +158,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
// We are at limit, invalidate oldest, move list down and add new as last.
select {
case walks[0].endTimerCh <- struct{}{}:
close(walks[0].endWalkCh)
walks[0].endWalkCh.close()
default:
}
copy(walks, walks[1:])
@ -185,7 +196,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi
}
}
// Signal the mergeWalk go-routine to die.
close(endWalkCh)
walkInfo.endWalkCh.close()
case <-endTimerCh:
return
}
@ -235,7 +246,7 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{})
delete(t.pool, params)
}
walk.endTimerCh <- struct{}{}
return walk.entryChs, walk.endWalkCh
return walk.entryChs, walk.endWalkCh.ch
}
// Set - adds a mergeWalk to the mergeWalkPool.
@ -281,7 +292,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
}
select {
case endCh <- struct{}{}:
close(endWalkCh)
endWalkCh.close()
default:
}
} else {
@ -295,7 +306,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
walkInfo := mergeWalk{
added: UTCNow(),
entryChs: resultChs,
endWalkCh: endWalkCh,
endWalkCh: &endWalkSafeCh{ch: endWalkCh},
endTimerCh: endTimerCh,
}
@ -307,7 +318,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
// We are at limit, invalidate oldest, move list down and add new as last.
select {
case walks[0].endTimerCh <- struct{}{}:
close(walks[0].endWalkCh)
walks[0].endWalkCh.close()
default:
}
copy(walks, walks[1:])
@ -345,7 +356,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh
}
}
// Signal the mergeWalk go-routine to die.
close(endWalkCh)
walkInfo.endWalkCh.close()
case <-endTimerCh:
return
}

View File

@ -259,11 +259,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
return d.disk.DeleteVersion(ctx, volume, path, fi)
}
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
if err := d.calcError(); err != nil {
return FileInfo{}, err
}
return d.disk.ReadVersion(ctx, volume, path, versionID)
return d.disk.ReadVersion(ctx, volume, path, versionID, checkDataDir)
}
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) {

View File

@ -59,7 +59,7 @@ type StorageAPI interface {
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error)
ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (FileInfo, error)
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
// File operations.

View File

@ -370,11 +370,12 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
return err
}
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
values.Set(storageRESTVersionID, versionID)
values.Set(storageRESTCheckDataDir, strconv.FormatBool(checkDataDir))
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
if err != nil {

View File

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v20" // Re-implementation of storage layer
storageRESTVersion = "v21" // Add checkDataDir in ReadVersion API
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)
@ -60,6 +60,7 @@ const (
storageRESTDirPath = "dir-path"
storageRESTFilePath = "file-path"
storageRESTVersionID = "version-id"
storageRESTCheckDataDir = "check-data-dir"
storageRESTTotalVersions = "total-versions"
storageRESTSrcVolume = "source-volume"
storageRESTSrcPath = "source-path"

View File

@ -326,8 +326,13 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
versionID := vars[storageRESTVersionID]
checkDataDir, err := strconv.ParseBool(vars[storageRESTCheckDataDir])
if err != nil {
s.writeErrorResponse(w, err)
return
}
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID)
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, checkDataDir)
if err != nil {
s.writeErrorResponse(w, err)
return
@ -925,7 +930,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...)
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTCheckDataDir)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
storageRESTDstVolume, storageRESTDstPath)...)

View File

@ -288,12 +288,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
return p.storage.WriteMetadata(ctx, volume, path, fi)
}
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
if err = p.checkDiskStale(); err != nil {
return fi, err
}
return p.storage.ReadVersion(ctx, volume, path, versionID)
return p.storage.ReadVersion(ctx, volume, path, versionID, checkDataDir)
}
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {

View File

@ -1315,7 +1315,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
}
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
if err != nil {
if err == errFileNotFound {
@ -1338,7 +1338,24 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, errFileNotFound
}
return getFileInfo(buf, volume, path, versionID)
fi, err = getFileInfo(buf, volume, path, versionID)
if err != nil {
return fi, err
}
if fi.DataDir != "" && checkDataDir {
if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil {
if err == errVolumeNotFound {
if versionID != "" {
return fi, errFileVersionNotFound
}
return fi, errFileNotFound
}
return fi, err
}
}
return fi, nil
}
// ReadAll reads from r until an error or EOF and returns the data it read.
@ -2221,10 +2238,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
return err
}
if err = renameAll(srcFilePath, dstFilePath); err != nil {
return osErrToFileErr(err)
}
// Commit data
if srcDataPath != "" {
removeAll(oldDstDataPath)
removeAll(dstDataPath)
@ -2233,6 +2247,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
}
}
// Commit meta-file
if err = renameAll(srcFilePath, dstFilePath); err != nil {
return osErrToFileErr(err)
}
// Remove parent dir of the source file if empty
if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) {
deleteFile(srcVolumeDir, parentDir, false)