diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 43dbbac1d..d56714600 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -332,17 +332,14 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re s.writeErrorResponse(w, err) return } + fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData) if err != nil { s.writeErrorResponse(w, err) return } - bufp := s.storage.rpool.Get().(*[]byte) - defer s.storage.rpool.Put(bufp) - enc := msgp.NewWriterBuf(w, *bufp) - logger.LogIf(r.Context(), fi.EncodeMsg(enc)) - enc.Flush() + logger.LogIf(r.Context(), msgp.Encode(w, &fi)) } // WriteMetadata write new updated metadata. diff --git a/cmd/testdata/xl.meta b/cmd/testdata/xl.meta new file mode 100644 index 000000000..694e5a2aa Binary files /dev/null and b/cmd/testdata/xl.meta differ diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index a43e663da..43ee468d9 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -105,6 +105,7 @@ type xlStorage struct { diskID string formatFileInfo os.FileInfo + formatLegacy bool formatLastCheck time.Time diskInfoCache timedValue @@ -496,13 +497,13 @@ func (s *xlStorage) GetDiskID() (string, error) { } s.Lock() - defer s.Unlock() - // If somebody else updated the disk ID and changed the time, return what they got. if !lastCheck.IsZero() && !s.formatLastCheck.Equal(lastCheck) && diskID != "" { + s.Unlock() // Somebody else got the lock first. return diskID, nil } + s.Unlock() formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) fi, err := os.Stat(formatFile) @@ -529,8 +530,10 @@ func (s *xlStorage) GetDiskID() (string, error) { } if xioutil.SameFile(fi, fileInfo) && diskID != "" { + s.Lock() // If the file has not changed, just return the cached diskID information. s.formatLastCheck = time.Now() + s.Unlock() return diskID, nil } @@ -564,7 +567,10 @@ func (s *xlStorage) GetDiskID() (string, error) { return "", errCorruptedFormat } + s.Lock() + defer s.Unlock() s.diskID = format.Erasure.This + s.formatLegacy = format.Erasure.DistributionAlgo == formatErasureVersionV2DistributionAlgoLegacy s.formatFileInfo = fi s.formatLastCheck = time.Now() return s.diskID, nil @@ -754,7 +760,7 @@ func (s *xlStorage) isLeaf(volume string, leafPath string) bool { // legacy `xl.json`, in such situation we just rename // and proceed if rename is successful we know that it // is the leaf since `xl.json` was present. - return s.renameLegacyMetadata(volume, leafPath) == nil + return s.renameLegacyMetadata(volumeDir, leafPath) == nil } return false } @@ -1021,30 +1027,21 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) } -func (s *xlStorage) renameLegacyMetadata(volume, path string) error { +func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) { + s.RLock() + legacy := s.formatLegacy + s.RUnlock() + if !legacy { + // if its not a legacy backend then this function is + // a no-op always returns errFileNotFound + return errFileNotFound + } + atomic.AddInt32(&s.activeIOCount, 1) defer func() { atomic.AddInt32(&s.activeIOCount, -1) }() - volumeDir, err := s.getVolDir(volume) - if err != nil { - return err - } - - //gi Stat a volume entry. - _, err = os.Stat(volumeDir) - if err != nil { - if osIsNotExist(err) { - return errVolumeNotFound - } else if isSysErrIO(err) { - return errFaultyDisk - } else if isSysErrTooManyFiles(err) { - return errTooManyOpenFiles - } - return err - } - // Validate file path length, before reading. filePath := pathJoin(volumeDir, path) if err = checkPathLength(filePath); err != nil { @@ -1097,7 +1094,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { if err == errFileNotFound { - if err = s.renameLegacyMetadata(volume, path); err != nil { + if err = s.renameLegacyMetadata(volumeDir, path); err != nil { if err == errFileNotFound { if versionID != "" { return fi, errFileVersionNotFound @@ -2009,7 +2006,8 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, if !osIsNotExist(err) { return osErrToFileErr(err) } - err = s.renameLegacyMetadata(dstVolume, dstPath) + // errFileNotFound comes here. + err = s.renameLegacyMetadata(dstVolumeDir, dstPath) if err != nil && err != errFileNotFound { return err } diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index 108d7c8fb..98cc5f6da 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -253,6 +253,91 @@ func TestXLStorageIsDirEmpty(t *testing.T) { } } +// TestXLStorageReadVersion - TestXLStorages the functionality implemented by xlStorage ReadVersion storage API. +func TestXLStorageReadVersion(t *testing.T) { + // create xlStorage test setup + xlStorage, path, err := newXLStorageTestSetup() + if err != nil { + t.Fatalf("Unable to create xlStorage test setup, %s", err) + } + + defer os.RemoveAll(path) + + xlMeta, _ := ioutil.ReadFile("testdata/xl.meta") + + // Create files for the test cases. + if err = xlStorage.MakeVol(context.Background(), "exists"); err != nil { + t.Fatalf("Unable to create a volume \"exists\", %s", err) + } + if err = xlStorage.AppendFile(context.Background(), "exists", "as-directory/as-file/xl.meta", xlMeta); err != nil { + t.Fatalf("Unable to create a file \"as-directory/as-file\", %s", err) + } + if err = xlStorage.AppendFile(context.Background(), "exists", "as-file/xl.meta", xlMeta); err != nil { + t.Fatalf("Unable to create a file \"as-file\", %s", err) + } + if err = xlStorage.AppendFile(context.Background(), "exists", "as-file-parent/xl.meta", xlMeta); err != nil { + t.Fatalf("Unable to create a file \"as-file-parent\", %s", err) + } + + // TestXLStoragecases to validate different conditions for ReadVersion API. + testCases := []struct { + volume string + path string + err error + }{ + // TestXLStorage case - 1. + // Validate volume does not exist. + { + volume: "i-dont-exist", + path: "", + err: errVolumeNotFound, + }, + // TestXLStorage case - 2. + // Validate bad condition file does not exist. + { + volume: "exists", + path: "as-file-not-found", + err: errFileNotFound, + }, + // TestXLStorage case - 3. + // Validate bad condition file exists as prefix/directory and + // we are attempting to read it. + { + volume: "exists", + path: "as-directory", + err: errFileNotFound, + }, + // TestXLStorage case - 4. + { + volume: "exists", + path: "as-file-parent/as-file", + err: errFileNotFound, + }, + // TestXLStorage case - 5. + // Validate the good condition file exists and we are able to read it. + { + volume: "exists", + path: "as-file", + err: nil, + }, + // TestXLStorage case - 6. + // TestXLStorage case with invalid volume name. + { + volume: "ab", + path: "as-file", + err: errVolumeNotFound, + }, + } + + // Run through all the test cases and validate for ReadVersion. + for i, testCase := range testCases { + _, err = xlStorage.ReadVersion(context.Background(), testCase.volume, testCase.path, "", false) + if err != testCase.err { + t.Fatalf("TestXLStorage %d: Expected err \"%s\", got err \"%s\"", i+1, testCase.err, err) + } + } +} + // TestXLStorageReadAll - TestXLStorages the functionality implemented by xlStorage ReadAll storage API. func TestXLStorageReadAll(t *testing.T) { // create xlStorage test setup