Compare commits

...

26 Commits

Author SHA1 Message Date
Poorna Krishnamoorthy e46577116f
Avoid replication if remote target is offline (#13157)
Co-authored-by: Poorna Krishnamoorthy <poorna@minio.io>
2021-09-07 09:30:28 -07:00
Harshavardhana de8d75b788 fix: ignore delete-marker healing during headObject()/disksWithAllParts 2021-07-29 11:45:42 -07:00
Klaus Post f3cd606976 Grab read lock while reading usage cache (#12111)
Signed-off-by: Klaus Post <klauspost@gmail.com>
2021-05-11 10:26:42 -07:00
Klaus Post 14105a1689 odirectReader: handle EOF correctly (#11998)
EOF may be sent along with data so queue it up and 
return it when the buffer is empty.

Also, when reading data without direct io don't add a buffer 
that only results in extra memcopy.
2021-05-11 10:26:01 -07:00
Harshavardhana 28a9409da7 fix: remove auto-close GetObjectReader (#12009)
locks can get relinquished when Read() sees io.EOF
leading to prematurely closing of the readers

concurrent writes on the same object can have
undesired consequences here when these locks
are relinquished.
2021-05-11 10:25:45 -07:00
Klaus Post 2965348694 Alloc less for metacache decompression (#12134)
Network streams are limited to 16K blocks. Don't alloc more upfront.

Signed-off-by: Klaus Post <klauspost@gmail.com>
2021-05-11 10:18:44 -07:00
Anis Elleuch 857ea511f1 scanner: Shuffle disks to scan (#12036)
Ensure random association between disk and bucket in each crawling
iteration to ensure that ILM applies correctly to objects not present in
all disks.
2021-05-10 10:26:07 -07:00
Harshavardhana 3f74303d1c fix: optimize ListBuckets for anonymous users (#12182)
anonymous users are never allowed to listBuckets(),
we do not need to further validate the policy, we can
simply reject if credentials are empty.
2021-05-10 10:26:05 -07:00
Harshavardhana afa90189ae fix: do not heal when disks are down (#12186)
HeadObject() was erroneously attempting
a heal when disks are down, avoid it.
2021-05-10 10:22:21 -07:00
Harshavardhana e3f2a260aa add missing responseBody drain (#12147)
Signed-off-by: Harshavardhana <harsha@minio.io>
2021-04-29 03:30:55 -07:00
Harshavardhana 82780ec10e remove CreateFile with streaming response
Signed-off-by: Harshavardhana <harsha@minio.io>
2021-04-29 03:28:43 -07:00
Klaus Post b57735ec53 Fix listPathRaw/WalkDir cancelation (#11905)
In #11888 we observe a lot of running, WalkDir calls.

There doesn't appear to be any listerners for these calls, so they should be aborted.

Ensure that WalkDir aborts when upstream cancels the request.

Fixes #11888
2021-04-29 03:13:41 -07:00
Poorna Krishnamoorthy bfab990c33 Improve error message from SetRemoteTargetHandler (#11909) 2021-04-06 12:42:30 -07:00
Harshavardhana 94018588fe unmarshal both LegalHold and ObjectLockLegalHold XML types (#11921)
Because of silly AWS S3 behavior we to handle both types.

fixes #11920
2021-04-06 12:41:56 -07:00
Anis Elleuch 8b76ba8d5d crawling: Apply lifecycle then decide healing action (#11563)
It is inefficient to decide to heal an object before checking its
lifecycle for expiration or transition. This commit will just reverse
the order of action: evaluate lifecycle and heal only if asked and
lifecycle resulted a NoneAction.
2021-04-06 12:41:51 -07:00
Harshavardhana 7eb7f65e48 add policy conditions support for signatureVersion and authType (#11947)
https://docs.aws.amazon.com/AmazonS3/latest/API/bucket-policy-s3-sigv4-conditions.html

fixes #11944
2021-04-06 12:41:31 -07:00
Harshavardhana c608c0688a fix: properly close leaking bandwidth monitor channel (#11967)
This PR fixes

- close leaking bandwidth report channel leakage
- remove the closer requirement for bandwidth monitor
  instead if Read() fails remember the error and return
  error for all subsequent reads.
- use locking for usage-cache.bin updates, with inline
  data we cannot afford to have concurrent writes to
  usage-cache.bin corrupting xl.meta
2021-04-06 12:40:42 -07:00
Aditya Manthramurthy 41a9d1d778 Fix S3Select SQL column reference handling (#11957)
This change fixes handling of these types of queries:

- Double quoted column names with special characters:
    SELECT "column.name" FROM s3object
- Double quoted column names with reserved keywords:
    SELECT "CAST" FROM s3object
- Table name as prefix for column names:
    SELECT S3Object."CAST" FROM s3object
2021-04-06 12:40:28 -07:00
Klaus Post e21e80841e Fix data race when connecting disks (#11983)
Multiple disks from the same set would be writing concurrently.

```
WARNING: DATA RACE
Write at 0x00c002100ce0 by goroutine 166:
  github.com/minio/minio/cmd.(*erasureSets).connectDisks.func1()
      d:/minio/minio/cmd/erasure-sets.go:254 +0x82f

Previous write at 0x00c002100ce0 by goroutine 129:
  github.com/minio/minio/cmd.(*erasureSets).connectDisks.func1()
      d:/minio/minio/cmd/erasure-sets.go:254 +0x82f

Goroutine 166 (running) created at:
  github.com/minio/minio/cmd.(*erasureSets).connectDisks()
      d:/minio/minio/cmd/erasure-sets.go:210 +0x324
  github.com/minio/minio/cmd.(*erasureSets).monitorAndConnectEndpoints()
      d:/minio/minio/cmd/erasure-sets.go:288 +0x244

Goroutine 129 (finished) created at:
  github.com/minio/minio/cmd.(*erasureSets).connectDisks()
      d:/minio/minio/cmd/erasure-sets.go:210 +0x324
  github.com/minio/minio/cmd.(*erasureSets).monitorAndConnectEndpoints()
      d:/minio/minio/cmd/erasure-sets.go:288 +0x244
```
2021-04-06 12:39:59 -07:00
Klaus Post 98c792bbeb Fix disk info race (#11984)
Protect updated members in xlStorage.

```
WARNING: DATA RACE
Write at 0x00c004b4ee78 by goroutine 1491:
  github.com/minio/minio/cmd.(*xlStorage).GetDiskID()
      d:/minio/minio/cmd/xl-storage.go:590 +0x1078
  github.com/minio/minio/cmd.(*xlStorageDiskIDCheck).checkDiskStale()
      d:/minio/minio/cmd/xl-storage-disk-id-check.go:195 +0x84
  github.com/minio/minio/cmd.(*xlStorageDiskIDCheck).StatVol()
      d:/minio/minio/cmd/xl-storage-disk-id-check.go:284 +0x16a
  github.com/minio/minio/cmd.erasureObjects.getBucketInfo.func1()
      d:/minio/minio/cmd/erasure-bucket.go:100 +0x1a5
  github.com/minio/minio/pkg/sync/errgroup.(*Group).Go.func1()
      d:/minio/minio/pkg/sync/errgroup/errgroup.go:122 +0xd7

Previous read at 0x00c004b4ee78 by goroutine 1087:
  github.com/minio/minio/cmd.(*xlStorage).CheckFile.func1()
      d:/minio/minio/cmd/xl-storage.go:1699 +0x384
  github.com/minio/minio/cmd.(*xlStorage).CheckFile()
      d:/minio/minio/cmd/xl-storage.go:1726 +0x13c
  github.com/minio/minio/cmd.(*xlStorageDiskIDCheck).CheckFile()
      d:/minio/minio/cmd/xl-storage-disk-id-check.go:446 +0x23b
  github.com/minio/minio/cmd.erasureObjects.parentDirIsObject.func1()
      d:/minio/minio/cmd/erasure-common.go:173 +0x194
  github.com/minio/minio/pkg/sync/errgroup.(*Group).Go.func1()
      d:/minio/minio/pkg/sync/errgroup/errgroup.go:122 +0xd7
```
2021-04-06 12:39:57 -07:00
Klaus Post f687ba53bc Fix Access Key requests (#11979)
Fix accessing claims when auth error is unchecked.

Only replaced when unchecked and when clearly without side effects.

Fixes #11959
2021-04-06 11:03:55 -07:00
Harshavardhana e3da59c923 fix possible crash in bucket bandwidth monitor (#11986) 2021-04-06 11:03:41 -07:00
Harshavardhana 781b9b051c fix: service accounts policy enforcement regression (#11910)
service accounts were not inheriting parent policies
anymore due to refactors in the PolicyDBGet() from
the latest release, fix this behavior properly.
2021-04-06 08:58:05 -07:00
Harshavardhana 438becfde8 fix: delete/delete marker replication versions consistent (#11932)
replication didn't work as expected when deletion of
delete markers was requested in DeleteMultipleObjects
API, this is due to incorrect lookup elements being
used to look for delete markers.
2021-04-06 08:57:36 -07:00
Harshavardhana 16ef338649 fix: notify parent user in notification events (#11934)
fixes #11885
2021-04-06 08:55:37 -07:00
Harshavardhana 3242847ec0 avoid network read errors crashing CreateFile call (#11939)
Thanks to @dvaldivia for reproducing this
2021-04-06 08:55:30 -07:00
47 changed files with 757 additions and 378 deletions

View File

@ -172,7 +172,12 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
}
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, update); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
switch err.(type) {
case BucketRemoteConnectionErr:
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationRemoteConnectionError, err), r.URL)
default:
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
}
return
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)

View File

@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
@ -1470,30 +1471,33 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http
return
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
setEventStreamHeaders(w)
reportCh := make(chan bandwidth.Report, 1)
reportCh := make(chan bandwidth.Report)
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
bucketsRequestedString := r.URL.Query().Get("buckets")
bucketsRequested := strings.Split(bucketsRequestedString, ",")
go func() {
defer close(reportCh)
for {
reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...)
select {
case <-ctx.Done():
return
default:
time.Sleep(2 * time.Second)
case reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...):
time.Sleep(time.Duration(rnd.Float64() * float64(2*time.Second)))
}
}
}()
for {
select {
case report := <-reportCh:
enc := json.NewEncoder(w)
err := enc.Encode(report)
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
case report, ok := <-reportCh:
if !ok {
return
}
if err := json.NewEncoder(w).Encode(report); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
w.(http.Flusher).Flush()

View File

@ -298,6 +298,12 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
return
}
// Anonymous users, should be rejected.
if cred.AccessKey == "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrAccessDenied), r.URL, guessIsBrowserReq(r))
return
}
// If etcd, dns federation configured list buckets from etcd.
var bucketsInfo []BucketInfo
if globalDNSConfig != nil && globalBucketFederation {
@ -496,7 +502,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
object.PurgeTransitioned = goi.TransitionStatus
}
if replicateDeletes {
delMarker, replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectName: object.ObjectName,
VersionID: object.VersionID,
}, goi, gerr)
@ -511,9 +517,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
if object.VersionID != "" {
object.VersionPurgeStatus = Pending
if delMarker {
object.DeleteMarkerVersionID = object.VersionID
}
} else {
object.DeleteMarkerReplicationStatus = string(replication.Pending)
}
@ -557,13 +560,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
})
deletedObjects := make([]DeletedObject, len(deleteObjects.Objects))
for i := range errs {
dindex := objectsToDelete[ObjectToDelete{
// DeleteMarkerVersionID is not used specifically to avoid
// lookup errors, since DeleteMarkerVersionID is only
// created during DeleteMarker creation when client didn't
// specify a versionID.
objToDel := ObjectToDelete{
ObjectName: dObjects[i].ObjectName,
VersionID: dObjects[i].VersionID,
VersionPurgeStatus: dObjects[i].VersionPurgeStatus,
DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus,
PurgeTransitioned: dObjects[i].PurgeTransitioned,
}]
}
dindex := objectsToDelete[objToDel]
if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) {
if replicateDeletes {
dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus
@ -619,12 +627,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
eventName := event.ObjectRemovedDelete
objInfo := ObjectInfo{
Name: dobj.ObjectName,
VersionID: dobj.VersionID,
Name: dobj.ObjectName,
VersionID: dobj.VersionID,
DeleteMarker: dobj.DeleteMarker,
}
if dobj.DeleteMarker {
objInfo.DeleteMarker = dobj.DeleteMarker
if objInfo.DeleteMarker {
objInfo.VersionID = dobj.DeleteMarkerVersionID
eventName = event.ObjectRemovedDeleteMarkerCreated
}

View File

@ -530,7 +530,7 @@ type SelectParameters struct {
// IsEmpty returns true if no select parameters set
func (sp *SelectParameters) IsEmpty() bool {
return sp == nil || sp.S3Select == s3select.S3Select{}
return sp == nil
}
var (

View File

@ -83,17 +83,38 @@ func getConditionValues(r *http.Request, lc string, username string, claims map[
}
}
authType := getRequestAuthType(r)
var signatureVersion string
switch authType {
case authTypeSignedV2, authTypePresignedV2:
signatureVersion = signV2Algorithm
case authTypeSigned, authTypePresigned, authTypeStreamingSigned, authTypePostPolicy:
signatureVersion = signV4Algorithm
}
var authtype string
switch authType {
case authTypePresignedV2, authTypePresigned:
authtype = "REST-QUERY-STRING"
case authTypeSignedV2, authTypeSigned, authTypeStreamingSigned:
authtype = "REST-HEADER"
case authTypePostPolicy:
authtype = "POST"
}
args := map[string][]string{
"CurrentTime": {currTime.Format(time.RFC3339)},
"EpochTime": {strconv.FormatInt(currTime.Unix(), 10)},
"SecureTransport": {strconv.FormatBool(r.TLS != nil)},
"SourceIp": {handlers.GetSourceIP(r)},
"UserAgent": {r.UserAgent()},
"Referer": {r.Referer()},
"principaltype": {principalType},
"userid": {username},
"username": {username},
"versionid": {vid},
"CurrentTime": {currTime.Format(time.RFC3339)},
"EpochTime": {strconv.FormatInt(currTime.Unix(), 10)},
"SecureTransport": {strconv.FormatBool(r.TLS != nil)},
"SourceIp": {handlers.GetSourceIP(r)},
"UserAgent": {r.UserAgent()},
"Referer": {r.Referer()},
"principaltype": {principalType},
"userid": {username},
"username": {username},
"versionid": {vid},
"signatureversion": {signatureVersion},
"authType": {authtype},
}
if lc != "" {

View File

@ -175,10 +175,10 @@ func isStandardHeader(matchHeaderKey string) bool {
}
// returns whether object version is a deletemarker and if object qualifies for replication
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) {
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (replicate, sync bool) {
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
return false, false, sync
return false, sync
}
opts := replication.ObjectOpts{
Name: dobj.ObjectName,
@ -198,19 +198,19 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
validReplStatus = true
}
if oi.DeleteMarker && (validReplStatus || replicate) {
return oi.DeleteMarker, true, sync
return true, sync
}
// can be the case that other cluster is down and duplicate `mc rm --vid`
// is issued - this still needs to be replicated back to the other target
return oi.DeleteMarker, oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
return oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
if tgt == nil {
return oi.DeleteMarker, false, false
return false, false
}
return oi.DeleteMarker, replicate, tgt.replicateSync
return replicate, tgt.replicateSync
}
// replicate deletes to the designated replication target if replication configuration
@ -263,6 +263,21 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
})
return
}
if tgt.isOffline() {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, rcfg.RoleArn))
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
return
}
rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
VersionID: versionID,
@ -594,6 +609,17 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
})
return
}
if tgt.isOffline() {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, cfg.RoleArn))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
})
@ -697,19 +723,25 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
if totalNodesCount == 0 {
totalNodesCount = 1 // For standalone erasure coding
}
b := target.BandwidthLimit / int64(totalNodesCount)
var headerSize int
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
// r takes over closing gr.
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
opts := &bandwidth.MonitorReaderOptions{
Bucket: objInfo.Bucket,
Object: objInfo.Name,
HeaderSize: headerSize,
BandwidthBytesPerSec: target.BandwidthLimit / int64(totalNodesCount),
ClusterBandwidth: target.BandwidthLimit,
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, gr, opts)
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
defer r.Close()
}
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()

View File

@ -100,7 +100,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
if minio.ToErrorResponse(err).Code == "NoSuchBucket" {
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
}
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket}
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
}
if tgt.Type == madmin.ReplicationService {
if !globalIsErasure {
@ -111,7 +111,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
}
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
if err != nil {
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket}
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
}
if vcfg.Status != string(versioning.Enabled) {
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}
@ -124,7 +124,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
if minio.ToErrorResponse(err).Code == "NoSuchBucket" {
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
}
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket}
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
}
if vcfg.Status != string(versioning.Enabled) {
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}

View File

@ -428,3 +428,13 @@ func getTLSConfig() (x509Certs []*x509.Certificate, manager *certs.Manager, secu
secureConn = true
return x509Certs, manager, secureConn, nil
}
// contextCanceled returns whether a context is canceled.
func contextCanceled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

View File

@ -797,42 +797,39 @@ type actionMeta struct {
var applyActionsLogPrefix = color.Green("applyActions:")
// applyActions will apply lifecycle checks on to a scanned item.
// The resulting size on disk will always be returned.
// The metadata will be compared to consensus on the object layer before any changes are applied.
// If no metadata is supplied, -1 is returned if no action is taken.
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
if i.debug {
if meta.oi.VersionID != "" {
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), meta.oi.VersionID)
} else {
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
}
}
healOpts := madmin.HealOpts{Remove: healDeleteDangling}
if meta.bitRotScan {
healOpts.ScanMode = madmin.HealDeepScan
}
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts)
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0
}
if err != nil && !errors.Is(err, NotImplemented{}) {
logger.LogIf(ctx, err)
return 0
}
return res.ObjectSize
}
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta actionMeta) (applied bool, size int64) {
size, err := meta.oi.GetActualSize()
if i.debug {
logger.LogIf(ctx, err)
}
if i.heal {
if i.debug {
if meta.oi.VersionID != "" {
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), meta.oi.VersionID)
} else {
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
}
}
healOpts := madmin.HealOpts{Remove: healDeleteDangling}
if meta.bitRotScan {
healOpts.ScanMode = madmin.HealDeepScan
}
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts)
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0
}
if err != nil && !errors.Is(err, NotImplemented{}) {
logger.LogIf(ctx, err)
return 0
}
size = res.ObjectSize
}
if i.lifeCycle == nil {
if i.debug {
console.Debugf(applyActionsLogPrefix+" no lifecycle rules to apply: %q\n", i.objectPath())
}
return size
return false, size
}
versionID := meta.oi.VersionID
@ -866,7 +863,7 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta acti
if i.debug {
console.Debugf(applyActionsLogPrefix+" object not expirable: %q\n", i.objectPath())
}
return size
return false, size
}
obj, err := o.GetObjectInfo(ctx, i.bucket, i.objectPath(), ObjectOptions{
@ -878,19 +875,18 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta acti
if !obj.DeleteMarker { // if this is not a delete marker log and return
// Do nothing - heal in the future.
logger.LogIf(ctx, err)
return size
return false, size
}
case ObjectNotFound, VersionNotFound:
// object not found or version not found return 0
return 0
return false, 0
default:
// All other errors proceed.
logger.LogIf(ctx, err)
return size
return false, size
}
}
var applied bool
action = evalActionFromLifecycle(ctx, *i.lifeCycle, obj, i.debug)
if action != lifecycle.NoneAction {
applied = applyLifecycleAction(ctx, action, o, obj)
@ -899,9 +895,26 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta acti
if applied {
switch action {
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
default: // for all lifecycle actions that remove data
return 0
return true, size
}
// For all other lifecycle actions that remove data
return true, 0
}
return false, size
}
// applyActions will apply lifecycle checks on to a scanned item.
// The resulting size on disk will always be returned.
// The metadata will be compared to consensus on the object layer before any changes are applied.
// If no metadata is supplied, -1 is returned if no action is taken.
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) int64 {
applied, size := i.applyLifecycle(ctx, o, meta)
// For instance, an applied lifecycle means we remove/transitioned an object
// from the current deployment, which means we don't have to call healing
// routine even if we are asked to do via heal flag.
if !applied && i.heal {
size = i.applyHealing(ctx, o, meta)
}
return size
}

View File

@ -485,7 +485,7 @@ type objectIO interface {
// Only backend errors are returned as errors.
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, noLock, ObjectOptions{})
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{})
if err != nil {
switch err.(type) {
case ObjectNotFound:
@ -522,7 +522,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
dataUsageBucket,
name,
NewPutObjReader(r),
ObjectOptions{NoLock: true})
ObjectOptions{})
if isErrBucketNotFound(err) {
return nil
}

View File

@ -170,15 +170,17 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
// consider the offline disks as consistent.
continue
}
if len(meta.Erasure.Distribution) != len(onlineDisks) {
// Erasure distribution seems to have lesser
// number of items than number of online disks.
inconsistent++
continue
}
if meta.Erasure.Distribution[i] != meta.Erasure.Index {
// Mismatch indexes with distribution order
inconsistent++
if !meta.Deleted {
if len(meta.Erasure.Distribution) != len(onlineDisks) {
// Erasure distribution seems to have lesser
// number of items than number of online disks.
inconsistent++
continue
}
if meta.Erasure.Distribution[i] != meta.Erasure.Index {
// Mismatch indexes with distribution order
inconsistent++
}
}
}
@ -204,20 +206,22 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
continue
}
if len(meta.Erasure.Distribution) != len(onlineDisks) {
// Erasure distribution is not the same as onlineDisks
// attempt a fix if possible, assuming other entries
// might have the right erasure distribution.
partsMetadata[i] = FileInfo{}
dataErrs[i] = errFileCorrupt
continue
}
if !meta.Deleted {
if len(meta.Erasure.Distribution) != len(onlineDisks) {
// Erasure distribution is not the same as onlineDisks
// attempt a fix if possible, assuming other entries
// might have the right erasure distribution.
partsMetadata[i] = FileInfo{}
dataErrs[i] = errFileCorrupt
continue
}
// Since erasure.Distribution is trustable we can fix the mismatching erasure.Index
if meta.Erasure.Distribution[i] != meta.Erasure.Index {
partsMetadata[i] = FileInfo{}
dataErrs[i] = errFileCorrupt
continue
// Since erasure.Distribution is trustable we can fix the mismatching erasure.Index
if meta.Erasure.Distribution[i] != meta.Erasure.Index {
partsMetadata[i] = FileInfo{}
dataErrs[i] = errFileCorrupt
continue
}
}
}
@ -226,9 +230,13 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
// disk has a valid xl.meta but may not have all the
// parts. This is considered an outdated disk, since
// it needs healing too.
dataErrs[i] = onlineDisk.VerifyFile(ctx, bucket, object, partsMetadata[i])
if !partsMetadata[i].Deleted {
dataErrs[i] = onlineDisk.VerifyFile(ctx, bucket, object, partsMetadata[i])
}
case madmin.HealNormalScan:
dataErrs[i] = onlineDisk.CheckParts(ctx, bucket, object, partsMetadata[i])
if !partsMetadata[i].Deleted {
dataErrs[i] = onlineDisk.CheckParts(ctx, bucket, object, partsMetadata[i])
}
}
if dataErrs[i] == nil {

View File

@ -494,6 +494,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
closeBitrotWriters(writers)
if err != nil {
logger.LogIf(ctx, err)
return pi, toObjectErr(err, bucket, object)
}
@ -528,6 +529,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
if err != nil {
logger.LogIf(ctx, err)
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
@ -544,7 +546,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Pick one from the first valid metadata.
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return pi, err
logger.LogIf(ctx, err)
return pi, toObjectErr(err, bucket, object)
}
// Once part is successfully committed, proceed with updating erasure metadata.
@ -571,6 +574,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Writes update `xl.meta` format for each disk.
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
logger.LogIf(ctx, err)
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}

View File

@ -447,6 +447,12 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
return fi, nil, nil, err
}
// if one of the disk is offline, return right here no need
// to attempt a heal on the object.
if countErrs(errs, errDiskNotFound) > 0 {
return fi, metaArr, onlineDisks, nil
}
var missingBlocks int
for i, err := range errs {
if err != nil && errors.Is(err, errFileNotFound) {
@ -460,7 +466,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
// if missing metadata can be reconstructed, attempt to reconstruct.
if missingBlocks > 0 && missingBlocks < readQuorum {
if !fi.Deleted && missingBlocks > 0 && missingBlocks < readQuorum {
if _, healing := er.getOnlineDisksWithHealing(); !healing {
go healObject(bucket, object, fi.VersionID, madmin.HealNormalScan)
}
@ -801,11 +807,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
// Write unique `xl.meta` for each disk.
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Rename the successfully written temporary object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}

View File

@ -250,8 +250,8 @@ func (s *erasureSets) connectDisks() {
}
disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex)
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
s.erasureDisksMu.Unlock()
setsJustConnected[setIndex] = true
s.erasureDisksMu.Unlock()
}(endpoint)
}

View File

@ -435,6 +435,11 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
}
}()
// Shuffle disks to ensure a total randomness of bucket/disk association to ensure
// that objects that are not present in all disks are accounted and ILM applied.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] })
// Start one scanner per disk
var wg sync.WaitGroup
wg.Add(len(disks))

View File

@ -233,10 +233,15 @@ func extractReqParams(r *http.Request) map[string]string {
region := globalServerRegion
cred := getReqAccessCred(r, region)
principalID := cred.AccessKey
if cred.ParentUser != "" {
principalID = cred.ParentUser
}
// Success.
m := map[string]string{
"region": region,
"accessKey": cred.AccessKey,
"principalId": principalID,
"sourceIPAddress": handlers.GetSourceIP(r),
// Add more fields here.
}

View File

@ -1704,7 +1704,7 @@ func (sys *IAMSys) PolicyDBGet(name string, isGroup bool, groups ...string) ([]s
// information in IAM (i.e sys.iam*Map) - this info is stored only in the STS
// generated credentials. Thus we skip looking up group memberships, user map,
// and group map and check the appropriate policy maps directly.
func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
func (sys *IAMSys) policyDBGet(name string, isGroup bool) (policies []string, err error) {
if isGroup {
if sys.usersSysType == MinIOUsersSysType {
g, ok := sys.iamGroupsMap[name]
@ -1719,8 +1719,7 @@ func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
}
}
mp := sys.iamGroupPolicyMap[name]
return mp.toSlice(), nil
return sys.iamGroupPolicyMap[name].toSlice(), nil
}
var u auth.Credentials
@ -1738,8 +1737,6 @@ func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
}
}
var policies []string
mp, ok := sys.iamUserPolicyMap[name]
if !ok {
if u.ParentUser != "" {
@ -1757,8 +1754,7 @@ func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
continue
}
p := sys.iamGroupPolicyMap[group]
policies = append(policies, p.toSlice()...)
policies = append(policies, sys.iamGroupPolicyMap[group].toSlice()...)
}
return policies, nil
@ -1788,8 +1784,9 @@ func (sys *IAMSys) IsAllowedServiceAccount(args iampolicy.Args, parent string) b
}
// Check policy for this service account.
svcPolicies, err := sys.PolicyDBGet(args.AccountName, false)
svcPolicies, err := sys.PolicyDBGet(parent, false, args.Groups...)
if err != nil {
logger.LogIf(GlobalContext, err)
return false
}
@ -2072,7 +2069,7 @@ func (sys *IAMSys) IsAllowed(args iampolicy.Args) bool {
}
// Continue with the assumption of a regular user
policies, err := sys.PolicyDBGet(args.AccountName, false)
policies, err := sys.PolicyDBGet(args.AccountName, false, args.Groups...)
if err != nil {
return false
}

View File

@ -81,6 +81,15 @@ type MapClaims struct {
jwtgo.MapClaims
}
// GetAccessKey will return the access key.
// If nil an empty string will be returned.
func (c *MapClaims) GetAccessKey() string {
if c == nil {
return ""
}
return c.AccessKey
}
// NewStandardClaims - initializes standard claims
func NewStandardClaims() *StandardClaims {
return &StandardClaims{}

View File

@ -811,6 +811,9 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
if len(disks) == 0 {
return fmt.Errorf("listPathRaw: 0 drives provided")
}
// Cancel upstream if we finish before we expect.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
askDisks := len(disks)
readers := make([]*metacacheReader, askDisks)
@ -821,6 +824,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
if err != nil {
return err
}
// Make sure we close the pipe so blocked writes doesn't stay around.
defer r.CloseWithError(context.Canceled)
// Send request to each disk.
go func() {
werr := d.WalkDir(ctx, WalkDirOptions{
@ -832,7 +837,10 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
ForwardTo: opts.forwardTo,
}, w)
w.CloseWithError(werr)
if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() {
if werr != io.EOF && werr != nil &&
werr.Error() != errFileNotFound.Error() &&
werr.Error() != errVolumeNotFound.Error() &&
!errors.Is(werr, context.Canceled) {
logger.LogIf(ctx, werr)
}
}()

View File

@ -230,7 +230,8 @@ func (w *metacacheWriter) Reset(out io.Writer) {
}
var s2DecPool = sync.Pool{New: func() interface{} {
return s2.NewReader(nil)
// Default alloc block for network transfer.
return s2.NewReader(nil, s2.ReaderAllocBlock(16<<10))
}}
// metacacheReader allows reading a cache stream.

View File

@ -27,6 +27,7 @@ import (
"strings"
"github.com/gorilla/mux"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
xioutil "github.com/minio/minio/pkg/ioutil"
)
@ -107,6 +108,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
forward := opts.ForwardTo
var scanDir func(path string) error
scanDir = func(current string) error {
if contextCanceled(ctx) {
return ctx.Err()
}
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
if err != nil {
// Folder could have gone away in-between
@ -142,6 +146,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Do do not retain the file.
entries[i] = ""
if contextCanceled(ctx) {
return ctx.Err()
}
// If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry
@ -183,6 +190,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if entry == "" {
continue
}
if contextCanceled(ctx) {
return ctx.Err()
}
meta := metaCacheEntry{name: PathJoin(current, entry)}
// If directory entry on stack before this, pop it now.
@ -282,6 +292,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
logger.LogIf(ctx, err)
return err
}
defer xhttp.DrainBody(respBody)
return waitForHTTPStream(respBody, wr)
}

View File

@ -1368,7 +1368,7 @@ func (args eventArgs) ToEvent(escape bool) event.Event {
AwsRegion: args.ReqParams["region"],
EventTime: eventTime.Format(event.AMZTimeFormat),
EventName: args.EventName,
UserIdentity: event.Identity{PrincipalID: args.ReqParams["accessKey"]},
UserIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
RequestParameters: args.ReqParams,
ResponseElements: respElements,
S3: event.Metadata{
@ -1376,7 +1376,7 @@ func (args eventArgs) ToEvent(escape bool) event.Event {
ConfigurationID: "Config",
Bucket: event.Bucket{
Name: args.BucketName,
OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["accessKey"]},
OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
ARN: policy.ResourceARNPrefix + args.BucketName,
},
Object: event.Object{

View File

@ -426,7 +426,7 @@ func (e BucketRemoteTargetNotFound) Error() string {
type BucketRemoteConnectionErr GenericError
func (e BucketRemoteConnectionErr) Error() string {
return "Remote service endpoint or target bucket not available: " + e.Bucket
return fmt.Sprintf("Remote service endpoint or target bucket not available: %s \n\t%s", e.Bucket, e.Err.Error())
}
// BucketRemoteAlreadyExists remote already exists for this target type.

View File

@ -814,13 +814,7 @@ func (g *GetObjectReader) Close() error {
// Read - to implement Reader interface.
func (g *GetObjectReader) Read(p []byte) (n int, err error) {
n, err = g.pReader.Read(p)
if err != nil {
// Calling code may not Close() in case of error, so
// we ensure it.
g.Close()
}
return
return g.pReader.Read(p)
}
//SealMD5CurrFn seals md5sum with object encryption key and returns sealed

View File

@ -2817,7 +2817,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
VersionID: opts.VersionID,
})
}
_, replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
if replicateDel {
if opts.VersionID != "" {
opts.VersionPurgeStatus = Pending
@ -2825,6 +2826,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
opts.DeleteMarkerReplicationStatus = string(replication.Pending)
}
}
vID := opts.VersionID
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
// check if replica has permission to be deleted.

View File

@ -127,6 +127,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
}
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery))
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
req.Header.Set("Expect", "100-continue") // set expect continue to honor expect continue timeouts
if length > 0 {
req.ContentLength = length
}

View File

@ -207,7 +207,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
pw.CloseWithError(cache.serializeTo(pw))
}()
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
pr.Close()
return cache, err
@ -221,7 +221,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
}()
err = newCache.deserialize(pr)
pr.CloseWithError(err)
return newCache, err
return newCache, toStorageErr(err)
}
func (client *storageRESTClient) GetDiskID() (string, error) {
@ -238,6 +238,14 @@ func (client *storageRESTClient) SetDiskID(id string) {
// DiskInfo - fetch disk information for a remote disk.
func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
if !client.IsOnline() {
// make sure to check if the disk is offline, since the underlying
// value is cached we should attempt to invalidate it if such calls
// were attempted. This can lead to false success under certain conditions
// - this change attempts to avoid stale information if the underlying
// transport is already down.
return info, errDiskNotFound
}
client.diskInfoCache.Once.Do(func() {
client.diskInfoCache.TTL = time.Second
client.diskInfoCache.Update = func() (interface{}, error) {
@ -247,7 +255,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
if err != nil {
return info, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err = msgp.Decode(respBody, &info); err != nil {
return info, err
}
@ -270,7 +278,7 @@ func (client *storageRESTClient) MakeVolBulk(ctx context.Context, volumes ...str
values := make(url.Values)
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -279,7 +287,7 @@ func (client *storageRESTClient) MakeVol(ctx context.Context, volume string) (er
values := make(url.Values)
values.Set(storageRESTVolume, volume)
respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -289,7 +297,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
if err != nil {
return
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
vinfos := VolsInfo(vols)
err = msgp.Decode(respBody, &vinfos)
return vinfos, err
@ -303,7 +311,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
if err != nil {
return
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
err = msgp.Decode(respBody, &vol)
return vol, err
}
@ -316,7 +324,7 @@ func (client *storageRESTClient) DeleteVol(ctx context.Context, volume string, f
values.Set(storageRESTForceDelete, "true")
}
respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -327,7 +335,7 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string,
values.Set(storageRESTFilePath, path)
reader := bytes.NewReader(buf)
respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -337,12 +345,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
values.Set(storageRESTFilePath, path)
values.Set(storageRESTLength, strconv.Itoa(int(size)))
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
if err != nil {
return err
}
waitReader, err := waitForHTTPResponse(respBody)
defer http.DrainBody(ioutil.NopCloser(waitReader))
defer respBody.Close()
defer xhttp.DrainBody(respBody)
return err
}
@ -357,7 +360,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
}
respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -373,7 +376,7 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
}
respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -383,7 +386,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b)))
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -393,7 +396,7 @@ func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, p
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -410,7 +413,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
}
respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -424,7 +427,6 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
values.Set(storageRESTDstPath, dstPath)
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, nil, -1)
defer http.DrainBody(respBody)
return err
}
@ -454,13 +456,13 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
if err != nil {
return fi, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
dec := msgpNewReader(respBody)
defer readMsgpReaderPool.Put(dec)
err = fi.DecodeMsg(dec)
return fi, err
return fi, toStorageErr(err)
}
// ReadAll - reads all contents of a file.
@ -472,7 +474,7 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
if err != nil {
return nil, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return ioutil.ReadAll(respBody)
}
@ -483,11 +485,7 @@ func (client *storageRESTClient) ReadFileStream(ctx context.Context, volume, pat
values.Set(storageRESTFilePath, path)
values.Set(storageRESTOffset, strconv.Itoa(int(offset)))
values.Set(storageRESTLength, strconv.Itoa(int(length)))
respBody, err := client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1)
if err != nil {
return nil, err
}
return respBody, nil
return client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1)
}
// ReadFile - reads section of a file.
@ -508,9 +506,9 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
if err != nil {
return 0, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
n, err := io.ReadFull(respBody, buf)
return int64(n), err
return int64(n), toStorageErr(err)
}
// ListDir - lists a directory.
@ -523,9 +521,9 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st
if err != nil {
return nil, err
}
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&entries)
return entries, err
return entries, toStorageErr(err)
}
// DeleteFile - deletes a file.
@ -536,7 +534,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -560,7 +558,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
errs = make([]error, len(versions))
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
for i := range errs {
errs[i] = err
@ -571,7 +569,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
reader, err := waitForHTTPResponse(respBody)
if err != nil {
for i := range errs {
errs[i] = err
errs[i] = toStorageErr(err)
}
return errs
}
@ -599,7 +597,7 @@ func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcP
values.Set(storageRESTDstVolume, dstVolume)
values.Set(storageRESTDstPath, dstPath)
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
return err
}
@ -614,19 +612,19 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
}
respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1)
defer http.DrainBody(respBody)
defer xhttp.DrainBody(respBody)
if err != nil {
return err
}
respReader, err := waitForHTTPResponse(respBody)
if err != nil {
return err
return toStorageErr(err)
}
verifyResp := &VerifyFileResp{}
if err = gob.NewDecoder(respReader).Decode(verifyResp); err != nil {
return err
return toStorageErr(err)
}
return toStorageErr(verifyResp.Err)

View File

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v29" // Removed WalkVersions()
storageRESTVersion = "v30" // CreateFile is back to non-streaming
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)

View File

@ -288,8 +288,10 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
return
}
done := keepHTTPResponseAlive(w)
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body))
err = s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// DeleteVersion delete updated metadata.
@ -693,6 +695,7 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
if doneCh == nil {
return
}
// Indicate we are ready to write.
doneCh <- err
@ -732,23 +735,6 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
}
}
// drainCloser can be used for wrapping an http response.
// It will drain the body before closing.
type drainCloser struct {
rc io.ReadCloser
}
// Read forwards the read operation.
func (f drainCloser) Read(p []byte) (n int, err error) {
return f.rc.Read(p)
}
// Close drains the body and closes the upstream.
func (f drainCloser) Close() error {
xhttp.DrainBody(f.rc)
return nil
}
// httpStreamResponse allows streaming a response, but still send an error.
type httpStreamResponse struct {
done chan error
@ -840,7 +826,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
case 0:
// 0 is unbuffered, copy the rest.
_, err := io.Copy(w, respBody)
respBody.Close()
if err == io.EOF {
return nil
}
@ -850,18 +835,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
if err != nil {
return err
}
respBody.Close()
return errors.New(string(errorText))
case 3:
// gob style is already deprecated, we can remove this when
// storage API version will be greater or equal to 23.
defer respBody.Close()
dec := gob.NewDecoder(respBody)
var err error
if de := dec.Decode(&err); de == nil {
return err
}
return errors.New("rpc error")
case 2:
// Block of data
var tmp [4]byte
@ -878,7 +852,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
case 32:
continue
default:
go xhttp.DrainBody(respBody)
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
}
}

View File

@ -455,7 +455,7 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 15 * time.Second,
ExpectContinueTimeout: 15 * time.Second,
TLSClientConfig: tlsConfig,

View File

@ -226,7 +226,7 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep
reply.UIVersion = Version
reqParams := extractReqParams(r)
reqParams["accessKey"] = claims.AccessKey
reqParams["accessKey"] = claims.GetAccessKey()
sendEvent(eventArgs{
EventName: event.BucketCreated,
@ -723,7 +723,7 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
)
reqParams := extractReqParams(r)
reqParams["accessKey"] = claims.AccessKey
reqParams["accessKey"] = claims.GetAccessKey()
sourceIP := handlers.GetSourceIP(r)
next:
@ -767,7 +767,7 @@ next:
}
if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig {
goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts)
if _, replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{
if replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{
ObjectName: objectName,
VersionID: goi.VersionID,
}, goi, gerr); replicateDel {
@ -903,7 +903,7 @@ next:
}
}
}
_, replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
// since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication
objToDel := ObjectToDelete{ObjectName: obj.Name}
if replicateDel {
@ -1340,7 +1340,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}
reqParams := extractReqParams(r)
reqParams["accessKey"] = claims.AccessKey
reqParams["accessKey"] = claims.GetAccessKey()
// Notify object created event.
sendEvent(eventArgs{
@ -1529,7 +1529,7 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
}
reqParams := extractReqParams(r)
reqParams["accessKey"] = claims.AccessKey
reqParams["accessKey"] = claims.GetAccessKey()
// Notify object accessed via a GET request.
sendEvent(eventArgs{
@ -1684,7 +1684,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
defer archive.Close()
reqParams := extractReqParams(r)
reqParams["accessKey"] = claims.AccessKey
reqParams["accessKey"] = claims.GetAccessKey()
respElements := extractRespElements(w)
for i, object := range args.Objects {

View File

@ -347,6 +347,8 @@ func (s *xlStorage) IsLocal() bool {
// Retrieve location indexes.
func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
s.RLock()
defer s.RUnlock()
// If unset, see if we can locate it.
if s.poolIndex < 0 || s.setIndex < 0 || s.diskIndex < 0 {
return getXLDiskLoc(s.diskID)
@ -1032,11 +1034,13 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
}
func (s *xlStorage) readAllData(volumeDir string, filePath string, requireDirectIO bool) (buf []byte, err error) {
var f *os.File
var r io.ReadCloser
if requireDirectIO {
var f *os.File
f, err = disk.OpenFileDirectIO(filePath, readMode, 0666)
r = &odirectReader{f, nil, nil, true, true, s, nil}
} else {
f, err = OpenFile(filePath, readMode, 0)
r, err = OpenFile(filePath, readMode, 0)
}
if err != nil {
if osIsNotExist(err) {
@ -1071,10 +1075,8 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string, requireDirect
return nil, err
}
or := &odirectReader{f, nil, nil, true, true, s, nil}
defer or.Close()
buf, err = ioutil.ReadAll(or)
defer r.Close()
buf, err = ioutil.ReadAll(r)
if err != nil {
err = osErrToFileErr(err)
}
@ -1247,7 +1249,7 @@ type odirectReader struct {
// Read - Implements Reader interface.
func (o *odirectReader) Read(buf []byte) (n int, err error) {
if o.err != nil {
if o.err != nil && (len(o.buf) == 0 || o.freshRead) {
return 0, o.err
}
if o.buf == nil {
@ -1274,20 +1276,22 @@ func (o *odirectReader) Read(buf []byte) (n int, err error) {
}
}
if n == 0 {
// err is io.EOF
// err is likely io.EOF
o.err = err
return n, err
}
o.err = err
o.buf = o.buf[:n]
o.freshRead = false
}
if len(buf) >= len(o.buf) {
n = copy(buf, o.buf)
o.freshRead = true
return n, nil
return n, o.err
}
n = copy(buf, o.buf)
o.buf = o.buf[n:]
// There is more left in buffer, do not return any EOF yet.
return n, nil
}
@ -1615,6 +1619,9 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
if err != nil {
return err
}
s.RLock()
formatLegacy := s.formatLegacy
s.RUnlock()
var checkFile func(p string) error
checkFile = func(p string) error {
@ -1626,10 +1633,10 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
if err := checkPathLength(filePath); err != nil {
return err
}
st, _ := Lstat(filePath)
if st == nil {
if !s.formatLegacy {
if !formatLegacy {
return errPathNotFound
}
@ -1880,10 +1887,13 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
legacyPreserved = true
}
} else {
s.RLock()
formatLegacy := s.formatLegacy
s.RUnlock()
// It is possible that some drives may not have `xl.meta` file
// in such scenarios verify if atleast `part.1` files exist
// to verify for legacy version.
if s.formatLegacy {
if formatLegacy {
// We only need this code if we are moving
// from `xl.json` to `xl.meta`, we can avoid
// one extra readdir operation here for all

View File

@ -123,8 +123,12 @@ func (m *Monitor) getReport(selectBucket SelectionFunction) *bandwidth.Report {
if !selectBucket(bucket) {
continue
}
bucketThrottle, ok := m.bucketThrottle[bucket]
if !ok {
continue
}
report.BucketStats[bucket] = bandwidth.Details{
LimitInBytesPerSecond: m.bucketThrottle[bucket].clusterBandwidth,
LimitInBytesPerSecond: bucketThrottle.clusterBandwidth,
CurrentBandwidthInBytesPerSecond: bucketMeasurement.getExpMovingAvgBytesPerSecond(),
}
}

View File

@ -25,62 +25,61 @@ import (
// MonitoredReader monitors the bandwidth
type MonitoredReader struct {
bucket string // Token to track bucket
opts *MonitorReaderOptions
bucketMeasurement *bucketMeasurement // bucket measurement object
object string // Token to track object
reader io.ReadCloser // Reader to wrap
reader io.Reader // Reader to wrap
lastStop time.Time // Last timestamp for a measurement
headerSize int // Size of the header not captured by reader
throttle *throttle // throttle the rate at which replication occur
monitor *Monitor // Monitor reference
closed bool // Reader is closed
lastErr error // last error reported, if this non-nil all reads will fail.
}
// NewMonitoredReader returns a io.ReadCloser that reports bandwidth details.
// The supplied reader will be closed.
func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.ReadCloser, headerSize int, bandwidthBytesPerSecond int64, clusterBandwidth int64) *MonitoredReader {
// MonitorReaderOptions provides configurable options for monitor reader implementation.
type MonitorReaderOptions struct {
Bucket string
Object string
HeaderSize int
BandwidthBytesPerSec int64
ClusterBandwidth int64
}
// NewMonitoredReader returns a io.Reader that reports bandwidth details.
func NewMonitoredReader(ctx context.Context, monitor *Monitor, reader io.Reader, opts *MonitorReaderOptions) *MonitoredReader {
timeNow := time.Now()
b := monitor.track(bucket, object, timeNow)
b := monitor.track(opts.Bucket, opts.Object, timeNow)
return &MonitoredReader{
bucket: bucket,
object: object,
opts: opts,
bucketMeasurement: b,
reader: reader,
lastStop: timeNow,
headerSize: headerSize,
throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond, clusterBandwidth),
throttle: monitor.throttleBandwidth(ctx, opts.Bucket, opts.BandwidthBytesPerSec, opts.ClusterBandwidth),
monitor: monitor,
}
}
// Read wraps the read reader
func (m *MonitoredReader) Read(p []byte) (n int, err error) {
if m.closed {
err = io.ErrClosedPipe
if m.lastErr != nil {
err = m.lastErr
return
}
p = p[:m.throttle.GetLimitForBytes(int64(len(p)))]
n, err = m.reader.Read(p)
stop := time.Now()
update := uint64(n + m.headerSize)
update := uint64(n + m.opts.HeaderSize)
m.bucketMeasurement.incrementBytes(update)
m.lastStop = stop
unused := len(p) - (n + m.headerSize)
m.headerSize = 0 // Set to 0 post first read
unused := len(p) - (n + m.opts.HeaderSize)
m.opts.HeaderSize = 0 // Set to 0 post first read
if unused > 0 {
m.throttle.ReleaseUnusedBandwidth(int64(unused))
}
if err != nil {
m.lastErr = err
}
return
}
// Close stops tracking the io
func (m *MonitoredReader) Close() error {
if m.closed {
return nil
}
m.closed = true
return m.reader.Close()
}

View File

@ -18,6 +18,7 @@ package lifecycle
import (
"encoding/xml"
"fmt"
"io"
"strings"
"time"
@ -71,7 +72,8 @@ func (lc *Lifecycle) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err e
switch start.Name.Local {
case "LifecycleConfiguration", "BucketLifecycleConfiguration":
default:
return errUnknownXMLTag
return xml.UnmarshalError(fmt.Sprintf("expected element type <LifecycleConfiguration>/<BucketLifecycleConfiguration> but have <%s>",
start.Name.Local))
}
for {
// Read tokens from the XML document in a stream.
@ -93,7 +95,7 @@ func (lc *Lifecycle) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err e
}
lc.Rules = append(lc.Rules, r)
default:
return errUnknownXMLTag
return xml.UnmarshalError(fmt.Sprintf("expected element type <Rule> but have <%s>", se.Name.Local))
}
}
}

View File

@ -489,6 +489,41 @@ type ObjectLegalHold struct {
Status LegalHoldStatus `xml:"Status,omitempty"`
}
// UnmarshalXML - decodes XML data.
func (l *ObjectLegalHold) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err error) {
switch start.Name.Local {
case "LegalHold", "ObjectLockLegalHold":
default:
return xml.UnmarshalError(fmt.Sprintf("expected element type <LegalHold>/<ObjectLockLegalHold> but have <%s>",
start.Name.Local))
}
for {
// Read tokens from the XML document in a stream.
t, err := d.Token()
if err != nil {
if err == io.EOF {
break
}
return err
}
switch se := t.(type) {
case xml.StartElement:
switch se.Name.Local {
case "Status":
var st LegalHoldStatus
if err = d.DecodeElement(&st, &se); err != nil {
return err
}
l.Status = st
default:
return xml.UnmarshalError(fmt.Sprintf("expected element type <Status> but have <%s>", se.Name.Local))
}
}
}
return nil
}
// IsEmpty returns true if struct is empty
func (l *ObjectLegalHold) IsEmpty() bool {
return !l.Status.Valid()

View File

@ -18,6 +18,7 @@ package lock
import (
"encoding/xml"
"errors"
"fmt"
"net/http"
"reflect"
@ -467,6 +468,23 @@ func TestParseObjectLegalHold(t *testing.T) {
expectedErr: nil,
expectErr: false,
},
{
value: `<?xml version="1.0" encoding="UTF-8"?><ObjectLockLegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Status>ON</Status></ObjectLockLegalHold>`,
expectedErr: nil,
expectErr: false,
},
// invalid Status key
{
value: `<?xml version="1.0" encoding="UTF-8"?><ObjectLockLegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><MyStatus>ON</MyStatus></ObjectLockLegalHold>`,
expectedErr: errors.New("expected element type <Status> but have <MyStatus>"),
expectErr: true,
},
// invalid XML attr
{
value: `<?xml version="1.0" encoding="UTF-8"?><UnknownLegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Status>ON</Status></UnknownLegalHold>`,
expectedErr: errors.New("expected element type <LegalHold>/<ObjectLockLegalHold> but have <UnknownLegalHold>"),
expectErr: true,
},
{
value: `<?xml version="1.0" encoding="UTF-8"?><LegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Status>On</Status></LegalHold>`,
expectedErr: ErrMalformedXML,

View File

@ -110,10 +110,18 @@ const (
// AWSUsername - user friendly name, in MinIO this value is same as your user Access Key.
AWSUsername Key = "aws:username"
// S3SignatureVersion - identifies the version of AWS Signature that you want to support for authenticated requests.
S3SignatureVersion = "s3:signatureversion"
// S3AuthType - optionally use this condition key to restrict incoming requests to use a specific authentication method.
S3AuthType = "s3:authType"
)
// AllSupportedKeys - is list of all all supported keys.
var AllSupportedKeys = append([]Key{
S3SignatureVersion,
S3AuthType,
S3XAmzCopySource,
S3XAmzServerSideEncryption,
S3XAmzServerSideEncryptionCustomerAlgorithm,
@ -144,6 +152,8 @@ var AllSupportedKeys = append([]Key{
// CommonKeys - is list of all common condition keys.
var CommonKeys = append([]Key{
S3SignatureVersion,
S3AuthType,
S3XAmzContentSha256,
S3LocationConstraint,
AWSReferer,

View File

@ -1,5 +1,5 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
* MinIO Cloud Storage, (C) 2019-2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -216,6 +216,7 @@ type S3Select struct {
statement *sql.SelectStatement
progressReader *progressReader
recordReader recordReader
close func() error
}
var (
@ -311,6 +312,7 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
}
return err
}
s3Select.close = rc.Close
return nil
case jsonFormat:
rc, err := getReader(0, -1)
@ -333,6 +335,8 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
} else {
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
}
s3Select.close = rc.Close
return nil
case parquetFormat:
if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") {
@ -396,6 +400,12 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
// Evaluate - filters and sends records read from opened reader as per select statement to http response writer.
func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
defer func() {
if s3Select.close != nil {
s3Select.close()
}
}()
getProgressFunc := s3Select.getProgress
if !s3Select.Progress.Enabled {
getProgressFunc = nil

View File

@ -739,6 +739,152 @@ func TestCSVQueries2(t *testing.T) {
}
}
func TestCSVQueries3(t *testing.T) {
input := `na.me,qty,CAST
apple,1,true
mango,3,false
`
var testTable = []struct {
name string
query string
requestXML []byte // override request XML
wantResult string
}{
{
name: "Select a column containing dot",
query: `select "na.me" from S3Object s`,
wantResult: `apple
mango`,
},
{
name: "Select column containing dot with table name prefix",
query: `select count(S3Object."na.me") from S3Object`,
wantResult: `2`,
},
{
name: "Select column containing dot with table alias prefix",
query: `select s."na.me" from S3Object as s`,
wantResult: `apple
mango`,
},
{
name: "Select column simplest",
query: `select qty from S3Object`,
wantResult: `1
3`,
},
{
name: "Select column with table name prefix",
query: `select S3Object.qty from S3Object`,
wantResult: `1
3`,
},
{
name: "Select column without table alias",
query: `select qty from S3Object s`,
wantResult: `1
3`,
},
{
name: "Select column with table alias",
query: `select s.qty from S3Object s`,
wantResult: `1
3`,
},
{
name: "Select reserved word column",
query: `select "CAST" from s3object`,
wantResult: `true
false`,
},
{
name: "Select reserved word column with table alias",
query: `select S3Object."CAST" from s3object`,
wantResult: `true
false`,
},
{
name: "Select reserved word column with unused table alias",
query: `select "CAST" from s3object s`,
wantResult: `true
false`,
},
{
name: "Select reserved word column with table alias",
query: `select s."CAST" from s3object s`,
wantResult: `true
false`,
},
{
name: "Select reserved word column with table alias",
query: `select NOT CAST(s."CAST" AS Bool) from s3object s`,
wantResult: `false
true`,
},
}
defRequest := `<?xml version="1.0" encoding="UTF-8"?>
<SelectObjectContentRequest>
<Expression>%s</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<CSV>
<FileHeaderInfo>USE</FileHeaderInfo>
<QuoteCharacter>"</QuoteCharacter>
</CSV>
</InputSerialization>
<OutputSerialization>
<CSV/>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectObjectContentRequest>`
for _, testCase := range testTable {
t.Run(testCase.name, func(t *testing.T) {
testReq := testCase.requestXML
if len(testReq) == 0 {
testReq = []byte(fmt.Sprintf(defRequest, testCase.query))
}
s3Select, err := NewS3Select(bytes.NewReader(testReq))
if err != nil {
t.Fatal(err)
}
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewBufferString(input)), nil
}); err != nil {
t.Fatal(err)
}
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
resp := http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(w.response)),
ContentLength: int64(len(w.response)),
}
res, err := minio.NewSelectResults(&resp, "testbucket")
if err != nil {
t.Error(err)
return
}
got, err := ioutil.ReadAll(res)
if err != nil {
t.Error(err)
return
}
gotS := strings.TrimSpace(string(got))
if gotS != testCase.wantResult {
t.Errorf("received response does not match with expected reply.\nQuery: %s\n=====\ngot: %s\n=====\nwant: %s\n=====\n", testCase.query, gotS, testCase.wantResult)
}
})
}
}
func TestCSVInput(t *testing.T) {
var testTable = []struct {
requestXML []byte

View File

@ -63,7 +63,7 @@ func newAggVal(fn FuncName) *aggVal {
// current row and stores the result.
//
// On success, it returns (nil, nil).
func (e *FuncExpr) evalAggregationNode(r Record) error {
func (e *FuncExpr) evalAggregationNode(r Record, tableAlias string) error {
// It is assumed that this function is called only when
// `e` is an aggregation function.
@ -77,13 +77,13 @@ func (e *FuncExpr) evalAggregationNode(r Record) error {
return nil
}
val, err = e.Count.ExprArg.evalNode(r)
val, err = e.Count.ExprArg.evalNode(r, tableAlias)
if err != nil {
return err
}
} else {
// Evaluate the (only) argument
val, err = e.SFunc.ArgsList[0].evalNode(r)
val, err = e.SFunc.ArgsList[0].evalNode(r, tableAlias)
if err != nil {
return err
}
@ -149,13 +149,13 @@ func (e *FuncExpr) evalAggregationNode(r Record) error {
return err
}
func (e *AliasedExpression) aggregateRow(r Record) error {
return e.Expression.aggregateRow(r)
func (e *AliasedExpression) aggregateRow(r Record, tableAlias string) error {
return e.Expression.aggregateRow(r, tableAlias)
}
func (e *Expression) aggregateRow(r Record) error {
func (e *Expression) aggregateRow(r Record, tableAlias string) error {
for _, ex := range e.And {
err := ex.aggregateRow(r)
err := ex.aggregateRow(r, tableAlias)
if err != nil {
return err
}
@ -163,9 +163,9 @@ func (e *Expression) aggregateRow(r Record) error {
return nil
}
func (e *ListExpr) aggregateRow(r Record) error {
func (e *ListExpr) aggregateRow(r Record, tableAlias string) error {
for _, ex := range e.Elements {
err := ex.aggregateRow(r)
err := ex.aggregateRow(r, tableAlias)
if err != nil {
return err
}
@ -173,9 +173,9 @@ func (e *ListExpr) aggregateRow(r Record) error {
return nil
}
func (e *AndCondition) aggregateRow(r Record) error {
func (e *AndCondition) aggregateRow(r Record, tableAlias string) error {
for _, ex := range e.Condition {
err := ex.aggregateRow(r)
err := ex.aggregateRow(r, tableAlias)
if err != nil {
return err
}
@ -183,15 +183,15 @@ func (e *AndCondition) aggregateRow(r Record) error {
return nil
}
func (e *Condition) aggregateRow(r Record) error {
func (e *Condition) aggregateRow(r Record, tableAlias string) error {
if e.Operand != nil {
return e.Operand.aggregateRow(r)
return e.Operand.aggregateRow(r, tableAlias)
}
return e.Not.aggregateRow(r)
return e.Not.aggregateRow(r, tableAlias)
}
func (e *ConditionOperand) aggregateRow(r Record) error {
err := e.Operand.aggregateRow(r)
func (e *ConditionOperand) aggregateRow(r Record, tableAlias string) error {
err := e.Operand.aggregateRow(r, tableAlias)
if err != nil {
return err
}
@ -202,38 +202,38 @@ func (e *ConditionOperand) aggregateRow(r Record) error {
switch {
case e.ConditionRHS.Compare != nil:
return e.ConditionRHS.Compare.Operand.aggregateRow(r)
return e.ConditionRHS.Compare.Operand.aggregateRow(r, tableAlias)
case e.ConditionRHS.Between != nil:
err = e.ConditionRHS.Between.Start.aggregateRow(r)
err = e.ConditionRHS.Between.Start.aggregateRow(r, tableAlias)
if err != nil {
return err
}
return e.ConditionRHS.Between.End.aggregateRow(r)
return e.ConditionRHS.Between.End.aggregateRow(r, tableAlias)
case e.ConditionRHS.In != nil:
elt := e.ConditionRHS.In.ListExpression
err = elt.aggregateRow(r)
err = elt.aggregateRow(r, tableAlias)
if err != nil {
return err
}
return nil
case e.ConditionRHS.Like != nil:
err = e.ConditionRHS.Like.Pattern.aggregateRow(r)
err = e.ConditionRHS.Like.Pattern.aggregateRow(r, tableAlias)
if err != nil {
return err
}
return e.ConditionRHS.Like.EscapeChar.aggregateRow(r)
return e.ConditionRHS.Like.EscapeChar.aggregateRow(r, tableAlias)
default:
return errInvalidASTNode
}
}
func (e *Operand) aggregateRow(r Record) error {
err := e.Left.aggregateRow(r)
func (e *Operand) aggregateRow(r Record, tableAlias string) error {
err := e.Left.aggregateRow(r, tableAlias)
if err != nil {
return err
}
for _, rt := range e.Right {
err = rt.Right.aggregateRow(r)
err = rt.Right.aggregateRow(r, tableAlias)
if err != nil {
return err
}
@ -241,13 +241,13 @@ func (e *Operand) aggregateRow(r Record) error {
return nil
}
func (e *MultOp) aggregateRow(r Record) error {
err := e.Left.aggregateRow(r)
func (e *MultOp) aggregateRow(r Record, tableAlias string) error {
err := e.Left.aggregateRow(r, tableAlias)
if err != nil {
return err
}
for _, rt := range e.Right {
err = rt.Right.aggregateRow(r)
err = rt.Right.aggregateRow(r, tableAlias)
if err != nil {
return err
}
@ -255,29 +255,29 @@ func (e *MultOp) aggregateRow(r Record) error {
return nil
}
func (e *UnaryTerm) aggregateRow(r Record) error {
func (e *UnaryTerm) aggregateRow(r Record, tableAlias string) error {
if e.Negated != nil {
return e.Negated.Term.aggregateRow(r)
return e.Negated.Term.aggregateRow(r, tableAlias)
}
return e.Primary.aggregateRow(r)
return e.Primary.aggregateRow(r, tableAlias)
}
func (e *PrimaryTerm) aggregateRow(r Record) error {
func (e *PrimaryTerm) aggregateRow(r Record, tableAlias string) error {
switch {
case e.ListExpr != nil:
return e.ListExpr.aggregateRow(r)
return e.ListExpr.aggregateRow(r, tableAlias)
case e.SubExpression != nil:
return e.SubExpression.aggregateRow(r)
return e.SubExpression.aggregateRow(r, tableAlias)
case e.FuncCall != nil:
return e.FuncCall.aggregateRow(r)
return e.FuncCall.aggregateRow(r, tableAlias)
}
return nil
}
func (e *FuncExpr) aggregateRow(r Record) error {
func (e *FuncExpr) aggregateRow(r Record, tableAlias string) error {
switch e.getFunctionName() {
case aggFnAvg, aggFnSum, aggFnMax, aggFnMin, aggFnCount:
return e.evalAggregationNode(r)
return e.evalAggregationNode(r, tableAlias)
default:
// TODO: traverse arguments and call aggregateRow on
// them if they could be an ancestor of an

View File

@ -19,6 +19,7 @@ package sql
import (
"errors"
"fmt"
"strings"
)
// Query analysis - The query is analyzed to determine if it involves
@ -177,7 +178,7 @@ func (e *PrimaryTerm) analyze(s *Select) (result qProp) {
case e.JPathExpr != nil:
// Check if the path expression is valid
if len(e.JPathExpr.PathExpr) > 0 {
if e.JPathExpr.BaseKey.String() != s.From.As {
if e.JPathExpr.BaseKey.String() != s.From.As && strings.ToLower(e.JPathExpr.BaseKey.String()) != baseTableName {
result = qProp{err: errInvalidKeypath}
return
}

View File

@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"math"
"strings"
"github.com/bcicen/jstream"
"github.com/minio/simdjson-go"
@ -47,21 +46,21 @@ var (
// of child nodes. The final result row is returned after all rows are
// processed, and the `getAggregate` function is called.
func (e *AliasedExpression) evalNode(r Record) (*Value, error) {
return e.Expression.evalNode(r)
func (e *AliasedExpression) evalNode(r Record, tableAlias string) (*Value, error) {
return e.Expression.evalNode(r, tableAlias)
}
func (e *Expression) evalNode(r Record) (*Value, error) {
func (e *Expression) evalNode(r Record, tableAlias string) (*Value, error) {
if len(e.And) == 1 {
// In this case, result is not required to be boolean
// type.
return e.And[0].evalNode(r)
return e.And[0].evalNode(r, tableAlias)
}
// Compute OR of conditions
result := false
for _, ex := range e.And {
res, err := ex.evalNode(r)
res, err := ex.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -74,16 +73,16 @@ func (e *Expression) evalNode(r Record) (*Value, error) {
return FromBool(result), nil
}
func (e *AndCondition) evalNode(r Record) (*Value, error) {
func (e *AndCondition) evalNode(r Record, tableAlias string) (*Value, error) {
if len(e.Condition) == 1 {
// In this case, result does not have to be boolean
return e.Condition[0].evalNode(r)
return e.Condition[0].evalNode(r, tableAlias)
}
// Compute AND of conditions
result := true
for _, ex := range e.Condition {
res, err := ex.evalNode(r)
res, err := ex.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -96,14 +95,14 @@ func (e *AndCondition) evalNode(r Record) (*Value, error) {
return FromBool(result), nil
}
func (e *Condition) evalNode(r Record) (*Value, error) {
func (e *Condition) evalNode(r Record, tableAlias string) (*Value, error) {
if e.Operand != nil {
// In this case, result does not have to be boolean
return e.Operand.evalNode(r)
return e.Operand.evalNode(r, tableAlias)
}
// Compute NOT of condition
res, err := e.Not.evalNode(r)
res, err := e.Not.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -114,8 +113,8 @@ func (e *Condition) evalNode(r Record) (*Value, error) {
return FromBool(!b), nil
}
func (e *ConditionOperand) evalNode(r Record) (*Value, error) {
opVal, opErr := e.Operand.evalNode(r)
func (e *ConditionOperand) evalNode(r Record, tableAlias string) (*Value, error) {
opVal, opErr := e.Operand.evalNode(r, tableAlias)
if opErr != nil || e.ConditionRHS == nil {
return opVal, opErr
}
@ -123,7 +122,7 @@ func (e *ConditionOperand) evalNode(r Record) (*Value, error) {
// Need to evaluate the ConditionRHS
switch {
case e.ConditionRHS.Compare != nil:
cmpRight, cmpRErr := e.ConditionRHS.Compare.Operand.evalNode(r)
cmpRight, cmpRErr := e.ConditionRHS.Compare.Operand.evalNode(r, tableAlias)
if cmpRErr != nil {
return nil, cmpRErr
}
@ -132,26 +131,26 @@ func (e *ConditionOperand) evalNode(r Record) (*Value, error) {
return FromBool(b), err
case e.ConditionRHS.Between != nil:
return e.ConditionRHS.Between.evalBetweenNode(r, opVal)
return e.ConditionRHS.Between.evalBetweenNode(r, opVal, tableAlias)
case e.ConditionRHS.Like != nil:
return e.ConditionRHS.Like.evalLikeNode(r, opVal)
return e.ConditionRHS.Like.evalLikeNode(r, opVal, tableAlias)
case e.ConditionRHS.In != nil:
return e.ConditionRHS.In.evalInNode(r, opVal)
return e.ConditionRHS.In.evalInNode(r, opVal, tableAlias)
default:
return nil, errInvalidASTNode
}
}
func (e *Between) evalBetweenNode(r Record, arg *Value) (*Value, error) {
stVal, stErr := e.Start.evalNode(r)
func (e *Between) evalBetweenNode(r Record, arg *Value, tableAlias string) (*Value, error) {
stVal, stErr := e.Start.evalNode(r, tableAlias)
if stErr != nil {
return nil, stErr
}
endVal, endErr := e.End.evalNode(r)
endVal, endErr := e.End.evalNode(r, tableAlias)
if endErr != nil {
return nil, endErr
}
@ -174,7 +173,7 @@ func (e *Between) evalBetweenNode(r Record, arg *Value) (*Value, error) {
return FromBool(result), nil
}
func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
func (e *Like) evalLikeNode(r Record, arg *Value, tableAlias string) (*Value, error) {
inferTypeAsString(arg)
s, ok := arg.ToString()
@ -183,7 +182,7 @@ func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
return nil, errLikeInvalidInputs(err)
}
pattern, err1 := e.Pattern.evalNode(r)
pattern, err1 := e.Pattern.evalNode(r, tableAlias)
if err1 != nil {
return nil, err1
}
@ -199,7 +198,7 @@ func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
escape := runeZero
if e.EscapeChar != nil {
escapeVal, err2 := e.EscapeChar.evalNode(r)
escapeVal, err2 := e.EscapeChar.evalNode(r, tableAlias)
if err2 != nil {
return nil, err2
}
@ -230,14 +229,14 @@ func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
return FromBool(matchResult), nil
}
func (e *ListExpr) evalNode(r Record) (*Value, error) {
func (e *ListExpr) evalNode(r Record, tableAlias string) (*Value, error) {
res := make([]Value, len(e.Elements))
if len(e.Elements) == 1 {
// If length 1, treat as single value.
return e.Elements[0].evalNode(r)
return e.Elements[0].evalNode(r, tableAlias)
}
for i, elt := range e.Elements {
v, err := elt.evalNode(r)
v, err := elt.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -248,7 +247,7 @@ func (e *ListExpr) evalNode(r Record) (*Value, error) {
const floatCmpTolerance = 0.000001
func (e *In) evalInNode(r Record, lhs *Value) (*Value, error) {
func (e *In) evalInNode(r Record, lhs *Value, tableAlias string) (*Value, error) {
// Compare two values in terms of in-ness.
var cmp func(a, b Value) bool
cmp = func(a, b Value) bool {
@ -283,7 +282,7 @@ func (e *In) evalInNode(r Record, lhs *Value) (*Value, error) {
var rhs Value
if elt := e.ListExpression; elt != nil {
eltVal, err := elt.evalNode(r)
eltVal, err := elt.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -304,8 +303,8 @@ func (e *In) evalInNode(r Record, lhs *Value) (*Value, error) {
return FromBool(cmp(rhs, *lhs)), nil
}
func (e *Operand) evalNode(r Record) (*Value, error) {
lval, lerr := e.Left.evalNode(r)
func (e *Operand) evalNode(r Record, tableAlias string) (*Value, error) {
lval, lerr := e.Left.evalNode(r, tableAlias)
if lerr != nil || len(e.Right) == 0 {
return lval, lerr
}
@ -315,7 +314,7 @@ func (e *Operand) evalNode(r Record) (*Value, error) {
// symbols.
for _, rightTerm := range e.Right {
op := rightTerm.Op
rval, rerr := rightTerm.Right.evalNode(r)
rval, rerr := rightTerm.Right.evalNode(r, tableAlias)
if rerr != nil {
return nil, rerr
}
@ -327,8 +326,8 @@ func (e *Operand) evalNode(r Record) (*Value, error) {
return lval, nil
}
func (e *MultOp) evalNode(r Record) (*Value, error) {
lval, lerr := e.Left.evalNode(r)
func (e *MultOp) evalNode(r Record, tableAlias string) (*Value, error) {
lval, lerr := e.Left.evalNode(r, tableAlias)
if lerr != nil || len(e.Right) == 0 {
return lval, lerr
}
@ -337,7 +336,7 @@ func (e *MultOp) evalNode(r Record) (*Value, error) {
// AST node is for terms separated by *, / or % symbols.
for _, rightTerm := range e.Right {
op := rightTerm.Op
rval, rerr := rightTerm.Right.evalNode(r)
rval, rerr := rightTerm.Right.evalNode(r, tableAlias)
if rerr != nil {
return nil, rerr
}
@ -350,12 +349,12 @@ func (e *MultOp) evalNode(r Record) (*Value, error) {
return lval, nil
}
func (e *UnaryTerm) evalNode(r Record) (*Value, error) {
func (e *UnaryTerm) evalNode(r Record, tableAlias string) (*Value, error) {
if e.Negated == nil {
return e.Primary.evalNode(r)
return e.Primary.evalNode(r, tableAlias)
}
v, err := e.Negated.Term.evalNode(r)
v, err := e.Negated.Term.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -368,19 +367,15 @@ func (e *UnaryTerm) evalNode(r Record) (*Value, error) {
return nil, errArithMismatchedTypes
}
func (e *JSONPath) evalNode(r Record) (*Value, error) {
// Strip the table name from the keypath.
keypath := e.String()
if strings.Contains(keypath, ".") {
ps := strings.SplitN(keypath, ".", 2)
if len(ps) == 2 {
keypath = ps[1]
}
func (e *JSONPath) evalNode(r Record, tableAlias string) (*Value, error) {
alias := tableAlias
if tableAlias == "" {
alias = baseTableName
}
pathExpr := e.StripTableAlias(alias)
_, rawVal := r.Raw()
switch rowVal := rawVal.(type) {
case jstream.KVS, simdjson.Object:
pathExpr := e.PathExpr
if len(pathExpr) == 0 {
pathExpr = []*JSONPathElement{{Key: &ObjectKey{ID: e.BaseKey}}}
}
@ -392,7 +387,10 @@ func (e *JSONPath) evalNode(r Record) (*Value, error) {
return jsonToValue(result)
default:
return r.Get(keypath)
if pathExpr[len(pathExpr)-1].Key == nil {
return nil, errInvalidKeypath
}
return r.Get(pathExpr[len(pathExpr)-1].Key.keyString())
}
}
@ -447,28 +445,28 @@ func jsonToValue(result interface{}) (*Value, error) {
return nil, fmt.Errorf("Unhandled value type: %T", result)
}
func (e *PrimaryTerm) evalNode(r Record) (res *Value, err error) {
func (e *PrimaryTerm) evalNode(r Record, tableAlias string) (res *Value, err error) {
switch {
case e.Value != nil:
return e.Value.evalNode(r)
case e.JPathExpr != nil:
return e.JPathExpr.evalNode(r)
return e.JPathExpr.evalNode(r, tableAlias)
case e.ListExpr != nil:
return e.ListExpr.evalNode(r)
return e.ListExpr.evalNode(r, tableAlias)
case e.SubExpression != nil:
return e.SubExpression.evalNode(r)
return e.SubExpression.evalNode(r, tableAlias)
case e.FuncCall != nil:
return e.FuncCall.evalNode(r)
return e.FuncCall.evalNode(r, tableAlias)
}
return nil, errInvalidASTNode
}
func (e *FuncExpr) evalNode(r Record) (res *Value, err error) {
func (e *FuncExpr) evalNode(r Record, tableAlias string) (res *Value, err error) {
switch e.getFunctionName() {
case aggFnCount, aggFnAvg, aggFnMax, aggFnMin, aggFnSum:
return e.getAggregate()
default:
return e.evalSQLFnNode(r)
return e.evalSQLFnNode(r, tableAlias)
}
}

View File

@ -84,35 +84,35 @@ func (e *FuncExpr) getFunctionName() FuncName {
// evalSQLFnNode assumes that the FuncExpr is not an aggregation
// function.
func (e *FuncExpr) evalSQLFnNode(r Record) (res *Value, err error) {
func (e *FuncExpr) evalSQLFnNode(r Record, tableAlias string) (res *Value, err error) {
// Handle functions that have phrase arguments
switch e.getFunctionName() {
case sqlFnCast:
expr := e.Cast.Expr
res, err = expr.castTo(r, strings.ToUpper(e.Cast.CastType))
res, err = expr.castTo(r, strings.ToUpper(e.Cast.CastType), tableAlias)
return
case sqlFnSubstring:
return handleSQLSubstring(r, e.Substring)
return handleSQLSubstring(r, e.Substring, tableAlias)
case sqlFnExtract:
return handleSQLExtract(r, e.Extract)
return handleSQLExtract(r, e.Extract, tableAlias)
case sqlFnTrim:
return handleSQLTrim(r, e.Trim)
return handleSQLTrim(r, e.Trim, tableAlias)
case sqlFnDateAdd:
return handleDateAdd(r, e.DateAdd)
return handleDateAdd(r, e.DateAdd, tableAlias)
case sqlFnDateDiff:
return handleDateDiff(r, e.DateDiff)
return handleDateDiff(r, e.DateDiff, tableAlias)
}
// For all simple argument functions, we evaluate the arguments here
argVals := make([]*Value, len(e.SFunc.ArgsList))
for i, arg := range e.SFunc.ArgsList {
argVals[i], err = arg.evalNode(r)
argVals[i], err = arg.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -219,8 +219,8 @@ func upperCase(v *Value) (*Value, error) {
return FromString(strings.ToUpper(s)), nil
}
func handleDateAdd(r Record, d *DateAddFunc) (*Value, error) {
q, err := d.Quantity.evalNode(r)
func handleDateAdd(r Record, d *DateAddFunc, tableAlias string) (*Value, error) {
q, err := d.Quantity.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -230,7 +230,7 @@ func handleDateAdd(r Record, d *DateAddFunc) (*Value, error) {
return nil, fmt.Errorf("QUANTITY must be a numeric argument to %s()", sqlFnDateAdd)
}
ts, err := d.Timestamp.evalNode(r)
ts, err := d.Timestamp.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -245,8 +245,8 @@ func handleDateAdd(r Record, d *DateAddFunc) (*Value, error) {
return dateAdd(strings.ToUpper(d.DatePart), qty, t)
}
func handleDateDiff(r Record, d *DateDiffFunc) (*Value, error) {
tval1, err := d.Timestamp1.evalNode(r)
func handleDateDiff(r Record, d *DateDiffFunc, tableAlias string) (*Value, error) {
tval1, err := d.Timestamp1.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -258,7 +258,7 @@ func handleDateDiff(r Record, d *DateDiffFunc) (*Value, error) {
return nil, fmt.Errorf("%s() expects two timestamp arguments", sqlFnDateDiff)
}
tval2, err := d.Timestamp2.evalNode(r)
tval2, err := d.Timestamp2.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -277,12 +277,12 @@ func handleUTCNow() (*Value, error) {
return FromTimestamp(time.Now().UTC()), nil
}
func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
func handleSQLSubstring(r Record, e *SubstringFunc, tableAlias string) (val *Value, err error) {
// Both forms `SUBSTRING('abc' FROM 2 FOR 1)` and
// SUBSTRING('abc', 2, 1) are supported.
// Evaluate the string argument
v1, err := e.Expr.evalNode(r)
v1, err := e.Expr.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -301,7 +301,7 @@ func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
}
// Evaluate the FROM argument
v2, err := arg2.evalNode(r)
v2, err := arg2.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -315,7 +315,7 @@ func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
length := -1
// Evaluate the optional FOR argument
if arg3 != nil {
v3, err := arg3.evalNode(r)
v3, err := arg3.evalNode(r, tableAlias)
if err != nil {
return nil, err
}
@ -336,11 +336,11 @@ func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
return FromString(res), err
}
func handleSQLTrim(r Record, e *TrimFunc) (res *Value, err error) {
func handleSQLTrim(r Record, e *TrimFunc, tableAlias string) (res *Value, err error) {
chars := ""
ok := false
if e.TrimChars != nil {
charsV, cerr := e.TrimChars.evalNode(r)
charsV, cerr := e.TrimChars.evalNode(r, tableAlias)
if cerr != nil {
return nil, cerr
}
@ -351,7 +351,7 @@ func handleSQLTrim(r Record, e *TrimFunc) (res *Value, err error) {
}
}
fromV, ferr := e.TrimFrom.evalNode(r)
fromV, ferr := e.TrimFrom.evalNode(r, tableAlias)
if ferr != nil {
return nil, ferr
}
@ -368,8 +368,8 @@ func handleSQLTrim(r Record, e *TrimFunc) (res *Value, err error) {
return FromString(result), nil
}
func handleSQLExtract(r Record, e *ExtractFunc) (res *Value, err error) {
timeVal, verr := e.From.evalNode(r)
func handleSQLExtract(r Record, e *ExtractFunc, tableAlias string) (res *Value, err error) {
timeVal, verr := e.From.evalNode(r, tableAlias)
if verr != nil {
return nil, verr
}
@ -406,8 +406,8 @@ const (
castTimestamp = "TIMESTAMP"
)
func (e *Expression) castTo(r Record, castType string) (res *Value, err error) {
v, err := e.evalNode(r)
func (e *Expression) castTo(r Record, castType string, tableAlias string) (res *Value, err error) {
v, err := e.evalNode(r, tableAlias)
if err != nil {
return nil, err
}

View File

@ -119,7 +119,9 @@ type JSONPath struct {
PathExpr []*JSONPathElement `parser:"(@@)*"`
// Cached values:
pathString string
pathString string
strippedTableAlias string
strippedPathExpr []*JSONPathElement
}
// AliasedExpression is an expression that can be optionally named

View File

@ -46,6 +46,9 @@ type SelectStatement struct {
// Count of rows that have been output.
outputCount int64
// Table alias
tableAlias string
}
// ParseSelectStatement - parses a select query from the given string
@ -107,6 +110,9 @@ func ParseSelectStatement(s string) (stmt SelectStatement, err error) {
if err != nil {
err = errQueryAnalysisFailure(err)
}
// Set table alias
stmt.tableAlias = selectAST.From.As
return
}
@ -226,7 +232,7 @@ func (e *SelectStatement) IsAggregated() bool {
// records have been processed. Applies only to aggregation queries.
func (e *SelectStatement) AggregateResult(output Record) error {
for i, expr := range e.selectAST.Expression.Expressions {
v, err := expr.evalNode(nil)
v, err := expr.evalNode(nil, e.tableAlias)
if err != nil {
return err
}
@ -246,7 +252,7 @@ func (e *SelectStatement) isPassingWhereClause(input Record) (bool, error) {
if e.selectAST.Where == nil {
return true, nil
}
value, err := e.selectAST.Where.evalNode(input)
value, err := e.selectAST.Where.evalNode(input, e.tableAlias)
if err != nil {
return false, err
}
@ -272,7 +278,7 @@ func (e *SelectStatement) AggregateRow(input Record) error {
}
for _, expr := range e.selectAST.Expression.Expressions {
err := expr.aggregateRow(input)
err := expr.aggregateRow(input, e.tableAlias)
if err != nil {
return err
}
@ -302,7 +308,7 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
}
for i, expr := range e.selectAST.Expression.Expressions {
v, err := expr.evalNode(input)
v, err := expr.evalNode(input, e.tableAlias)
if err != nil {
return nil, err
}

View File

@ -36,6 +36,27 @@ func (e *JSONPath) String() string {
return e.pathString
}
// StripTableAlias removes a table alias from the path. The result is also
// cached for repeated lookups during SQL query evaluation.
func (e *JSONPath) StripTableAlias(tableAlias string) []*JSONPathElement {
if e.strippedTableAlias == tableAlias {
return e.strippedPathExpr
}
hasTableAlias := e.BaseKey.String() == tableAlias || strings.ToLower(e.BaseKey.String()) == baseTableName
var pathExpr []*JSONPathElement
if hasTableAlias {
pathExpr = e.PathExpr
} else {
pathExpr = make([]*JSONPathElement, len(e.PathExpr)+1)
pathExpr[0] = &JSONPathElement{Key: &ObjectKey{ID: e.BaseKey}}
copy(pathExpr[1:], e.PathExpr)
}
e.strippedTableAlias = tableAlias
e.strippedPathExpr = pathExpr
return e.strippedPathExpr
}
func (e *JSONPathElement) String() string {
switch {
case e.Key != nil: