Compare commits

...

20 Commits

Author SHA1 Message Date
Harshavardhana e16e75ce30 fix: delete-markers without quorum were unreadable (#13351)
DeleteMarkers were unreadable if they had quorum based
guarantees, this PR tries to fix this behavior appropriately.

DeleteMarkers with sufficient should be allowed and the
return error should be accordingly with or without version-id.

This also allows for overwrites which may not be possible
in a multi-pool setup.

fixes #12787
2021-11-05 00:11:32 -07:00
Harshavardhana ab5bc50847 update to RedHat UBI 8.4 image 2021-11-04 23:46:21 -07:00
Harshavardhana 59fc075b8a add missing responseBody drain (#12147)
Signed-off-by: Harshavardhana <harsha@minio.io>
2021-11-04 23:28:08 -07:00
Harshavardhana c896954fea fix: extend parentDirIsObject for all parents 2021-11-04 17:56:34 -07:00
Harshavardhana 1221feca72 fix: SQL select to honor limits properly for array queries (#13568)
added tests to cover the scenarios as well.
2021-11-02 20:37:23 -07:00
Harshavardhana 11005331cd regenerate data-usage-cache upon unreadable errors
also do not hold up usage scanner for long
2021-10-19 10:03:54 -07:00
Poorna K cf89776b40
Backport replica metrics update (#13461)
* fix too few params in writeErrorResponse

* Update Replica metrics in real time

Co-authored-by: Poorna Krishnamoorthy <poorna@minio.io>
2021-10-18 08:42:27 -07:00
Harshavardhana c60f17c89b fix: validate exclusivity with partNumber regardless of valid Range
To mimic an exact AWS S3 behavior this fix is needed.
2021-10-12 00:21:03 -07:00
Poorna K b97131446c
Backport replication using multipart (#13384) 2021-10-08 10:28:04 -07:00
Harshavardhana 74ad69f31e optimize multipart upload
cherry-pick 33cee9f38a from master
branch for improving multipart upload and lock handling
2021-10-05 10:07:42 -07:00
Harshavardhana 4a2307612d s3: Put bucket tagging to return an error when bucket is not found 2021-10-04 10:02:37 -07:00
Harshavardhana 6e90939ecf do not panic if DNS_WEBHOOK_ENDPOINT is not reachable (#13265) 2021-10-04 01:02:51 -07:00
Ashish Kumar Sinha 89ab3dfd40 Update PutObjectPart error message (#13313)
Co-authored-by: sinhaashish <ashish@minio.io>
2021-10-04 01:02:36 -07:00
Klaus Post 795a24ea71
Debug info backport (#12643)
Download files from *any* bucket/path as an encrypted zip file.

The key is included in the response but can be separated so zip 
and the key doesn't have to be sent on the same channel.

Requires https://github.com/minio/pkg/pull/6
2021-07-08 10:34:34 -07:00
Klaus Post 3ae18ce672 Fix list entry deduplication (#12325)
File infos would always be the same.

Add numversions as a final tiebreaker.
2021-06-21 13:25:04 -07:00
Anis Elleuch f179fc0e37
fix: safe update of the audit objectErasureMap (#12477) (#12527)
objectErasureMap in the audit holds information about the objects
involved in the current S3 operation such as pool index, set an index,
and disk endpoints. One user saw a crash due to a concurrent update of
objectErasureMap information. Use sync.Map to prevent a crash.
2021-06-17 08:38:48 -07:00
Klaus Post 2fc63ae45b Fix crash from unstable sort
Fix in https://github.com/minio/minio/pull/12487 assumes that slices with tiebreaks are sorted equally.

That is only the case for "stable" sort versions. Fixes nil result being returned in multi-pool setups.
2021-06-14 09:53:43 -07:00
Harshavardhana a9451eaca8 fix: inline data upon overwrites should be readable (#12369)
This PR fixes two bugs

- Remove fi.Data upon overwrite of objects from inlined-data to non-inlined-data
- Workaround for an existing bug on disk with latest releases to ignore fi.Data
  and instead read from the disk for non-inlined-data
- Addtionally add a reserved metadata header to indicate data is inlined for
  a given version.
2021-06-10 15:03:46 -07:00
Harshavardhana 80d4fc863f fix: serve always only the latest objects
due to a historic bug, it is possible that
some objects might exist on multiple pools,
rely on ModTime to return the correct pool.
2021-06-10 14:52:14 -07:00
Harshavardhana 8c654a725e revert CreateFile waitForResponse (#12124)
instead use expect continue timeout, and have
higher response header timeout, the new higher
timeout satisfies worse case scenarios for total
response time on a CreateFile operation.

Also set the "expect" continue header to satisfy
expect continue timeout behavior.

Some clients seem to cause CreateFile body to be
truncated, leading to no errors which instead
fails with ObjectNotFound on a PUT operation,
this change avoids such failures appropriately.

Signed-off-by: Harshavardhana <harsha@minio.io>
2021-05-07 14:30:40 -07:00
47 changed files with 902 additions and 255 deletions

View File

@ -11,7 +11,7 @@ RUN \
git clone https://github.com/minio/minio && cd minio && \
git checkout master && go install -v -ldflags "$(go run buildscripts/gen-ldflags.go)"
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
ENV MINIO_ACCESS_KEY_FILE=access_key \
MINIO_SECRET_KEY_FILE=secret_key \

View File

@ -11,7 +11,7 @@ RUN \
git clone https://github.com/minio/minio && cd minio && \
git checkout master && go install -v -ldflags "$(go run buildscripts/gen-ldflags.go)"
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
ARG TARGETARCH

View File

@ -1,4 +1,4 @@
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
ARG TARGETARCH

View File

@ -1,4 +1,4 @@
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
ARG TARGETARCH

View File

@ -18,6 +18,7 @@ package cmd
import (
"context"
crand "crypto/rand"
"crypto/subtle"
"crypto/tls"
"encoding/json"
@ -36,6 +37,7 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/klauspost/compress/zip"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
@ -49,7 +51,8 @@ import (
"github.com/minio/minio/pkg/kms"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
trace "github.com/minio/minio/pkg/trace"
"github.com/minio/minio/pkg/trace"
"github.com/secure-io/sio-go"
)
const (
@ -1856,3 +1859,104 @@ func checkConnection(endpointStr string, timeout time.Duration) error {
defer xhttp.DrainBody(resp.Body)
return nil
}
// getRawDataer provides an interface for getting raw FS files.
type getRawDataer interface {
GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error
}
// InspectDataHandler - GET /minio/admin/v3/inspect-data
// ----------
// Download file from all nodes in a zip format
func (a adminAPIHandlers) InspectDataHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "InspectData")
// Validate request signature.
_, adminAPIErr := checkAdminRequestAuth(ctx, r, iampolicy.InspectDataAction, "")
if adminAPIErr != ErrNone {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(adminAPIErr), r.URL)
return
}
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
o, ok := newObjectLayerFn().(getRawDataer)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
volume := r.URL.Query().Get("volume")
file := r.URL.Query().Get("file")
if len(volume) == 0 {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketName), r.URL)
return
}
if len(file) == 0 {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
return
}
var key [32]byte
// MUST use crypto/rand
n, err := crand.Read(key[:])
if err != nil || n != len(key) {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
return
}
stream, err := sio.AES_256_GCM.Stream(key[:])
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
return
}
// Zero nonce, we only use each key once, and 32 bytes is plenty.
nonce := make([]byte, stream.NonceSize())
encw := stream.EncryptWriter(w, nonce, nil)
defer encw.Close()
// Write a version for making *incompatible* changes.
// The AdminClient will reject any version it does not know.
w.Write([]byte{1})
// Write key first (without encryption)
_, err = w.Write(key[:])
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
return
}
// Initialize a zip writer which will provide a zipped content
// of profiling data of all nodes
zipWriter := zip.NewWriter(encw)
defer zipWriter.Close()
err = o.GetRawData(ctx, volume, file, func(r io.Reader, host, disk, filename string, size int64, modtime time.Time) error {
// Prefix host+disk
filename = path.Join(host, disk, filename)
header, zerr := zip.FileInfoHeader(dummyFileInfo{
name: filename,
size: size,
mode: 0600,
modTime: modtime,
isDir: false,
sys: nil,
})
if zerr != nil {
logger.LogIf(ctx, zerr)
return nil
}
header.Method = zip.Deflate
zwriter, zerr := zipWriter.CreateHeader(header)
if zerr != nil {
logger.LogIf(ctx, zerr)
return nil
}
if _, err = io.Copy(zwriter, r); err != nil {
logger.LogIf(ctx, err)
}
return nil
})
logger.LogIf(ctx, err)
}

View File

@ -56,6 +56,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
// Info operations
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/info").HandlerFunc(httpTraceAll(adminAPI.ServerInfoHandler))
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/inspect-data").HandlerFunc(httpTraceHdrs(adminAPI.InspectDataHandler)).Queries("volume", "{volume:.*}", "file", "{file:.*}")
// StorageInfo operations
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/storageinfo").HandlerFunc(httpTraceAll(adminAPI.StorageInfoHandler))

View File

@ -440,7 +440,7 @@ var errorCodes = errorCodeMap{
},
ErrInvalidMaxParts: {
Code: "InvalidArgument",
Description: "Argument max-parts must be an integer between 0 and 2147483647",
Description: "Part number must be an integer between 1 and 10000, inclusive",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidPartNumberMarker: {

View File

@ -1287,6 +1287,12 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
return
}
// Before proceeding validate if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
config, err := objectlock.ParseObjectLockConfig(r.Body)
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
@ -1341,6 +1347,12 @@ func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWri
return
}
// Before proceeding validate if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
config, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
@ -1378,6 +1390,12 @@ func (api objectAPIHandlers) PutBucketTaggingHandler(w http.ResponseWriter, r *h
return
}
// Before proceeding validate if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
tags, err := tags.ParseBucketXML(io.LimitReader(r.Body, r.ContentLength))
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
@ -1423,6 +1441,12 @@ func (api objectAPIHandlers) GetBucketTaggingHandler(w http.ResponseWriter, r *h
return
}
// Before proceeding validate if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
config, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
@ -1460,6 +1484,12 @@ func (api objectAPIHandlers) DeleteBucketTaggingHandler(w http.ResponseWriter, r
return
}
// Before proceeding validate if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if err := globalBucketMetadataSys.Update(bucket, bucketTaggingConfig, nil); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return

View File

@ -19,6 +19,7 @@ package cmd
import (
"context"
"fmt"
"io"
"net/http"
"reflect"
"strings"
@ -35,6 +36,7 @@ import (
"github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
)
@ -717,9 +719,17 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, gr, opts)
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
if objInfo.isMultipart() {
if err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object,
r, objInfo, putOpts); err != nil {
replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
}
} else {
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
}
@ -779,6 +789,55 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
}
}
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (err error) {
var uploadedParts []miniogo.CompletePart
uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
if err != nil {
return err
}
defer func() {
if err != nil {
// block and abort remote upload upon failure.
if aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID); aerr != nil {
aerr = fmt.Errorf("Unable to cleanup failed multipart replication %s on remote %s/%s: %w", uploadID, bucket, object, aerr)
logger.LogIf(ctx, aerr)
}
}
}()
var (
hr *hash.Reader
pInfo miniogo.ObjectPart
)
for _, partInfo := range objInfo.Parts {
hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize)
if err != nil {
return err
}
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.ActualSize, "", "", opts.ServerSideEncryption)
if err != nil {
return err
}
if pInfo.Size != partInfo.ActualSize {
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
}
uploadedParts = append(uploadedParts, miniogo.CompletePart{
PartNumber: pInfo.PartNumber,
ETag: pInfo.ETag,
})
}
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{
Internal: miniogo.AdvancedPutOptions{
SourceMTime: objInfo.ModTime,
// always set this to distinguish between `mc mirror` replication and serverside
ReplicationRequest: true,
}})
return err
}
// filterReplicationStatusMetadata filters replication status metadata for COPY
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
// Copy on write

View File

@ -89,16 +89,19 @@ func (c *OperatorDNS) Put(bucket string) error {
if err = c.addAuthHeader(req); err != nil {
return newError(bucket, err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
if derr := c.Delete(bucket); derr != nil {
return newError(bucket, derr)
}
return err
}
var errorStringBuilder strings.Builder
io.Copy(&errorStringBuilder, io.LimitReader(resp.Body, resp.ContentLength))
xhttp.DrainBody(resp.Body)
defer xhttp.DrainBody(resp.Body)
if resp.StatusCode != http.StatusOK {
var errorStringBuilder strings.Builder
io.Copy(&errorStringBuilder, io.LimitReader(resp.Body, resp.ContentLength))
errorString := errorStringBuilder.String()
switch resp.StatusCode {
case http.StatusConflict:

View File

@ -528,12 +528,17 @@ type objectIO interface {
// Only backend errors are returned as errors.
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
// Abandon if more than 5 minutes, so we don't hold up scanner.
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{})
if err != nil {
switch err.(type) {
case ObjectNotFound:
case BucketNotFound:
case InsufficientReadQuorum:
case StorageErr:
default:
return toObjectErr(err, dataUsageBucket, name)
}
@ -561,6 +566,10 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
return err
}
// Abandon if more than 5 minutes, so we don't hold up scanner.
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
_, err = store.PutObject(ctx,
dataUsageBucket,
name,

View File

@ -65,6 +65,29 @@ const (
)
// isMultipart returns true if the current object is
// uploaded by the user using multipart mechanism:
// initiate new multipart, upload part, complete upload
func (o *ObjectInfo) isMultipart() bool {
if len(o.Parts) == 0 {
return false
}
_, encrypted := crypto.IsEncrypted(o.UserDefined)
if encrypted && !crypto.IsMultiPart(o.UserDefined) {
return false
}
for _, part := range o.Parts {
_, err := sio.DecryptedSize(uint64(part.Size))
if err != nil {
return false
}
}
// Further check if this object is uploaded using multipart mechanism
// by the user and it is not about Erasure internally splitting the
// object into parts in PutObject()
return !(o.backendType == BackendErasure && len(o.ETag) == 32)
}
// isEncryptedMultipart returns true if the current object is
// uploaded by the user using multipart mechanism:
// initiate new multipart, upload part, complete upload

View File

@ -39,7 +39,11 @@ func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dat
}
for _, dataDir := range dataDirs {
if dataDir == "" {
if dataDir == errorDir {
continue
}
if dataDir == delMarkerDir {
dataDirOccurenceMap[delMarkerDir]++
continue
}
dataDirOccurenceMap[dataDir]++
@ -96,6 +100,11 @@ func listObjectModtimes(partsMetadata []FileInfo, errs []error) (modTimes []time
return modTimes
}
const (
errorDir = "error-dir"
delMarkerDir = ""
)
// Notes:
// There are 5 possible states a disk could be in,
// 1. __online__ - has the latest copy of xl.meta - returned by listOnlineDisks
@ -129,6 +138,7 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
dataDirs := make([]string, len(partsMetadata))
for idx, fi := range partsMetadata {
if errs[idx] != nil {
dataDirs[idx] = errorDir
continue
}
dataDirs[idx] = fi.DataDir
@ -150,9 +160,9 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
}
// Returns the latest updated FileInfo files and error in case of failure.
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error) (FileInfo, error) {
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error, quorum int) (FileInfo, error) {
// There should be atleast half correct entries, if not return failure
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, len(partsMetadata)/2); reducedErr != nil {
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum); reducedErr != nil {
return FileInfo{}, reducedErr
}
@ -181,7 +191,8 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
count++
}
}
if count < len(partsMetadata)/2 {
if count < quorum {
return FileInfo{}, errErasureReadQuorum
}

View File

@ -189,7 +189,7 @@ func TestListOnlineDisks(t *testing.T) {
}
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}
@ -364,7 +364,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
}
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}
@ -420,7 +420,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
}
partsMetadata, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
_, err = getLatestFileInfo(ctx, partsMetadata, errs)
_, err = getLatestFileInfo(ctx, partsMetadata, errs, len(disks)/2)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}

View File

@ -258,6 +258,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Re-read when we have lock...
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
if _, err = getLatestFileInfo(ctx, partsMetadata, errs, er.defaultParityCount); err != nil {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
}
// List of disks having latest version of the object er.meta
// (by modtime).
latestDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
@ -853,8 +857,9 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
versionID = nullVersionID
}
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
// Perform quick read without lock.
// This allows to quickly check if all is ok or all are missing.
_, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
if isAllNotFound(errs) {
err = toObjectErr(errFileNotFound, bucket, object)
if versionID != "" {
@ -864,11 +869,6 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
}
_, err = getLatestFileInfo(healCtx, partsMetadata, errs)
if err != nil {
return er.purgeObjectDangling(healCtx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
}
// Heal the object.
return er.healObject(healCtx, bucket, object, versionID, opts)
}

View File

@ -215,7 +215,7 @@ func TestHealObjectCorrupted(t *testing.T) {
}
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
fi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
@ -240,7 +240,7 @@ func TestHealObjectCorrupted(t *testing.T) {
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
nfi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
@ -266,7 +266,7 @@ func TestHealObjectCorrupted(t *testing.T) {
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
nfi, err = getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}

View File

@ -317,7 +317,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
// writeQuorum is the min required disks to write data.
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs, defaultParityCount)
if err != nil {
return 0, 0, err
}

View File

@ -378,15 +378,25 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
//
// Implements S3 compatible Upload Part API.
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
// Write lock for this part ID.
// Held throughout the operation.
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
pctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
defer partIDLock.Unlock()
// Read lock for upload id.
// Only held while reading the upload metadata.
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
rctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
readLocked := true
defer func() {
if readLocked {
uploadIDLock.RUnlock()
if uploadIDRLock != nil {
uploadIDRLock.RUnlock()
}
}()
@ -402,23 +412,27 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
// Validates if upload ID exists.
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
return pi, toObjectErr(err, bucket, object, uploadID)
}
storageDisks := er.getDisks()
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
uploadIDPath, "", false)
// Unlock upload id locks before, so others can get it.
uploadIDRLock.RUnlock()
uploadIDRLock = nil
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount)
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object)
}
@ -427,7 +441,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, dataDir, writeQuorum)
if err != nil {
return pi, err
}
@ -449,7 +463,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
}
}()
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
@ -486,7 +500,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
}
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum)
closeBitrotWriters(writers)
if err != nil {
return pi, toObjectErr(err, bucket, object)
@ -504,31 +518,29 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
}
}
// Unlock here before acquiring write locks all concurrent
// PutObjectParts would serialize here updating `xl.meta`
uploadIDLock.RUnlock()
readLocked = false
ctx, err = uploadIDLock.GetLock(ctx, globalOperationTimeout)
// Acquire write lock to update metadata.
uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
wctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
defer uploadIDLock.Unlock()
defer uploadIDWLock.Unlock()
// Validates if upload ID exists.
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
return pi, toObjectErr(err, bucket, object, uploadID)
}
// Rename temporary part file to its final location.
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
onlineDisks, err = rename(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object)
}
@ -537,7 +549,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, dataDir, writeQuorum)
if err != nil {
return pi, err
}
@ -565,7 +577,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
}
// Writes update `xl.meta` format for each disk.
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}

View File

@ -203,36 +203,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
return fn(pr, h, opts.CheckPrecondFn, pipeCloser)
}
// GetObject - reads an object erasured coded across multiple
// disks. Supports additional parameters like offset and length
// which are synonymous with HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
// Lock the object before reading.
lk := er.NewNSLock(bucket, object)
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
defer lk.RUnlock()
// Start offset cannot be negative.
if startOffset < 0 {
logger.LogIf(ctx, errUnexpected, logger.Application)
return errUnexpected
}
// Writer cannot be nil.
if writer == nil {
logger.LogIf(ctx, errUnexpected)
return errUnexpected
}
return er.getObject(ctx, bucket, object, startOffset, length, writer, opts)
}
func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
// Reorder online disks based on erasure distribution order.
// Reorder parts metadata based on erasure distribution order.
@ -272,6 +242,27 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
if err != nil {
return toObjectErr(err, bucket, object)
}
// This hack is needed to avoid a bug found when overwriting
// the inline data object with a un-inlined version, for the
// time being we need this as we have released inline-data
// version already and this bug is already present in newer
// releases.
//
// This mainly happens with objects < smallFileThreshold when
// they are overwritten with un-inlined objects >= smallFileThreshold,
// due to a bug in RenameData() the fi.Data is not niled leading to
// GetObject thinking that fi.Data is valid while fi.Size has
// changed already.
if _, ok := fi.Metadata[ReservedMetadataPrefixLower+"inline-data"]; ok {
shardFileSize := erasure.ShardFileSize(fi.Size)
if shardFileSize >= 0 && shardFileSize >= smallFileThreshold {
for i := range metaArr {
metaArr[i].Data = nil
}
}
}
var healOnce sync.Once
for ; partIndex <= lastPartIndex; partIndex++ {
@ -355,23 +346,6 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
return nil
}
// getObject wrapper for erasure GetObject
func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error {
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil {
return toObjectErr(err, bucket, object)
}
if fi.Deleted {
if opts.VersionID == "" {
return toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return toObjectErr(errMethodNotAllowed, bucket, object)
}
return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, fi, metaArr, onlineDisks)
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
if !opts.NoLock {
@ -787,6 +761,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
partsMetadata[index].ModTime = modTime
}
if len(inlineBuffers) > 0 {
// Set an additional header when data is inlined.
for index := range partsMetadata {
partsMetadata[index].Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
}
}
// Rename the successfully written temporary object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum); err != nil {
logger.LogIf(ctx, err)

View File

@ -322,9 +322,18 @@ func TestGetObjectNoQuorum(t *testing.T) {
}
}
err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts)
if err != toObjectErr(errFileNotFound, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
if err != nil {
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
}
}
if gr != nil {
_, err = io.Copy(ioutil.Discard, gr)
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
}
gr.Close()
}
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks
@ -358,9 +367,18 @@ func TestGetObjectNoQuorum(t *testing.T) {
}
z.serverPools[0].erasureDisksMu.Unlock()
// Fetch object from store.
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
if err != nil {
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
}
}
if gr != nil {
_, err = io.Copy(ioutil.Discard, gr)
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
}
gr.Close()
}
}

View File

@ -147,6 +147,38 @@ func (z *erasureServerPools) GetDisksID(ids ...string) []StorageAPI {
return res
}
// GetRawFile will return all files with a given raw path to the callback.
// For now only direct paths are supported.
func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error {
for _, s := range z.serverPools {
for _, disks := range s.erasureDisks {
for i, disk := range disks {
if disk == OfflineDisk {
continue
}
si, err := disk.StatInfoFile(ctx, volume, file)
if err != nil {
continue
}
r, err := disk.ReadFileStream(ctx, volume, file, 0, si.Size)
if err != nil {
continue
}
defer r.Close()
did, err := disk.GetDiskID()
if err != nil {
did = fmt.Sprintf("disk-%d", i)
}
err = fn(r, disk.Hostname(), did, pathJoin(volume, file), si.Size, si.ModTime)
if err != nil {
return err
}
}
}
}
return nil
}
func (z *erasureServerPools) SetDriveCounts() []int {
setDriveCounts := make([]int, len(z.serverPools))
for i := range z.serverPools {
@ -244,6 +276,13 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, s
return serverPools
}
// poolObjInfo represents the state of an object per pool
type poolObjInfo struct {
PoolIndex int
ObjInfo ObjectInfo
Err error
}
// getPoolIdxExisting returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
@ -254,35 +293,49 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
return 0, nil
}
errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))
poolObjInfos := make([]poolObjInfo, len(z.serverPools))
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
// remember the pool index, we may sort the slice original index might be lost.
pinfo := poolObjInfo{
PoolIndex: i,
}
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
poolObjInfos[i] = pinfo
}(i, pool)
}
wg.Wait()
for i, err := range errs {
if err == nil {
return i, nil
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
sort.Slice(poolObjInfos, func(i, j int) bool {
mtime1 := poolObjInfos[i].ObjInfo.ModTime
mtime2 := poolObjInfos[j].ObjInfo.ModTime
return mtime1.After(mtime2)
})
for _, pinfo := range poolObjInfos {
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return -1, pinfo.Err
}
if isErrObjectNotFound(err) {
if isErrObjectNotFound(pinfo.Err) {
// No object exists or its a delete marker,
// check objInfo to confirm.
if objInfos[i].DeleteMarker && objInfos[i].Name != "" {
return i, nil
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
return pinfo.PoolIndex, nil
}
// objInfo is not valid, truly the object doesn't
// exist proceed to next pool.
continue
}
return -1, err
return pinfo.PoolIndex, nil
}
return -1, toObjectErr(errFileNotFound, bucket, object)
@ -629,45 +682,86 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
unlockOnDefer = true
}
errs := make([]error, len(z.serverPools))
grs := make([]*GetObjectReader, len(z.serverPools))
lockType = noLock // do not take locks at lower levels
checkPrecondFn := opts.CheckPrecondFn
opts.CheckPrecondFn = nil
results := make([]struct {
zIdx int
gr *GetObjectReader
err error
}, len(z.serverPools))
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
grs[i], errs[i] = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
results[i].zIdx = i
results[i].gr, results[i].err = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}(i, pool)
}
wg.Wait()
var found int = -1
for i, err := range errs {
if err == nil {
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
sort.Slice(results, func(i, j int) bool {
var mtimeI, mtimeJ time.Time
if results[i].gr != nil {
mtimeI = results[i].gr.ObjInfo.ModTime
}
if results[j].gr != nil {
mtimeJ = results[j].gr.ObjInfo.ModTime
}
// On tiebreaks, choose the earliest.
if mtimeI.Equal(mtimeJ) {
return results[i].zIdx < results[j].zIdx
}
return mtimeI.After(mtimeJ)
})
var found = -1
for i, res := range results {
if res.err == nil {
// Select earliest result
found = i
break
}
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
for _, grr := range grs {
if grr != nil {
grr.Close()
}
if !isErrObjectNotFound(res.err) && !isErrVersionNotFound(res.err) {
for _, res := range results {
res.gr.Close()
}
return gr, err
return nil, res.err
}
}
if found >= 0 {
return grs[found], nil
if found < 0 {
object = decodeDirObject(object)
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
return gr, ObjectNotFound{Bucket: bucket, Object: object}
}
object = decodeDirObject(object)
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
// Check preconditions.
if checkPrecondFn != nil && checkPrecondFn(results[found].gr.ObjInfo) {
for _, res := range results {
res.gr.Close()
}
return nil, PreConditionFailed{}
}
return gr, ObjectNotFound{Bucket: bucket, Object: object}
// Close all others
for i, res := range results {
if i == found {
continue
}
res.gr.Close()
}
return results[found].gr, nil
}
func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
@ -689,37 +783,48 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
}
defer lk.RUnlock()
errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))
results := make([]struct {
zIdx int
oi ObjectInfo
err error
}, len(z.serverPools))
opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, opts)
results[i].zIdx = i
results[i].oi, results[i].err = pool.GetObjectInfo(ctx, bucket, object, opts)
}(i, pool)
}
wg.Wait()
var found int = -1
for i, err := range errs {
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
sort.Slice(results, func(i, j int) bool {
a, b := results[i], results[j]
if a.oi.ModTime.Equal(b.oi.ModTime) {
// On tiebreak, select the lowest zone index.
return a.zIdx < b.zIdx
}
return a.oi.ModTime.After(b.oi.ModTime)
})
for _, res := range results {
// Return first found.
err := res.err
if err == nil {
found = i
break
return res.oi, nil
}
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// some errors such as MethodNotAllowed for delete marker
// should be returned upwards.
return objInfos[i], err
return res.oi, err
}
}
if found >= 0 {
return objInfos[found], nil
}
object = decodeDirObject(object)
if opts.VersionID != "" {
return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}

View File

@ -19,11 +19,13 @@ package cmd
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"math/rand"
"net/http"
"path"
"sort"
"sync"
"time"
@ -499,6 +501,21 @@ type auditObjectOp struct {
Disks []string `json:"disks"`
}
type auditObjectErasureMap struct {
sync.Map
}
// Define how to marshal auditObjectErasureMap so it can be
// printed in the audit webhook notification request.
func (a *auditObjectErasureMap) MarshalJSON() ([]byte, error) {
mapCopy := make(map[string]auditObjectOp)
a.Range(func(k, v interface{}) bool {
mapCopy[k.(string)] = v.(auditObjectOp)
return true
})
return json.Marshal(mapCopy)
}
func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) {
if len(logger.AuditTargets) == 0 {
return
@ -512,20 +529,20 @@ func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjec
Disks: set.getEndpoints(),
}
var objectErasureSetTag map[string]auditObjectOp
var objectErasureSetTag *auditObjectErasureMap
reqInfo := logger.GetReqInfo(ctx)
for _, kv := range reqInfo.GetTags() {
if kv.Key == objectErasureMapKey {
objectErasureSetTag = kv.Val.(map[string]auditObjectOp)
objectErasureSetTag = kv.Val.(*auditObjectErasureMap)
break
}
}
if objectErasureSetTag == nil {
objectErasureSetTag = make(map[string]auditObjectOp)
objectErasureSetTag = &auditObjectErasureMap{}
}
objectErasureSetTag[object] = op
objectErasureSetTag.Store(object, op)
reqInfo.SetTags(objectErasureMapKey, objectErasureSetTag)
}
@ -863,10 +880,19 @@ func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string,
}
func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
if parent == "." {
return false
var isParentDirObject func(string) bool
isParentDirObject = func(p string) bool {
if p == "." || p == SlashSeparator {
return false
}
if s.getHashedSet(p).parentDirIsObject(ctx, bucket, p) {
// If there is already a file at prefix "p", return true.
return true
}
// Check if there is a file as one of the parent paths.
return isParentDirObject(path.Dir(p))
}
return s.getHashedSet(parent).parentDirIsObject(ctx, bucket, parent)
return isParentDirObject(parent)
}
// PutObject - writes an object to hashedSet based on the object name.

View File

@ -26,6 +26,7 @@ import (
"os"
"os/user"
"path"
"path/filepath"
"sort"
"strings"
"sync"
@ -1607,3 +1608,16 @@ func (fs *FSObjects) ReadHealth(ctx context.Context) bool {
_, err := os.Stat(fs.fsPath)
return err == nil
}
func (fs *FSObjects) GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error {
f, err := os.Open(filepath.Join(fs.fsPath, volume, file))
if err != nil {
return nil
}
defer f.Close()
st, err := f.Stat()
if err != nil || st.IsDir() {
return nil
}
return fn(f, "fs", fs.fsUUID, file, st.Size(), st.ModTime())
}

View File

@ -380,7 +380,7 @@ func (l *s3Objects) ListObjects(ctx context.Context, bucket string, prefix strin
// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
func (l *s3Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, e error) {
result, err := l.Client.ListObjectsV2(bucket, prefix, continuationToken, fetchOwner, delimiter, maxKeys)
result, err := l.Client.ListObjectsV2(bucket, prefix, startAfter, continuationToken, delimiter, maxKeys)
if err != nil {
return loi, minio.ErrorRespToObjectError(err, bucket)
}
@ -670,7 +670,7 @@ func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, obj
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
func (l *s3Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {
etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts))
etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts), miniogo.PutObjectOptions{})
if err != nil {
return oi, minio.ErrorRespToObjectError(err, bucket, object)
}

View File

@ -188,11 +188,16 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
if err != nil {
return true
}
oFIV, err := existing.fileInfo(o.Bucket)
oFIV, err := other.fileInfo(o.Bucket)
if err != nil {
return false
}
return oFIV.ModTime.After(eFIV.ModTime)
// Replace if modtime is newer
if !oFIV.ModTime.Equal(eFIV.ModTime) {
return oFIV.ModTime.After(eFIV.ModTime)
}
// Use NumVersions as a final tiebreaker.
return oFIV.NumVersions > eFIV.NumVersions
})
if entries.len() > o.Limit {
allAtEOF = false

View File

@ -27,6 +27,7 @@ import (
"strings"
"github.com/gorilla/mux"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
xioutil "github.com/minio/minio/pkg/ioutil"
)
@ -293,6 +294,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
logger.LogIf(ctx, err)
return err
}
defer xhttp.DrainBody(respBody)
return waitForHTTPStream(respBody, wr)
}

View File

@ -285,3 +285,10 @@ func (d *naughtyDisk) VerifyFile(ctx context.Context, volume, path string, fi Fi
}
return d.disk.VerifyFile(ctx, volume, path, fi)
}
func (d *naughtyDisk) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
if err := d.calcError(); err != nil {
return stat, err
}
return d.disk.StatInfoFile(ctx, volume, path)
}

View File

@ -805,6 +805,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
// Close - calls the cleanup actions in reverse order
func (g *GetObjectReader) Close() error {
if g == nil {
return nil
}
// sync.Once is used here to ensure that Close() is
// idempotent.
g.once.Do(func() {

View File

@ -386,6 +386,12 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
var rangeErr error
rangeHeader := r.Header.Get(xhttp.Range)
if rangeHeader != "" {
// Both 'Range' and 'partNumber' cannot be specified at the same time
if opts.PartNumber > 0 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber), r.URL, guessIsBrowserReq(r))
return
}
rs, rangeErr = parseRequestRangeSpec(rangeHeader)
// Handle only errInvalidRange. Ignore other
// parse error and treat it as regular Get
@ -399,12 +405,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
}
}
// Both 'bytes' and 'partNumber' cannot be specified at the same time
if rs != nil && opts.PartNumber > 0 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber), r.URL, guessIsBrowserReq(r))
return
}
// Validate pre-conditions if any.
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
@ -680,6 +680,12 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
var rs *HTTPRangeSpec
rangeHeader := r.Header.Get(xhttp.Range)
if rangeHeader != "" {
// Both 'Range' and 'partNumber' cannot be specified at the same time
if opts.PartNumber > 0 {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
return
}
if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
// Handle only errInvalidRange. Ignore other
// parse error and treat it as regular Get
@ -693,12 +699,6 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
}
}
// Both 'bytes' and 'partNumber' cannot be specified at the same time
if rs != nil && opts.PartNumber > 0 {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
return
}
// Set encryption response headers
if objectAPI.IsEncryptionSupported() {
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
@ -1683,6 +1683,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
size, _ := objInfo.GetActualSize()
defer globalReplicationStats.Update(bucket, size, replication.Replica, replication.StatusType(""), replication.ObjectReplicationType)
}
setPutObjHeaders(w, objInfo, false)
writeSuccessResponseHeadersOnly(w)
@ -1974,9 +1978,11 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
size, _ := objInfo.GetActualSize()
defer globalReplicationStats.Update(bucket, size, replication.Replica, replication.StatusType(""), replication.ObjectReplicationType)
}
}
untar(hreader, putObjectTar)
w.Header()[xhttp.ETag] = []string{`"` + hex.EncodeToString(hreader.MD5Current()) + `"`}
@ -3081,6 +3087,10 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
size, _ := objInfo.GetActualSize()
defer globalReplicationStats.Update(bucket, size, replication.Replica, replication.StatusType(""), replication.ObjectReplicationType)
}
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)

View File

@ -127,6 +127,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
}
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery))
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
req.Header.Set("Expect", "100-continue")
if length > 0 {
req.ContentLength = length
}

View File

@ -2427,7 +2427,7 @@ func (s *TestSuiteCommon) TestObjectMultipartListError(c *check) {
c.Assert(err, nil)
// Since max-keys parameter in the ListMultipart request set to invalid value of -2,
// its expected to fail with error message "InvalidArgument".
verifyError(c, response4, "InvalidArgument", "Argument max-parts must be an integer between 0 and 2147483647", http.StatusBadRequest)
verifyError(c, response4, "InvalidArgument", "Part number must be an integer between 1 and 10000, inclusive", http.StatusBadRequest)
}
// TestObjectValidMD5 - First uploads an object with a valid Content-Md5 header and verifies the status,

View File

@ -70,6 +70,7 @@ type StorageAPI interface {
CheckFile(ctx context.Context, volume string, path string) (err error)
Delete(ctx context.Context, volume string, path string, recursive bool) (err error)
VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error
StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error)
// Write all data, syncs the data to disk.
// Should be used for smaller payloads.

View File

@ -31,7 +31,6 @@ import (
"sync"
"time"
"github.com/minio/minio/cmd/http"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
@ -207,7 +206,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
pw.CloseWithError(cache.serializeTo(pw))
}()
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
pr.Close()
return cache, err
@ -247,7 +246,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
if err != nil {
return info, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err = msgp.Decode(respBody, &info); err != nil {
return info, err
}
@ -270,7 +269,7 @@ func (client *storageRESTClient) MakeVolBulk(ctx context.Context, volumes ...str
values := make(url.Values)
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -279,7 +278,7 @@ func (client *storageRESTClient) MakeVol(ctx context.Context, volume string) (er
values := make(url.Values)
values.Set(storageRESTVolume, volume)
respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -289,7 +288,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
if err != nil {
return
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
vinfos := VolsInfo(vols)
err = msgp.Decode(respBody, &vinfos)
return vinfos, err
@ -303,7 +302,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
if err != nil {
return
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
err = msgp.Decode(respBody, &vol)
return vol, err
}
@ -316,7 +315,7 @@ func (client *storageRESTClient) DeleteVol(ctx context.Context, volume string, f
values.Set(storageRESTForceDelete, "true")
}
respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -327,7 +326,7 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string,
values.Set(storageRESTFilePath, path)
reader := bytes.NewReader(buf)
respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -337,11 +336,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
values.Set(storageRESTFilePath, path)
values.Set(storageRESTLength, strconv.Itoa(int(size)))
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
if err != nil {
return err
}
_, err = waitForHTTPResponse(respBody)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -356,7 +351,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
}
respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -371,7 +366,7 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat
}
respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -387,7 +382,7 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
}
respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -397,7 +392,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b)))
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -407,7 +402,7 @@ func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, p
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -424,7 +419,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
}
respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -442,7 +437,7 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
}
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -473,7 +468,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
if err != nil {
return fi, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
dec := msgpNewReader(respBody)
defer readMsgpReaderPool.Put(dec)
@ -491,7 +486,7 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
if err != nil {
return nil, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return ioutil.ReadAll(respBody)
}
@ -527,7 +522,7 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
if err != nil {
return 0, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
n, err := io.ReadFull(respBody, buf)
return int64(n), err
}
@ -542,7 +537,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st
if err != nil {
return nil, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&entries)
return entries, err
}
@ -555,7 +550,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -579,7 +574,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
errs = make([]error, len(versions))
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
for i := range errs {
errs[i] = err
@ -618,7 +613,7 @@ func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcP
values.Set(storageRESTDstVolume, dstVolume)
values.Set(storageRESTDstPath, dstPath)
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -633,7 +628,7 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
}
respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
return err
}
@ -651,6 +646,23 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
return toStorageErr(verifyResp.Err)
}
func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodStatInfoFile, values, nil, -1)
if err != nil {
return stat, err
}
defer xhttp.DrainBody(respBody)
respReader, err := waitForHTTPResponse(respBody)
if err != nil {
return stat, err
}
err = stat.DecodeMsg(msgpNewReader(respReader))
return stat, err
}
// Close - marks the client as closed.
func (client *storageRESTClient) Close() error {
client.restClient.Close()

View File

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v31" // Added RenameData with fileInfo()
storageRESTVersion = "v32" // Inline bugfix needs all servers to be updated
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)
@ -51,6 +51,7 @@ const (
storageRESTMethodRenameFile = "/renamefile"
storageRESTMethodVerifyFile = "/verifyfile"
storageRESTMethodWalkDir = "/walkdir"
storageRESTMethodStatInfoFile = "/statfile"
)
const (

View File

@ -770,23 +770,6 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
}
}
// drainCloser can be used for wrapping an http response.
// It will drain the body before closing.
type drainCloser struct {
rc io.ReadCloser
}
// Read forwards the read operation.
func (f drainCloser) Read(p []byte) (n int, err error) {
return f.rc.Read(p)
}
// Close drains the body and closes the upstream.
func (f drainCloser) Close() error {
xhttp.DrainBody(f.rc)
return nil
}
// httpStreamResponse allows streaming a response, but still send an error.
type httpStreamResponse struct {
done chan error
@ -878,7 +861,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
case 0:
// 0 is unbuffered, copy the rest.
_, err := io.Copy(w, respBody)
respBody.Close()
if err == io.EOF {
return nil
}
@ -888,18 +870,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
if err != nil {
return err
}
respBody.Close()
return errors.New(string(errorText))
case 3:
// gob style is already deprecated, we can remove this when
// storage API version will be greater or equal to 23.
defer respBody.Close()
dec := gob.NewDecoder(respBody)
var err error
if de := dec.Decode(&err); de == nil {
return err
}
return errors.New("rpc error")
case 2:
// Block of data
var tmp [4]byte
@ -916,7 +887,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
case 32:
continue
default:
go xhttp.DrainBody(respBody)
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
}
}
@ -1023,6 +993,23 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
}
}
// StatInfoFile returns file stat info.
func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
done := keepHTTPResponseAlive(w)
si, err := s.storage.StatInfoFile(r.Context(), volume, filePath)
done(err)
if err != nil {
return
}
msgp.Encode(w, &si)
}
// registerStorageRPCRouter - register storage rpc router.
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) {
for _, ep := range endpointServerPools {
@ -1093,6 +1080,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTRecursive)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
}
}
}

View File

@ -32,12 +32,13 @@ func _() {
_ = x[storageMetricUpdateMetadata-21]
_ = x[storageMetricReadVersion-22]
_ = x[storageMetricReadAll-23]
_ = x[storageMetricLast-24]
_ = x[storageStatInfoFile-24]
_ = x[storageMetricLast-25]
}
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllLast"
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllstorageStatInfoFileLast"
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 237}
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 252, 256}
func (i storageMetric) String() string {
if i >= storageMetric(len(_storageMetric_index)-1) {

View File

@ -456,7 +456,7 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 15 * time.Second,
ExpectContinueTimeout: 15 * time.Second,
TLSClientConfig: tlsConfig,

View File

@ -57,6 +57,7 @@ const (
storageMetricUpdateMetadata
storageMetricReadVersion
storageMetricReadAll
storageStatInfoFile
// .... add more
@ -619,6 +620,22 @@ func storageTrace(s storageMetric, startTime time.Time, duration time.Duration,
}
}
func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
defer p.updateStorageMetrics(storageStatInfoFile, volume, path)()
select {
case <-ctx.Done():
return StatInfo{}, ctx.Err()
default:
}
if err = p.checkDiskStale(); err != nil {
return StatInfo{}, err
}
return p.storage.StatInfoFile(ctx, volume, path)
}
// Update storage metrics
func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...string) func() {
startTime := time.Now()

View File

@ -1078,6 +1078,11 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
if readData {
if len(fi.Data) > 0 || fi.Size == 0 {
if len(fi.Data) > 0 {
if _, ok := fi.Metadata[ReservedMetadataPrefixLower+"inline-data"]; !ok {
fi.Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
}
}
return fi, nil
}
@ -2018,6 +2023,8 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// Purge the destination path as we are not preserving anything
// versioned object was not requested.
oldDstDataPath = pathJoin(dstVolumeDir, dstPath, ofi.DataDir)
xlMeta.data.remove(nullVersionID)
xlMeta.data.remove(ofi.DataDir)
}
}
}
@ -2206,3 +2213,32 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
return nil
}
func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return stat, err
}
// Stat a volume entry.
if err = Access(volumeDir); err != nil {
if osIsNotExist(err) {
return stat, errVolumeNotFound
} else if isSysErrIO(err) {
return stat, errFaultyDisk
} else if osIsPermission(err) {
return stat, errVolumeAccessDenied
}
return stat, err
}
filePath := pathJoin(volumeDir, path)
if err := checkPathLength(filePath); err != nil {
return stat, err
}
st, _ := Lstat(filePath)
if st == nil {
return stat, errPathNotFound
}
return StatInfo{ModTime: st.ModTime(), Size: st.Size()}, nil
}

View File

@ -0,0 +1,91 @@
package main
import (
"bufio"
"encoding/binary"
"encoding/hex"
"flag"
"fmt"
"hash/crc32"
"io"
"log"
"os"
"strings"
"github.com/secure-io/sio-go"
)
var (
key = flag.String("key", "", "decryption string")
//js = flag.Bool("json", false, "expect json input")
)
func main() {
flag.Parse()
args := flag.Args()
switch len(flag.Args()) {
case 0:
// Read from stdin, write to stdout.
decrypt(*key, os.Stdin, os.Stdout)
return
case 1:
r, err := os.Open(args[0])
fatalErr(err)
defer r.Close()
dstName := strings.TrimSuffix(args[0], ".enc") + ".zip"
w, err := os.Create(dstName)
fatalErr(err)
defer w.Close()
if len(*key) == 0 {
reader := bufio.NewReader(os.Stdin)
fmt.Print("Enter Decryption Key: ")
text, _ := reader.ReadString('\n')
// convert CRLF to LF
*key = strings.Replace(text, "\n", "", -1)
}
decrypt(*key, r, w)
fmt.Println("Output decrypted to", dstName)
return
default:
fatalIf(true, "Only 1 file can be decrypted")
os.Exit(1)
}
}
func decrypt(keyHex string, r io.Reader, w io.Writer) {
keyHex = strings.TrimSpace(keyHex)
fatalIf(len(keyHex) != 72, "Unexpected key length: %d, want 72", len(keyHex))
id, err := hex.DecodeString(keyHex[:8])
fatalErr(err)
key, err := hex.DecodeString(keyHex[8:])
fatalErr(err)
// Verify that CRC is ok.
want := binary.LittleEndian.Uint32(id)
got := crc32.ChecksumIEEE(key)
fatalIf(want != got, "Invalid key checksum, want %x, got %x", want, got)
stream, err := sio.AES_256_GCM.Stream(key)
fatalErr(err)
// Zero nonce, we only use each key once, and 32 bytes is plenty.
nonce := make([]byte, stream.NonceSize())
encr := stream.DecryptReader(r, nonce, nil)
_, err = io.Copy(w, encr)
fatalErr(err)
}
func fatalErr(err error) {
if err == nil {
return
}
log.Fatalln(err)
}
func fatalIf(b bool, msg string, v ...interface{}) {
if !b {
return
}
log.Fatalf(msg, v...)
}

3
go.mod
View File

@ -47,7 +47,7 @@ require (
github.com/minio/cli v1.22.0
github.com/minio/highwayhash v1.0.2
github.com/minio/md5-simd v1.1.1 // indirect
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78
github.com/minio/minio-go/v7 v7.0.14
github.com/minio/selfupdate v0.3.1
github.com/minio/sha256-simd v1.0.0
github.com/minio/simdjson-go v0.2.1
@ -77,7 +77,6 @@ require (
github.com/tidwall/gjson v1.6.8
github.com/tidwall/sjson v1.0.4
github.com/tinylib/msgp v1.1.3
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 // indirect
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
github.com/willf/bitset v1.1.11 // indirect
github.com/willf/bloom v2.0.3+incompatible

13
go.sum
View File

@ -365,8 +365,6 @@ github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magefile/mage v1.10.0 h1:3HiXzCUY12kh9bIuyXShaVe529fJfyqoVM42o/uom2g=
github.com/magefile/mage v1.10.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
@ -398,8 +396,8 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/md5-simd v1.1.1 h1:9ojcLbuZ4gXbB2sX53MKn8JUZ0sB/2wfwsEcRw+I08U=
github.com/minio/md5-simd v1.1.1/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78 h1:v7OMbUnWkyRlO2MZ5AuYioELhwXF/BgZEznrQ1drBEM=
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78/go.mod h1:mTh2uJuAbEqdhMVl6CMIIZLUeiMiWtJR4JB8/5g2skw=
github.com/minio/minio-go/v7 v7.0.14 h1:T7cw8P586gVwEEd0y21kTYtloD576XZgP62N8pE130s=
github.com/minio/minio-go/v7 v7.0.14/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs=
github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs=
github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
@ -556,8 +554,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.8.0 h1:nfhvjKcUMhBMVqbKHJlk5RPrrfYr/NMo3692g0dwfWU=
github.com/sirupsen/logrus v1.8.0/go.mod h1:4GuYW9TZmE769R5STWrRakJc4UqQ3+QQ95fyz7ENv1A=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
@ -598,8 +596,6 @@ github.com/tinylib/msgp v1.1.3 h1:3giwAkmtaEDLSV0MdO1lDLuPgklgPzmk8H9+So2BVfA=
github.com/tinylib/msgp v1.1.3/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
@ -657,6 +653,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

View File

@ -53,6 +53,8 @@ const (
HealthInfoAdminAction = "admin:OBDInfo"
// BandwidthMonitorAction - allow monitoring bandwidth usage
BandwidthMonitorAction = "admin:BandwidthMonitor"
// InspectDataAction - allows downloading raw files from backend
InspectDataAction = "admin:InspectData"
// ServerUpdateAdminAction - allow MinIO binary update
ServerUpdateAdminAction = "admin:ServerUpdate"

View File

@ -526,6 +526,17 @@ OuterLoop:
}
outputQueue[len(outputQueue)-1] = outputRecord
if s3Select.statement.LimitReached() {
if !sendRecord() {
break
}
if err = writer.Finish(s3Select.getProgress()); err != nil {
// FIXME: log this error.
err = nil
}
return
}
if len(outputQueue) < cap(outputQueue) {
continue
}

View File

@ -403,6 +403,74 @@ func TestJSONQueries(t *testing.T) {
query: `SELECT 3.0 / 2, 5 / 2.0 FROM S3Object LIMIT 1`,
wantResult: `{"_1":1.5,"_2":2.5}`,
},
{
name: "limit-1",
query: `SELECT * FROM S3Object[*].elements[*] LIMIT 1`,
requestXML: []byte(`
<?xml version="1.0" encoding="UTF-8"?>
<SelectObjectContentRequest>
<Expression>select * from s3object[*].elements[*] s where s.element_type = '__elem__merfu'</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<JSON>
<Type>DOCUMENT</Type>
</JSON>
</InputSerialization>
<OutputSerialization>
<JSON>
</JSON>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectObjectContentRequest>`),
wantResult: `{"element_type":"__elem__merfu","element_id":"d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb","attributes":{"__attr__image_dpi":300,"__attr__image_size":[2550,3299],"__attr__image_index":2,"__attr__image_format":"JPEG","__attr__file_extension":"jpg","__attr__data":null}}`,
withJSON: `
{
"name": "small_pdf1.pdf",
"lume_id": "9507193e-572d-4f95-bcf1-e9226d96be65",
"elements": [
{
"element_type": "__elem__image",
"element_id": "859d09c4-7cf1-4a37-9674-3a7de8b56abc",
"attributes": {
"__attr__image_dpi": 300,
"__attr__image_size": [
2550,
3299
],
"__attr__image_index": 1,
"__attr__image_format": "JPEG",
"__attr__file_extension": "jpg",
"__attr__data": null
}
},
{
"element_type": "__elem__merfu",
"element_id": "d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb",
"attributes": {
"__attr__image_dpi": 300,
"__attr__image_size": [
2550,
3299
],
"__attr__image_index": 2,
"__attr__image_format": "JPEG",
"__attr__file_extension": "jpg",
"__attr__data": null
}
}
],
"data": "asdascasdc1234e123erdasdas"
}`,
},
{
name: "limit-2",
query: `select * from s3object[*].person[*] limit 1`,
wantResult: `{"Id":1,"Name":"Anshu","Address":"Templestowe","Car":"Jeep"}`,
withJSON: `{ "person": [ { "Id": 1, "Name": "Anshu", "Address": "Templestowe", "Car": "Jeep" }, { "Id": 2, "Name": "Ben Mostafa", "Address": "Las Vegas", "Car": "Mustang" }, { "Id": 3, "Name": "Rohan Wood", "Address": "Wooddon", "Car": "VW" } ] }`,
},
}
defRequest := `<?xml version="1.0" encoding="UTF-8"?>

View File

@ -331,6 +331,7 @@ func TestFromClauseJSONPath(t *testing.T) {
"select * from s3object[*].books[*].name as s",
"select * from s3object s where name > 2",
"select * from s3object[*].name as s where name > 2",
"select * from s3object[*].books[*] limit 1",
}
for i, tc := range cases {
err := p.ParseString(tc, &s)

View File

@ -301,9 +301,8 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
// .. WHERE ..`
// Update count of records output.
if e.limitValue > -1 {
e.outputCount++
}
e.outputCount++
return input.Clone(output), nil
}
@ -327,9 +326,7 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
}
// Update count of records output.
if e.limitValue > -1 {
e.outputCount++
}
e.outputCount++
return output, nil
}