fix: ensure proper usage of DataDir (#12300)
- GetObject() should always use a common dataDir to read from when it starts reading, this allows the code in erasure decoding to have sane expectations. - Healing should always heal on the common dataDir, this allows the code in dangling object detection to purge dangling content. These both situations can happen under certain types of retries during PUT when server is restarting etc, some namespace entries might be left over. do not update bloomFilters for temporary objects
This commit is contained in:
parent
74c18717be
commit
7b935613e8
|
@ -756,7 +756,7 @@ var errorCodes = errorCodeMap{
|
|||
},
|
||||
ErrSlowDown: {
|
||||
Code: "SlowDown",
|
||||
Description: "Please reduce your request",
|
||||
Description: "Resource requested is unreadable, please reduce your request rate",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrInvalidPrefixMarker: {
|
||||
|
|
|
@ -125,6 +125,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
|
|||
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset
|
||||
if len(b.data) == 0 {
|
||||
b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset)
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext,
|
||||
fmt.Errorf("Error(%w) reading erasure shards at (%s: %s/%s), will attempt to reconstruct if we have quorum",
|
||||
err, b.disk, b.volume, b.filePath))
|
||||
}
|
||||
} else {
|
||||
b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset)
|
||||
}
|
||||
|
@ -132,7 +137,6 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
|
|||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
if offset != b.currOffset {
|
||||
// Can never happen unless there are programmer bugs
|
||||
return 0, errUnexpected
|
||||
|
|
|
@ -31,7 +31,6 @@ type parallelReader struct {
|
|||
readers []io.ReaderAt
|
||||
orgReaders []io.ReaderAt
|
||||
dataBlocks int
|
||||
errs []error
|
||||
offset int64
|
||||
shardSize int64
|
||||
shardFileSize int64
|
||||
|
@ -48,7 +47,6 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int
|
|||
return ¶llelReader{
|
||||
readers: readers,
|
||||
orgReaders: readers,
|
||||
errs: make([]error, len(readers)),
|
||||
dataBlocks: e.dataBlocks,
|
||||
offset: (offset / e.blockSize) * e.ShardSize(),
|
||||
shardSize: e.ShardSize(),
|
||||
|
@ -172,7 +170,6 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||
// This will be communicated upstream.
|
||||
p.orgReaders[bufIdx] = nil
|
||||
p.readers[i] = nil
|
||||
p.errs[i] = err
|
||||
|
||||
// Since ReadAt returned error, trigger another read.
|
||||
readTriggerCh <- true
|
||||
|
@ -197,19 +194,8 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||
return newBuf, nil
|
||||
}
|
||||
|
||||
if countErrs(p.errs, nil) == len(p.errs) {
|
||||
// We have success from all drives this can mean that
|
||||
// all local drives succeeded, but all remote drives
|
||||
// failed to read since p.readers[i] was already nil
|
||||
// for such remote servers - this condition was missed
|
||||
// we would return instead `nil, nil` from this
|
||||
// function - it is safer to simply return Quorum error
|
||||
// when all errs are nil but erasure coding cannot decode
|
||||
// the content.
|
||||
return nil, errErasureReadQuorum
|
||||
}
|
||||
|
||||
return nil, reduceReadQuorumErrs(context.Background(), p.errs, objectOpIgnoredErrs, p.dataBlocks)
|
||||
// If we cannot decode, just return read quorum error.
|
||||
return nil, errErasureReadQuorum
|
||||
}
|
||||
|
||||
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
|
||||
|
|
|
@ -162,7 +162,6 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
|||
object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
|
||||
availableDisks := make([]StorageAPI, len(onlineDisks))
|
||||
dataErrs := make([]error, len(onlineDisks))
|
||||
|
||||
inconsistent := 0
|
||||
for i, meta := range partsMetadata {
|
||||
if !meta.IsValid() {
|
||||
|
@ -234,6 +233,9 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
|||
if dataErrs[i] == nil {
|
||||
// All parts verified, mark it as all data available.
|
||||
availableDisks[i] = onlineDisk
|
||||
} else {
|
||||
// upon errors just make that disk's fileinfo invalid
|
||||
partsMetadata[i] = FileInfo{}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -261,10 +261,26 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||
|
||||
// List of disks having latest version of the object er.meta
|
||||
// (by modtime).
|
||||
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// List of disks having all parts as per latest er.meta.
|
||||
availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode)
|
||||
// make sure all parts metadata dataDir is same as returned by listOnlineDisks()
|
||||
// the reason is its possible that some of the disks might have stale data, for those
|
||||
// we simply override them with maximally occurring 'dataDir' - this ensures that
|
||||
// disksWithAllParts() verifies same dataDir across all drives.
|
||||
for i := range partsMetadata {
|
||||
partsMetadata[i].DataDir = lfi.DataDir
|
||||
}
|
||||
|
||||
// List of disks having all parts as per latest metadata.
|
||||
// NOTE: do not pass in latestDisks to diskWithAllParts since
|
||||
// the diskWithAllParts needs to reach the drive to ensure
|
||||
// validity of the metadata content, we should make sure that
|
||||
// we pass in disks as is for it to be verified. Once verified
|
||||
// the disksWithAllParts() returns the actual disks that can be
|
||||
// used here for reconstruction. This is done to ensure that
|
||||
// we do not skip drives that have inconsistent metadata to be
|
||||
// skipped from purging when they are stale.
|
||||
availableDisks, dataErrs := disksWithAllParts(ctx, storageDisks, partsMetadata, errs, bucket, object, scanMode)
|
||||
|
||||
// Loop to find number of disks with valid data, per-drive
|
||||
// data state and a list of outdated disks on which data needs
|
||||
|
@ -386,7 +402,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||
result.ParityBlocks = latestMeta.Erasure.ParityBlocks
|
||||
|
||||
// Reorder so that we have data disks first and parity disks next.
|
||||
latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
|
||||
latestDisks := shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
|
||||
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
|
||||
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
|
||||
|
||||
|
@ -770,6 +786,7 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
|
|||
corruptedErasureMeta++
|
||||
}
|
||||
}
|
||||
|
||||
var notFoundParts int
|
||||
for i := range dataErrs {
|
||||
// Only count part errors, if the error is not
|
||||
|
|
|
@ -296,6 +296,11 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||
}
|
||||
var healOnce sync.Once
|
||||
|
||||
// once we have obtained a common FileInfo i.e latest, we should stick
|
||||
// to single dataDir to read the content to avoid reading from some other
|
||||
// dataDir that has stale FileInfo{} to ensure that we fail appropriately
|
||||
// during reads and expect the same dataDir everywhere.
|
||||
dataDir := fi.DataDir
|
||||
for ; partIndex <= lastPartIndex; partIndex++ {
|
||||
if length == totalBytesRead {
|
||||
break
|
||||
|
@ -324,9 +329,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||
continue
|
||||
}
|
||||
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
|
||||
partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
data := metaArr[index].Data
|
||||
readers[index] = newBitrotReader(disk, data, bucket, partPath, tillOffset,
|
||||
partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset,
|
||||
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
|
||||
|
||||
// Prefer local disks
|
||||
|
@ -347,10 +351,12 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||
var scan madmin.HealScanMode
|
||||
if errors.Is(err, errFileNotFound) {
|
||||
scan = madmin.HealNormalScan
|
||||
logger.Info("Healing required, attempting to heal missing shards for %s", pathJoin(bucket, object, fi.VersionID))
|
||||
} else if errors.Is(err, errFileCorrupt) {
|
||||
scan = madmin.HealDeepScan
|
||||
logger.Info("Healing required, attempting to heal bitrot for %s", pathJoin(bucket, object, fi.VersionID))
|
||||
}
|
||||
if scan != madmin.HealUnknownScan {
|
||||
if scan == madmin.HealNormalScan || scan == madmin.HealDeepScan {
|
||||
healOnce.Do(func() {
|
||||
if _, healing := er.getOnlineDisksWithHealing(); !healing {
|
||||
go healObject(bucket, object, fi.VersionID, scan)
|
||||
|
@ -538,7 +544,6 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str
|
|||
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
||||
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dataDir, dstBucket, dstEntry string, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) {
|
||||
dataDir = retainSlash(dataDir)
|
||||
defer ObjectPathUpdated(pathJoin(srcBucket, srcEntry))
|
||||
defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry))
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
@ -592,7 +597,6 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
|
|||
dstEntry = retainSlash(dstEntry)
|
||||
srcEntry = retainSlash(srcEntry)
|
||||
}
|
||||
defer ObjectPathUpdated(pathJoin(srcBucket, srcEntry))
|
||||
defer ObjectPathUpdated(pathJoin(dstBucket, dstEntry))
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
@ -636,14 +640,8 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st
|
|||
|
||||
// putObject wrapper for erasureObjects PutObject
|
||||
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||
defer func() {
|
||||
ObjectPathUpdated(pathJoin(bucket, object))
|
||||
}()
|
||||
|
||||
data := r.Reader
|
||||
|
||||
uniqueID := mustGetUUID()
|
||||
tempObj := uniqueID
|
||||
// No metadata is set, allocate a new one.
|
||||
if opts.UserDefined == nil {
|
||||
opts.UserDefined = make(map[string]string)
|
||||
|
@ -692,7 +690,10 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
fi.VersionID = mustGetUUID()
|
||||
}
|
||||
}
|
||||
|
||||
fi.DataDir = mustGetUUID()
|
||||
uniqueID := mustGetUUID()
|
||||
tempObj := uniqueID
|
||||
|
||||
// Initialize erasure metadata.
|
||||
for index := range partsMetadata {
|
||||
|
|
|
@ -318,8 +318,8 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
|||
}
|
||||
|
||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts)
|
||||
if err != toObjectErr(errFileNotFound, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
|
||||
}
|
||||
|
||||
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks
|
||||
|
|
|
@ -287,7 +287,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
|
|||
_, _ = rand.Read(rnd[:])
|
||||
tmpFile := ".writable-check-" + hex.EncodeToString(rnd[:]) + ".tmp"
|
||||
filePath := pathJoin(p.diskPath, minioMetaTmpBucket, tmpFile)
|
||||
w, err := disk.OpenFileDirectIO(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0666)
|
||||
w, err := OpenFileDirectIO(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
return p, err
|
||||
}
|
||||
|
@ -1039,7 +1039,7 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string, requireDirect
|
|||
var r io.ReadCloser
|
||||
if requireDirectIO {
|
||||
var f *os.File
|
||||
f, err = disk.OpenFileDirectIO(filePath, readMode, 0666)
|
||||
f, err = OpenFileDirectIO(filePath, readMode, 0666)
|
||||
r = &odirectReader{f, nil, nil, true, true, s, nil}
|
||||
} else {
|
||||
r, err = OpenFile(filePath, readMode, 0)
|
||||
|
@ -1327,7 +1327,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
|
|||
var file *os.File
|
||||
// O_DIRECT only supported if offset is zero
|
||||
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
|
||||
file, err = disk.OpenFileDirectIO(filePath, readMode, 0666)
|
||||
file, err = OpenFileDirectIO(filePath, readMode, 0666)
|
||||
} else {
|
||||
// Open the file for reading.
|
||||
file, err = OpenFile(filePath, readMode, 0666)
|
||||
|
@ -1447,7 +1447,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
|||
if fileSize >= 0 && fileSize <= smallFileThreshold {
|
||||
// For streams smaller than 128KiB we simply write them as O_DSYNC (fdatasync)
|
||||
// and not O_DIRECT to avoid the complexities of aligned I/O.
|
||||
w, err := s.openFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
|
||||
w, err := s.openFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue