Compare commits
20 commits
master
...
RELEASE.20
Author | SHA1 | Date | |
---|---|---|---|
e16e75ce30 | |||
ab5bc50847 | |||
59fc075b8a | |||
c896954fea | |||
1221feca72 | |||
11005331cd | |||
cf89776b40 | |||
c60f17c89b | |||
b97131446c | |||
74ad69f31e | |||
4a2307612d | |||
6e90939ecf | |||
89ab3dfd40 | |||
795a24ea71 | |||
3ae18ce672 | |||
f179fc0e37 | |||
2fc63ae45b | |||
a9451eaca8 | |||
80d4fc863f | |||
8c654a725e |
|
@ -11,7 +11,7 @@ RUN \
|
|||
git clone https://github.com/minio/minio && cd minio && \
|
||||
git checkout master && go install -v -ldflags "$(go run buildscripts/gen-ldflags.go)"
|
||||
|
||||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
|
||||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
|
||||
|
||||
ENV MINIO_ACCESS_KEY_FILE=access_key \
|
||||
MINIO_SECRET_KEY_FILE=secret_key \
|
||||
|
|
|
@ -11,7 +11,7 @@ RUN \
|
|||
git clone https://github.com/minio/minio && cd minio && \
|
||||
git checkout master && go install -v -ldflags "$(go run buildscripts/gen-ldflags.go)"
|
||||
|
||||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
|
||||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
|
||||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
|
||||
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"crypto/subtle"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
|
@ -36,6 +37,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/klauspost/compress/zip"
|
||||
"github.com/minio/minio/cmd/config"
|
||||
"github.com/minio/minio/cmd/crypto"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
|
@ -49,7 +51,8 @@ import (
|
|||
"github.com/minio/minio/pkg/kms"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
trace "github.com/minio/minio/pkg/trace"
|
||||
"github.com/minio/minio/pkg/trace"
|
||||
"github.com/secure-io/sio-go"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -1856,3 +1859,104 @@ func checkConnection(endpointStr string, timeout time.Duration) error {
|
|||
defer xhttp.DrainBody(resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getRawDataer provides an interface for getting raw FS files.
|
||||
type getRawDataer interface {
|
||||
GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error
|
||||
}
|
||||
|
||||
// InspectDataHandler - GET /minio/admin/v3/inspect-data
|
||||
// ----------
|
||||
// Download file from all nodes in a zip format
|
||||
func (a adminAPIHandlers) InspectDataHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "InspectData")
|
||||
|
||||
// Validate request signature.
|
||||
_, adminAPIErr := checkAdminRequestAuth(ctx, r, iampolicy.InspectDataAction, "")
|
||||
if adminAPIErr != ErrNone {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(adminAPIErr), r.URL)
|
||||
return
|
||||
}
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
o, ok := newObjectLayerFn().(getRawDataer)
|
||||
if !ok {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
volume := r.URL.Query().Get("volume")
|
||||
file := r.URL.Query().Get("file")
|
||||
if len(volume) == 0 {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketName), r.URL)
|
||||
return
|
||||
}
|
||||
if len(file) == 0 {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
var key [32]byte
|
||||
// MUST use crypto/rand
|
||||
n, err := crand.Read(key[:])
|
||||
if err != nil || n != len(key) {
|
||||
logger.LogIf(ctx, err)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
stream, err := sio.AES_256_GCM.Stream(key[:])
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
// Zero nonce, we only use each key once, and 32 bytes is plenty.
|
||||
nonce := make([]byte, stream.NonceSize())
|
||||
encw := stream.EncryptWriter(w, nonce, nil)
|
||||
|
||||
defer encw.Close()
|
||||
|
||||
// Write a version for making *incompatible* changes.
|
||||
// The AdminClient will reject any version it does not know.
|
||||
w.Write([]byte{1})
|
||||
|
||||
// Write key first (without encryption)
|
||||
_, err = w.Write(key[:])
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Initialize a zip writer which will provide a zipped content
|
||||
// of profiling data of all nodes
|
||||
zipWriter := zip.NewWriter(encw)
|
||||
defer zipWriter.Close()
|
||||
|
||||
err = o.GetRawData(ctx, volume, file, func(r io.Reader, host, disk, filename string, size int64, modtime time.Time) error {
|
||||
// Prefix host+disk
|
||||
filename = path.Join(host, disk, filename)
|
||||
header, zerr := zip.FileInfoHeader(dummyFileInfo{
|
||||
name: filename,
|
||||
size: size,
|
||||
mode: 0600,
|
||||
modTime: modtime,
|
||||
isDir: false,
|
||||
sys: nil,
|
||||
})
|
||||
if zerr != nil {
|
||||
logger.LogIf(ctx, zerr)
|
||||
return nil
|
||||
}
|
||||
header.Method = zip.Deflate
|
||||
zwriter, zerr := zipWriter.CreateHeader(header)
|
||||
if zerr != nil {
|
||||
logger.LogIf(ctx, zerr)
|
||||
return nil
|
||||
}
|
||||
if _, err = io.Copy(zwriter, r); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
|
|||
|
||||
// Info operations
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/info").HandlerFunc(httpTraceAll(adminAPI.ServerInfoHandler))
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/inspect-data").HandlerFunc(httpTraceHdrs(adminAPI.InspectDataHandler)).Queries("volume", "{volume:.*}", "file", "{file:.*}")
|
||||
|
||||
// StorageInfo operations
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/storageinfo").HandlerFunc(httpTraceAll(adminAPI.StorageInfoHandler))
|
||||
|
|
|
@ -440,7 +440,7 @@ var errorCodes = errorCodeMap{
|
|||
},
|
||||
ErrInvalidMaxParts: {
|
||||
Code: "InvalidArgument",
|
||||
Description: "Argument max-parts must be an integer between 0 and 2147483647",
|
||||
Description: "Part number must be an integer between 1 and 10000, inclusive",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrInvalidPartNumberMarker: {
|
||||
|
|
|
@ -1287,6 +1287,12 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
|
|||
return
|
||||
}
|
||||
|
||||
// Before proceeding validate if bucket exists.
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
config, err := objectlock.ParseObjectLockConfig(r.Body)
|
||||
if err != nil {
|
||||
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
|
||||
|
@ -1341,6 +1347,12 @@ func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWri
|
|||
return
|
||||
}
|
||||
|
||||
// Before proceeding validate if bucket exists.
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
config, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
|
@ -1378,6 +1390,12 @@ func (api objectAPIHandlers) PutBucketTaggingHandler(w http.ResponseWriter, r *h
|
|||
return
|
||||
}
|
||||
|
||||
// Before proceeding validate if bucket exists.
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
tags, err := tags.ParseBucketXML(io.LimitReader(r.Body, r.ContentLength))
|
||||
if err != nil {
|
||||
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
|
||||
|
@ -1423,6 +1441,12 @@ func (api objectAPIHandlers) GetBucketTaggingHandler(w http.ResponseWriter, r *h
|
|||
return
|
||||
}
|
||||
|
||||
// Before proceeding validate if bucket exists.
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
config, err := globalBucketMetadataSys.GetTaggingConfig(bucket)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
|
@ -1460,6 +1484,12 @@ func (api objectAPIHandlers) DeleteBucketTaggingHandler(w http.ResponseWriter, r
|
|||
return
|
||||
}
|
||||
|
||||
// Before proceeding validate if bucket exists.
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
if err := globalBucketMetadataSys.Update(bucket, bucketTaggingConfig, nil); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
|
|
|
@ -19,6 +19,7 @@ package cmd
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
@ -35,6 +36,7 @@ import (
|
|||
"github.com/minio/minio/pkg/bucket/bandwidth"
|
||||
"github.com/minio/minio/pkg/bucket/replication"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
)
|
||||
|
@ -717,11 +719,19 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|||
}
|
||||
|
||||
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, gr, opts)
|
||||
if objInfo.isMultipart() {
|
||||
if err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object,
|
||||
r, objInfo, putOpts); err != nil {
|
||||
replicationStatus = replication.Failed
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
}
|
||||
} else {
|
||||
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
||||
replicationStatus = replication.Failed
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prevReplStatus := objInfo.ReplicationStatus
|
||||
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
|
||||
|
@ -779,6 +789,55 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|||
}
|
||||
}
|
||||
|
||||
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (err error) {
|
||||
var uploadedParts []miniogo.CompletePart
|
||||
uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// block and abort remote upload upon failure.
|
||||
if aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID); aerr != nil {
|
||||
aerr = fmt.Errorf("Unable to cleanup failed multipart replication %s on remote %s/%s: %w", uploadID, bucket, object, aerr)
|
||||
logger.LogIf(ctx, aerr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var (
|
||||
hr *hash.Reader
|
||||
pInfo miniogo.ObjectPart
|
||||
)
|
||||
|
||||
for _, partInfo := range objInfo.Parts {
|
||||
hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.ActualSize, "", "", opts.ServerSideEncryption)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pInfo.Size != partInfo.ActualSize {
|
||||
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
|
||||
}
|
||||
uploadedParts = append(uploadedParts, miniogo.CompletePart{
|
||||
PartNumber: pInfo.PartNumber,
|
||||
ETag: pInfo.ETag,
|
||||
})
|
||||
}
|
||||
|
||||
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{
|
||||
Internal: miniogo.AdvancedPutOptions{
|
||||
SourceMTime: objInfo.ModTime,
|
||||
// always set this to distinguish between `mc mirror` replication and serverside
|
||||
ReplicationRequest: true,
|
||||
}})
|
||||
return err
|
||||
}
|
||||
|
||||
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
||||
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
|
||||
// Copy on write
|
||||
|
|
|
@ -89,16 +89,19 @@ func (c *OperatorDNS) Put(bucket string) error {
|
|||
if err = c.addAuthHeader(req); err != nil {
|
||||
return newError(bucket, err)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
if derr := c.Delete(bucket); derr != nil {
|
||||
return newError(bucket, derr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer xhttp.DrainBody(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
var errorStringBuilder strings.Builder
|
||||
io.Copy(&errorStringBuilder, io.LimitReader(resp.Body, resp.ContentLength))
|
||||
xhttp.DrainBody(resp.Body)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
errorString := errorStringBuilder.String()
|
||||
switch resp.StatusCode {
|
||||
case http.StatusConflict:
|
||||
|
|
|
@ -528,12 +528,17 @@ type objectIO interface {
|
|||
// Only backend errors are returned as errors.
|
||||
// If the object is not found or unable to deserialize d is cleared and nil error is returned.
|
||||
func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error {
|
||||
// Abandon if more than 5 minutes, so we don't hold up scanner.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
case BucketNotFound:
|
||||
case InsufficientReadQuorum:
|
||||
case StorageErr:
|
||||
default:
|
||||
return toObjectErr(err, dataUsageBucket, name)
|
||||
}
|
||||
|
@ -561,6 +566,10 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
|
|||
return err
|
||||
}
|
||||
|
||||
// Abandon if more than 5 minutes, so we don't hold up scanner.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
_, err = store.PutObject(ctx,
|
||||
dataUsageBucket,
|
||||
name,
|
||||
|
|
|
@ -65,6 +65,29 @@ const (
|
|||
|
||||
)
|
||||
|
||||
// isMultipart returns true if the current object is
|
||||
// uploaded by the user using multipart mechanism:
|
||||
// initiate new multipart, upload part, complete upload
|
||||
func (o *ObjectInfo) isMultipart() bool {
|
||||
if len(o.Parts) == 0 {
|
||||
return false
|
||||
}
|
||||
_, encrypted := crypto.IsEncrypted(o.UserDefined)
|
||||
if encrypted && !crypto.IsMultiPart(o.UserDefined) {
|
||||
return false
|
||||
}
|
||||
for _, part := range o.Parts {
|
||||
_, err := sio.DecryptedSize(uint64(part.Size))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
// Further check if this object is uploaded using multipart mechanism
|
||||
// by the user and it is not about Erasure internally splitting the
|
||||
// object into parts in PutObject()
|
||||
return !(o.backendType == BackendErasure && len(o.ETag) == 32)
|
||||
}
|
||||
|
||||
// isEncryptedMultipart returns true if the current object is
|
||||
// uploaded by the user using multipart mechanism:
|
||||
// initiate new multipart, upload part, complete upload
|
||||
|
|
|
@ -39,7 +39,11 @@ func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dat
|
|||
}
|
||||
|
||||
for _, dataDir := range dataDirs {
|
||||
if dataDir == "" {
|
||||
if dataDir == errorDir {
|
||||
continue
|
||||
}
|
||||
if dataDir == delMarkerDir {
|
||||
dataDirOccurenceMap[delMarkerDir]++
|
||||
continue
|
||||
}
|
||||
dataDirOccurenceMap[dataDir]++
|
||||
|
@ -96,6 +100,11 @@ func listObjectModtimes(partsMetadata []FileInfo, errs []error) (modTimes []time
|
|||
return modTimes
|
||||
}
|
||||
|
||||
const (
|
||||
errorDir = "error-dir"
|
||||
delMarkerDir = ""
|
||||
)
|
||||
|
||||
// Notes:
|
||||
// There are 5 possible states a disk could be in,
|
||||
// 1. __online__ - has the latest copy of xl.meta - returned by listOnlineDisks
|
||||
|
@ -129,6 +138,7 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
|
|||
dataDirs := make([]string, len(partsMetadata))
|
||||
for idx, fi := range partsMetadata {
|
||||
if errs[idx] != nil {
|
||||
dataDirs[idx] = errorDir
|
||||
continue
|
||||
}
|
||||
dataDirs[idx] = fi.DataDir
|
||||
|
@ -150,9 +160,9 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error)
|
|||
}
|
||||
|
||||
// Returns the latest updated FileInfo files and error in case of failure.
|
||||
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error) (FileInfo, error) {
|
||||
func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error, quorum int) (FileInfo, error) {
|
||||
// There should be atleast half correct entries, if not return failure
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, len(partsMetadata)/2); reducedErr != nil {
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum); reducedErr != nil {
|
||||
return FileInfo{}, reducedErr
|
||||
}
|
||||
|
||||
|
@ -181,7 +191,8 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
|
|||
count++
|
||||
}
|
||||
}
|
||||
if count < len(partsMetadata)/2 {
|
||||
|
||||
if count < quorum {
|
||||
return FileInfo{}, errErasureReadQuorum
|
||||
}
|
||||
|
||||
|
|
|
@ -189,7 +189,7 @@ func TestListOnlineDisks(t *testing.T) {
|
|||
}
|
||||
|
||||
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||
}
|
||||
|
@ -364,7 +364,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
|
|||
}
|
||||
|
||||
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
||||
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||
fi, err := getLatestFileInfo(ctx, partsMetadata, errs, getReadQuorum(len(disks)))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||
}
|
||||
|
@ -420,7 +420,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
|
|||
|
||||
}
|
||||
partsMetadata, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
||||
_, err = getLatestFileInfo(ctx, partsMetadata, errs)
|
||||
_, err = getLatestFileInfo(ctx, partsMetadata, errs, len(disks)/2)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||
}
|
||||
|
|
|
@ -258,6 +258,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
|||
// Re-read when we have lock...
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
|
||||
|
||||
if _, err = getLatestFileInfo(ctx, partsMetadata, errs, er.defaultParityCount); err != nil {
|
||||
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
||||
}
|
||||
|
||||
// List of disks having latest version of the object er.meta
|
||||
// (by modtime).
|
||||
latestDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
@ -853,8 +857,9 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
|||
versionID = nullVersionID
|
||||
}
|
||||
|
||||
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
|
||||
|
||||
// Perform quick read without lock.
|
||||
// This allows to quickly check if all is ok or all are missing.
|
||||
_, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
|
||||
if isAllNotFound(errs) {
|
||||
err = toObjectErr(errFileNotFound, bucket, object)
|
||||
if versionID != "" {
|
||||
|
@ -864,11 +869,6 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
|||
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
|
||||
}
|
||||
|
||||
_, err = getLatestFileInfo(healCtx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return er.purgeObjectDangling(healCtx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
||||
}
|
||||
|
||||
// Heal the object.
|
||||
return er.healObject(healCtx, bucket, object, versionID, opts)
|
||||
}
|
||||
|
|
|
@ -215,7 +215,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||
}
|
||||
|
||||
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
||||
fi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||
}
|
||||
|
@ -240,7 +240,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||
}
|
||||
|
||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
||||
nfi, err := getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||
}
|
||||
|
@ -266,7 +266,7 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||
}
|
||||
|
||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
|
||||
nfi, err = getLatestFileInfo(ctx, fileInfos, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||
}
|
||||
|
|
|
@ -317,7 +317,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
|
|||
// writeQuorum is the min required disks to write data.
|
||||
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
|
||||
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
|
||||
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
|
||||
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs, defaultParityCount)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
|
|
@ -378,15 +378,25 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
|||
//
|
||||
// Implements S3 compatible Upload Part API.
|
||||
func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
||||
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
ctx, err = uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
||||
// Write lock for this part ID.
|
||||
// Held throughout the operation.
|
||||
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
||||
pctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
defer partIDLock.Unlock()
|
||||
|
||||
// Read lock for upload id.
|
||||
// Only held while reading the upload metadata.
|
||||
uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
rctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
readLocked := true
|
||||
defer func() {
|
||||
if readLocked {
|
||||
uploadIDLock.RUnlock()
|
||||
if uploadIDRLock != nil {
|
||||
uploadIDRLock.RUnlock()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -402,23 +412,27 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
// Validates if upload ID exists.
|
||||
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
||||
return pi, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
|
||||
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
|
||||
uploadIDPath, "", false)
|
||||
|
||||
// Unlock upload id locks before, so others can get it.
|
||||
uploadIDRLock.RUnlock()
|
||||
uploadIDRLock = nil
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return pi, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
@ -427,7 +441,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||
if err != nil {
|
||||
return pi, err
|
||||
}
|
||||
|
@ -449,7 +463,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
}
|
||||
}()
|
||||
|
||||
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
||||
erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
@ -486,7 +500,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||
}
|
||||
|
||||
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||
n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum)
|
||||
closeBitrotWriters(writers)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
|
@ -504,31 +518,29 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
}
|
||||
}
|
||||
|
||||
// Unlock here before acquiring write locks all concurrent
|
||||
// PutObjectParts would serialize here updating `xl.meta`
|
||||
uploadIDLock.RUnlock()
|
||||
readLocked = false
|
||||
ctx, err = uploadIDLock.GetLock(ctx, globalOperationTimeout)
|
||||
// Acquire write lock to update metadata.
|
||||
uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
|
||||
wctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return PartInfo{}, err
|
||||
}
|
||||
defer uploadIDLock.Unlock()
|
||||
defer uploadIDWLock.Unlock()
|
||||
|
||||
// Validates if upload ID exists.
|
||||
if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
||||
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
||||
return pi, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
// Rename temporary part file to its final location.
|
||||
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
||||
onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
||||
onlineDisks, err = rename(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||
}
|
||||
|
||||
// Read metadata again because it might be updated with parallel upload of another part.
|
||||
partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return pi, toObjectErr(reducedErr, bucket, object)
|
||||
}
|
||||
|
@ -537,7 +549,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||
if err != nil {
|
||||
return pi, err
|
||||
}
|
||||
|
@ -565,7 +577,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||
}
|
||||
|
||||
// Writes update `xl.meta` format for each disk.
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
||||
if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
|
|
|
@ -203,36 +203,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
|||
return fn(pr, h, opts.CheckPrecondFn, pipeCloser)
|
||||
}
|
||||
|
||||
// GetObject - reads an object erasured coded across multiple
|
||||
// disks. Supports additional parameters like offset and length
|
||||
// which are synonymous with HTTP Range requests.
|
||||
//
|
||||
// startOffset indicates the starting read location of the object.
|
||||
// length indicates the total length of the object.
|
||||
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
|
||||
// Lock the object before reading.
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer lk.RUnlock()
|
||||
|
||||
// Start offset cannot be negative.
|
||||
if startOffset < 0 {
|
||||
logger.LogIf(ctx, errUnexpected, logger.Application)
|
||||
return errUnexpected
|
||||
}
|
||||
|
||||
// Writer cannot be nil.
|
||||
if writer == nil {
|
||||
logger.LogIf(ctx, errUnexpected)
|
||||
return errUnexpected
|
||||
}
|
||||
|
||||
return er.getObject(ctx, bucket, object, startOffset, length, writer, opts)
|
||||
}
|
||||
|
||||
func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
|
||||
// Reorder online disks based on erasure distribution order.
|
||||
// Reorder parts metadata based on erasure distribution order.
|
||||
|
@ -272,6 +242,27 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// This hack is needed to avoid a bug found when overwriting
|
||||
// the inline data object with a un-inlined version, for the
|
||||
// time being we need this as we have released inline-data
|
||||
// version already and this bug is already present in newer
|
||||
// releases.
|
||||
//
|
||||
// This mainly happens with objects < smallFileThreshold when
|
||||
// they are overwritten with un-inlined objects >= smallFileThreshold,
|
||||
// due to a bug in RenameData() the fi.Data is not niled leading to
|
||||
// GetObject thinking that fi.Data is valid while fi.Size has
|
||||
// changed already.
|
||||
if _, ok := fi.Metadata[ReservedMetadataPrefixLower+"inline-data"]; ok {
|
||||
shardFileSize := erasure.ShardFileSize(fi.Size)
|
||||
if shardFileSize >= 0 && shardFileSize >= smallFileThreshold {
|
||||
for i := range metaArr {
|
||||
metaArr[i].Data = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var healOnce sync.Once
|
||||
|
||||
for ; partIndex <= lastPartIndex; partIndex++ {
|
||||
|
@ -355,23 +346,6 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
|
|||
return nil
|
||||
}
|
||||
|
||||
// getObject wrapper for erasure GetObject
|
||||
func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error {
|
||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
if fi.Deleted {
|
||||
if opts.VersionID == "" {
|
||||
return toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
// Make sure to return object info to provide extra information.
|
||||
return toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
}
|
||||
|
||||
return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, fi, metaArr, onlineDisks)
|
||||
}
|
||||
|
||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
||||
if !opts.NoLock {
|
||||
|
@ -787,6 +761,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||
partsMetadata[index].ModTime = modTime
|
||||
}
|
||||
|
||||
if len(inlineBuffers) > 0 {
|
||||
// Set an additional header when data is inlined.
|
||||
for index := range partsMetadata {
|
||||
partsMetadata[index].Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
|
||||
}
|
||||
}
|
||||
|
||||
// Rename the successfully written temporary object to final location.
|
||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
|
|
|
@ -322,9 +322,18 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts)
|
||||
if err != toObjectErr(errFileNotFound, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
|
||||
if err != nil {
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
|
||||
}
|
||||
}
|
||||
if gr != nil {
|
||||
_, err = io.Copy(ioutil.Discard, gr)
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
|
||||
}
|
||||
gr.Close()
|
||||
}
|
||||
|
||||
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks
|
||||
|
@ -358,9 +367,18 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
|||
}
|
||||
z.serverPools[0].erasureDisksMu.Unlock()
|
||||
// Fetch object from store.
|
||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
||||
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
|
||||
if err != nil {
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
|
||||
}
|
||||
}
|
||||
if gr != nil {
|
||||
_, err = io.Copy(ioutil.Discard, gr)
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
|
||||
}
|
||||
gr.Close()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -147,6 +147,38 @@ func (z *erasureServerPools) GetDisksID(ids ...string) []StorageAPI {
|
|||
return res
|
||||
}
|
||||
|
||||
// GetRawFile will return all files with a given raw path to the callback.
|
||||
// For now only direct paths are supported.
|
||||
func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error {
|
||||
for _, s := range z.serverPools {
|
||||
for _, disks := range s.erasureDisks {
|
||||
for i, disk := range disks {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
si, err := disk.StatInfoFile(ctx, volume, file)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
r, err := disk.ReadFileStream(ctx, volume, file, 0, si.Size)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
defer r.Close()
|
||||
did, err := disk.GetDiskID()
|
||||
if err != nil {
|
||||
did = fmt.Sprintf("disk-%d", i)
|
||||
}
|
||||
err = fn(r, disk.Hostname(), did, pathJoin(volume, file), si.Size, si.ModTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) SetDriveCounts() []int {
|
||||
setDriveCounts := make([]int, len(z.serverPools))
|
||||
for i := range z.serverPools {
|
||||
|
@ -244,6 +276,13 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, s
|
|||
return serverPools
|
||||
}
|
||||
|
||||
// poolObjInfo represents the state of an object per pool
|
||||
type poolObjInfo struct {
|
||||
PoolIndex int
|
||||
ObjInfo ObjectInfo
|
||||
Err error
|
||||
}
|
||||
|
||||
// getPoolIdxExisting returns the (first) found object pool index containing an object.
|
||||
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
|
||||
// If the object does not exist ObjectNotFound error is returned.
|
||||
|
@ -254,35 +293,49 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
errs := make([]error, len(z.serverPools))
|
||||
objInfos := make([]ObjectInfo, len(z.serverPools))
|
||||
poolObjInfos := make([]poolObjInfo, len(z.serverPools))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i, pool := range z.serverPools {
|
||||
wg.Add(1)
|
||||
go func(i int, pool *erasureSets) {
|
||||
defer wg.Done()
|
||||
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
|
||||
// remember the pool index, we may sort the slice original index might be lost.
|
||||
pinfo := poolObjInfo{
|
||||
PoolIndex: i,
|
||||
}
|
||||
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
|
||||
poolObjInfos[i] = pinfo
|
||||
}(i, pool)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i, err := range errs {
|
||||
if err == nil {
|
||||
return i, nil
|
||||
// Sort the objInfos such that we always serve latest
|
||||
// this is a defensive change to handle any duplicate
|
||||
// content that may have been created, we always serve
|
||||
// the latest object.
|
||||
sort.Slice(poolObjInfos, func(i, j int) bool {
|
||||
mtime1 := poolObjInfos[i].ObjInfo.ModTime
|
||||
mtime2 := poolObjInfos[j].ObjInfo.ModTime
|
||||
return mtime1.After(mtime2)
|
||||
})
|
||||
|
||||
for _, pinfo := range poolObjInfos {
|
||||
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
|
||||
return -1, pinfo.Err
|
||||
}
|
||||
if isErrObjectNotFound(err) {
|
||||
if isErrObjectNotFound(pinfo.Err) {
|
||||
// No object exists or its a delete marker,
|
||||
// check objInfo to confirm.
|
||||
if objInfos[i].DeleteMarker && objInfos[i].Name != "" {
|
||||
return i, nil
|
||||
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
|
||||
return pinfo.PoolIndex, nil
|
||||
}
|
||||
|
||||
// objInfo is not valid, truly the object doesn't
|
||||
// exist proceed to next pool.
|
||||
continue
|
||||
}
|
||||
return -1, err
|
||||
return pinfo.PoolIndex, nil
|
||||
}
|
||||
|
||||
return -1, toObjectErr(errFileNotFound, bucket, object)
|
||||
|
@ -629,45 +682,86 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
|
|||
unlockOnDefer = true
|
||||
}
|
||||
|
||||
errs := make([]error, len(z.serverPools))
|
||||
grs := make([]*GetObjectReader, len(z.serverPools))
|
||||
|
||||
lockType = noLock // do not take locks at lower levels
|
||||
checkPrecondFn := opts.CheckPrecondFn
|
||||
opts.CheckPrecondFn = nil
|
||||
results := make([]struct {
|
||||
zIdx int
|
||||
gr *GetObjectReader
|
||||
err error
|
||||
}, len(z.serverPools))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i, pool := range z.serverPools {
|
||||
wg.Add(1)
|
||||
go func(i int, pool *erasureSets) {
|
||||
defer wg.Done()
|
||||
grs[i], errs[i] = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||
results[i].zIdx = i
|
||||
results[i].gr, results[i].err = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||
}(i, pool)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var found int = -1
|
||||
for i, err := range errs {
|
||||
if err == nil {
|
||||
// Sort the objInfos such that we always serve latest
|
||||
// this is a defensive change to handle any duplicate
|
||||
// content that may have been created, we always serve
|
||||
// the latest object.
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
var mtimeI, mtimeJ time.Time
|
||||
|
||||
if results[i].gr != nil {
|
||||
mtimeI = results[i].gr.ObjInfo.ModTime
|
||||
}
|
||||
if results[j].gr != nil {
|
||||
mtimeJ = results[j].gr.ObjInfo.ModTime
|
||||
}
|
||||
// On tiebreaks, choose the earliest.
|
||||
if mtimeI.Equal(mtimeJ) {
|
||||
return results[i].zIdx < results[j].zIdx
|
||||
}
|
||||
return mtimeI.After(mtimeJ)
|
||||
})
|
||||
|
||||
var found = -1
|
||||
for i, res := range results {
|
||||
if res.err == nil {
|
||||
// Select earliest result
|
||||
found = i
|
||||
break
|
||||
}
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
for _, grr := range grs {
|
||||
if grr != nil {
|
||||
grr.Close()
|
||||
if !isErrObjectNotFound(res.err) && !isErrVersionNotFound(res.err) {
|
||||
for _, res := range results {
|
||||
res.gr.Close()
|
||||
}
|
||||
}
|
||||
return gr, err
|
||||
return nil, res.err
|
||||
}
|
||||
}
|
||||
|
||||
if found >= 0 {
|
||||
return grs[found], nil
|
||||
}
|
||||
|
||||
if found < 0 {
|
||||
object = decodeDirObject(object)
|
||||
if opts.VersionID != "" {
|
||||
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
|
||||
}
|
||||
return gr, ObjectNotFound{Bucket: bucket, Object: object}
|
||||
}
|
||||
|
||||
// Check preconditions.
|
||||
if checkPrecondFn != nil && checkPrecondFn(results[found].gr.ObjInfo) {
|
||||
for _, res := range results {
|
||||
res.gr.Close()
|
||||
}
|
||||
return nil, PreConditionFailed{}
|
||||
}
|
||||
|
||||
// Close all others
|
||||
for i, res := range results {
|
||||
if i == found {
|
||||
continue
|
||||
}
|
||||
res.gr.Close()
|
||||
}
|
||||
|
||||
return results[found].gr, nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||
|
@ -689,37 +783,48 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
|
|||
}
|
||||
defer lk.RUnlock()
|
||||
|
||||
errs := make([]error, len(z.serverPools))
|
||||
objInfos := make([]ObjectInfo, len(z.serverPools))
|
||||
|
||||
results := make([]struct {
|
||||
zIdx int
|
||||
oi ObjectInfo
|
||||
err error
|
||||
}, len(z.serverPools))
|
||||
opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
|
||||
var wg sync.WaitGroup
|
||||
for i, pool := range z.serverPools {
|
||||
wg.Add(1)
|
||||
go func(i int, pool *erasureSets) {
|
||||
defer wg.Done()
|
||||
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, opts)
|
||||
results[i].zIdx = i
|
||||
results[i].oi, results[i].err = pool.GetObjectInfo(ctx, bucket, object, opts)
|
||||
}(i, pool)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var found int = -1
|
||||
for i, err := range errs {
|
||||
// Sort the objInfos such that we always serve latest
|
||||
// this is a defensive change to handle any duplicate
|
||||
// content that may have been created, we always serve
|
||||
// the latest object.
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
a, b := results[i], results[j]
|
||||
if a.oi.ModTime.Equal(b.oi.ModTime) {
|
||||
// On tiebreak, select the lowest zone index.
|
||||
return a.zIdx < b.zIdx
|
||||
}
|
||||
return a.oi.ModTime.After(b.oi.ModTime)
|
||||
})
|
||||
for _, res := range results {
|
||||
// Return first found.
|
||||
err := res.err
|
||||
if err == nil {
|
||||
found = i
|
||||
break
|
||||
return res.oi, nil
|
||||
}
|
||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
// some errors such as MethodNotAllowed for delete marker
|
||||
// should be returned upwards.
|
||||
return objInfos[i], err
|
||||
return res.oi, err
|
||||
}
|
||||
}
|
||||
|
||||
if found >= 0 {
|
||||
return objInfos[found], nil
|
||||
}
|
||||
|
||||
object = decodeDirObject(object)
|
||||
if opts.VersionID != "" {
|
||||
return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
|
||||
|
|
|
@ -19,11 +19,13 @@ package cmd
|
|||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -499,6 +501,21 @@ type auditObjectOp struct {
|
|||
Disks []string `json:"disks"`
|
||||
}
|
||||
|
||||
type auditObjectErasureMap struct {
|
||||
sync.Map
|
||||
}
|
||||
|
||||
// Define how to marshal auditObjectErasureMap so it can be
|
||||
// printed in the audit webhook notification request.
|
||||
func (a *auditObjectErasureMap) MarshalJSON() ([]byte, error) {
|
||||
mapCopy := make(map[string]auditObjectOp)
|
||||
a.Range(func(k, v interface{}) bool {
|
||||
mapCopy[k.(string)] = v.(auditObjectOp)
|
||||
return true
|
||||
})
|
||||
return json.Marshal(mapCopy)
|
||||
}
|
||||
|
||||
func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) {
|
||||
if len(logger.AuditTargets) == 0 {
|
||||
return
|
||||
|
@ -512,20 +529,20 @@ func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjec
|
|||
Disks: set.getEndpoints(),
|
||||
}
|
||||
|
||||
var objectErasureSetTag map[string]auditObjectOp
|
||||
var objectErasureSetTag *auditObjectErasureMap
|
||||
reqInfo := logger.GetReqInfo(ctx)
|
||||
for _, kv := range reqInfo.GetTags() {
|
||||
if kv.Key == objectErasureMapKey {
|
||||
objectErasureSetTag = kv.Val.(map[string]auditObjectOp)
|
||||
objectErasureSetTag = kv.Val.(*auditObjectErasureMap)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if objectErasureSetTag == nil {
|
||||
objectErasureSetTag = make(map[string]auditObjectOp)
|
||||
objectErasureSetTag = &auditObjectErasureMap{}
|
||||
}
|
||||
|
||||
objectErasureSetTag[object] = op
|
||||
objectErasureSetTag.Store(object, op)
|
||||
reqInfo.SetTags(objectErasureMapKey, objectErasureSetTag)
|
||||
}
|
||||
|
||||
|
@ -863,10 +880,19 @@ func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string,
|
|||
}
|
||||
|
||||
func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent string) bool {
|
||||
if parent == "." {
|
||||
var isParentDirObject func(string) bool
|
||||
isParentDirObject = func(p string) bool {
|
||||
if p == "." || p == SlashSeparator {
|
||||
return false
|
||||
}
|
||||
return s.getHashedSet(parent).parentDirIsObject(ctx, bucket, parent)
|
||||
if s.getHashedSet(p).parentDirIsObject(ctx, bucket, p) {
|
||||
// If there is already a file at prefix "p", return true.
|
||||
return true
|
||||
}
|
||||
// Check if there is a file as one of the parent paths.
|
||||
return isParentDirObject(path.Dir(p))
|
||||
}
|
||||
return isParentDirObject(parent)
|
||||
}
|
||||
|
||||
// PutObject - writes an object to hashedSet based on the object name.
|
||||
|
|
14
cmd/fs-v1.go
14
cmd/fs-v1.go
|
@ -26,6 +26,7 @@ import (
|
|||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -1607,3 +1608,16 @@ func (fs *FSObjects) ReadHealth(ctx context.Context) bool {
|
|||
_, err := os.Stat(fs.fsPath)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (fs *FSObjects) GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error {
|
||||
f, err := os.Open(filepath.Join(fs.fsPath, volume, file))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
st, err := f.Stat()
|
||||
if err != nil || st.IsDir() {
|
||||
return nil
|
||||
}
|
||||
return fn(f, "fs", fs.fsUUID, file, st.Size(), st.ModTime())
|
||||
}
|
||||
|
|
|
@ -380,7 +380,7 @@ func (l *s3Objects) ListObjects(ctx context.Context, bucket string, prefix strin
|
|||
|
||||
// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
|
||||
func (l *s3Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, e error) {
|
||||
result, err := l.Client.ListObjectsV2(bucket, prefix, continuationToken, fetchOwner, delimiter, maxKeys)
|
||||
result, err := l.Client.ListObjectsV2(bucket, prefix, startAfter, continuationToken, delimiter, maxKeys)
|
||||
if err != nil {
|
||||
return loi, minio.ErrorRespToObjectError(err, bucket)
|
||||
}
|
||||
|
@ -670,7 +670,7 @@ func (l *s3Objects) AbortMultipartUpload(ctx context.Context, bucket string, obj
|
|||
|
||||
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
|
||||
func (l *s3Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {
|
||||
etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts))
|
||||
etag, err := l.Client.CompleteMultipartUpload(ctx, bucket, object, uploadID, minio.ToMinioClientCompleteParts(uploadedParts), miniogo.PutObjectOptions{})
|
||||
if err != nil {
|
||||
return oi, minio.ErrorRespToObjectError(err, bucket, object)
|
||||
}
|
||||
|
|
|
@ -188,11 +188,16 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
|
|||
if err != nil {
|
||||
return true
|
||||
}
|
||||
oFIV, err := existing.fileInfo(o.Bucket)
|
||||
oFIV, err := other.fileInfo(o.Bucket)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
// Replace if modtime is newer
|
||||
if !oFIV.ModTime.Equal(eFIV.ModTime) {
|
||||
return oFIV.ModTime.After(eFIV.ModTime)
|
||||
}
|
||||
// Use NumVersions as a final tiebreaker.
|
||||
return oFIV.NumVersions > eFIV.NumVersions
|
||||
})
|
||||
if entries.len() > o.Limit {
|
||||
allAtEOF = false
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
xioutil "github.com/minio/minio/pkg/ioutil"
|
||||
)
|
||||
|
@ -293,6 +294,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
|
|||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return waitForHTTPStream(respBody, wr)
|
||||
}
|
||||
|
||||
|
|
|
@ -285,3 +285,10 @@ func (d *naughtyDisk) VerifyFile(ctx context.Context, volume, path string, fi Fi
|
|||
}
|
||||
return d.disk.VerifyFile(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return stat, err
|
||||
}
|
||||
return d.disk.StatInfoFile(ctx, volume, path)
|
||||
}
|
||||
|
|
|
@ -805,6 +805,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||
|
||||
// Close - calls the cleanup actions in reverse order
|
||||
func (g *GetObjectReader) Close() error {
|
||||
if g == nil {
|
||||
return nil
|
||||
}
|
||||
// sync.Once is used here to ensure that Close() is
|
||||
// idempotent.
|
||||
g.once.Do(func() {
|
||||
|
|
|
@ -386,6 +386,12 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
|||
var rangeErr error
|
||||
rangeHeader := r.Header.Get(xhttp.Range)
|
||||
if rangeHeader != "" {
|
||||
// Both 'Range' and 'partNumber' cannot be specified at the same time
|
||||
if opts.PartNumber > 0 {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
rs, rangeErr = parseRequestRangeSpec(rangeHeader)
|
||||
// Handle only errInvalidRange. Ignore other
|
||||
// parse error and treat it as regular Get
|
||||
|
@ -399,12 +405,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
|||
}
|
||||
}
|
||||
|
||||
// Both 'bytes' and 'partNumber' cannot be specified at the same time
|
||||
if rs != nil && opts.PartNumber > 0 {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
// Validate pre-conditions if any.
|
||||
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
|
||||
if objectAPI.IsEncryptionSupported() {
|
||||
|
@ -680,6 +680,12 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
|
|||
var rs *HTTPRangeSpec
|
||||
rangeHeader := r.Header.Get(xhttp.Range)
|
||||
if rangeHeader != "" {
|
||||
// Both 'Range' and 'partNumber' cannot be specified at the same time
|
||||
if opts.PartNumber > 0 {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
|
||||
return
|
||||
}
|
||||
|
||||
if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
|
||||
// Handle only errInvalidRange. Ignore other
|
||||
// parse error and treat it as regular Get
|
||||
|
@ -693,12 +699,6 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
|
|||
}
|
||||
}
|
||||
|
||||
// Both 'bytes' and 'partNumber' cannot be specified at the same time
|
||||
if rs != nil && opts.PartNumber > 0 {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
|
||||
return
|
||||
}
|
||||
|
||||
// Set encryption response headers
|
||||
if objectAPI.IsEncryptionSupported() {
|
||||
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
|
||||
|
@ -1683,6 +1683,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
|||
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
|
||||
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
|
||||
}
|
||||
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
|
||||
size, _ := objInfo.GetActualSize()
|
||||
defer globalReplicationStats.Update(bucket, size, replication.Replica, replication.StatusType(""), replication.ObjectReplicationType)
|
||||
}
|
||||
setPutObjHeaders(w, objInfo, false)
|
||||
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
|
@ -1974,9 +1978,11 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
|
|||
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
|
||||
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
|
||||
}
|
||||
|
||||
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
|
||||
size, _ := objInfo.GetActualSize()
|
||||
defer globalReplicationStats.Update(bucket, size, replication.Replica, replication.StatusType(""), replication.ObjectReplicationType)
|
||||
}
|
||||
}
|
||||
|
||||
untar(hreader, putObjectTar)
|
||||
|
||||
w.Header()[xhttp.ETag] = []string{`"` + hex.EncodeToString(hreader.MD5Current()) + `"`}
|
||||
|
@ -3081,6 +3087,10 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
|||
if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
|
||||
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
|
||||
}
|
||||
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
|
||||
size, _ := objInfo.GetActualSize()
|
||||
defer globalReplicationStats.Update(bucket, size, replication.Replica, replication.StatusType(""), replication.ObjectReplicationType)
|
||||
}
|
||||
|
||||
// Write success response.
|
||||
writeSuccessResponseXML(w, encodedSuccessResponse)
|
||||
|
|
|
@ -127,6 +127,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
|||
}
|
||||
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery))
|
||||
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
|
||||
req.Header.Set("Expect", "100-continue")
|
||||
if length > 0 {
|
||||
req.ContentLength = length
|
||||
}
|
||||
|
|
|
@ -2427,7 +2427,7 @@ func (s *TestSuiteCommon) TestObjectMultipartListError(c *check) {
|
|||
c.Assert(err, nil)
|
||||
// Since max-keys parameter in the ListMultipart request set to invalid value of -2,
|
||||
// its expected to fail with error message "InvalidArgument".
|
||||
verifyError(c, response4, "InvalidArgument", "Argument max-parts must be an integer between 0 and 2147483647", http.StatusBadRequest)
|
||||
verifyError(c, response4, "InvalidArgument", "Part number must be an integer between 1 and 10000, inclusive", http.StatusBadRequest)
|
||||
}
|
||||
|
||||
// TestObjectValidMD5 - First uploads an object with a valid Content-Md5 header and verifies the status,
|
||||
|
|
|
@ -70,6 +70,7 @@ type StorageAPI interface {
|
|||
CheckFile(ctx context.Context, volume string, path string) (err error)
|
||||
Delete(ctx context.Context, volume string, path string, recursive bool) (err error)
|
||||
VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error)
|
||||
|
||||
// Write all data, syncs the data to disk.
|
||||
// Should be used for smaller payloads.
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/http"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/cmd/rest"
|
||||
|
@ -207,7 +206,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||
pw.CloseWithError(cache.serializeTo(pw))
|
||||
}()
|
||||
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
pr.Close()
|
||||
return cache, err
|
||||
|
@ -247,7 +246,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
|
|||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err = msgp.Decode(respBody, &info); err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
@ -270,7 +269,7 @@ func (client *storageRESTClient) MakeVolBulk(ctx context.Context, volumes ...str
|
|||
values := make(url.Values)
|
||||
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
|
||||
respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -279,7 +278,7 @@ func (client *storageRESTClient) MakeVol(ctx context.Context, volume string) (er
|
|||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -289,7 +288,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
vinfos := VolsInfo(vols)
|
||||
err = msgp.Decode(respBody, &vinfos)
|
||||
return vinfos, err
|
||||
|
@ -303,7 +302,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
err = msgp.Decode(respBody, &vol)
|
||||
return vol, err
|
||||
}
|
||||
|
@ -316,7 +315,7 @@ func (client *storageRESTClient) DeleteVol(ctx context.Context, volume string, f
|
|||
values.Set(storageRESTForceDelete, "true")
|
||||
}
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -327,7 +326,7 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string,
|
|||
values.Set(storageRESTFilePath, path)
|
||||
reader := bytes.NewReader(buf)
|
||||
respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -337,11 +336,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
|
|||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
||||
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = waitForHTTPResponse(respBody)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -356,7 +351,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -371,7 +366,7 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -387,7 +382,7 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -397,7 +392,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
|
|||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b)))
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -407,7 +402,7 @@ func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, p
|
|||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -424,7 +419,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -442,7 +437,7 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -473,7 +468,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
|
|||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
|
||||
dec := msgpNewReader(respBody)
|
||||
defer readMsgpReaderPool.Put(dec)
|
||||
|
@ -491,7 +486,7 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return ioutil.ReadAll(respBody)
|
||||
}
|
||||
|
||||
|
@ -527,7 +522,7 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
n, err := io.ReadFull(respBody, buf)
|
||||
return int64(n), err
|
||||
}
|
||||
|
@ -542,7 +537,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
err = gob.NewDecoder(respBody).Decode(&entries)
|
||||
return entries, err
|
||||
}
|
||||
|
@ -555,7 +550,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path
|
|||
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -579,7 +574,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
|||
errs = make([]error, len(versions))
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
for i := range errs {
|
||||
errs[i] = err
|
||||
|
@ -618,7 +613,7 @@ func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcP
|
|||
values.Set(storageRESTDstVolume, dstVolume)
|
||||
values.Set(storageRESTDstPath, dstPath)
|
||||
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -633,7 +628,7 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
|||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -651,6 +646,23 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
|||
return toStorageErr(verifyResp.Err)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
respBody, err := client.call(ctx, storageRESTMethodStatInfoFile, values, nil, -1)
|
||||
if err != nil {
|
||||
return stat, err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
respReader, err := waitForHTTPResponse(respBody)
|
||||
if err != nil {
|
||||
return stat, err
|
||||
}
|
||||
err = stat.DecodeMsg(msgpNewReader(respReader))
|
||||
return stat, err
|
||||
}
|
||||
|
||||
// Close - marks the client as closed.
|
||||
func (client *storageRESTClient) Close() error {
|
||||
client.restClient.Close()
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v31" // Added RenameData with fileInfo()
|
||||
storageRESTVersion = "v32" // Inline bugfix needs all servers to be updated
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
@ -51,6 +51,7 @@ const (
|
|||
storageRESTMethodRenameFile = "/renamefile"
|
||||
storageRESTMethodVerifyFile = "/verifyfile"
|
||||
storageRESTMethodWalkDir = "/walkdir"
|
||||
storageRESTMethodStatInfoFile = "/statfile"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -770,23 +770,6 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// drainCloser can be used for wrapping an http response.
|
||||
// It will drain the body before closing.
|
||||
type drainCloser struct {
|
||||
rc io.ReadCloser
|
||||
}
|
||||
|
||||
// Read forwards the read operation.
|
||||
func (f drainCloser) Read(p []byte) (n int, err error) {
|
||||
return f.rc.Read(p)
|
||||
}
|
||||
|
||||
// Close drains the body and closes the upstream.
|
||||
func (f drainCloser) Close() error {
|
||||
xhttp.DrainBody(f.rc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// httpStreamResponse allows streaming a response, but still send an error.
|
||||
type httpStreamResponse struct {
|
||||
done chan error
|
||||
|
@ -878,7 +861,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||
case 0:
|
||||
// 0 is unbuffered, copy the rest.
|
||||
_, err := io.Copy(w, respBody)
|
||||
respBody.Close()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
|
@ -888,18 +870,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
respBody.Close()
|
||||
return errors.New(string(errorText))
|
||||
case 3:
|
||||
// gob style is already deprecated, we can remove this when
|
||||
// storage API version will be greater or equal to 23.
|
||||
defer respBody.Close()
|
||||
dec := gob.NewDecoder(respBody)
|
||||
var err error
|
||||
if de := dec.Decode(&err); de == nil {
|
||||
return err
|
||||
}
|
||||
return errors.New("rpc error")
|
||||
case 2:
|
||||
// Block of data
|
||||
var tmp [4]byte
|
||||
|
@ -916,7 +887,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||
case 32:
|
||||
continue
|
||||
default:
|
||||
go xhttp.DrainBody(respBody)
|
||||
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
|
||||
}
|
||||
}
|
||||
|
@ -1023,6 +993,23 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// StatInfoFile returns file stat info.
|
||||
func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
vars := mux.Vars(r)
|
||||
volume := vars[storageRESTVolume]
|
||||
filePath := vars[storageRESTFilePath]
|
||||
done := keepHTTPResponseAlive(w)
|
||||
si, err := s.storage.StatInfoFile(r.Context(), volume, filePath)
|
||||
done(err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
msgp.Encode(w, &si)
|
||||
}
|
||||
|
||||
// registerStorageRPCRouter - register storage rpc router.
|
||||
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) {
|
||||
for _, ep := range endpointServerPools {
|
||||
|
@ -1093,6 +1080,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
|||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTRecursive)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,12 +32,13 @@ func _() {
|
|||
_ = x[storageMetricUpdateMetadata-21]
|
||||
_ = x[storageMetricReadVersion-22]
|
||||
_ = x[storageMetricReadAll-23]
|
||||
_ = x[storageMetricLast-24]
|
||||
_ = x[storageStatInfoFile-24]
|
||||
_ = x[storageMetricLast-25]
|
||||
}
|
||||
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllLast"
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllstorageStatInfoFileLast"
|
||||
|
||||
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 237}
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 252, 256}
|
||||
|
||||
func (i storageMetric) String() string {
|
||||
if i >= storageMetric(len(_storageMetric_index)-1) {
|
||||
|
|
|
@ -456,7 +456,7 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
|
|||
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||
IdleConnTimeout: 15 * time.Second,
|
||||
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
TLSHandshakeTimeout: 15 * time.Second,
|
||||
ExpectContinueTimeout: 15 * time.Second,
|
||||
TLSClientConfig: tlsConfig,
|
||||
|
|
|
@ -57,6 +57,7 @@ const (
|
|||
storageMetricUpdateMetadata
|
||||
storageMetricReadVersion
|
||||
storageMetricReadAll
|
||||
storageStatInfoFile
|
||||
|
||||
// .... add more
|
||||
|
||||
|
@ -619,6 +620,22 @@ func storageTrace(s storageMetric, startTime time.Time, duration time.Duration,
|
|||
}
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
defer p.updateStorageMetrics(storageStatInfoFile, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return StatInfo{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
return StatInfo{}, err
|
||||
}
|
||||
|
||||
return p.storage.StatInfoFile(ctx, volume, path)
|
||||
}
|
||||
|
||||
// Update storage metrics
|
||||
func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...string) func() {
|
||||
startTime := time.Now()
|
||||
|
|
|
@ -1078,6 +1078,11 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||
|
||||
if readData {
|
||||
if len(fi.Data) > 0 || fi.Size == 0 {
|
||||
if len(fi.Data) > 0 {
|
||||
if _, ok := fi.Metadata[ReservedMetadataPrefixLower+"inline-data"]; !ok {
|
||||
fi.Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
|
||||
}
|
||||
}
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
|
@ -2018,6 +2023,8 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
|||
// Purge the destination path as we are not preserving anything
|
||||
// versioned object was not requested.
|
||||
oldDstDataPath = pathJoin(dstVolumeDir, dstPath, ofi.DataDir)
|
||||
xlMeta.data.remove(nullVersionID)
|
||||
xlMeta.data.remove(ofi.DataDir)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2206,3 +2213,32 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return stat, err
|
||||
}
|
||||
|
||||
// Stat a volume entry.
|
||||
if err = Access(volumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return stat, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return stat, errFaultyDisk
|
||||
} else if osIsPermission(err) {
|
||||
return stat, errVolumeAccessDenied
|
||||
}
|
||||
return stat, err
|
||||
}
|
||||
filePath := pathJoin(volumeDir, path)
|
||||
if err := checkPathLength(filePath); err != nil {
|
||||
return stat, err
|
||||
}
|
||||
st, _ := Lstat(filePath)
|
||||
if st == nil {
|
||||
return stat, errPathNotFound
|
||||
}
|
||||
|
||||
return StatInfo{ModTime: st.ModTime(), Size: st.Size()}, nil
|
||||
}
|
||||
|
|
91
docs/debugging/inspect/main.go
Normal file
91
docs/debugging/inspect/main.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/secure-io/sio-go"
|
||||
)
|
||||
|
||||
var (
|
||||
key = flag.String("key", "", "decryption string")
|
||||
//js = flag.Bool("json", false, "expect json input")
|
||||
)
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
args := flag.Args()
|
||||
switch len(flag.Args()) {
|
||||
case 0:
|
||||
// Read from stdin, write to stdout.
|
||||
decrypt(*key, os.Stdin, os.Stdout)
|
||||
return
|
||||
case 1:
|
||||
r, err := os.Open(args[0])
|
||||
fatalErr(err)
|
||||
defer r.Close()
|
||||
dstName := strings.TrimSuffix(args[0], ".enc") + ".zip"
|
||||
w, err := os.Create(dstName)
|
||||
fatalErr(err)
|
||||
defer w.Close()
|
||||
if len(*key) == 0 {
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
fmt.Print("Enter Decryption Key: ")
|
||||
|
||||
text, _ := reader.ReadString('\n')
|
||||
// convert CRLF to LF
|
||||
*key = strings.Replace(text, "\n", "", -1)
|
||||
}
|
||||
decrypt(*key, r, w)
|
||||
fmt.Println("Output decrypted to", dstName)
|
||||
return
|
||||
default:
|
||||
fatalIf(true, "Only 1 file can be decrypted")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func decrypt(keyHex string, r io.Reader, w io.Writer) {
|
||||
keyHex = strings.TrimSpace(keyHex)
|
||||
fatalIf(len(keyHex) != 72, "Unexpected key length: %d, want 72", len(keyHex))
|
||||
id, err := hex.DecodeString(keyHex[:8])
|
||||
fatalErr(err)
|
||||
key, err := hex.DecodeString(keyHex[8:])
|
||||
fatalErr(err)
|
||||
|
||||
// Verify that CRC is ok.
|
||||
want := binary.LittleEndian.Uint32(id)
|
||||
got := crc32.ChecksumIEEE(key)
|
||||
fatalIf(want != got, "Invalid key checksum, want %x, got %x", want, got)
|
||||
|
||||
stream, err := sio.AES_256_GCM.Stream(key)
|
||||
fatalErr(err)
|
||||
|
||||
// Zero nonce, we only use each key once, and 32 bytes is plenty.
|
||||
nonce := make([]byte, stream.NonceSize())
|
||||
encr := stream.DecryptReader(r, nonce, nil)
|
||||
_, err = io.Copy(w, encr)
|
||||
fatalErr(err)
|
||||
}
|
||||
|
||||
func fatalErr(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
func fatalIf(b bool, msg string, v ...interface{}) {
|
||||
if !b {
|
||||
return
|
||||
}
|
||||
log.Fatalf(msg, v...)
|
||||
}
|
3
go.mod
3
go.mod
|
@ -47,7 +47,7 @@ require (
|
|||
github.com/minio/cli v1.22.0
|
||||
github.com/minio/highwayhash v1.0.2
|
||||
github.com/minio/md5-simd v1.1.1 // indirect
|
||||
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78
|
||||
github.com/minio/minio-go/v7 v7.0.14
|
||||
github.com/minio/selfupdate v0.3.1
|
||||
github.com/minio/sha256-simd v1.0.0
|
||||
github.com/minio/simdjson-go v0.2.1
|
||||
|
@ -77,7 +77,6 @@ require (
|
|||
github.com/tidwall/gjson v1.6.8
|
||||
github.com/tidwall/sjson v1.0.4
|
||||
github.com/tinylib/msgp v1.1.3
|
||||
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 // indirect
|
||||
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
|
||||
github.com/willf/bitset v1.1.11 // indirect
|
||||
github.com/willf/bloom v2.0.3+incompatible
|
||||
|
|
13
go.sum
13
go.sum
|
@ -365,8 +365,6 @@ github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
|||
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
|
||||
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
||||
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
||||
github.com/magefile/mage v1.10.0 h1:3HiXzCUY12kh9bIuyXShaVe529fJfyqoVM42o/uom2g=
|
||||
github.com/magefile/mage v1.10.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
|
||||
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
|
||||
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
|
@ -398,8 +396,8 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT
|
|||
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
|
||||
github.com/minio/md5-simd v1.1.1 h1:9ojcLbuZ4gXbB2sX53MKn8JUZ0sB/2wfwsEcRw+I08U=
|
||||
github.com/minio/md5-simd v1.1.1/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
|
||||
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78 h1:v7OMbUnWkyRlO2MZ5AuYioELhwXF/BgZEznrQ1drBEM=
|
||||
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78/go.mod h1:mTh2uJuAbEqdhMVl6CMIIZLUeiMiWtJR4JB8/5g2skw=
|
||||
github.com/minio/minio-go/v7 v7.0.14 h1:T7cw8P586gVwEEd0y21kTYtloD576XZgP62N8pE130s=
|
||||
github.com/minio/minio-go/v7 v7.0.14/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs=
|
||||
github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs=
|
||||
github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM=
|
||||
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
|
||||
|
@ -556,8 +554,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
|
|||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
github.com/sirupsen/logrus v1.8.0 h1:nfhvjKcUMhBMVqbKHJlk5RPrrfYr/NMo3692g0dwfWU=
|
||||
github.com/sirupsen/logrus v1.8.0/go.mod h1:4GuYW9TZmE769R5STWrRakJc4UqQ3+QQ95fyz7ENv1A=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
|
||||
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
|
||||
|
@ -598,8 +596,6 @@ github.com/tinylib/msgp v1.1.3 h1:3giwAkmtaEDLSV0MdO1lDLuPgklgPzmk8H9+So2BVfA=
|
|||
github.com/tinylib/msgp v1.1.3/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE=
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4=
|
||||
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
||||
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
|
@ -657,6 +653,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
|
|||
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
|
|
|
@ -53,6 +53,8 @@ const (
|
|||
HealthInfoAdminAction = "admin:OBDInfo"
|
||||
// BandwidthMonitorAction - allow monitoring bandwidth usage
|
||||
BandwidthMonitorAction = "admin:BandwidthMonitor"
|
||||
// InspectDataAction - allows downloading raw files from backend
|
||||
InspectDataAction = "admin:InspectData"
|
||||
|
||||
// ServerUpdateAdminAction - allow MinIO binary update
|
||||
ServerUpdateAdminAction = "admin:ServerUpdate"
|
||||
|
|
|
@ -526,6 +526,17 @@ OuterLoop:
|
|||
}
|
||||
|
||||
outputQueue[len(outputQueue)-1] = outputRecord
|
||||
if s3Select.statement.LimitReached() {
|
||||
if !sendRecord() {
|
||||
break
|
||||
}
|
||||
if err = writer.Finish(s3Select.getProgress()); err != nil {
|
||||
// FIXME: log this error.
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if len(outputQueue) < cap(outputQueue) {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -403,6 +403,74 @@ func TestJSONQueries(t *testing.T) {
|
|||
query: `SELECT 3.0 / 2, 5 / 2.0 FROM S3Object LIMIT 1`,
|
||||
wantResult: `{"_1":1.5,"_2":2.5}`,
|
||||
},
|
||||
{
|
||||
name: "limit-1",
|
||||
query: `SELECT * FROM S3Object[*].elements[*] LIMIT 1`,
|
||||
requestXML: []byte(`
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<SelectObjectContentRequest>
|
||||
<Expression>select * from s3object[*].elements[*] s where s.element_type = '__elem__merfu'</Expression>
|
||||
<ExpressionType>SQL</ExpressionType>
|
||||
<InputSerialization>
|
||||
<CompressionType>NONE</CompressionType>
|
||||
<JSON>
|
||||
<Type>DOCUMENT</Type>
|
||||
</JSON>
|
||||
</InputSerialization>
|
||||
<OutputSerialization>
|
||||
<JSON>
|
||||
</JSON>
|
||||
</OutputSerialization>
|
||||
<RequestProgress>
|
||||
<Enabled>FALSE</Enabled>
|
||||
</RequestProgress>
|
||||
</SelectObjectContentRequest>`),
|
||||
wantResult: `{"element_type":"__elem__merfu","element_id":"d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb","attributes":{"__attr__image_dpi":300,"__attr__image_size":[2550,3299],"__attr__image_index":2,"__attr__image_format":"JPEG","__attr__file_extension":"jpg","__attr__data":null}}`,
|
||||
withJSON: `
|
||||
{
|
||||
"name": "small_pdf1.pdf",
|
||||
"lume_id": "9507193e-572d-4f95-bcf1-e9226d96be65",
|
||||
"elements": [
|
||||
{
|
||||
"element_type": "__elem__image",
|
||||
"element_id": "859d09c4-7cf1-4a37-9674-3a7de8b56abc",
|
||||
"attributes": {
|
||||
"__attr__image_dpi": 300,
|
||||
"__attr__image_size": [
|
||||
2550,
|
||||
3299
|
||||
],
|
||||
"__attr__image_index": 1,
|
||||
"__attr__image_format": "JPEG",
|
||||
"__attr__file_extension": "jpg",
|
||||
"__attr__data": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"element_type": "__elem__merfu",
|
||||
"element_id": "d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb",
|
||||
"attributes": {
|
||||
"__attr__image_dpi": 300,
|
||||
"__attr__image_size": [
|
||||
2550,
|
||||
3299
|
||||
],
|
||||
"__attr__image_index": 2,
|
||||
"__attr__image_format": "JPEG",
|
||||
"__attr__file_extension": "jpg",
|
||||
"__attr__data": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"data": "asdascasdc1234e123erdasdas"
|
||||
}`,
|
||||
},
|
||||
{
|
||||
name: "limit-2",
|
||||
query: `select * from s3object[*].person[*] limit 1`,
|
||||
wantResult: `{"Id":1,"Name":"Anshu","Address":"Templestowe","Car":"Jeep"}`,
|
||||
withJSON: `{ "person": [ { "Id": 1, "Name": "Anshu", "Address": "Templestowe", "Car": "Jeep" }, { "Id": 2, "Name": "Ben Mostafa", "Address": "Las Vegas", "Car": "Mustang" }, { "Id": 3, "Name": "Rohan Wood", "Address": "Wooddon", "Car": "VW" } ] }`,
|
||||
},
|
||||
}
|
||||
|
||||
defRequest := `<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
|
|
@ -331,6 +331,7 @@ func TestFromClauseJSONPath(t *testing.T) {
|
|||
"select * from s3object[*].books[*].name as s",
|
||||
"select * from s3object s where name > 2",
|
||||
"select * from s3object[*].name as s where name > 2",
|
||||
"select * from s3object[*].books[*] limit 1",
|
||||
}
|
||||
for i, tc := range cases {
|
||||
err := p.ParseString(tc, &s)
|
||||
|
|
|
@ -301,9 +301,8 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
|
|||
// .. WHERE ..`
|
||||
|
||||
// Update count of records output.
|
||||
if e.limitValue > -1 {
|
||||
e.outputCount++
|
||||
}
|
||||
|
||||
return input.Clone(output), nil
|
||||
}
|
||||
|
||||
|
@ -327,9 +326,7 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
|
|||
}
|
||||
|
||||
// Update count of records output.
|
||||
if e.limitValue > -1 {
|
||||
e.outputCount++
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue