minio/cmd/gateway/oss/gateway-oss.go
2018-03-26 09:11:39 -07:00

991 lines
31 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Minio Cloud Storage, (C) 2017, 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package oss
import (
"context"
"encoding/xml"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/dustin/go-humanize"
"github.com/minio/cli"
"github.com/minio/minio-go/pkg/policy"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/errors"
"github.com/minio/minio/pkg/hash"
)
const (
ossS3MinPartSize = 5 * humanize.MiByte
ossMaxParts = 1000
ossMaxKeys = 1000
ossBackend = "oss"
)
func init() {
const ossGatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} [ENDPOINT]
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
ENDPOINT:
OSS server endpoint. Default ENDPOINT is https://oss.aliyuncs.com
ENVIRONMENT VARIABLES:
ACCESS:
MINIO_ACCESS_KEY: Username or access key of OSS storage.
MINIO_SECRET_KEY: Password or secret key of OSS storage.
BROWSER:
MINIO_BROWSER: To disable web browser access, set this value to "off".
UPDATE:
MINIO_UPDATE: To turn off in-place upgrades, set this value to "off".
DOMAIN:
MINIO_DOMAIN: To enable virtual-host-style requests. Set this value to Minio host domain name.
EXAMPLES:
1. Start minio gateway server for Aliyun OSS backend.
$ export MINIO_ACCESS_KEY=accesskey
$ export MINIO_SECRET_KEY=secretkey
$ {{.HelpName}}
2. Start minio gateway server for Aliyun OSS backend on custom endpoint.
$ export MINIO_ACCESS_KEY=Q3AM3UQ867SPQQA43P2F
$ export MINIO_SECRET_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
$ {{.HelpName}} https://oss.example.com
`
minio.RegisterGatewayCommand(cli.Command{
Name: "oss",
Usage: "Alibaba Cloud (Aliyun) Object Storage Service (OSS).",
Action: ossGatewayMain,
CustomHelpTemplate: ossGatewayTemplate,
HideHelpCommand: true,
})
}
// Handler for 'minio gateway oss' command line.
func ossGatewayMain(ctx *cli.Context) {
if ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, ossBackend, 1)
}
// Validate gateway arguments.
host := ctx.Args().First()
minio.FatalIf(minio.ValidateGatewayArguments(ctx.GlobalString("address"), host), "Invalid argument")
minio.StartGateway(ctx, &OSS{host})
}
// OSS implements Gateway.
type OSS struct {
host string
}
// Name implements Gateway interface.
func (g *OSS) Name() string {
return ossBackend
}
// NewGatewayLayer implements Gateway interface and returns OSS ObjectLayer.
func (g *OSS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
var err error
// Regions and endpoints
// https://www.alibabacloud.com/help/doc-detail/31837.htm
if g.host == "" {
g.host = "https://oss.aliyuncs.com"
}
// Initialize oss client object.
client, err := oss.New(g.host, creds.AccessKey, creds.SecretKey)
if err != nil {
return nil, err
}
return &ossObjects{
Client: client,
}, nil
}
// Production - oss is not production ready yet.
func (g *OSS) Production() bool {
return false
}
// appendS3MetaToOSSOptions converts metadata meant for S3 PUT/COPY
// object into oss.Option.
//
// S3 user-metadata is translated to OSS metadata by removing the
// `X-Amz-Meta-` prefix and converted into `X-Oss-Meta-`.
//
// Header names are canonicalized as in http.Header.
func appendS3MetaToOSSOptions(opts []oss.Option, s3Metadata map[string]string) ([]oss.Option, error) {
if opts == nil {
opts = make([]oss.Option, 0, len(s3Metadata))
}
for k, v := range s3Metadata {
k = http.CanonicalHeaderKey(k)
switch {
case strings.HasPrefix(k, "X-Amz-Meta-"):
metaKey := k[len("X-Amz-Meta-"):]
// NOTE(timonwong): OSS won't allow headers with underscore(_).
if strings.Contains(metaKey, "_") {
return nil, errors.Trace(minio.UnsupportedMetadata{})
}
opts = append(opts, oss.Meta(metaKey, v))
case k == "X-Amz-Acl":
// Valid values: public-read, private, and public-read-write
opts = append(opts, oss.ObjectACL(oss.ACLType(v)))
case k == "X-Amz-Server-Side-Encryption":
opts = append(opts, oss.ServerSideEncryption(v))
case k == "X-Amz-Copy-Source-If-Match":
opts = append(opts, oss.CopySourceIfMatch(v))
case k == "X-Amz-Copy-Source-If-None-Match":
opts = append(opts, oss.CopySourceIfNoneMatch(v))
case k == "X-Amz-Copy-Source-If-Unmodified-Since":
if v, err := http.ParseTime(v); err == nil {
opts = append(opts, oss.CopySourceIfUnmodifiedSince(v))
}
case k == "X-Amz-Copy-Source-If-Modified-Since":
if v, err := http.ParseTime(v); err == nil {
opts = append(opts, oss.CopySourceIfModifiedSince(v))
}
case k == "Accept-Encoding":
opts = append(opts, oss.AcceptEncoding(v))
case k == "Cache-Control":
opts = append(opts, oss.CacheControl(v))
case k == "Content-Disposition":
opts = append(opts, oss.ContentDisposition(v))
case k == "Content-Encoding":
opts = append(opts, oss.ContentEncoding(v))
case k == "Content-Length":
if v, err := strconv.ParseInt(v, 10, 64); err == nil {
opts = append(opts, oss.ContentLength(v))
}
case k == "Content-MD5":
opts = append(opts, oss.ContentMD5(v))
case k == "Content-Type":
opts = append(opts, oss.ContentType(v))
case k == "Expires":
if v, err := http.ParseTime(v); err == nil {
opts = append(opts, oss.Expires(v))
}
}
}
return opts, nil
}
// ossMetaToS3Meta converts OSS metadata to S3 metadata.
// It is the reverse of appendS3MetaToOSSOptions.
func ossHeaderToS3Meta(header http.Header) map[string]string {
// Decoding technique for each key is used here is as follows
// Each '_' is converted to '-'
// Each '__' is converted to '_'
// With this basic assumption here are some of the expected
// translations for these keys.
// i: 'x_s3cmd__attrs' -> o: 'x-s3cmd_attrs' (mixed)
// i: 'x____test____value' -> o: 'x__test__value' (double '_')
decodeKey := func(key string) string {
tokens := strings.Split(key, "__")
for i := range tokens {
tokens[i] = strings.Replace(tokens[i], "_", "-", -1)
}
return strings.Join(tokens, "_")
}
s3Metadata := make(map[string]string)
for k := range header {
k = http.CanonicalHeaderKey(k)
switch {
case strings.HasPrefix(k, oss.HTTPHeaderOssMetaPrefix):
// Add amazon s3 meta prefix
metaKey := k[len(oss.HTTPHeaderOssMetaPrefix):]
metaKey = "X-Amz-Meta-" + decodeKey(metaKey)
metaKey = http.CanonicalHeaderKey(metaKey)
s3Metadata[metaKey] = header.Get(k)
case k == "Cache-Control":
fallthrough
case k == "Content-Encoding":
fallthrough
case k == "Content-Disposition":
fallthrough
case k == "Content-Length":
fallthrough
case k == "Content-MD5":
fallthrough
case k == "Content-Type":
s3Metadata[k] = header.Get(k)
}
}
return s3Metadata
}
// ossToObjectError converts OSS errors to minio object layer errors.
func ossToObjectError(err error, params ...string) error {
if err == nil {
return nil
}
e, ok := err.(*errors.Error)
if !ok {
// Code should be fixed if this function is called without doing errors.Trace()
// Else handling different situations in this function makes this function complicated.
minio.ErrorIf(err, "Expected type *Error")
return err
}
err = e.Cause
bucket := ""
object := ""
uploadID := ""
switch len(params) {
case 3:
uploadID = params[2]
fallthrough
case 2:
object = params[1]
fallthrough
case 1:
bucket = params[0]
}
ossErr, ok := err.(oss.ServiceError)
if !ok {
// We don't interpret non OSS errors. As oss errors will
// have StatusCode to help to convert to object errors.
return e
}
switch ossErr.Code {
case "BucketAlreadyExists":
err = minio.BucketAlreadyOwnedByYou{Bucket: bucket}
case "BucketNotEmpty":
err = minio.BucketNotEmpty{Bucket: bucket}
case "InvalidBucketName":
err = minio.BucketNameInvalid{Bucket: bucket}
case "NoSuchBucket":
err = minio.BucketNotFound{Bucket: bucket}
case "NoSuchKey":
if object != "" {
err = minio.ObjectNotFound{Bucket: bucket, Object: object}
} else {
err = minio.BucketNotFound{Bucket: bucket}
}
case "InvalidObjectName":
err = minio.ObjectNameInvalid{Bucket: bucket, Object: object}
case "AccessDenied":
err = minio.PrefixAccessDenied{Bucket: bucket, Object: object}
case "NoSuchUpload":
err = minio.InvalidUploadID{UploadID: uploadID}
case "EntityTooSmall":
err = minio.PartTooSmall{}
case "SignatureDoesNotMatch":
err = minio.SignatureDoesNotMatch{}
case "InvalidPart":
err = minio.InvalidPart{}
}
e.Cause = err
return e
}
// ossObjects implements gateway for Aliyun Object Storage Service.
type ossObjects struct {
minio.GatewayUnsupported
Client *oss.Client
}
// Shutdown saves any gateway metadata to disk
// if necessary and reload upon next restart.
func (l *ossObjects) Shutdown(ctx context.Context) error {
return nil
}
// StorageInfo is not relevant to OSS backend.
func (l *ossObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo) {
return
}
// ossIsValidBucketName verifies whether a bucket name is valid.
func ossIsValidBucketName(bucket string) bool {
// dot is not allowed in bucket name
if strings.Contains(bucket, ".") {
return false
}
if !minio.IsValidBucketName(bucket) {
return false
}
return true
}
// MakeBucketWithLocation creates a new container on OSS backend.
func (l *ossObjects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
if !ossIsValidBucketName(bucket) {
return errors.Trace(minio.BucketNameInvalid{Bucket: bucket})
}
err := l.Client.CreateBucket(bucket)
return ossToObjectError(errors.Trace(err), bucket)
}
// ossGeBucketInfo gets bucket metadata.
func ossGeBucketInfo(client *oss.Client, bucket string) (bi minio.BucketInfo, err error) {
if !ossIsValidBucketName(bucket) {
return bi, errors.Trace(minio.BucketNameInvalid{Bucket: bucket})
}
bgir, err := client.GetBucketInfo(bucket)
if err != nil {
return bi, ossToObjectError(errors.Trace(err), bucket)
}
return minio.BucketInfo{
Name: bgir.BucketInfo.Name,
Created: bgir.BucketInfo.CreationDate,
}, nil
}
// GetBucketInfo gets bucket metadata.
func (l *ossObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
return ossGeBucketInfo(l.Client, bucket)
}
// ListBuckets lists all OSS buckets.
func (l *ossObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
marker := oss.Marker("")
for {
lbr, err := l.Client.ListBuckets(marker)
if err != nil {
return nil, ossToObjectError(errors.Trace(err))
}
for _, bi := range lbr.Buckets {
buckets = append(buckets, minio.BucketInfo{
Name: bi.Name,
Created: bi.CreationDate,
})
}
marker = oss.Marker(lbr.NextMarker)
if !lbr.IsTruncated {
break
}
}
return buckets, nil
}
// DeleteBucket deletes a bucket on OSS.
func (l *ossObjects) DeleteBucket(ctx context.Context, bucket string) error {
err := l.Client.DeleteBucket(bucket)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket)
}
return nil
}
// fromOSSClientObjectProperties converts oss ObjectProperties to ObjectInfo.
func fromOSSClientObjectProperties(bucket string, o oss.ObjectProperties) minio.ObjectInfo {
// NOTE(timonwong): No Content-Type and user defined metadata.
// https://www.alibabacloud.com/help/doc-detail/31965.htm
return minio.ObjectInfo{
Bucket: bucket,
Name: o.Key,
ModTime: o.LastModified,
Size: o.Size,
ETag: minio.ToS3ETag(o.ETag),
}
}
// fromOSSClientListObjectsResult converts oss ListBucketResult to ListObjectsInfo.
func fromOSSClientListObjectsResult(bucket string, lor oss.ListObjectsResult) minio.ListObjectsInfo {
objects := make([]minio.ObjectInfo, len(lor.Objects))
for i, oi := range lor.Objects {
objects[i] = fromOSSClientObjectProperties(bucket, oi)
}
prefixes := make([]string, len(lor.CommonPrefixes))
copy(prefixes, lor.CommonPrefixes)
return minio.ListObjectsInfo{
IsTruncated: lor.IsTruncated,
NextMarker: lor.NextMarker,
Objects: objects,
Prefixes: prefixes,
}
}
// ossListObjects lists all blobs in OSS bucket filtered by prefix.
func ossListObjects(client *oss.Client, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
buck, err := client.Bucket(bucket)
if err != nil {
return loi, ossToObjectError(errors.Trace(err), bucket)
}
// maxKeys should default to 1000 or less.
if maxKeys == 0 || maxKeys > ossMaxKeys {
maxKeys = ossMaxKeys
}
lor, err := buck.ListObjects(oss.Prefix(prefix), oss.Marker(marker), oss.Delimiter(delimiter), oss.MaxKeys(maxKeys))
if err != nil {
return loi, ossToObjectError(errors.Trace(err), bucket)
}
return fromOSSClientListObjectsResult(bucket, lor), nil
}
// ossListObjectsV2 lists all blobs in OSS bucket filtered by prefix.
func ossListObjectsV2(client *oss.Client, bucket, prefix, continuationToken, delimiter string, maxKeys int,
fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) {
// fetchOwner and startAfter are not supported and unused.
marker := continuationToken
resultV1, err := ossListObjects(client, bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return loi, err
}
return minio.ListObjectsV2Info{
Objects: resultV1.Objects,
Prefixes: resultV1.Prefixes,
ContinuationToken: continuationToken,
NextContinuationToken: resultV1.NextMarker,
IsTruncated: resultV1.IsTruncated,
}, nil
}
// ListObjects lists all blobs in OSS bucket filtered by prefix.
func (l *ossObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
return ossListObjects(l.Client, bucket, prefix, marker, delimiter, maxKeys)
}
// ListObjectsV2 lists all blobs in OSS bucket filtered by prefix
func (l *ossObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int,
fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) {
return ossListObjectsV2(l.Client, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter)
}
// ossGetObject reads an object on OSS. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func ossGetObject(client *oss.Client, bucket, key string, startOffset, length int64, writer io.Writer, etag string) error {
if length < 0 && length != -1 {
return ossToObjectError(errors.Trace(fmt.Errorf("Invalid argument")), bucket, key)
}
bkt, err := client.Bucket(bucket)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, key)
}
var opts []oss.Option
if startOffset >= 0 && length >= 0 {
opts = append(opts, oss.Range(startOffset, startOffset+length-1))
}
object, err := bkt.GetObject(key, opts...)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, key)
}
defer object.Close()
if _, err := io.Copy(writer, object); err != nil {
return ossToObjectError(errors.Trace(err), bucket, key)
}
return nil
}
// GetObject reads an object on OSS. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *ossObjects) GetObject(ctx context.Context, bucket, key string, startOffset, length int64, writer io.Writer, etag string) error {
return ossGetObject(l.Client, bucket, key, startOffset, length, writer, etag)
}
func translatePlainError(err error) error {
errString := err.Error()
switch errString {
case "oss: service returned without a response body (404 Not Found)":
return oss.ServiceError{Code: "NoSuchKey"}
case "oss: service returned without a response body (400 Bad Request)":
return oss.ServiceError{Code: "AccessDenied"}
}
return err
}
// ossGetObjectInfo reads object info and replies back ObjectInfo.
func ossGetObjectInfo(client *oss.Client, bucket, object string) (objInfo minio.ObjectInfo, err error) {
bkt, err := client.Bucket(bucket)
if err != nil {
return objInfo, ossToObjectError(errors.Trace(err), bucket, object)
}
header, err := bkt.GetObjectDetailedMeta(object)
if err != nil {
return objInfo, ossToObjectError(errors.Trace(translatePlainError(err)), bucket, object)
}
// Build S3 metadata from OSS metadata
userDefined := ossHeaderToS3Meta(header)
modTime, _ := http.ParseTime(header.Get("Last-Modified"))
size, _ := strconv.ParseInt(header.Get("Content-Length"), 10, 64)
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: modTime,
Size: size,
ETag: minio.ToS3ETag(header.Get("ETag")),
UserDefined: userDefined,
ContentType: header.Get("Content-Type"),
ContentEncoding: header.Get("Content-Encoding"),
}, nil
}
// GetObjectInfo reads object info and replies back ObjectInfo.
func (l *ossObjects) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo minio.ObjectInfo, err error) {
return ossGetObjectInfo(l.Client, bucket, object)
}
// ossPutObject creates a new object with the incoming data.
func ossPutObject(client *oss.Client, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
bkt, err := client.Bucket(bucket)
if err != nil {
return objInfo, ossToObjectError(errors.Trace(err), bucket, object)
}
// Build OSS metadata
opts, err := appendS3MetaToOSSOptions(nil, metadata)
if err != nil {
return objInfo, ossToObjectError(err, bucket, object)
}
err = bkt.PutObject(object, data, opts...)
if err != nil {
return objInfo, ossToObjectError(errors.Trace(err), bucket, object)
}
return ossGetObjectInfo(client, bucket, object)
}
// PutObject creates a new object with the incoming data.
func (l *ossObjects) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
return ossPutObject(l.Client, bucket, object, data, metadata)
}
// CopyObject copies an object from source bucket to a destination bucket.
func (l *ossObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
bkt, err := l.Client.Bucket(srcBucket)
if err != nil {
return objInfo, ossToObjectError(errors.Trace(err), srcBucket, srcObject)
}
opts := make([]oss.Option, 0, len(srcInfo.UserDefined)+1)
// Set this header such that following CopyObject() always sets the right metadata on the destination.
// metadata input is already a trickled down value from interpreting x-oss-metadata-directive at
// handler layer. So what we have right now is supposed to be applied on the destination object anyways.
// So preserve it by adding "REPLACE" directive to save all the metadata set by CopyObject API.
opts = append(opts, oss.MetadataDirective(oss.MetaReplace))
// Build OSS metadata
opts, err = appendS3MetaToOSSOptions(opts, srcInfo.UserDefined)
if err != nil {
return objInfo, ossToObjectError(err, srcBucket, srcObject)
}
if _, err = bkt.CopyObjectTo(dstBucket, dstObject, srcObject, opts...); err != nil {
return objInfo, ossToObjectError(errors.Trace(err), srcBucket, srcObject)
}
return l.GetObjectInfo(ctx, dstBucket, dstObject)
}
// DeleteObject deletes a blob in bucket.
func (l *ossObjects) DeleteObject(ctx context.Context, bucket, object string) error {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, object)
}
err = bkt.DeleteObject(object)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, object)
}
return nil
}
// fromOSSClientListMultipartsInfo converts oss ListMultipartUploadResult to ListMultipartsInfo
func fromOSSClientListMultipartsInfo(lmur oss.ListMultipartUploadResult) minio.ListMultipartsInfo {
uploads := make([]minio.MultipartInfo, len(lmur.Uploads))
for i, um := range lmur.Uploads {
uploads[i] = minio.MultipartInfo{
Object: um.Key,
UploadID: um.UploadID,
Initiated: um.Initiated,
}
}
commonPrefixes := make([]string, len(lmur.CommonPrefixes))
copy(commonPrefixes, lmur.CommonPrefixes)
return minio.ListMultipartsInfo{
KeyMarker: lmur.KeyMarker,
UploadIDMarker: lmur.UploadIDMarker,
NextKeyMarker: lmur.NextKeyMarker,
NextUploadIDMarker: lmur.NextUploadIDMarker,
MaxUploads: lmur.MaxUploads,
IsTruncated: lmur.IsTruncated,
Uploads: uploads,
Prefix: lmur.Prefix,
Delimiter: lmur.Delimiter,
CommonPrefixes: commonPrefixes,
}
}
// ListMultipartUploads lists all multipart uploads.
func (l *ossObjects) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return lmi, ossToObjectError(errors.Trace(err), bucket)
}
lmur, err := bkt.ListMultipartUploads(oss.Prefix(prefix), oss.KeyMarker(keyMarker), oss.UploadIDMarker(uploadIDMarker),
oss.Delimiter(delimiter), oss.MaxUploads(maxUploads))
if err != nil {
return lmi, ossToObjectError(errors.Trace(err), bucket)
}
return fromOSSClientListMultipartsInfo(lmur), nil
}
// NewMultipartUpload upload object in multiple parts.
func (l *ossObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return uploadID, ossToObjectError(errors.Trace(err), bucket, object)
}
// Build OSS metadata
opts, err := appendS3MetaToOSSOptions(nil, metadata)
if err != nil {
return uploadID, ossToObjectError(err, bucket, object)
}
lmur, err := bkt.InitiateMultipartUpload(object, opts...)
if err != nil {
return uploadID, ossToObjectError(errors.Trace(err), bucket, object)
}
return lmur.UploadID, nil
}
// PutObjectPart puts a part of object in bucket.
func (l *ossObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (pi minio.PartInfo, err error) {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return pi, ossToObjectError(errors.Trace(err), bucket, object)
}
imur := oss.InitiateMultipartUploadResult{
Bucket: bucket,
Key: object,
UploadID: uploadID,
}
size := data.Size()
up, err := bkt.UploadPart(imur, data, size, partID)
if err != nil {
return pi, ossToObjectError(errors.Trace(err), bucket, object)
}
return minio.PartInfo{
Size: size,
ETag: minio.ToS3ETag(up.ETag),
// NOTE(timonwong): LastModified is not supported
PartNumber: up.PartNumber,
}, nil
}
func ossBuildListObjectPartsParams(uploadID string, partNumberMarker, maxParts int) map[string]interface{} {
return map[string]interface{}{
"uploadId": uploadID,
"part-number-marker": strconv.Itoa(partNumberMarker),
"max-parts": strconv.Itoa(maxParts),
}
}
// fromOSSClientListPartsInfo converts OSS ListUploadedPartsResult to ListPartsInfo
func fromOSSClientListPartsInfo(lupr oss.ListUploadedPartsResult, partNumberMarker int) minio.ListPartsInfo {
parts := make([]minio.PartInfo, len(lupr.UploadedParts))
for i, up := range lupr.UploadedParts {
parts[i] = minio.PartInfo{
PartNumber: up.PartNumber,
LastModified: up.LastModified,
ETag: minio.ToS3ETag(up.ETag),
Size: int64(up.Size),
}
}
nextPartNumberMarker, _ := strconv.Atoi(lupr.NextPartNumberMarker)
return minio.ListPartsInfo{
Bucket: lupr.Bucket,
Object: lupr.Key,
UploadID: lupr.UploadID,
PartNumberMarker: partNumberMarker,
NextPartNumberMarker: nextPartNumberMarker,
MaxParts: lupr.MaxParts,
IsTruncated: lupr.IsTruncated,
Parts: parts,
}
}
func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, partNumberMarker, maxParts int) (lupr oss.ListUploadedPartsResult, err error) {
params := ossBuildListObjectPartsParams(uploadID, partNumberMarker, maxParts)
resp, err := client.Conn.Do("GET", bucket, object, params, nil, nil, 0, nil)
if err != nil {
return lupr, err
}
defer func() {
// always drain output (response body)
io.CopyN(ioutil.Discard, resp.Body, 512)
resp.Body.Close()
}()
err = xml.NewDecoder(resp.Body).Decode(&lupr)
if err != nil {
return lupr, err
}
return lupr, nil
}
// CopyObjectPart creates a part in a multipart upload by copying
// existing object or a part of it.
func (l *ossObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
partID int, startOffset, length int64, srcInfo minio.ObjectInfo) (p minio.PartInfo, err error) {
bkt, err := l.Client.Bucket(destBucket)
if err != nil {
return p, ossToObjectError(errors.Trace(err), destBucket)
}
// Build OSS metadata
opts, err := appendS3MetaToOSSOptions(nil, srcInfo.UserDefined)
if err != nil {
return p, ossToObjectError(err, srcBucket, srcObject)
}
completePart, err := bkt.UploadPartCopy(oss.InitiateMultipartUploadResult{
Key: destObject,
UploadID: uploadID,
}, srcBucket, srcObject, startOffset, length, partID, opts...)
if err != nil {
return p, ossToObjectError(errors.Trace(err), srcBucket, srcObject)
}
p.PartNumber = completePart.PartNumber
p.ETag = minio.ToS3ETag(completePart.ETag)
return p, nil
}
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *ossObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi minio.ListPartsInfo, err error) {
lupr, err := ossListObjectParts(l.Client, bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil {
return lpi, ossToObjectError(errors.Trace(err), bucket, object, uploadID)
}
return fromOSSClientListPartsInfo(lupr, partNumberMarker), nil
}
// AbortMultipartUpload aborts a ongoing multipart upload.
func (l *ossObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, object)
}
err = bkt.AbortMultipartUpload(oss.InitiateMultipartUploadResult{
Bucket: bucket,
Key: object,
UploadID: uploadID,
})
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, object)
}
return nil
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object.
func (l *ossObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart) (oi minio.ObjectInfo, err error) {
client := l.Client
bkt, err := client.Bucket(bucket)
if err != nil {
return oi, ossToObjectError(errors.Trace(err), bucket, object)
}
// Error out if uploadedParts except last part sizing < 5MiB.
// NOTE(timonwong): Actually, OSS wont't throw EntityTooSmall error, doing this check just for mint :(
var partNumberMarker int
lupr := oss.ListUploadedPartsResult{IsTruncated: true}
for lupr.IsTruncated {
lupr, err = ossListObjectParts(client, bucket, object, uploadID, partNumberMarker, ossMaxParts)
if err != nil {
return oi, ossToObjectError(errors.Trace(err), bucket, object, uploadID)
}
uploadedParts := lupr.UploadedParts
if !lupr.IsTruncated {
if len(uploadedParts) < 1 {
uploadedParts = nil
} else {
uploadedParts = uploadedParts[:len(uploadedParts)-1]
}
}
for _, part := range uploadedParts {
if part.Size < ossS3MinPartSize {
return oi, errors.Trace(minio.PartTooSmall{
PartNumber: part.PartNumber,
PartSize: int64(part.Size),
PartETag: minio.ToS3ETag(part.ETag),
})
}
}
partNumberMarker, _ = strconv.Atoi(lupr.NextPartNumberMarker)
}
imur := oss.InitiateMultipartUploadResult{
Bucket: bucket,
Key: object,
UploadID: uploadID,
}
parts := make([]oss.UploadPart, len(uploadedParts))
for i, up := range uploadedParts {
parts[i] = oss.UploadPart{
PartNumber: up.PartNumber,
ETag: strings.TrimSuffix(up.ETag, "-1"), // Trim "-1" suffix in ETag as PutObjectPart().
}
}
_, err = bkt.CompleteMultipartUpload(imur, parts)
if err != nil {
return oi, ossToObjectError(errors.Trace(err), bucket, object)
}
return l.GetObjectInfo(ctx, bucket, object)
}
// SetBucketPolicy sets policy on bucket.
// OSS supports three types of bucket policies:
// oss.ACLPublicReadWrite: readwrite in minio terminology
// oss.ACLPublicRead: readonly in minio terminology
// oss.ACLPrivate: none in minio terminology
func (l *ossObjects) SetBucketPolicy(ctx context.Context, bucket string, policyInfo policy.BucketAccessPolicy) error {
bucketPolicies := policy.GetPolicies(policyInfo.Statements, bucket, "")
if len(bucketPolicies) != 1 {
return errors.Trace(minio.NotImplemented{})
}
prefix := bucket + "/*" // For all objects inside the bucket.
for policyPrefix, bucketPolicy := range bucketPolicies {
if policyPrefix != prefix {
return errors.Trace(minio.NotImplemented{})
}
var acl oss.ACLType
switch bucketPolicy {
case policy.BucketPolicyNone:
acl = oss.ACLPrivate
case policy.BucketPolicyReadOnly:
acl = oss.ACLPublicRead
case policy.BucketPolicyReadWrite:
acl = oss.ACLPublicReadWrite
default:
return errors.Trace(minio.NotImplemented{})
}
err := l.Client.SetBucketACL(bucket, acl)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket)
}
}
return nil
}
// GetBucketPolicy will get policy on bucket.
func (l *ossObjects) GetBucketPolicy(ctx context.Context, bucket string) (policy.BucketAccessPolicy, error) {
result, err := l.Client.GetBucketACL(bucket)
if err != nil {
return policy.BucketAccessPolicy{}, ossToObjectError(errors.Trace(err))
}
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
switch result.ACL {
case string(oss.ACLPrivate):
// By default, all buckets starts with a "private" policy.
return policy.BucketAccessPolicy{}, ossToObjectError(errors.Trace(minio.PolicyNotFound{}), bucket)
case string(oss.ACLPublicRead):
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyReadOnly, bucket, "")
case string(oss.ACLPublicReadWrite):
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyReadWrite, bucket, "")
default:
return policy.BucketAccessPolicy{}, errors.Trace(minio.NotImplemented{})
}
return policyInfo, nil
}
// DeleteBucketPolicy deletes all policies on bucket.
func (l *ossObjects) DeleteBucketPolicy(ctx context.Context, bucket string) error {
err := l.Client.SetBucketACL(bucket, oss.ACLPrivate)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket)
}
return nil
}