fix: enable xl.json to xl.meta only if legacy drive is found (#11255)

another optimization is renameLegacyMetadata() never needs
to validate bucket with os.Stat() again, leading to reduction
in one extra syscall.
This commit is contained in:
Harshavardhana 2021-01-11 02:27:04 -08:00 committed by GitHub
parent e8176fe978
commit e4e117faab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 29 deletions

View File

@ -332,17 +332,14 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
s.writeErrorResponse(w, err)
return
}
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData)
if err != nil {
s.writeErrorResponse(w, err)
return
}
bufp := s.storage.rpool.Get().(*[]byte)
defer s.storage.rpool.Put(bufp)
enc := msgp.NewWriterBuf(w, *bufp)
logger.LogIf(r.Context(), fi.EncodeMsg(enc))
enc.Flush()
logger.LogIf(r.Context(), msgp.Encode(w, &fi))
}
// WriteMetadata write new updated metadata.

BIN
cmd/testdata/xl.meta vendored Normal file

Binary file not shown.

View File

@ -105,6 +105,7 @@ type xlStorage struct {
diskID string
formatFileInfo os.FileInfo
formatLegacy bool
formatLastCheck time.Time
diskInfoCache timedValue
@ -496,13 +497,13 @@ func (s *xlStorage) GetDiskID() (string, error) {
}
s.Lock()
defer s.Unlock()
// If somebody else updated the disk ID and changed the time, return what they got.
if !lastCheck.IsZero() && !s.formatLastCheck.Equal(lastCheck) && diskID != "" {
s.Unlock()
// Somebody else got the lock first.
return diskID, nil
}
s.Unlock()
formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile)
fi, err := os.Stat(formatFile)
@ -529,8 +530,10 @@ func (s *xlStorage) GetDiskID() (string, error) {
}
if xioutil.SameFile(fi, fileInfo) && diskID != "" {
s.Lock()
// If the file has not changed, just return the cached diskID information.
s.formatLastCheck = time.Now()
s.Unlock()
return diskID, nil
}
@ -564,7 +567,10 @@ func (s *xlStorage) GetDiskID() (string, error) {
return "", errCorruptedFormat
}
s.Lock()
defer s.Unlock()
s.diskID = format.Erasure.This
s.formatLegacy = format.Erasure.DistributionAlgo == formatErasureVersionV2DistributionAlgoLegacy
s.formatFileInfo = fi
s.formatLastCheck = time.Now()
return s.diskID, nil
@ -754,7 +760,7 @@ func (s *xlStorage) isLeaf(volume string, leafPath string) bool {
// legacy `xl.json`, in such situation we just rename
// and proceed if rename is successful we know that it
// is the leaf since `xl.json` was present.
return s.renameLegacyMetadata(volume, leafPath) == nil
return s.renameLegacyMetadata(volumeDir, leafPath) == nil
}
return false
}
@ -1021,30 +1027,21 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
}
func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) {
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
if !legacy {
// if its not a legacy backend then this function is
// a no-op always returns errFileNotFound
return errFileNotFound
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
}()
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
//gi Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if osIsNotExist(err) {
return errVolumeNotFound
} else if isSysErrIO(err) {
return errFaultyDisk
} else if isSysErrTooManyFiles(err) {
return errTooManyOpenFiles
}
return err
}
// Validate file path length, before reading.
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
@ -1097,7 +1094,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
if err != nil {
if err == errFileNotFound {
if err = s.renameLegacyMetadata(volume, path); err != nil {
if err = s.renameLegacyMetadata(volumeDir, path); err != nil {
if err == errFileNotFound {
if versionID != "" {
return fi, errFileVersionNotFound
@ -2009,7 +2006,8 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
if !osIsNotExist(err) {
return osErrToFileErr(err)
}
err = s.renameLegacyMetadata(dstVolume, dstPath)
// errFileNotFound comes here.
err = s.renameLegacyMetadata(dstVolumeDir, dstPath)
if err != nil && err != errFileNotFound {
return err
}

View File

@ -253,6 +253,91 @@ func TestXLStorageIsDirEmpty(t *testing.T) {
}
}
// TestXLStorageReadVersion - TestXLStorages the functionality implemented by xlStorage ReadVersion storage API.
func TestXLStorageReadVersion(t *testing.T) {
// create xlStorage test setup
xlStorage, path, err := newXLStorageTestSetup()
if err != nil {
t.Fatalf("Unable to create xlStorage test setup, %s", err)
}
defer os.RemoveAll(path)
xlMeta, _ := ioutil.ReadFile("testdata/xl.meta")
// Create files for the test cases.
if err = xlStorage.MakeVol(context.Background(), "exists"); err != nil {
t.Fatalf("Unable to create a volume \"exists\", %s", err)
}
if err = xlStorage.AppendFile(context.Background(), "exists", "as-directory/as-file/xl.meta", xlMeta); err != nil {
t.Fatalf("Unable to create a file \"as-directory/as-file\", %s", err)
}
if err = xlStorage.AppendFile(context.Background(), "exists", "as-file/xl.meta", xlMeta); err != nil {
t.Fatalf("Unable to create a file \"as-file\", %s", err)
}
if err = xlStorage.AppendFile(context.Background(), "exists", "as-file-parent/xl.meta", xlMeta); err != nil {
t.Fatalf("Unable to create a file \"as-file-parent\", %s", err)
}
// TestXLStoragecases to validate different conditions for ReadVersion API.
testCases := []struct {
volume string
path string
err error
}{
// TestXLStorage case - 1.
// Validate volume does not exist.
{
volume: "i-dont-exist",
path: "",
err: errVolumeNotFound,
},
// TestXLStorage case - 2.
// Validate bad condition file does not exist.
{
volume: "exists",
path: "as-file-not-found",
err: errFileNotFound,
},
// TestXLStorage case - 3.
// Validate bad condition file exists as prefix/directory and
// we are attempting to read it.
{
volume: "exists",
path: "as-directory",
err: errFileNotFound,
},
// TestXLStorage case - 4.
{
volume: "exists",
path: "as-file-parent/as-file",
err: errFileNotFound,
},
// TestXLStorage case - 5.
// Validate the good condition file exists and we are able to read it.
{
volume: "exists",
path: "as-file",
err: nil,
},
// TestXLStorage case - 6.
// TestXLStorage case with invalid volume name.
{
volume: "ab",
path: "as-file",
err: errVolumeNotFound,
},
}
// Run through all the test cases and validate for ReadVersion.
for i, testCase := range testCases {
_, err = xlStorage.ReadVersion(context.Background(), testCase.volume, testCase.path, "", false)
if err != testCase.err {
t.Fatalf("TestXLStorage %d: Expected err \"%s\", got err \"%s\"", i+1, testCase.err, err)
}
}
}
// TestXLStorageReadAll - TestXLStorages the functionality implemented by xlStorage ReadAll storage API.
func TestXLStorageReadAll(t *testing.T) {
// create xlStorage test setup