fix: delete-markers without quorum were unreadable (#13351)

DeleteMarkers were unreadable if they had quorum based
guarantees, this PR tries to fix this behavior appropriately.

DeleteMarkers with sufficient should be allowed and the
return error should be accordingly with or without version-id.

This also allows for overwrites which may not be possible
in a multi-pool setup.

fixes #12787
This commit is contained in:
Harshavardhana 2021-10-04 08:53:38 -07:00
parent ab5bc50847
commit e16e75ce30
5 changed files with 29 additions and 17 deletions

View File

@ -39,7 +39,11 @@ func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dat
}
for _, dataDir := range dataDirs {
if dataDir == "" {
if dataDir == errorDir {
continue
}
if dataDir == delMarkerDir {
dataDirOccurenceMap[delMarkerDir]++
continue
}
dataDirOccurenceMap[dataDir]++
@ -96,6 +100,11 @@ func listObjectModtimes(partsMetadata []FileInfo, errs []error) (modTimes []time
return modTimes
}
const (
errorDir = "error-dir"
delMarkerDir = ""
)
// Notes:
// There are 5 possible states a disk could be in,
// 1. __online__ - has the latest copy of xl.meta - returned by listOnlineDisks
@ -129,6 +138,7 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
dataDirs := make([]string, len(partsMetadata))
for idx, fi := range partsMetadata {
if errs[idx] != nil {
dataDirs[idx] = errorDir
continue
}
dataDirs[idx] = fi.DataDir
@ -150,9 +160,9 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
}
// Returns the latest updated FileInfo files and error in case of failure.
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error) (FileInfo, error) {
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error, quorum int) (FileInfo, error) {
// There should be atleast half correct entries, if not return failure
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, len(partsMetadata)/2); reducedErr != nil {
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum); reducedErr != nil {
return FileInfo{}, reducedErr
}
@ -181,7 +191,8 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
count++
}
}
if count < len(partsMetadata)/2 {
if count < quorum {
return FileInfo{}, errErasureReadQuorum
}

View File

@ -189,7 +189,7 @@ func TestListOnlineDisks(t *testing.T) {
}
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}
@ -364,7 +364,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
}
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}
@ -420,7 +420,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
}
partsMetadata, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
_, err = getLatestFileInfo(ctx, partsMetadata, errs)
_, err = getLatestFileInfo(ctx, partsMetadata, errs, len(disks)/2)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}

View File

@ -258,6 +258,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Re-read when we have lock...
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
if _, err = getLatestFileInfo(ctx, partsMetadata, errs, er.defaultParityCount); err != nil {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
}
// List of disks having latest version of the object er.meta
// (by modtime).
latestDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
@ -853,7 +857,9 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
versionID = nullVersionID
}
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
// Perform quick read without lock.
// This allows to quickly check if all is ok or all are missing.
_, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
if isAllNotFound(errs) {
err = toObjectErr(errFileNotFound, bucket, object)
if versionID != "" {
@ -863,11 +869,6 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
}
_, err = getLatestFileInfo(healCtx, partsMetadata, errs)
if err != nil {
return er.purgeObjectDangling(healCtx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
}
// Heal the object.
return er.healObject(healCtx, bucket, object, versionID, opts)
}

View File

@ -215,7 +215,7 @@ func TestHealObjectCorrupted(t *testing.T) {
}
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
fi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
@ -240,7 +240,7 @@ func TestHealObjectCorrupted(t *testing.T) {
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
nfi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
@ -266,7 +266,7 @@ func TestHealObjectCorrupted(t *testing.T) {
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
nfi, err = getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}

View File

@ -317,7 +317,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
// writeQuorum is the min required disks to write data.
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs, defaultParityCount)
if err != nil {
return 0, 0, err
}