Compare commits
26 Commits
master
...
RELEASE.20
Author | SHA1 | Date |
---|---|---|
Klaus Post | ba04da8bf2 | |
Harshavardhana | f4c4e5fc33 | |
Klaus Post | f3cd606976 | |
Klaus Post | 14105a1689 | |
Harshavardhana | 28a9409da7 | |
Klaus Post | 2965348694 | |
Anis Elleuch | 857ea511f1 | |
Harshavardhana | 3f74303d1c | |
Harshavardhana | afa90189ae | |
Harshavardhana | e3f2a260aa | |
Harshavardhana | 82780ec10e | |
Klaus Post | b57735ec53 | |
Poorna Krishnamoorthy | bfab990c33 | |
Harshavardhana | 94018588fe | |
Anis Elleuch | 8b76ba8d5d | |
Harshavardhana | 7eb7f65e48 | |
Harshavardhana | c608c0688a | |
Aditya Manthramurthy | 41a9d1d778 | |
Klaus Post | e21e80841e | |
Klaus Post | 98c792bbeb | |
Klaus Post | f687ba53bc | |
Harshavardhana | e3da59c923 | |
Harshavardhana | 781b9b051c | |
Harshavardhana | 438becfde8 | |
Harshavardhana | 16ef338649 | |
Harshavardhana | 3242847ec0 |
|
@ -172,7 +172,12 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
|
|||
}
|
||||
|
||||
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, update); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
switch err.(type) {
|
||||
case BucketRemoteConnectionErr:
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationRemoteConnectionError, err), r.URL)
|
||||
default:
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
}
|
||||
return
|
||||
}
|
||||
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -1470,30 +1471,33 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http
|
|||
return
|
||||
}
|
||||
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
setEventStreamHeaders(w)
|
||||
reportCh := make(chan bandwidth.Report, 1)
|
||||
reportCh := make(chan bandwidth.Report)
|
||||
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
|
||||
defer keepAliveTicker.Stop()
|
||||
bucketsRequestedString := r.URL.Query().Get("buckets")
|
||||
bucketsRequested := strings.Split(bucketsRequestedString, ",")
|
||||
go func() {
|
||||
defer close(reportCh)
|
||||
for {
|
||||
reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
time.Sleep(2 * time.Second)
|
||||
case reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...):
|
||||
time.Sleep(time.Duration(rnd.Float64() * float64(2*time.Second)))
|
||||
}
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case report := <-reportCh:
|
||||
enc := json.NewEncoder(w)
|
||||
err := enc.Encode(report)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
case report, ok := <-reportCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err := json.NewEncoder(w).Encode(report); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
w.(http.Flusher).Flush()
|
||||
|
|
|
@ -298,6 +298,12 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
|
|||
return
|
||||
}
|
||||
|
||||
// Anonymous users, should be rejected.
|
||||
if cred.AccessKey == "" {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrAccessDenied), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
// If etcd, dns federation configured list buckets from etcd.
|
||||
var bucketsInfo []BucketInfo
|
||||
if globalDNSConfig != nil && globalBucketFederation {
|
||||
|
@ -496,7 +502,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
|||
object.PurgeTransitioned = goi.TransitionStatus
|
||||
}
|
||||
if replicateDeletes {
|
||||
delMarker, replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
|
||||
replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
|
||||
ObjectName: object.ObjectName,
|
||||
VersionID: object.VersionID,
|
||||
}, goi, gerr)
|
||||
|
@ -511,9 +517,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
|||
}
|
||||
if object.VersionID != "" {
|
||||
object.VersionPurgeStatus = Pending
|
||||
if delMarker {
|
||||
object.DeleteMarkerVersionID = object.VersionID
|
||||
}
|
||||
} else {
|
||||
object.DeleteMarkerReplicationStatus = string(replication.Pending)
|
||||
}
|
||||
|
@ -557,13 +560,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
|||
})
|
||||
deletedObjects := make([]DeletedObject, len(deleteObjects.Objects))
|
||||
for i := range errs {
|
||||
dindex := objectsToDelete[ObjectToDelete{
|
||||
// DeleteMarkerVersionID is not used specifically to avoid
|
||||
// lookup errors, since DeleteMarkerVersionID is only
|
||||
// created during DeleteMarker creation when client didn't
|
||||
// specify a versionID.
|
||||
objToDel := ObjectToDelete{
|
||||
ObjectName: dObjects[i].ObjectName,
|
||||
VersionID: dObjects[i].VersionID,
|
||||
VersionPurgeStatus: dObjects[i].VersionPurgeStatus,
|
||||
DeleteMarkerReplicationStatus: dObjects[i].DeleteMarkerReplicationStatus,
|
||||
PurgeTransitioned: dObjects[i].PurgeTransitioned,
|
||||
}]
|
||||
}
|
||||
dindex := objectsToDelete[objToDel]
|
||||
if errs[i] == nil || isErrObjectNotFound(errs[i]) || isErrVersionNotFound(errs[i]) {
|
||||
if replicateDeletes {
|
||||
dObjects[i].DeleteMarkerReplicationStatus = deleteList[i].DeleteMarkerReplicationStatus
|
||||
|
@ -619,12 +627,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
|||
|
||||
eventName := event.ObjectRemovedDelete
|
||||
objInfo := ObjectInfo{
|
||||
Name: dobj.ObjectName,
|
||||
VersionID: dobj.VersionID,
|
||||
Name: dobj.ObjectName,
|
||||
VersionID: dobj.VersionID,
|
||||
DeleteMarker: dobj.DeleteMarker,
|
||||
}
|
||||
|
||||
if dobj.DeleteMarker {
|
||||
objInfo.DeleteMarker = dobj.DeleteMarker
|
||||
if objInfo.DeleteMarker {
|
||||
objInfo.VersionID = dobj.DeleteMarkerVersionID
|
||||
eventName = event.ObjectRemovedDeleteMarkerCreated
|
||||
}
|
||||
|
|
|
@ -530,7 +530,7 @@ type SelectParameters struct {
|
|||
|
||||
// IsEmpty returns true if no select parameters set
|
||||
func (sp *SelectParameters) IsEmpty() bool {
|
||||
return sp == nil || sp.S3Select == s3select.S3Select{}
|
||||
return sp == nil
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
|
@ -83,17 +83,38 @@ func getConditionValues(r *http.Request, lc string, username string, claims map[
|
|||
}
|
||||
}
|
||||
|
||||
authType := getRequestAuthType(r)
|
||||
var signatureVersion string
|
||||
switch authType {
|
||||
case authTypeSignedV2, authTypePresignedV2:
|
||||
signatureVersion = signV2Algorithm
|
||||
case authTypeSigned, authTypePresigned, authTypeStreamingSigned, authTypePostPolicy:
|
||||
signatureVersion = signV4Algorithm
|
||||
}
|
||||
|
||||
var authtype string
|
||||
switch authType {
|
||||
case authTypePresignedV2, authTypePresigned:
|
||||
authtype = "REST-QUERY-STRING"
|
||||
case authTypeSignedV2, authTypeSigned, authTypeStreamingSigned:
|
||||
authtype = "REST-HEADER"
|
||||
case authTypePostPolicy:
|
||||
authtype = "POST"
|
||||
}
|
||||
|
||||
args := map[string][]string{
|
||||
"CurrentTime": {currTime.Format(time.RFC3339)},
|
||||
"EpochTime": {strconv.FormatInt(currTime.Unix(), 10)},
|
||||
"SecureTransport": {strconv.FormatBool(r.TLS != nil)},
|
||||
"SourceIp": {handlers.GetSourceIP(r)},
|
||||
"UserAgent": {r.UserAgent()},
|
||||
"Referer": {r.Referer()},
|
||||
"principaltype": {principalType},
|
||||
"userid": {username},
|
||||
"username": {username},
|
||||
"versionid": {vid},
|
||||
"CurrentTime": {currTime.Format(time.RFC3339)},
|
||||
"EpochTime": {strconv.FormatInt(currTime.Unix(), 10)},
|
||||
"SecureTransport": {strconv.FormatBool(r.TLS != nil)},
|
||||
"SourceIp": {handlers.GetSourceIP(r)},
|
||||
"UserAgent": {r.UserAgent()},
|
||||
"Referer": {r.Referer()},
|
||||
"principaltype": {principalType},
|
||||
"userid": {username},
|
||||
"username": {username},
|
||||
"versionid": {vid},
|
||||
"signatureversion": {signatureVersion},
|
||||
"authType": {authtype},
|
||||
}
|
||||
|
||||
if lc != "" {
|
||||
|
|
|
@ -175,10 +175,10 @@ func isStandardHeader(matchHeaderKey string) bool {
|
|||
}
|
||||
|
||||
// returns whether object version is a deletemarker and if object qualifies for replication
|
||||
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) {
|
||||
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (replicate, sync bool) {
|
||||
rcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil || rcfg == nil {
|
||||
return false, false, sync
|
||||
return false, sync
|
||||
}
|
||||
opts := replication.ObjectOpts{
|
||||
Name: dobj.ObjectName,
|
||||
|
@ -198,19 +198,19 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
|
|||
validReplStatus = true
|
||||
}
|
||||
if oi.DeleteMarker && (validReplStatus || replicate) {
|
||||
return oi.DeleteMarker, true, sync
|
||||
return true, sync
|
||||
}
|
||||
// can be the case that other cluster is down and duplicate `mc rm --vid`
|
||||
// is issued - this still needs to be replicated back to the other target
|
||||
return oi.DeleteMarker, oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
|
||||
return oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
|
||||
}
|
||||
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
|
||||
// the target online status should not be used here while deciding
|
||||
// whether to replicate deletes as the target could be temporarily down
|
||||
if tgt == nil {
|
||||
return oi.DeleteMarker, false, false
|
||||
return false, false
|
||||
}
|
||||
return oi.DeleteMarker, replicate, tgt.replicateSync
|
||||
return replicate, tgt.replicateSync
|
||||
}
|
||||
|
||||
// replicate deletes to the designated replication target if replication configuration
|
||||
|
@ -697,19 +697,25 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
|
|||
if totalNodesCount == 0 {
|
||||
totalNodesCount = 1 // For standalone erasure coding
|
||||
}
|
||||
b := target.BandwidthLimit / int64(totalNodesCount)
|
||||
|
||||
var headerSize int
|
||||
for k, v := range putOpts.Header() {
|
||||
headerSize += len(k) + len(v)
|
||||
}
|
||||
|
||||
// r takes over closing gr.
|
||||
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
|
||||
opts := &bandwidth.MonitorReaderOptions{
|
||||
Bucket: objInfo.Bucket,
|
||||
Object: objInfo.Name,
|
||||
HeaderSize: headerSize,
|
||||
BandwidthBytesPerSec: target.BandwidthLimit / int64(totalNodesCount),
|
||||
ClusterBandwidth: target.BandwidthLimit,
|
||||
}
|
||||
|
||||
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, gr, opts)
|
||||
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
||||
replicationStatus = replication.Failed
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
}
|
||||
defer r.Close()
|
||||
}
|
||||
|
||||
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
|
||||
|
|
|
@ -100,7 +100,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
|||
if minio.ToErrorResponse(err).Code == "NoSuchBucket" {
|
||||
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket}
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
|
||||
}
|
||||
if tgt.Type == madmin.ReplicationService {
|
||||
if !globalIsErasure {
|
||||
|
@ -111,7 +111,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
|||
}
|
||||
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
|
||||
if err != nil {
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket}
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
|
||||
}
|
||||
if vcfg.Status != string(versioning.Enabled) {
|
||||
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}
|
||||
|
@ -124,7 +124,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
|||
if minio.ToErrorResponse(err).Code == "NoSuchBucket" {
|
||||
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket}
|
||||
return BucketRemoteConnectionErr{Bucket: tgt.TargetBucket, Err: err}
|
||||
}
|
||||
if vcfg.Status != string(versioning.Enabled) {
|
||||
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}
|
||||
|
|
|
@ -428,3 +428,13 @@ func getTLSConfig() (x509Certs []*x509.Certificate, manager *certs.Manager, secu
|
|||
secureConn = true
|
||||
return x509Certs, manager, secureConn, nil
|
||||
}
|
||||
|
||||
// contextCanceled returns whether a context is canceled.
|
||||
func contextCanceled(ctx context.Context) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -797,42 +797,39 @@ type actionMeta struct {
|
|||
|
||||
var applyActionsLogPrefix = color.Green("applyActions:")
|
||||
|
||||
// applyActions will apply lifecycle checks on to a scanned item.
|
||||
// The resulting size on disk will always be returned.
|
||||
// The metadata will be compared to consensus on the object layer before any changes are applied.
|
||||
// If no metadata is supplied, -1 is returned if no action is taken.
|
||||
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
|
||||
func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
|
||||
if i.debug {
|
||||
if meta.oi.VersionID != "" {
|
||||
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), meta.oi.VersionID)
|
||||
} else {
|
||||
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
|
||||
}
|
||||
}
|
||||
healOpts := madmin.HealOpts{Remove: healDeleteDangling}
|
||||
if meta.bitRotScan {
|
||||
healOpts.ScanMode = madmin.HealDeepScan
|
||||
}
|
||||
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts)
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
return 0
|
||||
}
|
||||
if err != nil && !errors.Is(err, NotImplemented{}) {
|
||||
logger.LogIf(ctx, err)
|
||||
return 0
|
||||
}
|
||||
return res.ObjectSize
|
||||
}
|
||||
|
||||
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta actionMeta) (applied bool, size int64) {
|
||||
size, err := meta.oi.GetActualSize()
|
||||
if i.debug {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
if i.heal {
|
||||
if i.debug {
|
||||
if meta.oi.VersionID != "" {
|
||||
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), meta.oi.VersionID)
|
||||
} else {
|
||||
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
|
||||
}
|
||||
}
|
||||
healOpts := madmin.HealOpts{Remove: healDeleteDangling}
|
||||
if meta.bitRotScan {
|
||||
healOpts.ScanMode = madmin.HealDeepScan
|
||||
}
|
||||
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts)
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
return 0
|
||||
}
|
||||
if err != nil && !errors.Is(err, NotImplemented{}) {
|
||||
logger.LogIf(ctx, err)
|
||||
return 0
|
||||
}
|
||||
size = res.ObjectSize
|
||||
}
|
||||
if i.lifeCycle == nil {
|
||||
if i.debug {
|
||||
console.Debugf(applyActionsLogPrefix+" no lifecycle rules to apply: %q\n", i.objectPath())
|
||||
}
|
||||
return size
|
||||
return false, size
|
||||
}
|
||||
|
||||
versionID := meta.oi.VersionID
|
||||
|
@ -866,7 +863,7 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta acti
|
|||
if i.debug {
|
||||
console.Debugf(applyActionsLogPrefix+" object not expirable: %q\n", i.objectPath())
|
||||
}
|
||||
return size
|
||||
return false, size
|
||||
}
|
||||
|
||||
obj, err := o.GetObjectInfo(ctx, i.bucket, i.objectPath(), ObjectOptions{
|
||||
|
@ -878,19 +875,18 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta acti
|
|||
if !obj.DeleteMarker { // if this is not a delete marker log and return
|
||||
// Do nothing - heal in the future.
|
||||
logger.LogIf(ctx, err)
|
||||
return size
|
||||
return false, size
|
||||
}
|
||||
case ObjectNotFound, VersionNotFound:
|
||||
// object not found or version not found return 0
|
||||
return 0
|
||||
return false, 0
|
||||
default:
|
||||
// All other errors proceed.
|
||||
logger.LogIf(ctx, err)
|
||||
return size
|
||||
return false, size
|
||||
}
|
||||
}
|
||||
|
||||
var applied bool
|
||||
action = evalActionFromLifecycle(ctx, *i.lifeCycle, obj, i.debug)
|
||||
if action != lifecycle.NoneAction {
|
||||
applied = applyLifecycleAction(ctx, action, o, obj)
|
||||
|
@ -899,9 +895,26 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta acti
|
|||
if applied {
|
||||
switch action {
|
||||
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
||||
default: // for all lifecycle actions that remove data
|
||||
return 0
|
||||
return true, size
|
||||
}
|
||||
// For all other lifecycle actions that remove data
|
||||
return true, 0
|
||||
}
|
||||
|
||||
return false, size
|
||||
}
|
||||
|
||||
// applyActions will apply lifecycle checks on to a scanned item.
|
||||
// The resulting size on disk will always be returned.
|
||||
// The metadata will be compared to consensus on the object layer before any changes are applied.
|
||||
// If no metadata is supplied, -1 is returned if no action is taken.
|
||||
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) int64 {
|
||||
applied, size := i.applyLifecycle(ctx, o, meta)
|
||||
// For instance, an applied lifecycle means we remove/transitioned an object
|
||||
// from the current deployment, which means we don't have to call healing
|
||||
// routine even if we are asked to do via heal flag.
|
||||
if !applied && i.heal {
|
||||
size = i.applyHealing(ctx, o, meta)
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
|
|
@ -485,7 +485,7 @@ type objectIO interface {
|
|||
// Only backend errors are returned as errors.
|
||||
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
|
||||
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, noLock, ObjectOptions{})
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
|
@ -522,7 +522,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
|
|||
dataUsageBucket,
|
||||
name,
|
||||
NewPutObjReader(r),
|
||||
ObjectOptions{NoLock: true})
|
||||
ObjectOptions{})
|
||||
if isErrBucketNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -115,7 +115,11 @@ func (er erasureObjects) renameAll(ctx context.Context, bucket, prefix string) {
|
|||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
disk.RenameFile(ctx, bucket, prefix, minioMetaTmpBucket, mustGetUUID())
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
disk.RenameFile(ctx, bucket, prefix, minioMetaTmpDeletedBucket, mustGetUUID())
|
||||
}(disk)
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -130,6 +134,10 @@ func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
|
|||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
disk.Delete(ctx, bucket, prefix, true)
|
||||
}(disk)
|
||||
}
|
||||
|
@ -494,6 +502,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||
closeBitrotWriters(writers)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
|
@ -528,6 +537,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
||||
onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||
}
|
||||
|
||||
|
@ -544,7 +554,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
// Pick one from the first valid metadata.
|
||||
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
|
||||
if err != nil {
|
||||
return pi, err
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Once part is successfully committed, proceed with updating erasure metadata.
|
||||
|
@ -571,6 +582,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
|
||||
// Writes update `xl.meta` format for each disk.
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
|
|
|
@ -447,6 +447,12 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||
return fi, nil, nil, err
|
||||
}
|
||||
|
||||
// if one of the disk is offline, return right here no need
|
||||
// to attempt a heal on the object.
|
||||
if countErrs(errs, errDiskNotFound) > 0 {
|
||||
return fi, metaArr, onlineDisks, nil
|
||||
}
|
||||
|
||||
var missingBlocks int
|
||||
for i, err := range errs {
|
||||
if err != nil && errors.Is(err, errFileNotFound) {
|
||||
|
@ -801,11 +807,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Rename the successfully written temporary object to final location.
|
||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
|
|
|
@ -963,6 +963,21 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int {
|
|||
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
||||
var loi ListObjectsInfo
|
||||
|
||||
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
|
||||
// Optimization for certain applications like
|
||||
// - Cohesity
|
||||
// - Actifio, Splunk etc.
|
||||
// which send ListObjects requests where the actual object
|
||||
// itself is the prefix and max-keys=1 in such scenarios
|
||||
// we can simply verify locally if such an object exists
|
||||
// to avoid the need for ListObjects().
|
||||
objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
|
||||
if err == nil {
|
||||
loi.Objects = append(loi.Objects, objInfo)
|
||||
return loi, nil
|
||||
}
|
||||
}
|
||||
|
||||
merged, err := z.listPath(ctx, listPathOptions{
|
||||
Bucket: bucket,
|
||||
Prefix: prefix,
|
||||
|
@ -1292,18 +1307,6 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo
|
|||
return nil
|
||||
}
|
||||
|
||||
// deleteAll will delete a bucket+prefix unconditionally across all disks.
|
||||
// Note that set distribution is ignored so it should only be used in cases where
|
||||
// data is not distributed across sets.
|
||||
// Errors are logged but individual disk failures are not returned.
|
||||
func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix string) {
|
||||
for _, servers := range z.serverPools {
|
||||
for _, set := range servers.sets {
|
||||
set.deleteAll(ctx, bucket, prefix)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// renameAll will rename bucket+prefix unconditionally across all disks to
|
||||
// minioMetaTmpBucket + unique uuid,
|
||||
// Note that set distribution is ignored so it should only be used in cases where
|
||||
|
|
|
@ -250,8 +250,8 @@ func (s *erasureSets) connectDisks() {
|
|||
}
|
||||
disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex)
|
||||
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
|
||||
s.erasureDisksMu.Unlock()
|
||||
setsJustConnected[setIndex] = true
|
||||
s.erasureDisksMu.Unlock()
|
||||
}(endpoint)
|
||||
}
|
||||
|
||||
|
|
|
@ -435,6 +435,11 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
|
|||
}
|
||||
}()
|
||||
|
||||
// Shuffle disks to ensure a total randomness of bucket/disk association to ensure
|
||||
// that objects that are not present in all disks are accounted and ILM applied.
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
r.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] })
|
||||
|
||||
// Start one scanner per disk
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(disks))
|
||||
|
|
|
@ -233,10 +233,15 @@ func extractReqParams(r *http.Request) map[string]string {
|
|||
region := globalServerRegion
|
||||
cred := getReqAccessCred(r, region)
|
||||
|
||||
principalID := cred.AccessKey
|
||||
if cred.ParentUser != "" {
|
||||
principalID = cred.ParentUser
|
||||
}
|
||||
|
||||
// Success.
|
||||
m := map[string]string{
|
||||
"region": region,
|
||||
"accessKey": cred.AccessKey,
|
||||
"principalId": principalID,
|
||||
"sourceIPAddress": handlers.GetSourceIP(r),
|
||||
// Add more fields here.
|
||||
}
|
||||
|
|
15
cmd/iam.go
15
cmd/iam.go
|
@ -1704,7 +1704,7 @@ func (sys *IAMSys) PolicyDBGet(name string, isGroup bool, groups ...string) ([]s
|
|||
// information in IAM (i.e sys.iam*Map) - this info is stored only in the STS
|
||||
// generated credentials. Thus we skip looking up group memberships, user map,
|
||||
// and group map and check the appropriate policy maps directly.
|
||||
func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
|
||||
func (sys *IAMSys) policyDBGet(name string, isGroup bool) (policies []string, err error) {
|
||||
if isGroup {
|
||||
if sys.usersSysType == MinIOUsersSysType {
|
||||
g, ok := sys.iamGroupsMap[name]
|
||||
|
@ -1719,8 +1719,7 @@ func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
mp := sys.iamGroupPolicyMap[name]
|
||||
return mp.toSlice(), nil
|
||||
return sys.iamGroupPolicyMap[name].toSlice(), nil
|
||||
}
|
||||
|
||||
var u auth.Credentials
|
||||
|
@ -1738,8 +1737,6 @@ func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
var policies []string
|
||||
|
||||
mp, ok := sys.iamUserPolicyMap[name]
|
||||
if !ok {
|
||||
if u.ParentUser != "" {
|
||||
|
@ -1757,8 +1754,7 @@ func (sys *IAMSys) policyDBGet(name string, isGroup bool) ([]string, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
p := sys.iamGroupPolicyMap[group]
|
||||
policies = append(policies, p.toSlice()...)
|
||||
policies = append(policies, sys.iamGroupPolicyMap[group].toSlice()...)
|
||||
}
|
||||
|
||||
return policies, nil
|
||||
|
@ -1788,8 +1784,9 @@ func (sys *IAMSys) IsAllowedServiceAccount(args iampolicy.Args, parent string) b
|
|||
}
|
||||
|
||||
// Check policy for this service account.
|
||||
svcPolicies, err := sys.PolicyDBGet(args.AccountName, false)
|
||||
svcPolicies, err := sys.PolicyDBGet(parent, false, args.Groups...)
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -2072,7 +2069,7 @@ func (sys *IAMSys) IsAllowed(args iampolicy.Args) bool {
|
|||
}
|
||||
|
||||
// Continue with the assumption of a regular user
|
||||
policies, err := sys.PolicyDBGet(args.AccountName, false)
|
||||
policies, err := sys.PolicyDBGet(args.AccountName, false, args.Groups...)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -81,6 +81,15 @@ type MapClaims struct {
|
|||
jwtgo.MapClaims
|
||||
}
|
||||
|
||||
// GetAccessKey will return the access key.
|
||||
// If nil an empty string will be returned.
|
||||
func (c *MapClaims) GetAccessKey() string {
|
||||
if c == nil {
|
||||
return ""
|
||||
}
|
||||
return c.AccessKey
|
||||
}
|
||||
|
||||
// NewStandardClaims - initializes standard claims
|
||||
func NewStandardClaims() *StandardClaims {
|
||||
return &StandardClaims{}
|
||||
|
|
|
@ -811,6 +811,9 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
|||
if len(disks) == 0 {
|
||||
return fmt.Errorf("listPathRaw: 0 drives provided")
|
||||
}
|
||||
// Cancel upstream if we finish before we expect.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
askDisks := len(disks)
|
||||
readers := make([]*metacacheReader, askDisks)
|
||||
|
@ -821,6 +824,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Make sure we close the pipe so blocked writes doesn't stay around.
|
||||
defer r.CloseWithError(context.Canceled)
|
||||
// Send request to each disk.
|
||||
go func() {
|
||||
werr := d.WalkDir(ctx, WalkDirOptions{
|
||||
|
@ -832,7 +837,10 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
|||
ForwardTo: opts.forwardTo,
|
||||
}, w)
|
||||
w.CloseWithError(werr)
|
||||
if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() {
|
||||
if werr != io.EOF && werr != nil &&
|
||||
werr.Error() != errFileNotFound.Error() &&
|
||||
werr.Error() != errVolumeNotFound.Error() &&
|
||||
!errors.Is(werr, context.Canceled) {
|
||||
logger.LogIf(ctx, werr)
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -230,7 +230,8 @@ func (w *metacacheWriter) Reset(out io.Writer) {
|
|||
}
|
||||
|
||||
var s2DecPool = sync.Pool{New: func() interface{} {
|
||||
return s2.NewReader(nil)
|
||||
// Default alloc block for network transfer.
|
||||
return s2.NewReader(nil, s2.ReaderAllocBlock(16<<10))
|
||||
}}
|
||||
|
||||
// metacacheReader allows reading a cache stream.
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
xioutil "github.com/minio/minio/pkg/ioutil"
|
||||
)
|
||||
|
@ -107,7 +108,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
forward := opts.ForwardTo
|
||||
var scanDir func(path string) error
|
||||
scanDir = func(current string) error {
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
s.walkMu.Lock()
|
||||
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
|
||||
s.walkMu.Unlock()
|
||||
if err != nil {
|
||||
// Folder could have gone away in-between
|
||||
if err != errVolumeNotFound && err != errFileNotFound {
|
||||
|
@ -142,10 +148,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
// Do do not retain the file.
|
||||
entries[i] = ""
|
||||
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
// If root was an object return it as such.
|
||||
if HasSuffix(entry, xlStorageFormatFile) {
|
||||
var meta metaCacheEntry
|
||||
s.walkMu.Lock()
|
||||
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry))
|
||||
s.walkMu.Unlock()
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
|
@ -160,7 +171,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
// Check legacy.
|
||||
if HasSuffix(entry, xlStorageFormatFileV1) {
|
||||
var meta metaCacheEntry
|
||||
s.walkMu.Lock()
|
||||
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry))
|
||||
s.walkMu.Unlock()
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
|
@ -183,6 +196,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
if entry == "" {
|
||||
continue
|
||||
}
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
meta := metaCacheEntry{name: PathJoin(current, entry)}
|
||||
|
||||
// If directory entry on stack before this, pop it now.
|
||||
|
@ -207,7 +223,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash
|
||||
}
|
||||
|
||||
s.walkMu.Lock()
|
||||
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFile))
|
||||
s.walkMu.Unlock()
|
||||
switch {
|
||||
case err == nil:
|
||||
// It was an object
|
||||
|
@ -216,7 +234,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
}
|
||||
out <- meta
|
||||
case osIsNotExist(err):
|
||||
s.walkMu.Lock()
|
||||
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
|
||||
s.walkMu.Unlock()
|
||||
if err == nil {
|
||||
// Maybe rename? Would make it inconsistent across disks though.
|
||||
// os.Rename(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1), pathJoin(volumeDir, meta.name, xlStorageFormatFile))
|
||||
|
@ -282,6 +302,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
|
|||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return waitForHTTPStream(respBody, wr)
|
||||
}
|
||||
|
||||
|
@ -300,7 +321,7 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques
|
|||
}
|
||||
|
||||
var reportNotFound bool
|
||||
if v := vars[storageRESTReportNotFound]; v != "" {
|
||||
if v := r.URL.Query().Get(storageRESTReportNotFound); v != "" {
|
||||
reportNotFound, err = strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
|
|
|
@ -18,7 +18,6 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
|
@ -238,14 +237,10 @@ func (m *metacache) delete(ctx context.Context) {
|
|||
logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id))
|
||||
}
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
|
||||
return
|
||||
if objAPI != nil {
|
||||
ez, ok := objAPI.(*erasureServerPools)
|
||||
if ok {
|
||||
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
|
||||
}
|
||||
}
|
||||
ez, ok := objAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools"))
|
||||
return
|
||||
}
|
||||
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
|
||||
}
|
||||
|
|
|
@ -1368,7 +1368,7 @@ func (args eventArgs) ToEvent(escape bool) event.Event {
|
|||
AwsRegion: args.ReqParams["region"],
|
||||
EventTime: eventTime.Format(event.AMZTimeFormat),
|
||||
EventName: args.EventName,
|
||||
UserIdentity: event.Identity{PrincipalID: args.ReqParams["accessKey"]},
|
||||
UserIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
|
||||
RequestParameters: args.ReqParams,
|
||||
ResponseElements: respElements,
|
||||
S3: event.Metadata{
|
||||
|
@ -1376,7 +1376,7 @@ func (args eventArgs) ToEvent(escape bool) event.Event {
|
|||
ConfigurationID: "Config",
|
||||
Bucket: event.Bucket{
|
||||
Name: args.BucketName,
|
||||
OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["accessKey"]},
|
||||
OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
|
||||
ARN: policy.ResourceARNPrefix + args.BucketName,
|
||||
},
|
||||
Object: event.Object{
|
||||
|
|
|
@ -426,7 +426,7 @@ func (e BucketRemoteTargetNotFound) Error() string {
|
|||
type BucketRemoteConnectionErr GenericError
|
||||
|
||||
func (e BucketRemoteConnectionErr) Error() string {
|
||||
return "Remote service endpoint or target bucket not available: " + e.Bucket
|
||||
return fmt.Sprintf("Remote service endpoint or target bucket not available: %s \n\t%s", e.Bucket, e.Err.Error())
|
||||
}
|
||||
|
||||
// BucketRemoteAlreadyExists remote already exists for this target type.
|
||||
|
|
|
@ -814,13 +814,7 @@ func (g *GetObjectReader) Close() error {
|
|||
|
||||
// Read - to implement Reader interface.
|
||||
func (g *GetObjectReader) Read(p []byte) (n int, err error) {
|
||||
n, err = g.pReader.Read(p)
|
||||
if err != nil {
|
||||
// Calling code may not Close() in case of error, so
|
||||
// we ensure it.
|
||||
g.Close()
|
||||
}
|
||||
return
|
||||
return g.pReader.Read(p)
|
||||
}
|
||||
|
||||
//SealMD5CurrFn seals md5sum with object encryption key and returns sealed
|
||||
|
|
|
@ -2817,7 +2817,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
|
|||
VersionID: opts.VersionID,
|
||||
})
|
||||
}
|
||||
_, replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
|
||||
|
||||
replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
|
||||
if replicateDel {
|
||||
if opts.VersionID != "" {
|
||||
opts.VersionPurgeStatus = Pending
|
||||
|
@ -2825,6 +2826,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
|
|||
opts.DeleteMarkerReplicationStatus = string(replication.Pending)
|
||||
}
|
||||
}
|
||||
|
||||
vID := opts.VersionID
|
||||
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
|
||||
// check if replica has permission to be deleted.
|
||||
|
|
|
@ -127,6 +127,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
|||
}
|
||||
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery))
|
||||
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
|
||||
req.Header.Set("Expect", "100-continue") // set expect continue to honor expect continue timeouts
|
||||
if length > 0 {
|
||||
req.ContentLength = length
|
||||
}
|
||||
|
|
|
@ -207,7 +207,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||
pw.CloseWithError(cache.serializeTo(pw))
|
||||
}()
|
||||
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
pr.Close()
|
||||
return cache, err
|
||||
|
@ -221,7 +221,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||
}()
|
||||
err = newCache.deserialize(pr)
|
||||
pr.CloseWithError(err)
|
||||
return newCache, err
|
||||
return newCache, toStorageErr(err)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) GetDiskID() (string, error) {
|
||||
|
@ -238,6 +238,14 @@ func (client *storageRESTClient) SetDiskID(id string) {
|
|||
|
||||
// DiskInfo - fetch disk information for a remote disk.
|
||||
func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
|
||||
if !client.IsOnline() {
|
||||
// make sure to check if the disk is offline, since the underlying
|
||||
// value is cached we should attempt to invalidate it if such calls
|
||||
// were attempted. This can lead to false success under certain conditions
|
||||
// - this change attempts to avoid stale information if the underlying
|
||||
// transport is already down.
|
||||
return info, errDiskNotFound
|
||||
}
|
||||
client.diskInfoCache.Once.Do(func() {
|
||||
client.diskInfoCache.TTL = time.Second
|
||||
client.diskInfoCache.Update = func() (interface{}, error) {
|
||||
|
@ -247,7 +255,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
|
|||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err = msgp.Decode(respBody, &info); err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
@ -270,7 +278,7 @@ func (client *storageRESTClient) MakeVolBulk(ctx context.Context, volumes ...str
|
|||
values := make(url.Values)
|
||||
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
|
||||
respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -279,7 +287,7 @@ func (client *storageRESTClient) MakeVol(ctx context.Context, volume string) (er
|
|||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -289,7 +297,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
vinfos := VolsInfo(vols)
|
||||
err = msgp.Decode(respBody, &vinfos)
|
||||
return vinfos, err
|
||||
|
@ -303,7 +311,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
err = msgp.Decode(respBody, &vol)
|
||||
return vol, err
|
||||
}
|
||||
|
@ -316,7 +324,7 @@ func (client *storageRESTClient) DeleteVol(ctx context.Context, volume string, f
|
|||
values.Set(storageRESTForceDelete, "true")
|
||||
}
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -327,7 +335,7 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string,
|
|||
values.Set(storageRESTFilePath, path)
|
||||
reader := bytes.NewReader(buf)
|
||||
respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -337,12 +345,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
|
|||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
||||
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
waitReader, err := waitForHTTPResponse(respBody)
|
||||
defer http.DrainBody(ioutil.NopCloser(waitReader))
|
||||
defer respBody.Close()
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -357,7 +360,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -373,7 +376,7 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -383,7 +386,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
|
|||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b)))
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -393,7 +396,7 @@ func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, p
|
|||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -410,7 +413,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -424,7 +427,6 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
|||
values.Set(storageRESTDstPath, dstPath)
|
||||
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -454,13 +456,13 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
|
|||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
|
||||
dec := msgpNewReader(respBody)
|
||||
defer readMsgpReaderPool.Put(dec)
|
||||
|
||||
err = fi.DecodeMsg(dec)
|
||||
return fi, err
|
||||
return fi, toStorageErr(err)
|
||||
}
|
||||
|
||||
// ReadAll - reads all contents of a file.
|
||||
|
@ -472,7 +474,7 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return ioutil.ReadAll(respBody)
|
||||
}
|
||||
|
||||
|
@ -483,11 +485,7 @@ func (client *storageRESTClient) ReadFileStream(ctx context.Context, volume, pat
|
|||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTOffset, strconv.Itoa(int(offset)))
|
||||
values.Set(storageRESTLength, strconv.Itoa(int(length)))
|
||||
respBody, err := client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return respBody, nil
|
||||
return client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1)
|
||||
}
|
||||
|
||||
// ReadFile - reads section of a file.
|
||||
|
@ -508,9 +506,9 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
n, err := io.ReadFull(respBody, buf)
|
||||
return int64(n), err
|
||||
return int64(n), toStorageErr(err)
|
||||
}
|
||||
|
||||
// ListDir - lists a directory.
|
||||
|
@ -523,9 +521,9 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&entries)
|
||||
return entries, err
|
||||
return entries, toStorageErr(err)
|
||||
}
|
||||
|
||||
// DeleteFile - deletes a file.
|
||||
|
@ -536,7 +534,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path
|
|||
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -560,7 +558,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
|||
errs = make([]error, len(versions))
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
for i := range errs {
|
||||
errs[i] = err
|
||||
|
@ -571,7 +569,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
|||
reader, err := waitForHTTPResponse(respBody)
|
||||
if err != nil {
|
||||
for i := range errs {
|
||||
errs[i] = err
|
||||
errs[i] = toStorageErr(err)
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
@ -599,7 +597,7 @@ func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcP
|
|||
values.Set(storageRESTDstVolume, dstVolume)
|
||||
values.Set(storageRESTDstPath, dstPath)
|
||||
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -614,19 +612,19 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
respReader, err := waitForHTTPResponse(respBody)
|
||||
if err != nil {
|
||||
return err
|
||||
return toStorageErr(err)
|
||||
}
|
||||
|
||||
verifyResp := &VerifyFileResp{}
|
||||
if err = gob.NewDecoder(respReader).Decode(verifyResp); err != nil {
|
||||
return err
|
||||
return toStorageErr(err)
|
||||
}
|
||||
|
||||
return toStorageErr(verifyResp.Err)
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v29" // Removed WalkVersions()
|
||||
storageRESTVersion = "v30" // CreateFile is back to non-streaming
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
|
|
@ -288,8 +288,10 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
|
|||
return
|
||||
}
|
||||
|
||||
done := keepHTTPResponseAlive(w)
|
||||
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body))
|
||||
err = s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteVersion delete updated metadata.
|
||||
|
@ -693,6 +695,7 @@ func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
|
|||
if doneCh == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Indicate we are ready to write.
|
||||
doneCh <- err
|
||||
|
||||
|
@ -732,23 +735,6 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// drainCloser can be used for wrapping an http response.
|
||||
// It will drain the body before closing.
|
||||
type drainCloser struct {
|
||||
rc io.ReadCloser
|
||||
}
|
||||
|
||||
// Read forwards the read operation.
|
||||
func (f drainCloser) Read(p []byte) (n int, err error) {
|
||||
return f.rc.Read(p)
|
||||
}
|
||||
|
||||
// Close drains the body and closes the upstream.
|
||||
func (f drainCloser) Close() error {
|
||||
xhttp.DrainBody(f.rc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// httpStreamResponse allows streaming a response, but still send an error.
|
||||
type httpStreamResponse struct {
|
||||
done chan error
|
||||
|
@ -840,7 +826,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||
case 0:
|
||||
// 0 is unbuffered, copy the rest.
|
||||
_, err := io.Copy(w, respBody)
|
||||
respBody.Close()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
|
@ -850,18 +835,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
respBody.Close()
|
||||
return errors.New(string(errorText))
|
||||
case 3:
|
||||
// gob style is already deprecated, we can remove this when
|
||||
// storage API version will be greater or equal to 23.
|
||||
defer respBody.Close()
|
||||
dec := gob.NewDecoder(respBody)
|
||||
var err error
|
||||
if de := dec.Decode(&err); de == nil {
|
||||
return err
|
||||
}
|
||||
return errors.New("rpc error")
|
||||
case 2:
|
||||
// Block of data
|
||||
var tmp [4]byte
|
||||
|
@ -878,7 +852,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||
case 32:
|
||||
continue
|
||||
default:
|
||||
go xhttp.DrainBody(respBody)
|
||||
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -455,7 +455,7 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
|
|||
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||
IdleConnTimeout: 15 * time.Second,
|
||||
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
TLSHandshakeTimeout: 15 * time.Second,
|
||||
ExpectContinueTimeout: 15 * time.Second,
|
||||
TLSClientConfig: tlsConfig,
|
||||
|
|
|
@ -226,7 +226,7 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep
|
|||
reply.UIVersion = Version
|
||||
|
||||
reqParams := extractReqParams(r)
|
||||
reqParams["accessKey"] = claims.AccessKey
|
||||
reqParams["accessKey"] = claims.GetAccessKey()
|
||||
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.BucketCreated,
|
||||
|
@ -723,7 +723,7 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
|
|||
)
|
||||
|
||||
reqParams := extractReqParams(r)
|
||||
reqParams["accessKey"] = claims.AccessKey
|
||||
reqParams["accessKey"] = claims.GetAccessKey()
|
||||
sourceIP := handlers.GetSourceIP(r)
|
||||
|
||||
next:
|
||||
|
@ -767,7 +767,7 @@ next:
|
|||
}
|
||||
if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig {
|
||||
goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts)
|
||||
if _, replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{
|
||||
if replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{
|
||||
ObjectName: objectName,
|
||||
VersionID: goi.VersionID,
|
||||
}, goi, gerr); replicateDel {
|
||||
|
@ -903,7 +903,7 @@ next:
|
|||
}
|
||||
}
|
||||
}
|
||||
_, replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
|
||||
replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
|
||||
// since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication
|
||||
objToDel := ObjectToDelete{ObjectName: obj.Name}
|
||||
if replicateDel {
|
||||
|
@ -1340,7 +1340,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
reqParams := extractReqParams(r)
|
||||
reqParams["accessKey"] = claims.AccessKey
|
||||
reqParams["accessKey"] = claims.GetAccessKey()
|
||||
|
||||
// Notify object created event.
|
||||
sendEvent(eventArgs{
|
||||
|
@ -1529,7 +1529,7 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
reqParams := extractReqParams(r)
|
||||
reqParams["accessKey"] = claims.AccessKey
|
||||
reqParams["accessKey"] = claims.GetAccessKey()
|
||||
|
||||
// Notify object accessed via a GET request.
|
||||
sendEvent(eventArgs{
|
||||
|
@ -1684,7 +1684,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
|
|||
defer archive.Close()
|
||||
|
||||
reqParams := extractReqParams(r)
|
||||
reqParams["accessKey"] = claims.AccessKey
|
||||
reqParams["accessKey"] = claims.GetAccessKey()
|
||||
respElements := extractRespElements(w)
|
||||
|
||||
for i, object := range args.Objects {
|
||||
|
|
|
@ -118,6 +118,9 @@ type xlStorage struct {
|
|||
|
||||
ctx context.Context
|
||||
sync.RWMutex
|
||||
|
||||
// mutex to prevent concurrent read operations overloading walks.
|
||||
walkMu sync.Mutex
|
||||
}
|
||||
|
||||
// checkPathLength - returns error if given path name length more than 255
|
||||
|
@ -347,6 +350,8 @@ func (s *xlStorage) IsLocal() bool {
|
|||
|
||||
// Retrieve location indexes.
|
||||
func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
// If unset, see if we can locate it.
|
||||
if s.poolIndex < 0 || s.setIndex < 0 || s.diskIndex < 0 {
|
||||
return getXLDiskLoc(s.diskID)
|
||||
|
@ -1032,11 +1037,13 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||
}
|
||||
|
||||
func (s *xlStorage) readAllData(volumeDir string, filePath string, requireDirectIO bool) (buf []byte, err error) {
|
||||
var f *os.File
|
||||
var r io.ReadCloser
|
||||
if requireDirectIO {
|
||||
var f *os.File
|
||||
f, err = disk.OpenFileDirectIO(filePath, readMode, 0666)
|
||||
r = &odirectReader{f, nil, nil, true, true, s, nil}
|
||||
} else {
|
||||
f, err = OpenFile(filePath, readMode, 0)
|
||||
r, err = OpenFile(filePath, readMode, 0)
|
||||
}
|
||||
if err != nil {
|
||||
if osIsNotExist(err) {
|
||||
|
@ -1071,10 +1078,8 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string, requireDirect
|
|||
return nil, err
|
||||
}
|
||||
|
||||
or := &odirectReader{f, nil, nil, true, true, s, nil}
|
||||
defer or.Close()
|
||||
|
||||
buf, err = ioutil.ReadAll(or)
|
||||
defer r.Close()
|
||||
buf, err = ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
err = osErrToFileErr(err)
|
||||
}
|
||||
|
@ -1247,7 +1252,7 @@ type odirectReader struct {
|
|||
|
||||
// Read - Implements Reader interface.
|
||||
func (o *odirectReader) Read(buf []byte) (n int, err error) {
|
||||
if o.err != nil {
|
||||
if o.err != nil && (len(o.buf) == 0 || o.freshRead) {
|
||||
return 0, o.err
|
||||
}
|
||||
if o.buf == nil {
|
||||
|
@ -1274,20 +1279,22 @@ func (o *odirectReader) Read(buf []byte) (n int, err error) {
|
|||
}
|
||||
}
|
||||
if n == 0 {
|
||||
// err is io.EOF
|
||||
// err is likely io.EOF
|
||||
o.err = err
|
||||
return n, err
|
||||
}
|
||||
o.err = err
|
||||
o.buf = o.buf[:n]
|
||||
o.freshRead = false
|
||||
}
|
||||
if len(buf) >= len(o.buf) {
|
||||
n = copy(buf, o.buf)
|
||||
o.freshRead = true
|
||||
return n, nil
|
||||
return n, o.err
|
||||
}
|
||||
n = copy(buf, o.buf)
|
||||
o.buf = o.buf[n:]
|
||||
// There is more left in buffer, do not return any EOF yet.
|
||||
return n, nil
|
||||
}
|
||||
|
||||
|
@ -1615,6 +1622,9 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.RLock()
|
||||
formatLegacy := s.formatLegacy
|
||||
s.RUnlock()
|
||||
|
||||
var checkFile func(p string) error
|
||||
checkFile = func(p string) error {
|
||||
|
@ -1626,10 +1636,10 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
|
|||
if err := checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
st, _ := Lstat(filePath)
|
||||
if st == nil {
|
||||
if !s.formatLegacy {
|
||||
|
||||
if !formatLegacy {
|
||||
return errPathNotFound
|
||||
}
|
||||
|
||||
|
@ -1880,10 +1890,13 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
|||
legacyPreserved = true
|
||||
}
|
||||
} else {
|
||||
s.RLock()
|
||||
formatLegacy := s.formatLegacy
|
||||
s.RUnlock()
|
||||
// It is possible that some drives may not have `xl.meta` file
|
||||
// in such scenarios verify if atleast `part.1` files exist
|
||||
// to verify for legacy version.
|
||||
if s.formatLegacy {
|
||||
if formatLegacy {
|
||||
// We only need this code if we are moving
|
||||
// from `xl.json` to `xl.meta`, we can avoid
|
||||
// one extra readdir operation here for all
|
||||
|
|
|
@ -123,8 +123,12 @@ func (m *Monitor) getReport(selectBucket SelectionFunction) *bandwidth.Report {
|
|||
if !selectBucket(bucket) {
|
||||
continue
|
||||
}
|
||||
bucketThrottle, ok := m.bucketThrottle[bucket]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
report.BucketStats[bucket] = bandwidth.Details{
|
||||
LimitInBytesPerSecond: m.bucketThrottle[bucket].clusterBandwidth,
|
||||
LimitInBytesPerSecond: bucketThrottle.clusterBandwidth,
|
||||
CurrentBandwidthInBytesPerSecond: bucketMeasurement.getExpMovingAvgBytesPerSecond(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,62 +25,61 @@ import (
|
|||
|
||||
// MonitoredReader monitors the bandwidth
|
||||
type MonitoredReader struct {
|
||||
bucket string // Token to track bucket
|
||||
opts *MonitorReaderOptions
|
||||
bucketMeasurement *bucketMeasurement // bucket measurement object
|
||||
object string // Token to track object
|
||||
reader io.ReadCloser // Reader to wrap
|
||||
reader io.Reader // Reader to wrap
|
||||
lastStop time.Time // Last timestamp for a measurement
|
||||
headerSize int // Size of the header not captured by reader
|
||||
throttle *throttle // throttle the rate at which replication occur
|
||||
monitor *Monitor // Monitor reference
|
||||
closed bool // Reader is closed
|
||||
lastErr error // last error reported, if this non-nil all reads will fail.
|
||||
}
|
||||
|
||||
// NewMonitoredReader returns a io.ReadCloser that reports bandwidth details.
|
||||
// The supplied reader will be closed.
|
||||
func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.ReadCloser, headerSize int, bandwidthBytesPerSecond int64, clusterBandwidth int64) *MonitoredReader {
|
||||
// MonitorReaderOptions provides configurable options for monitor reader implementation.
|
||||
type MonitorReaderOptions struct {
|
||||
Bucket string
|
||||
Object string
|
||||
HeaderSize int
|
||||
BandwidthBytesPerSec int64
|
||||
ClusterBandwidth int64
|
||||
}
|
||||
|
||||
// NewMonitoredReader returns a io.Reader that reports bandwidth details.
|
||||
func NewMonitoredReader(ctx context.Context, monitor *Monitor, reader io.Reader, opts *MonitorReaderOptions) *MonitoredReader {
|
||||
timeNow := time.Now()
|
||||
b := monitor.track(bucket, object, timeNow)
|
||||
b := monitor.track(opts.Bucket, opts.Object, timeNow)
|
||||
return &MonitoredReader{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
opts: opts,
|
||||
bucketMeasurement: b,
|
||||
reader: reader,
|
||||
lastStop: timeNow,
|
||||
headerSize: headerSize,
|
||||
throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond, clusterBandwidth),
|
||||
throttle: monitor.throttleBandwidth(ctx, opts.Bucket, opts.BandwidthBytesPerSec, opts.ClusterBandwidth),
|
||||
monitor: monitor,
|
||||
}
|
||||
}
|
||||
|
||||
// Read wraps the read reader
|
||||
func (m *MonitoredReader) Read(p []byte) (n int, err error) {
|
||||
if m.closed {
|
||||
err = io.ErrClosedPipe
|
||||
if m.lastErr != nil {
|
||||
err = m.lastErr
|
||||
return
|
||||
}
|
||||
|
||||
p = p[:m.throttle.GetLimitForBytes(int64(len(p)))]
|
||||
|
||||
n, err = m.reader.Read(p)
|
||||
stop := time.Now()
|
||||
update := uint64(n + m.headerSize)
|
||||
update := uint64(n + m.opts.HeaderSize)
|
||||
|
||||
m.bucketMeasurement.incrementBytes(update)
|
||||
m.lastStop = stop
|
||||
unused := len(p) - (n + m.headerSize)
|
||||
m.headerSize = 0 // Set to 0 post first read
|
||||
unused := len(p) - (n + m.opts.HeaderSize)
|
||||
m.opts.HeaderSize = 0 // Set to 0 post first read
|
||||
|
||||
if unused > 0 {
|
||||
m.throttle.ReleaseUnusedBandwidth(int64(unused))
|
||||
}
|
||||
if err != nil {
|
||||
m.lastErr = err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close stops tracking the io
|
||||
func (m *MonitoredReader) Close() error {
|
||||
if m.closed {
|
||||
return nil
|
||||
}
|
||||
m.closed = true
|
||||
return m.reader.Close()
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package lifecycle
|
|||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -71,7 +72,8 @@ func (lc *Lifecycle) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err e
|
|||
switch start.Name.Local {
|
||||
case "LifecycleConfiguration", "BucketLifecycleConfiguration":
|
||||
default:
|
||||
return errUnknownXMLTag
|
||||
return xml.UnmarshalError(fmt.Sprintf("expected element type <LifecycleConfiguration>/<BucketLifecycleConfiguration> but have <%s>",
|
||||
start.Name.Local))
|
||||
}
|
||||
for {
|
||||
// Read tokens from the XML document in a stream.
|
||||
|
@ -93,7 +95,7 @@ func (lc *Lifecycle) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err e
|
|||
}
|
||||
lc.Rules = append(lc.Rules, r)
|
||||
default:
|
||||
return errUnknownXMLTag
|
||||
return xml.UnmarshalError(fmt.Sprintf("expected element type <Rule> but have <%s>", se.Name.Local))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -489,6 +489,41 @@ type ObjectLegalHold struct {
|
|||
Status LegalHoldStatus `xml:"Status,omitempty"`
|
||||
}
|
||||
|
||||
// UnmarshalXML - decodes XML data.
|
||||
func (l *ObjectLegalHold) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err error) {
|
||||
switch start.Name.Local {
|
||||
case "LegalHold", "ObjectLockLegalHold":
|
||||
default:
|
||||
return xml.UnmarshalError(fmt.Sprintf("expected element type <LegalHold>/<ObjectLockLegalHold> but have <%s>",
|
||||
start.Name.Local))
|
||||
}
|
||||
for {
|
||||
// Read tokens from the XML document in a stream.
|
||||
t, err := d.Token()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
switch se := t.(type) {
|
||||
case xml.StartElement:
|
||||
switch se.Name.Local {
|
||||
case "Status":
|
||||
var st LegalHoldStatus
|
||||
if err = d.DecodeElement(&st, &se); err != nil {
|
||||
return err
|
||||
}
|
||||
l.Status = st
|
||||
default:
|
||||
return xml.UnmarshalError(fmt.Sprintf("expected element type <Status> but have <%s>", se.Name.Local))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsEmpty returns true if struct is empty
|
||||
func (l *ObjectLegalHold) IsEmpty() bool {
|
||||
return !l.Status.Valid()
|
||||
|
|
|
@ -18,6 +18,7 @@ package lock
|
|||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
|
@ -467,6 +468,23 @@ func TestParseObjectLegalHold(t *testing.T) {
|
|||
expectedErr: nil,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
value: `<?xml version="1.0" encoding="UTF-8"?><ObjectLockLegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Status>ON</Status></ObjectLockLegalHold>`,
|
||||
expectedErr: nil,
|
||||
expectErr: false,
|
||||
},
|
||||
// invalid Status key
|
||||
{
|
||||
value: `<?xml version="1.0" encoding="UTF-8"?><ObjectLockLegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><MyStatus>ON</MyStatus></ObjectLockLegalHold>`,
|
||||
expectedErr: errors.New("expected element type <Status> but have <MyStatus>"),
|
||||
expectErr: true,
|
||||
},
|
||||
// invalid XML attr
|
||||
{
|
||||
value: `<?xml version="1.0" encoding="UTF-8"?><UnknownLegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Status>ON</Status></UnknownLegalHold>`,
|
||||
expectedErr: errors.New("expected element type <LegalHold>/<ObjectLockLegalHold> but have <UnknownLegalHold>"),
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
value: `<?xml version="1.0" encoding="UTF-8"?><LegalHold xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Status>On</Status></LegalHold>`,
|
||||
expectedErr: ErrMalformedXML,
|
||||
|
|
|
@ -110,10 +110,18 @@ const (
|
|||
|
||||
// AWSUsername - user friendly name, in MinIO this value is same as your user Access Key.
|
||||
AWSUsername Key = "aws:username"
|
||||
|
||||
// S3SignatureVersion - identifies the version of AWS Signature that you want to support for authenticated requests.
|
||||
S3SignatureVersion = "s3:signatureversion"
|
||||
|
||||
// S3AuthType - optionally use this condition key to restrict incoming requests to use a specific authentication method.
|
||||
S3AuthType = "s3:authType"
|
||||
)
|
||||
|
||||
// AllSupportedKeys - is list of all all supported keys.
|
||||
var AllSupportedKeys = append([]Key{
|
||||
S3SignatureVersion,
|
||||
S3AuthType,
|
||||
S3XAmzCopySource,
|
||||
S3XAmzServerSideEncryption,
|
||||
S3XAmzServerSideEncryptionCustomerAlgorithm,
|
||||
|
@ -144,6 +152,8 @@ var AllSupportedKeys = append([]Key{
|
|||
|
||||
// CommonKeys - is list of all common condition keys.
|
||||
var CommonKeys = append([]Key{
|
||||
S3SignatureVersion,
|
||||
S3AuthType,
|
||||
S3XAmzContentSha256,
|
||||
S3LocationConstraint,
|
||||
AWSReferer,
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
* MinIO Cloud Storage, (C) 2019-2021 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -216,6 +216,7 @@ type S3Select struct {
|
|||
statement *sql.SelectStatement
|
||||
progressReader *progressReader
|
||||
recordReader recordReader
|
||||
close func() error
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -311,6 +312,7 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
|
|||
}
|
||||
return err
|
||||
}
|
||||
s3Select.close = rc.Close
|
||||
return nil
|
||||
case jsonFormat:
|
||||
rc, err := getReader(0, -1)
|
||||
|
@ -333,6 +335,8 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
|
|||
} else {
|
||||
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
|
||||
}
|
||||
|
||||
s3Select.close = rc.Close
|
||||
return nil
|
||||
case parquetFormat:
|
||||
if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") {
|
||||
|
@ -396,6 +400,12 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
|
|||
|
||||
// Evaluate - filters and sends records read from opened reader as per select statement to http response writer.
|
||||
func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
|
||||
defer func() {
|
||||
if s3Select.close != nil {
|
||||
s3Select.close()
|
||||
}
|
||||
}()
|
||||
|
||||
getProgressFunc := s3Select.getProgress
|
||||
if !s3Select.Progress.Enabled {
|
||||
getProgressFunc = nil
|
||||
|
|
|
@ -739,6 +739,152 @@ func TestCSVQueries2(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCSVQueries3(t *testing.T) {
|
||||
input := `na.me,qty,CAST
|
||||
apple,1,true
|
||||
mango,3,false
|
||||
`
|
||||
var testTable = []struct {
|
||||
name string
|
||||
query string
|
||||
requestXML []byte // override request XML
|
||||
wantResult string
|
||||
}{
|
||||
{
|
||||
name: "Select a column containing dot",
|
||||
query: `select "na.me" from S3Object s`,
|
||||
wantResult: `apple
|
||||
mango`,
|
||||
},
|
||||
{
|
||||
name: "Select column containing dot with table name prefix",
|
||||
query: `select count(S3Object."na.me") from S3Object`,
|
||||
wantResult: `2`,
|
||||
},
|
||||
{
|
||||
name: "Select column containing dot with table alias prefix",
|
||||
query: `select s."na.me" from S3Object as s`,
|
||||
wantResult: `apple
|
||||
mango`,
|
||||
},
|
||||
{
|
||||
name: "Select column simplest",
|
||||
query: `select qty from S3Object`,
|
||||
wantResult: `1
|
||||
3`,
|
||||
},
|
||||
{
|
||||
name: "Select column with table name prefix",
|
||||
query: `select S3Object.qty from S3Object`,
|
||||
wantResult: `1
|
||||
3`,
|
||||
},
|
||||
{
|
||||
name: "Select column without table alias",
|
||||
query: `select qty from S3Object s`,
|
||||
wantResult: `1
|
||||
3`,
|
||||
},
|
||||
{
|
||||
name: "Select column with table alias",
|
||||
query: `select s.qty from S3Object s`,
|
||||
wantResult: `1
|
||||
3`,
|
||||
},
|
||||
{
|
||||
name: "Select reserved word column",
|
||||
query: `select "CAST" from s3object`,
|
||||
wantResult: `true
|
||||
false`,
|
||||
},
|
||||
{
|
||||
name: "Select reserved word column with table alias",
|
||||
query: `select S3Object."CAST" from s3object`,
|
||||
wantResult: `true
|
||||
false`,
|
||||
},
|
||||
{
|
||||
name: "Select reserved word column with unused table alias",
|
||||
query: `select "CAST" from s3object s`,
|
||||
wantResult: `true
|
||||
false`,
|
||||
},
|
||||
{
|
||||
name: "Select reserved word column with table alias",
|
||||
query: `select s."CAST" from s3object s`,
|
||||
wantResult: `true
|
||||
false`,
|
||||
},
|
||||
{
|
||||
name: "Select reserved word column with table alias",
|
||||
query: `select NOT CAST(s."CAST" AS Bool) from s3object s`,
|
||||
wantResult: `false
|
||||
true`,
|
||||
},
|
||||
}
|
||||
|
||||
defRequest := `<?xml version="1.0" encoding="UTF-8"?>
|
||||
<SelectObjectContentRequest>
|
||||
<Expression>%s</Expression>
|
||||
<ExpressionType>SQL</ExpressionType>
|
||||
<InputSerialization>
|
||||
<CompressionType>NONE</CompressionType>
|
||||
<CSV>
|
||||
<FileHeaderInfo>USE</FileHeaderInfo>
|
||||
<QuoteCharacter>"</QuoteCharacter>
|
||||
</CSV>
|
||||
</InputSerialization>
|
||||
<OutputSerialization>
|
||||
<CSV/>
|
||||
</OutputSerialization>
|
||||
<RequestProgress>
|
||||
<Enabled>FALSE</Enabled>
|
||||
</RequestProgress>
|
||||
</SelectObjectContentRequest>`
|
||||
|
||||
for _, testCase := range testTable {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
testReq := testCase.requestXML
|
||||
if len(testReq) == 0 {
|
||||
testReq = []byte(fmt.Sprintf(defRequest, testCase.query))
|
||||
}
|
||||
s3Select, err := NewS3Select(bytes.NewReader(testReq))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
|
||||
return ioutil.NopCloser(bytes.NewBufferString(input)), nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w := &testResponseWriter{}
|
||||
s3Select.Evaluate(w)
|
||||
s3Select.Close()
|
||||
resp := http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(bytes.NewReader(w.response)),
|
||||
ContentLength: int64(len(w.response)),
|
||||
}
|
||||
res, err := minio.NewSelectResults(&resp, "testbucket")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
got, err := ioutil.ReadAll(res)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
gotS := strings.TrimSpace(string(got))
|
||||
if gotS != testCase.wantResult {
|
||||
t.Errorf("received response does not match with expected reply.\nQuery: %s\n=====\ngot: %s\n=====\nwant: %s\n=====\n", testCase.query, gotS, testCase.wantResult)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCSVInput(t *testing.T) {
|
||||
var testTable = []struct {
|
||||
requestXML []byte
|
||||
|
|
|
@ -63,7 +63,7 @@ func newAggVal(fn FuncName) *aggVal {
|
|||
// current row and stores the result.
|
||||
//
|
||||
// On success, it returns (nil, nil).
|
||||
func (e *FuncExpr) evalAggregationNode(r Record) error {
|
||||
func (e *FuncExpr) evalAggregationNode(r Record, tableAlias string) error {
|
||||
// It is assumed that this function is called only when
|
||||
// `e` is an aggregation function.
|
||||
|
||||
|
@ -77,13 +77,13 @@ func (e *FuncExpr) evalAggregationNode(r Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
val, err = e.Count.ExprArg.evalNode(r)
|
||||
val, err = e.Count.ExprArg.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Evaluate the (only) argument
|
||||
val, err = e.SFunc.ArgsList[0].evalNode(r)
|
||||
val, err = e.SFunc.ArgsList[0].evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -149,13 +149,13 @@ func (e *FuncExpr) evalAggregationNode(r Record) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (e *AliasedExpression) aggregateRow(r Record) error {
|
||||
return e.Expression.aggregateRow(r)
|
||||
func (e *AliasedExpression) aggregateRow(r Record, tableAlias string) error {
|
||||
return e.Expression.aggregateRow(r, tableAlias)
|
||||
}
|
||||
|
||||
func (e *Expression) aggregateRow(r Record) error {
|
||||
func (e *Expression) aggregateRow(r Record, tableAlias string) error {
|
||||
for _, ex := range e.And {
|
||||
err := ex.aggregateRow(r)
|
||||
err := ex.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -163,9 +163,9 @@ func (e *Expression) aggregateRow(r Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *ListExpr) aggregateRow(r Record) error {
|
||||
func (e *ListExpr) aggregateRow(r Record, tableAlias string) error {
|
||||
for _, ex := range e.Elements {
|
||||
err := ex.aggregateRow(r)
|
||||
err := ex.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -173,9 +173,9 @@ func (e *ListExpr) aggregateRow(r Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *AndCondition) aggregateRow(r Record) error {
|
||||
func (e *AndCondition) aggregateRow(r Record, tableAlias string) error {
|
||||
for _, ex := range e.Condition {
|
||||
err := ex.aggregateRow(r)
|
||||
err := ex.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -183,15 +183,15 @@ func (e *AndCondition) aggregateRow(r Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Condition) aggregateRow(r Record) error {
|
||||
func (e *Condition) aggregateRow(r Record, tableAlias string) error {
|
||||
if e.Operand != nil {
|
||||
return e.Operand.aggregateRow(r)
|
||||
return e.Operand.aggregateRow(r, tableAlias)
|
||||
}
|
||||
return e.Not.aggregateRow(r)
|
||||
return e.Not.aggregateRow(r, tableAlias)
|
||||
}
|
||||
|
||||
func (e *ConditionOperand) aggregateRow(r Record) error {
|
||||
err := e.Operand.aggregateRow(r)
|
||||
func (e *ConditionOperand) aggregateRow(r Record, tableAlias string) error {
|
||||
err := e.Operand.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -202,38 +202,38 @@ func (e *ConditionOperand) aggregateRow(r Record) error {
|
|||
|
||||
switch {
|
||||
case e.ConditionRHS.Compare != nil:
|
||||
return e.ConditionRHS.Compare.Operand.aggregateRow(r)
|
||||
return e.ConditionRHS.Compare.Operand.aggregateRow(r, tableAlias)
|
||||
case e.ConditionRHS.Between != nil:
|
||||
err = e.ConditionRHS.Between.Start.aggregateRow(r)
|
||||
err = e.ConditionRHS.Between.Start.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return e.ConditionRHS.Between.End.aggregateRow(r)
|
||||
return e.ConditionRHS.Between.End.aggregateRow(r, tableAlias)
|
||||
case e.ConditionRHS.In != nil:
|
||||
elt := e.ConditionRHS.In.ListExpression
|
||||
err = elt.aggregateRow(r)
|
||||
err = elt.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
case e.ConditionRHS.Like != nil:
|
||||
err = e.ConditionRHS.Like.Pattern.aggregateRow(r)
|
||||
err = e.ConditionRHS.Like.Pattern.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return e.ConditionRHS.Like.EscapeChar.aggregateRow(r)
|
||||
return e.ConditionRHS.Like.EscapeChar.aggregateRow(r, tableAlias)
|
||||
default:
|
||||
return errInvalidASTNode
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Operand) aggregateRow(r Record) error {
|
||||
err := e.Left.aggregateRow(r)
|
||||
func (e *Operand) aggregateRow(r Record, tableAlias string) error {
|
||||
err := e.Left.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, rt := range e.Right {
|
||||
err = rt.Right.aggregateRow(r)
|
||||
err = rt.Right.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -241,13 +241,13 @@ func (e *Operand) aggregateRow(r Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *MultOp) aggregateRow(r Record) error {
|
||||
err := e.Left.aggregateRow(r)
|
||||
func (e *MultOp) aggregateRow(r Record, tableAlias string) error {
|
||||
err := e.Left.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, rt := range e.Right {
|
||||
err = rt.Right.aggregateRow(r)
|
||||
err = rt.Right.aggregateRow(r, tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -255,29 +255,29 @@ func (e *MultOp) aggregateRow(r Record) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *UnaryTerm) aggregateRow(r Record) error {
|
||||
func (e *UnaryTerm) aggregateRow(r Record, tableAlias string) error {
|
||||
if e.Negated != nil {
|
||||
return e.Negated.Term.aggregateRow(r)
|
||||
return e.Negated.Term.aggregateRow(r, tableAlias)
|
||||
}
|
||||
return e.Primary.aggregateRow(r)
|
||||
return e.Primary.aggregateRow(r, tableAlias)
|
||||
}
|
||||
|
||||
func (e *PrimaryTerm) aggregateRow(r Record) error {
|
||||
func (e *PrimaryTerm) aggregateRow(r Record, tableAlias string) error {
|
||||
switch {
|
||||
case e.ListExpr != nil:
|
||||
return e.ListExpr.aggregateRow(r)
|
||||
return e.ListExpr.aggregateRow(r, tableAlias)
|
||||
case e.SubExpression != nil:
|
||||
return e.SubExpression.aggregateRow(r)
|
||||
return e.SubExpression.aggregateRow(r, tableAlias)
|
||||
case e.FuncCall != nil:
|
||||
return e.FuncCall.aggregateRow(r)
|
||||
return e.FuncCall.aggregateRow(r, tableAlias)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *FuncExpr) aggregateRow(r Record) error {
|
||||
func (e *FuncExpr) aggregateRow(r Record, tableAlias string) error {
|
||||
switch e.getFunctionName() {
|
||||
case aggFnAvg, aggFnSum, aggFnMax, aggFnMin, aggFnCount:
|
||||
return e.evalAggregationNode(r)
|
||||
return e.evalAggregationNode(r, tableAlias)
|
||||
default:
|
||||
// TODO: traverse arguments and call aggregateRow on
|
||||
// them if they could be an ancestor of an
|
||||
|
|
|
@ -19,6 +19,7 @@ package sql
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Query analysis - The query is analyzed to determine if it involves
|
||||
|
@ -177,7 +178,7 @@ func (e *PrimaryTerm) analyze(s *Select) (result qProp) {
|
|||
case e.JPathExpr != nil:
|
||||
// Check if the path expression is valid
|
||||
if len(e.JPathExpr.PathExpr) > 0 {
|
||||
if e.JPathExpr.BaseKey.String() != s.From.As {
|
||||
if e.JPathExpr.BaseKey.String() != s.From.As && strings.ToLower(e.JPathExpr.BaseKey.String()) != baseTableName {
|
||||
result = qProp{err: errInvalidKeypath}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"github.com/bcicen/jstream"
|
||||
"github.com/minio/simdjson-go"
|
||||
|
@ -47,21 +46,21 @@ var (
|
|||
// of child nodes. The final result row is returned after all rows are
|
||||
// processed, and the `getAggregate` function is called.
|
||||
|
||||
func (e *AliasedExpression) evalNode(r Record) (*Value, error) {
|
||||
return e.Expression.evalNode(r)
|
||||
func (e *AliasedExpression) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
return e.Expression.evalNode(r, tableAlias)
|
||||
}
|
||||
|
||||
func (e *Expression) evalNode(r Record) (*Value, error) {
|
||||
func (e *Expression) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
if len(e.And) == 1 {
|
||||
// In this case, result is not required to be boolean
|
||||
// type.
|
||||
return e.And[0].evalNode(r)
|
||||
return e.And[0].evalNode(r, tableAlias)
|
||||
}
|
||||
|
||||
// Compute OR of conditions
|
||||
result := false
|
||||
for _, ex := range e.And {
|
||||
res, err := ex.evalNode(r)
|
||||
res, err := ex.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -74,16 +73,16 @@ func (e *Expression) evalNode(r Record) (*Value, error) {
|
|||
return FromBool(result), nil
|
||||
}
|
||||
|
||||
func (e *AndCondition) evalNode(r Record) (*Value, error) {
|
||||
func (e *AndCondition) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
if len(e.Condition) == 1 {
|
||||
// In this case, result does not have to be boolean
|
||||
return e.Condition[0].evalNode(r)
|
||||
return e.Condition[0].evalNode(r, tableAlias)
|
||||
}
|
||||
|
||||
// Compute AND of conditions
|
||||
result := true
|
||||
for _, ex := range e.Condition {
|
||||
res, err := ex.evalNode(r)
|
||||
res, err := ex.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -96,14 +95,14 @@ func (e *AndCondition) evalNode(r Record) (*Value, error) {
|
|||
return FromBool(result), nil
|
||||
}
|
||||
|
||||
func (e *Condition) evalNode(r Record) (*Value, error) {
|
||||
func (e *Condition) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
if e.Operand != nil {
|
||||
// In this case, result does not have to be boolean
|
||||
return e.Operand.evalNode(r)
|
||||
return e.Operand.evalNode(r, tableAlias)
|
||||
}
|
||||
|
||||
// Compute NOT of condition
|
||||
res, err := e.Not.evalNode(r)
|
||||
res, err := e.Not.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -114,8 +113,8 @@ func (e *Condition) evalNode(r Record) (*Value, error) {
|
|||
return FromBool(!b), nil
|
||||
}
|
||||
|
||||
func (e *ConditionOperand) evalNode(r Record) (*Value, error) {
|
||||
opVal, opErr := e.Operand.evalNode(r)
|
||||
func (e *ConditionOperand) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
opVal, opErr := e.Operand.evalNode(r, tableAlias)
|
||||
if opErr != nil || e.ConditionRHS == nil {
|
||||
return opVal, opErr
|
||||
}
|
||||
|
@ -123,7 +122,7 @@ func (e *ConditionOperand) evalNode(r Record) (*Value, error) {
|
|||
// Need to evaluate the ConditionRHS
|
||||
switch {
|
||||
case e.ConditionRHS.Compare != nil:
|
||||
cmpRight, cmpRErr := e.ConditionRHS.Compare.Operand.evalNode(r)
|
||||
cmpRight, cmpRErr := e.ConditionRHS.Compare.Operand.evalNode(r, tableAlias)
|
||||
if cmpRErr != nil {
|
||||
return nil, cmpRErr
|
||||
}
|
||||
|
@ -132,26 +131,26 @@ func (e *ConditionOperand) evalNode(r Record) (*Value, error) {
|
|||
return FromBool(b), err
|
||||
|
||||
case e.ConditionRHS.Between != nil:
|
||||
return e.ConditionRHS.Between.evalBetweenNode(r, opVal)
|
||||
return e.ConditionRHS.Between.evalBetweenNode(r, opVal, tableAlias)
|
||||
|
||||
case e.ConditionRHS.Like != nil:
|
||||
return e.ConditionRHS.Like.evalLikeNode(r, opVal)
|
||||
return e.ConditionRHS.Like.evalLikeNode(r, opVal, tableAlias)
|
||||
|
||||
case e.ConditionRHS.In != nil:
|
||||
return e.ConditionRHS.In.evalInNode(r, opVal)
|
||||
return e.ConditionRHS.In.evalInNode(r, opVal, tableAlias)
|
||||
|
||||
default:
|
||||
return nil, errInvalidASTNode
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Between) evalBetweenNode(r Record, arg *Value) (*Value, error) {
|
||||
stVal, stErr := e.Start.evalNode(r)
|
||||
func (e *Between) evalBetweenNode(r Record, arg *Value, tableAlias string) (*Value, error) {
|
||||
stVal, stErr := e.Start.evalNode(r, tableAlias)
|
||||
if stErr != nil {
|
||||
return nil, stErr
|
||||
}
|
||||
|
||||
endVal, endErr := e.End.evalNode(r)
|
||||
endVal, endErr := e.End.evalNode(r, tableAlias)
|
||||
if endErr != nil {
|
||||
return nil, endErr
|
||||
}
|
||||
|
@ -174,7 +173,7 @@ func (e *Between) evalBetweenNode(r Record, arg *Value) (*Value, error) {
|
|||
return FromBool(result), nil
|
||||
}
|
||||
|
||||
func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
|
||||
func (e *Like) evalLikeNode(r Record, arg *Value, tableAlias string) (*Value, error) {
|
||||
inferTypeAsString(arg)
|
||||
|
||||
s, ok := arg.ToString()
|
||||
|
@ -183,7 +182,7 @@ func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
|
|||
return nil, errLikeInvalidInputs(err)
|
||||
}
|
||||
|
||||
pattern, err1 := e.Pattern.evalNode(r)
|
||||
pattern, err1 := e.Pattern.evalNode(r, tableAlias)
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
|
@ -199,7 +198,7 @@ func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
|
|||
|
||||
escape := runeZero
|
||||
if e.EscapeChar != nil {
|
||||
escapeVal, err2 := e.EscapeChar.evalNode(r)
|
||||
escapeVal, err2 := e.EscapeChar.evalNode(r, tableAlias)
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
}
|
||||
|
@ -230,14 +229,14 @@ func (e *Like) evalLikeNode(r Record, arg *Value) (*Value, error) {
|
|||
return FromBool(matchResult), nil
|
||||
}
|
||||
|
||||
func (e *ListExpr) evalNode(r Record) (*Value, error) {
|
||||
func (e *ListExpr) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
res := make([]Value, len(e.Elements))
|
||||
if len(e.Elements) == 1 {
|
||||
// If length 1, treat as single value.
|
||||
return e.Elements[0].evalNode(r)
|
||||
return e.Elements[0].evalNode(r, tableAlias)
|
||||
}
|
||||
for i, elt := range e.Elements {
|
||||
v, err := elt.evalNode(r)
|
||||
v, err := elt.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -248,7 +247,7 @@ func (e *ListExpr) evalNode(r Record) (*Value, error) {
|
|||
|
||||
const floatCmpTolerance = 0.000001
|
||||
|
||||
func (e *In) evalInNode(r Record, lhs *Value) (*Value, error) {
|
||||
func (e *In) evalInNode(r Record, lhs *Value, tableAlias string) (*Value, error) {
|
||||
// Compare two values in terms of in-ness.
|
||||
var cmp func(a, b Value) bool
|
||||
cmp = func(a, b Value) bool {
|
||||
|
@ -283,7 +282,7 @@ func (e *In) evalInNode(r Record, lhs *Value) (*Value, error) {
|
|||
|
||||
var rhs Value
|
||||
if elt := e.ListExpression; elt != nil {
|
||||
eltVal, err := elt.evalNode(r)
|
||||
eltVal, err := elt.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -304,8 +303,8 @@ func (e *In) evalInNode(r Record, lhs *Value) (*Value, error) {
|
|||
return FromBool(cmp(rhs, *lhs)), nil
|
||||
}
|
||||
|
||||
func (e *Operand) evalNode(r Record) (*Value, error) {
|
||||
lval, lerr := e.Left.evalNode(r)
|
||||
func (e *Operand) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
lval, lerr := e.Left.evalNode(r, tableAlias)
|
||||
if lerr != nil || len(e.Right) == 0 {
|
||||
return lval, lerr
|
||||
}
|
||||
|
@ -315,7 +314,7 @@ func (e *Operand) evalNode(r Record) (*Value, error) {
|
|||
// symbols.
|
||||
for _, rightTerm := range e.Right {
|
||||
op := rightTerm.Op
|
||||
rval, rerr := rightTerm.Right.evalNode(r)
|
||||
rval, rerr := rightTerm.Right.evalNode(r, tableAlias)
|
||||
if rerr != nil {
|
||||
return nil, rerr
|
||||
}
|
||||
|
@ -327,8 +326,8 @@ func (e *Operand) evalNode(r Record) (*Value, error) {
|
|||
return lval, nil
|
||||
}
|
||||
|
||||
func (e *MultOp) evalNode(r Record) (*Value, error) {
|
||||
lval, lerr := e.Left.evalNode(r)
|
||||
func (e *MultOp) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
lval, lerr := e.Left.evalNode(r, tableAlias)
|
||||
if lerr != nil || len(e.Right) == 0 {
|
||||
return lval, lerr
|
||||
}
|
||||
|
@ -337,7 +336,7 @@ func (e *MultOp) evalNode(r Record) (*Value, error) {
|
|||
// AST node is for terms separated by *, / or % symbols.
|
||||
for _, rightTerm := range e.Right {
|
||||
op := rightTerm.Op
|
||||
rval, rerr := rightTerm.Right.evalNode(r)
|
||||
rval, rerr := rightTerm.Right.evalNode(r, tableAlias)
|
||||
if rerr != nil {
|
||||
return nil, rerr
|
||||
}
|
||||
|
@ -350,12 +349,12 @@ func (e *MultOp) evalNode(r Record) (*Value, error) {
|
|||
return lval, nil
|
||||
}
|
||||
|
||||
func (e *UnaryTerm) evalNode(r Record) (*Value, error) {
|
||||
func (e *UnaryTerm) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
if e.Negated == nil {
|
||||
return e.Primary.evalNode(r)
|
||||
return e.Primary.evalNode(r, tableAlias)
|
||||
}
|
||||
|
||||
v, err := e.Negated.Term.evalNode(r)
|
||||
v, err := e.Negated.Term.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -368,19 +367,15 @@ func (e *UnaryTerm) evalNode(r Record) (*Value, error) {
|
|||
return nil, errArithMismatchedTypes
|
||||
}
|
||||
|
||||
func (e *JSONPath) evalNode(r Record) (*Value, error) {
|
||||
// Strip the table name from the keypath.
|
||||
keypath := e.String()
|
||||
if strings.Contains(keypath, ".") {
|
||||
ps := strings.SplitN(keypath, ".", 2)
|
||||
if len(ps) == 2 {
|
||||
keypath = ps[1]
|
||||
}
|
||||
func (e *JSONPath) evalNode(r Record, tableAlias string) (*Value, error) {
|
||||
alias := tableAlias
|
||||
if tableAlias == "" {
|
||||
alias = baseTableName
|
||||
}
|
||||
pathExpr := e.StripTableAlias(alias)
|
||||
_, rawVal := r.Raw()
|
||||
switch rowVal := rawVal.(type) {
|
||||
case jstream.KVS, simdjson.Object:
|
||||
pathExpr := e.PathExpr
|
||||
if len(pathExpr) == 0 {
|
||||
pathExpr = []*JSONPathElement{{Key: &ObjectKey{ID: e.BaseKey}}}
|
||||
}
|
||||
|
@ -392,7 +387,10 @@ func (e *JSONPath) evalNode(r Record) (*Value, error) {
|
|||
|
||||
return jsonToValue(result)
|
||||
default:
|
||||
return r.Get(keypath)
|
||||
if pathExpr[len(pathExpr)-1].Key == nil {
|
||||
return nil, errInvalidKeypath
|
||||
}
|
||||
return r.Get(pathExpr[len(pathExpr)-1].Key.keyString())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -447,28 +445,28 @@ func jsonToValue(result interface{}) (*Value, error) {
|
|||
return nil, fmt.Errorf("Unhandled value type: %T", result)
|
||||
}
|
||||
|
||||
func (e *PrimaryTerm) evalNode(r Record) (res *Value, err error) {
|
||||
func (e *PrimaryTerm) evalNode(r Record, tableAlias string) (res *Value, err error) {
|
||||
switch {
|
||||
case e.Value != nil:
|
||||
return e.Value.evalNode(r)
|
||||
case e.JPathExpr != nil:
|
||||
return e.JPathExpr.evalNode(r)
|
||||
return e.JPathExpr.evalNode(r, tableAlias)
|
||||
case e.ListExpr != nil:
|
||||
return e.ListExpr.evalNode(r)
|
||||
return e.ListExpr.evalNode(r, tableAlias)
|
||||
case e.SubExpression != nil:
|
||||
return e.SubExpression.evalNode(r)
|
||||
return e.SubExpression.evalNode(r, tableAlias)
|
||||
case e.FuncCall != nil:
|
||||
return e.FuncCall.evalNode(r)
|
||||
return e.FuncCall.evalNode(r, tableAlias)
|
||||
}
|
||||
return nil, errInvalidASTNode
|
||||
}
|
||||
|
||||
func (e *FuncExpr) evalNode(r Record) (res *Value, err error) {
|
||||
func (e *FuncExpr) evalNode(r Record, tableAlias string) (res *Value, err error) {
|
||||
switch e.getFunctionName() {
|
||||
case aggFnCount, aggFnAvg, aggFnMax, aggFnMin, aggFnSum:
|
||||
return e.getAggregate()
|
||||
default:
|
||||
return e.evalSQLFnNode(r)
|
||||
return e.evalSQLFnNode(r, tableAlias)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -84,35 +84,35 @@ func (e *FuncExpr) getFunctionName() FuncName {
|
|||
|
||||
// evalSQLFnNode assumes that the FuncExpr is not an aggregation
|
||||
// function.
|
||||
func (e *FuncExpr) evalSQLFnNode(r Record) (res *Value, err error) {
|
||||
func (e *FuncExpr) evalSQLFnNode(r Record, tableAlias string) (res *Value, err error) {
|
||||
// Handle functions that have phrase arguments
|
||||
switch e.getFunctionName() {
|
||||
case sqlFnCast:
|
||||
expr := e.Cast.Expr
|
||||
res, err = expr.castTo(r, strings.ToUpper(e.Cast.CastType))
|
||||
res, err = expr.castTo(r, strings.ToUpper(e.Cast.CastType), tableAlias)
|
||||
return
|
||||
|
||||
case sqlFnSubstring:
|
||||
return handleSQLSubstring(r, e.Substring)
|
||||
return handleSQLSubstring(r, e.Substring, tableAlias)
|
||||
|
||||
case sqlFnExtract:
|
||||
return handleSQLExtract(r, e.Extract)
|
||||
return handleSQLExtract(r, e.Extract, tableAlias)
|
||||
|
||||
case sqlFnTrim:
|
||||
return handleSQLTrim(r, e.Trim)
|
||||
return handleSQLTrim(r, e.Trim, tableAlias)
|
||||
|
||||
case sqlFnDateAdd:
|
||||
return handleDateAdd(r, e.DateAdd)
|
||||
return handleDateAdd(r, e.DateAdd, tableAlias)
|
||||
|
||||
case sqlFnDateDiff:
|
||||
return handleDateDiff(r, e.DateDiff)
|
||||
return handleDateDiff(r, e.DateDiff, tableAlias)
|
||||
|
||||
}
|
||||
|
||||
// For all simple argument functions, we evaluate the arguments here
|
||||
argVals := make([]*Value, len(e.SFunc.ArgsList))
|
||||
for i, arg := range e.SFunc.ArgsList {
|
||||
argVals[i], err = arg.evalNode(r)
|
||||
argVals[i], err = arg.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -219,8 +219,8 @@ func upperCase(v *Value) (*Value, error) {
|
|||
return FromString(strings.ToUpper(s)), nil
|
||||
}
|
||||
|
||||
func handleDateAdd(r Record, d *DateAddFunc) (*Value, error) {
|
||||
q, err := d.Quantity.evalNode(r)
|
||||
func handleDateAdd(r Record, d *DateAddFunc, tableAlias string) (*Value, error) {
|
||||
q, err := d.Quantity.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -230,7 +230,7 @@ func handleDateAdd(r Record, d *DateAddFunc) (*Value, error) {
|
|||
return nil, fmt.Errorf("QUANTITY must be a numeric argument to %s()", sqlFnDateAdd)
|
||||
}
|
||||
|
||||
ts, err := d.Timestamp.evalNode(r)
|
||||
ts, err := d.Timestamp.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -245,8 +245,8 @@ func handleDateAdd(r Record, d *DateAddFunc) (*Value, error) {
|
|||
return dateAdd(strings.ToUpper(d.DatePart), qty, t)
|
||||
}
|
||||
|
||||
func handleDateDiff(r Record, d *DateDiffFunc) (*Value, error) {
|
||||
tval1, err := d.Timestamp1.evalNode(r)
|
||||
func handleDateDiff(r Record, d *DateDiffFunc, tableAlias string) (*Value, error) {
|
||||
tval1, err := d.Timestamp1.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -258,7 +258,7 @@ func handleDateDiff(r Record, d *DateDiffFunc) (*Value, error) {
|
|||
return nil, fmt.Errorf("%s() expects two timestamp arguments", sqlFnDateDiff)
|
||||
}
|
||||
|
||||
tval2, err := d.Timestamp2.evalNode(r)
|
||||
tval2, err := d.Timestamp2.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -277,12 +277,12 @@ func handleUTCNow() (*Value, error) {
|
|||
return FromTimestamp(time.Now().UTC()), nil
|
||||
}
|
||||
|
||||
func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
|
||||
func handleSQLSubstring(r Record, e *SubstringFunc, tableAlias string) (val *Value, err error) {
|
||||
// Both forms `SUBSTRING('abc' FROM 2 FOR 1)` and
|
||||
// SUBSTRING('abc', 2, 1) are supported.
|
||||
|
||||
// Evaluate the string argument
|
||||
v1, err := e.Expr.evalNode(r)
|
||||
v1, err := e.Expr.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -301,7 +301,7 @@ func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
|
|||
}
|
||||
|
||||
// Evaluate the FROM argument
|
||||
v2, err := arg2.evalNode(r)
|
||||
v2, err := arg2.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
|
|||
length := -1
|
||||
// Evaluate the optional FOR argument
|
||||
if arg3 != nil {
|
||||
v3, err := arg3.evalNode(r)
|
||||
v3, err := arg3.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -336,11 +336,11 @@ func handleSQLSubstring(r Record, e *SubstringFunc) (val *Value, err error) {
|
|||
return FromString(res), err
|
||||
}
|
||||
|
||||
func handleSQLTrim(r Record, e *TrimFunc) (res *Value, err error) {
|
||||
func handleSQLTrim(r Record, e *TrimFunc, tableAlias string) (res *Value, err error) {
|
||||
chars := ""
|
||||
ok := false
|
||||
if e.TrimChars != nil {
|
||||
charsV, cerr := e.TrimChars.evalNode(r)
|
||||
charsV, cerr := e.TrimChars.evalNode(r, tableAlias)
|
||||
if cerr != nil {
|
||||
return nil, cerr
|
||||
}
|
||||
|
@ -351,7 +351,7 @@ func handleSQLTrim(r Record, e *TrimFunc) (res *Value, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
fromV, ferr := e.TrimFrom.evalNode(r)
|
||||
fromV, ferr := e.TrimFrom.evalNode(r, tableAlias)
|
||||
if ferr != nil {
|
||||
return nil, ferr
|
||||
}
|
||||
|
@ -368,8 +368,8 @@ func handleSQLTrim(r Record, e *TrimFunc) (res *Value, err error) {
|
|||
return FromString(result), nil
|
||||
}
|
||||
|
||||
func handleSQLExtract(r Record, e *ExtractFunc) (res *Value, err error) {
|
||||
timeVal, verr := e.From.evalNode(r)
|
||||
func handleSQLExtract(r Record, e *ExtractFunc, tableAlias string) (res *Value, err error) {
|
||||
timeVal, verr := e.From.evalNode(r, tableAlias)
|
||||
if verr != nil {
|
||||
return nil, verr
|
||||
}
|
||||
|
@ -406,8 +406,8 @@ const (
|
|||
castTimestamp = "TIMESTAMP"
|
||||
)
|
||||
|
||||
func (e *Expression) castTo(r Record, castType string) (res *Value, err error) {
|
||||
v, err := e.evalNode(r)
|
||||
func (e *Expression) castTo(r Record, castType string, tableAlias string) (res *Value, err error) {
|
||||
v, err := e.evalNode(r, tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -119,7 +119,9 @@ type JSONPath struct {
|
|||
PathExpr []*JSONPathElement `parser:"(@@)*"`
|
||||
|
||||
// Cached values:
|
||||
pathString string
|
||||
pathString string
|
||||
strippedTableAlias string
|
||||
strippedPathExpr []*JSONPathElement
|
||||
}
|
||||
|
||||
// AliasedExpression is an expression that can be optionally named
|
||||
|
|
|
@ -46,6 +46,9 @@ type SelectStatement struct {
|
|||
|
||||
// Count of rows that have been output.
|
||||
outputCount int64
|
||||
|
||||
// Table alias
|
||||
tableAlias string
|
||||
}
|
||||
|
||||
// ParseSelectStatement - parses a select query from the given string
|
||||
|
@ -107,6 +110,9 @@ func ParseSelectStatement(s string) (stmt SelectStatement, err error) {
|
|||
if err != nil {
|
||||
err = errQueryAnalysisFailure(err)
|
||||
}
|
||||
|
||||
// Set table alias
|
||||
stmt.tableAlias = selectAST.From.As
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -226,7 +232,7 @@ func (e *SelectStatement) IsAggregated() bool {
|
|||
// records have been processed. Applies only to aggregation queries.
|
||||
func (e *SelectStatement) AggregateResult(output Record) error {
|
||||
for i, expr := range e.selectAST.Expression.Expressions {
|
||||
v, err := expr.evalNode(nil)
|
||||
v, err := expr.evalNode(nil, e.tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -246,7 +252,7 @@ func (e *SelectStatement) isPassingWhereClause(input Record) (bool, error) {
|
|||
if e.selectAST.Where == nil {
|
||||
return true, nil
|
||||
}
|
||||
value, err := e.selectAST.Where.evalNode(input)
|
||||
value, err := e.selectAST.Where.evalNode(input, e.tableAlias)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -272,7 +278,7 @@ func (e *SelectStatement) AggregateRow(input Record) error {
|
|||
}
|
||||
|
||||
for _, expr := range e.selectAST.Expression.Expressions {
|
||||
err := expr.aggregateRow(input)
|
||||
err := expr.aggregateRow(input, e.tableAlias)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -302,7 +308,7 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
|
|||
}
|
||||
|
||||
for i, expr := range e.selectAST.Expression.Expressions {
|
||||
v, err := expr.evalNode(input)
|
||||
v, err := expr.evalNode(input, e.tableAlias)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -36,6 +36,27 @@ func (e *JSONPath) String() string {
|
|||
return e.pathString
|
||||
}
|
||||
|
||||
// StripTableAlias removes a table alias from the path. The result is also
|
||||
// cached for repeated lookups during SQL query evaluation.
|
||||
func (e *JSONPath) StripTableAlias(tableAlias string) []*JSONPathElement {
|
||||
if e.strippedTableAlias == tableAlias {
|
||||
return e.strippedPathExpr
|
||||
}
|
||||
|
||||
hasTableAlias := e.BaseKey.String() == tableAlias || strings.ToLower(e.BaseKey.String()) == baseTableName
|
||||
var pathExpr []*JSONPathElement
|
||||
if hasTableAlias {
|
||||
pathExpr = e.PathExpr
|
||||
} else {
|
||||
pathExpr = make([]*JSONPathElement, len(e.PathExpr)+1)
|
||||
pathExpr[0] = &JSONPathElement{Key: &ObjectKey{ID: e.BaseKey}}
|
||||
copy(pathExpr[1:], e.PathExpr)
|
||||
}
|
||||
e.strippedTableAlias = tableAlias
|
||||
e.strippedPathExpr = pathExpr
|
||||
return e.strippedPathExpr
|
||||
}
|
||||
|
||||
func (e *JSONPathElement) String() string {
|
||||
switch {
|
||||
case e.Key != nil:
|
||||
|
|
Loading…
Reference in New Issue