Compare commits

...

9 commits

Author SHA1 Message Date
Harshavardhana 68c5ad83fb
fix: backend not reachable should be more descriptive (#13634) 2021-11-10 22:33:17 -08:00
Harshavardhana 5acc8c0134
add multi-site replication tests (#13631) 2021-11-10 18:18:09 -08:00
Klaus Post c897b6a82d
fix: missing entries on first list resume (#13627)
On first list resume or when specifying a custom markers entries could be missed in rare cases.

Do conservative truncation of entries when forwarding.

Replaces #13619
2021-11-10 10:41:21 -08:00
Shireesh Anjal d008e90d50
Support dynamic reset of minio config (#13626)
If a given MinIO config is dynamic (can be changed without restart),
ensure that it can be reset also without restart.

Signed-off-by: Shireesh Anjal <shireesh@minio.io>
2021-11-10 10:01:32 -08:00
Harshavardhana ea820b30bf
fix: use equalFold() instead of lower and compare (#13624) 2021-11-10 08:12:50 -08:00
Poorna K 03725dc015
Default multipart caching to writethrough (#13613)
when `MINIO_CACHE_COMMIT` is set.

- `writeback` caching applies only to single 
uploads. When cache commit mode is 
`writeback`, default multipart caching to be
synchronous.

- Add writethrough caching for single uploads
2021-11-10 08:12:03 -08:00
Harshavardhana 0a6f9bc1eb
allocate new highwayhash for each string hash (#13623)
fixes #13622
2021-11-09 15:28:08 -08:00
Aditya Manthramurthy 1946922de3
Add CI for etcd IAM backend (#13614)
Runs when ETCD_SERVER env var is set
2021-11-09 09:25:13 -08:00
Minio Trusted edf1f4233b Update yaml files to latest version RELEASE.2021-11-09T03-21-45Z 2021-11-09 04:51:05 +00:00
32 changed files with 713 additions and 192 deletions

125
.github/workflows/iam-integrations.yaml vendored Normal file
View file

@ -0,0 +1,125 @@
name: IAM integration with external systems
on:
pull_request:
branches:
- master
# This ensures that previous jobs for the PR are canceled when the PR is
# updated.
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
ldap-test:
name: LDAP Tests with Go ${{ matrix.go-version }}
runs-on: ubuntu-latest
services:
openldap:
image: quay.io/minio/openldap
ports:
- "389:389"
- "636:636"
env:
LDAP_ORGANIZATION: "MinIO Inc"
LDAP_DOMAIN: "min.io"
LDAP_ADMIN_PASSWORD: "admin"
strategy:
matrix:
go-version: [1.16.x, 1.17.x]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Test LDAP
env:
LDAP_TEST_SERVER: "localhost:389"
run: |
sudo sysctl net.ipv6.conf.all.disable_ipv6=0
sudo sysctl net.ipv6.conf.default.disable_ipv6=0
make test-iam
etcd-test:
name: Etcd Backend Tests with Go ${{ matrix.go-version }}
runs-on: ubuntu-latest
services:
etcd:
image: "quay.io/coreos/etcd:v3.5.1"
env:
ETCD_LISTEN_CLIENT_URLS: "http://0.0.0.0:2379"
ETCD_ADVERTISE_CLIENT_URLS: "http://0.0.0.0:2379"
ports:
- "2379:2379"
options: >-
--health-cmd "etcdctl endpoint health"
--health-interval 10s
--health-timeout 5s
--health-retries 5
strategy:
matrix:
go-version: [1.16.x, 1.17.x]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Test Etcd IAM backend
env:
ETCD_SERVER: "http://localhost:2379"
run: |
sudo sysctl net.ipv6.conf.all.disable_ipv6=0
sudo sysctl net.ipv6.conf.default.disable_ipv6=0
make test-iam
iam-etcd-test:
name: Etcd Backend + LDAP Tests with Go ${{ matrix.go-version }}
runs-on: ubuntu-latest
services:
openldap:
image: quay.io/minio/openldap
ports:
- "389:389"
- "636:636"
env:
LDAP_ORGANIZATION: "MinIO Inc"
LDAP_DOMAIN: "min.io"
LDAP_ADMIN_PASSWORD: "admin"
etcd:
image: "quay.io/coreos/etcd:v3.5.1"
env:
ETCD_LISTEN_CLIENT_URLS: "http://0.0.0.0:2379"
ETCD_ADVERTISE_CLIENT_URLS: "http://0.0.0.0:2379"
ports:
- "2379:2379"
options: >-
--health-cmd "etcdctl endpoint health"
--health-interval 10s
--health-timeout 5s
--health-retries 5
strategy:
matrix:
go-version: [1.16.x, 1.17.x]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Test Etcd IAM backend with LDAP IDP
env:
ETCD_SERVER: "http://localhost:2379"
LDAP_TEST_SERVER: "localhost:389"
run: |
sudo sysctl net.ipv6.conf.all.disable_ipv6=0
sudo sysctl net.ipv6.conf.default.disable_ipv6=0
make test-iam

View file

@ -1,4 +1,4 @@
name: External IDP Integration Tests
name: Multi-site replication tests
on:
pull_request:
@ -7,39 +7,27 @@ on:
# This ensures that previous jobs for the PR are canceled when the PR is
# updated.
concurrency:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
ldap-test:
name: LDAP Tests with Go ${{ matrix.go-version }}
replication-test:
name: Replication Tests with Go ${{ matrix.go-version }}
runs-on: ubuntu-latest
services:
openldap:
image: quay.io/minio/openldap
ports:
- 389:389
- 636:636
env:
LDAP_ORGANIZATION: "MinIO Inc"
LDAP_DOMAIN: "min.io"
LDAP_ADMIN_PASSWORD: "admin"
strategy:
matrix:
go-version: [1.16.x, 1.17.x]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Test LDAP
env:
LDAP_TEST_SERVER: "localhost:389"
- name: Test Replication
run: |
sudo sysctl net.ipv6.conf.all.disable_ipv6=0
sudo sysctl net.ipv6.conf.default.disable_ipv6=0
make test-ldap
go install -v github.com/minio/mc@latest
make test-replication

View file

@ -42,15 +42,19 @@ test: verifiers build ## builds minio, runs linters, tests
@echo "Running unit tests"
@GO111MODULE=on CGO_ENABLED=0 go test -tags kqueue ./... 1>/dev/null
test-race: verifiers build
test-race: verifiers build ## builds minio, runs linters, tests (race)
@echo "Running unit tests under -race"
@(env bash $(PWD)/buildscripts/race.sh)
test-ldap: build
@echo "Running tests for LDAP integration"
@CGO_ENABLED=0 go test -tags kqueue -v -run TestIAMWithLDAPServerSuite ./cmd
@echo "Running tests for LDAP integration with -race"
@CGO_ENABLED=1 go test -race -tags kqueue -v -run TestIAMWithLDAPServerSuite ./cmd
test-iam: build ## verify IAM (external IDP, etcd backends)
@echo "Running tests for IAM (external IDP, etcd backends)"
@CGO_ENABLED=0 go test -tags kqueue -v -run TestIAM* ./cmd
@echo "Running tests for IAM (external IDP, etcd backends) with -race"
@CGO_ENABLED=1 go test -race -tags kqueue -v -run TestIAM* ./cmd
test-replication: install ## verify multi site replication
@echo "Running tests for Replication three sites"
@(env bash $(PWD)/docs/bucket/replication/setup_3site_replication.sh)
verify: ## verify minio various setups
@echo "Verifying build with race"

View file

@ -19,6 +19,7 @@ package cmd
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
@ -78,6 +79,22 @@ func (a adminAPIHandlers) DelConfigKVHandler(w http.ResponseWriter, r *http.Requ
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
dynamic := config.SubSystemsDynamic.Contains(string(kvBytes))
if dynamic {
applyDynamic(ctx, objectAPI, cfg, r, w)
}
}
func applyDynamic(ctx context.Context, objectAPI ObjectLayer, cfg config.Config, r *http.Request, w http.ResponseWriter) {
// Apply dynamic values.
if err := applyDynamicConfig(GlobalContext, objectAPI, cfg); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
globalNotificationSys.SignalService(serviceReloadDynamic)
// Tell the client that dynamic config was applied.
w.Header().Set(madmin.ConfigAppliedHeader, madmin.ConfigAppliedTrue)
}
// SetConfigKVHandler - PUT /minio/admin/v3/set-config-kv
@ -135,14 +152,7 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
}
if dynamic {
// Apply dynamic values.
if err := applyDynamicConfig(GlobalContext, objectAPI, cfg); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
globalNotificationSys.SignalService(serviceReloadDynamic)
// If all values were dynamic, tell the client.
w.Header().Set(madmin.ConfigAppliedHeader, madmin.ConfigAppliedTrue)
applyDynamic(ctx, objectAPI, cfg, r, w)
}
writeSuccessResponseHeadersOnly(w)
}

View file

@ -15,6 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:build !race
// +build !race
// Tests in this file are not run under the `-race` flag as they are too slow
@ -40,20 +41,34 @@ func runAllIAMConcurrencyTests(suite *TestSuiteIAM, c *check) {
}
func TestIAMInternalIDPConcurrencyServerSuite(t *testing.T) {
testCases := []*TestSuiteIAM{
baseTestCases := []TestSuiteCommon{
// Init and run test on FS backend with signature v4.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4}),
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend, with tls enabled.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4, secure: true}),
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "Erasure", signer: signerV4}),
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "ErasureSet", signer: signerV4}),
{serverType: "ErasureSet", signer: signerV4},
}
testCases := []*TestSuiteIAM{}
for _, bt := range baseTestCases {
testCases = append(testCases,
newTestSuiteIAM(bt, false),
newTestSuiteIAM(bt, true),
)
}
for i, testCase := range testCases {
t.Run(fmt.Sprintf("Test: %d, ServerType: %s", i+1, testCase.serverType), func(t *testing.T) {
runAllIAMConcurrencyTests(testCase, &check{t, testCase.serverType})
})
etcdStr := ""
if testCase.withEtcdBackend {
etcdStr = " (with etcd backend)"
}
t.Run(
fmt.Sprintf("Test: %d, ServerType: %s%s", i+1, testCase.serverType, etcdStr),
func(t *testing.T) {
runAllIAMConcurrencyTests(testCase, &check{t, testCase.serverType})
},
)
}
}

View file

@ -20,6 +20,7 @@ package cmd
import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"
@ -39,13 +40,16 @@ const (
type TestSuiteIAM struct {
TestSuiteCommon
// Flag to turn on tests for etcd backend IAM
withEtcdBackend bool
endpoint string
adm *madmin.AdminClient
client *minio.Client
}
func newTestSuiteIAM(c TestSuiteCommon) *TestSuiteIAM {
return &TestSuiteIAM{TestSuiteCommon: c}
func newTestSuiteIAM(c TestSuiteCommon, withEtcdBackend bool) *TestSuiteIAM {
return &TestSuiteIAM{TestSuiteCommon: c, withEtcdBackend: withEtcdBackend}
}
func (s *TestSuiteIAM) iamSetup(c *check) {
@ -73,10 +77,42 @@ func (s *TestSuiteIAM) iamSetup(c *check) {
}
}
const (
EnvTestEtcdBackend = "ETCD_SERVER"
)
func (s *TestSuiteIAM) setUpEtcd(c *check, etcdServer string) {
ctx, cancel := context.WithTimeout(context.Background(), testDefaultTimeout)
defer cancel()
configCmds := []string{
"etcd",
"endpoints=" + etcdServer,
"path_prefix=" + mustGetUUID(),
}
_, err := s.adm.SetConfigKV(ctx, strings.Join(configCmds, " "))
if err != nil {
c.Fatalf("unable to setup Etcd for tests: %v", err)
}
s.RestartIAMSuite(c)
}
func (s *TestSuiteIAM) SetUpSuite(c *check) {
// If etcd backend is specified and etcd server is not present, the test
// is skipped.
etcdServer := os.Getenv(EnvTestEtcdBackend)
if s.withEtcdBackend && etcdServer == "" {
c.Skip("Skipping etcd backend IAM test as no etcd server is configured.")
}
s.TestSuiteCommon.SetUpSuite(c)
s.iamSetup(c)
if s.withEtcdBackend {
s.setUpEtcd(c, etcdServer)
}
}
func (s *TestSuiteIAM) RestartIAMSuite(c *check) {
@ -108,20 +144,34 @@ func runAllIAMTests(suite *TestSuiteIAM, c *check) {
}
func TestIAMInternalIDPServerSuite(t *testing.T) {
testCases := []*TestSuiteIAM{
baseTestCases := []TestSuiteCommon{
// Init and run test on FS backend with signature v4.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4}),
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend, with tls enabled.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4, secure: true}),
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "Erasure", signer: signerV4}),
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "ErasureSet", signer: signerV4}),
{serverType: "ErasureSet", signer: signerV4},
}
testCases := []*TestSuiteIAM{}
for _, bt := range baseTestCases {
testCases = append(testCases,
newTestSuiteIAM(bt, false),
newTestSuiteIAM(bt, true),
)
}
for i, testCase := range testCases {
t.Run(fmt.Sprintf("Test: %d, ServerType: %s", i+1, testCase.serverType), func(t *testing.T) {
runAllIAMTests(testCase, &check{t, testCase.serverType})
})
etcdStr := ""
if testCase.withEtcdBackend {
etcdStr = " (with etcd backend)"
}
t.Run(
fmt.Sprintf("Test: %d, ServerType: %s%s", i+1, testCase.serverType, etcdStr),
func(t *testing.T) {
runAllIAMTests(testCase, &check{t, testCase.serverType})
},
)
}
}

View file

@ -1362,8 +1362,8 @@ var errorCodes = errorCodeMap{
},
ErrBackendDown: {
Code: "XMinioBackendDown",
Description: "Object storage backend is unreachable",
HTTPStatusCode: http.StatusServiceUnavailable,
Description: "Remote backend is unreachable",
HTTPStatusCode: http.StatusBadRequest,
},
ErrIncorrectContinuationToken: {
Code: "InvalidArgument",
@ -2129,6 +2129,11 @@ func toAPIError(ctx context.Context, err error) APIError {
}
}
if apiErr.Code == "XMinioBackendDown" {
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err)
return apiErr
}
if apiErr.Code == "InternalError" {
// If we see an internal error try to interpret
// any underlying errors if possible depending on

View file

@ -654,6 +654,50 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met
return jsonSave(f, m)
}
// updates the ETag and ModTime on cache with ETag from backend
func (c *diskCache) updateMetadata(ctx context.Context, bucket, object, etag string, modTime time.Time, size int64) error {
cachedPath := getCacheSHADir(c.dir, bucket, object)
metaPath := pathJoin(cachedPath, cacheMetaJSONFile)
// Create cache directory if needed
if err := os.MkdirAll(cachedPath, 0777); err != nil {
return err
}
f, err := os.OpenFile(metaPath, os.O_RDWR, 0666)
if err != nil {
return err
}
defer f.Close()
m := &cacheMeta{
Version: cacheMetaVersion,
Bucket: bucket,
Object: object,
}
if err := jsonLoad(f, m); err != nil && err != io.EOF {
return err
}
if m.Meta == nil {
m.Meta = make(map[string]string)
}
var key []byte
var objectEncryptionKey crypto.ObjectKey
if globalCacheKMS != nil {
// Calculating object encryption key
key, err = decryptObjectInfo(key, bucket, object, m.Meta)
if err != nil {
return err
}
copy(objectEncryptionKey[:], key)
m.Meta["etag"] = hex.EncodeToString(objectEncryptionKey.SealETag([]byte(etag)))
} else {
m.Meta["etag"] = etag
}
m.Meta["last-modified"] = modTime.UTC().Format(http.TimeFormat)
m.Meta["Content-Length"] = strconv.Itoa(int(size))
return jsonSave(f, m)
}
func getCacheSHADir(dir, bucket, object string) string {
return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object))))
}
@ -755,22 +799,36 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
metadata[SSECacheEncrypted] = ""
return objectKey[:], nil
}
func (c *diskCache) GetLockContext(ctx context.Context, bucket, object string) (RWLocker, LockContext, error) {
cachePath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachePath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
return cLock, lkctx, err
}
// Caches the object to disk
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) (oi ObjectInfo, err error) {
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
if !c.diskSpaceAvailable(size) {
io.Copy(ioutil.Discard, data)
return oi, errDiskFull
}
cachePath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachePath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
cLock, lkctx, err := c.GetLockContext(ctx, bucket, object)
if err != nil {
return oi, err
}
ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel)
return c.put(ctx, bucket, object, data, size, rs, opts, incHitsOnly, writeback)
}
// Caches the object to disk
func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
if !c.diskSpaceAvailable(size) {
io.Copy(ioutil.Discard, data)
return oi, errDiskFull
}
cachePath := getCacheSHADir(c.dir, bucket, object)
meta, _, numHits, err := c.statCache(ctx, cachePath)
// Case where object not yet cached
if osIsNotExist(err) && c.after >= 1 {
@ -819,7 +877,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
removeAll(cachePath)
return oi, IncompleteBody{Bucket: bucket, Object: object}
}
if c.commitWriteback {
if writeback {
metadata["content-md5"] = md5sum
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
metadata["etag"] = hex.EncodeToString(md5bytes)
@ -1073,7 +1131,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
// the remaining parts.
partOffset = 0
} // End of read all parts loop.
pr.CloseWithError(err)
pw.CloseWithError(err)
}()
} else {
go func() {
@ -1105,8 +1163,15 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
return gr, numHits, nil
}
// deletes the cached object - caller should have taken write lock
func (c *diskCache) delete(bucket, object string) (err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
return removeAll(cacheObjPath)
}
// Deletes the cached object
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) {
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cacheObjPath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
@ -1116,12 +1181,6 @@ func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error)
return removeAll(cacheObjPath)
}
// Deletes the cached object
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
return c.delete(ctx, cacheObjPath)
}
// convenience function to check if object is cached on this diskCache
func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil {
@ -1199,14 +1258,11 @@ func (c *diskCache) NewMultipartUpload(ctx context.Context, bucket, object, uID
}
m.Meta = opts.UserDefined
m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
if c.commitWriteback {
m.Meta[writeBackStatusHeader] = CommitPending.String()
}
m.Stat.ModTime = UTCNow()
if globalCacheKMS != nil {
m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
if _, err := newCacheEncryptMetadata(bucket, object, m.Meta); err != nil {
return uploadID, err
}

View file

@ -110,7 +110,7 @@ func cacheControlOpts(o ObjectInfo) *cacheControl {
var headerVal string
for k, v := range m {
if strings.ToLower(k) == "cache-control" {
if strings.EqualFold(k, "cache-control") {
headerVal = v
}

View file

@ -368,7 +368,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
// use a new context to avoid locker prematurely timing out operation when the GetObjectNInfo returns.
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{
UserDefined: getMetadata(bReader.ObjInfo),
}, false)
}, false, false)
return
}
}()
@ -386,7 +386,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
io.LimitReader(pr, bkReader.ObjInfo.Size),
bkReader.ObjInfo.Size, rs, ObjectOptions{
UserDefined: userDefined,
}, false)
}, false, false)
// close the read end of the pipe, so the error gets
// propagated to teeReader
pr.CloseWithError(putErr)
@ -678,31 +678,82 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
return putObjectFn(ctx, bucket, object, r, opts)
}
if c.commitWriteback {
oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false)
oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false, true)
if err != nil {
return ObjectInfo{}, err
}
go c.uploadObject(GlobalContext, oi)
return oi, nil
}
objInfo, err = putObjectFn(ctx, bucket, object, r, opts)
if err == nil {
go func() {
// fill cache in the background
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
if bErr != nil {
return
}
defer bReader.Close()
oi, _, err := dcache.Stat(GlobalContext, bucket, object)
// avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false)
}
}()
if !c.commitWritethrough {
objInfo, err = putObjectFn(ctx, bucket, object, r, opts)
if err == nil {
go func() {
// fill cache in the background
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
if bErr != nil {
return
}
defer bReader.Close()
oi, _, err := dcache.Stat(GlobalContext, bucket, object)
// avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, true)
}
}()
}
return objInfo, err
}
return objInfo, err
cLock, lkctx, cerr := dcache.GetLockContext(GlobalContext, bucket, object)
if cerr != nil {
return putObjectFn(ctx, bucket, object, r, opts)
}
defer cLock.Unlock(lkctx.Cancel)
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", r.ActualSize())
if err != nil {
return
}
// Initialize pipe to stream data to cache
rPipe, wPipe := io.Pipe()
infoCh := make(chan ObjectInfo)
errorCh := make(chan error)
go func() {
info, err := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader), opts)
if err != nil {
close(infoCh)
pipeReader.CloseWithError(err)
rPipe.CloseWithError(err)
errorCh <- err
return
}
close(errorCh)
infoCh <- info
}()
go func() {
_, err := dcache.put(lkctx.Context(), bucket, object, rPipe, r.Size(), nil, opts, false, false)
if err != nil {
rPipe.CloseWithError(err)
return
}
}()
mwriter := cacheMultiWriter(pipeWriter, wPipe)
_, err = io.Copy(mwriter, r)
pipeWriter.Close()
wPipe.Close()
if err != nil {
err = <-errorCh
return ObjectInfo{}, err
}
info := <-infoCh
if cerr = dcache.updateMetadata(lkctx.Context(), bucket, object, info.ETag, info.ModTime, info.Size); cerr != nil {
dcache.delete(bucket, object)
}
return info, err
}
// upload cached object to backend in async commit mode.
@ -922,7 +973,7 @@ func (c *cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object st
dcache.Delete(ctx, bucket, object)
return newMultipartUploadFn(ctx, bucket, object, opts)
}
if !c.commitWritethrough {
if !c.commitWritethrough && !c.commitWriteback {
return newMultipartUploadFn(ctx, bucket, object, opts)
}
@ -941,7 +992,7 @@ func (c *cacheObjects) PutObjectPart(ctx context.Context, bucket, object, upload
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
}
if !c.commitWritethrough {
if !c.commitWritethrough && !c.commitWriteback {
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
}
if c.skipCache() {
@ -1039,7 +1090,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
}
if !c.commitWritethrough {
if !c.commitWritethrough && !c.commitWriteback {
return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
}
if err := dcache.uploadIDExists(dstBucket, dstObject, uploadID); err != nil {
@ -1077,7 +1128,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
// finalizes the upload saved in cache multipart dir.
func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
completeMultipartUploadFn := c.InnerCompleteMultipartUploadFn
if !c.commitWritethrough {
if !c.commitWritethrough && !c.commitWriteback {
return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts)
}
dcache, err := c.getCacheToLoc(ctx, bucket, object)
@ -1102,7 +1153,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
oi, _, err := dcache.Stat(GlobalContext, bucket, object)
// avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false)
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, false)
}
}
}()
@ -1113,7 +1164,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
// AbortMultipartUpload - aborts multipart upload on backend and cache.
func (c *cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
abortMultipartUploadFn := c.InnerAbortMultipartUploadFn
if !c.commitWritethrough {
if !c.commitWritethrough && !c.commitWriteback {
return abortMultipartUploadFn(ctx, bucket, object, uploadID, opts)
}
dcache, err := c.getCacheToLoc(ctx, bucket, object)

View file

@ -287,7 +287,7 @@ func ErrorRespToObjectError(err error, params ...string) error {
}
if xnet.IsNetworkOrHostDown(err, false) {
return BackendDown{}
return BackendDown{Err: err.Error()}
}
minioErr, ok := err.(minio.ErrorResponse)

View file

@ -116,6 +116,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
forward := ""
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) {
forward = strings.TrimPrefix(opts.ForwardTo, current)
// Trim further directories and trailing slash.
if idx := strings.IndexByte(forward, '/'); idx > 0 {
forward = forward[:idx]
}
@ -163,7 +164,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
entries[i] = entry
continue
}
// Trim slash, maybe compiler is clever?
// Trim slash, since we don't know if this is folder or object.
entries[i] = entries[i][:len(entry)-1]
continue
}
@ -214,9 +215,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level as we have already filtered the list.
if len(forward) > 0 {
idx := sort.SearchStrings(entries, forward)
if idx > 0 {
entries = entries[idx:]
// Conservative forwarding. Entries may be either objects or prefixes.
for i, entry := range entries {
if entry >= forward || strings.HasPrefix(forward, entry) {
entries = entries[i:]
break
}
}
}

View file

@ -645,10 +645,12 @@ func (e UnsupportedMetadata) Error() string {
}
// BackendDown is returned for network errors or if the gateway's backend is down.
type BackendDown struct{}
type BackendDown struct {
Err string
}
func (e BackendDown) Error() string {
return "Backend down"
return e.Err
}
// isErrBucketNotFound - Check if error type is BucketNotFound.

View file

@ -1378,6 +1378,152 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
}
}
// Wrapper for calling ListObjects continuation tests for both Erasure multiple disks and single node setup.
func TestListObjectsContinuation(t *testing.T) {
ExecObjectLayerTest(t, testListObjectsContinuation)
}
// Unit test for ListObjects in general.
func testListObjectsContinuation(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
t, _ := t1.(*testing.T)
testBuckets := []string{
// This bucket is used for testing ListObject operations.
"test-bucket-list-object-continuation-1",
"test-bucket-list-object-continuation-2",
}
for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{})
if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error())
}
}
var err error
testObjects := []struct {
parentBucket string
name string
content string
meta map[string]string
}{
{testBuckets[0], "a/1.txt", "contentstring", nil},
{testBuckets[0], "a-1.txt", "contentstring", nil},
{testBuckets[0], "a.txt", "contentstring", nil},
{testBuckets[0], "apache2-doc/1.txt", "contentstring", nil},
{testBuckets[0], "apache2/1.txt", "contentstring", nil},
{testBuckets[0], "apache2/-sub/2.txt", "contentstring", nil},
{testBuckets[1], "azerty/1.txt", "contentstring", nil},
{testBuckets[1], "apache2-doc/1.txt", "contentstring", nil},
{testBuckets[1], "apache2/1.txt", "contentstring", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
_, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta})
if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error())
}
}
// Formualting the result data set to be expected from ListObjects call inside the tests,
// This will be used in testCases and used for asserting the correctness of ListObjects output in the tests.
resultCases := []ListObjectsInfo{
{
Objects: []ObjectInfo{
{Name: "a-1.txt"},
{Name: "a.txt"},
{Name: "a/1.txt"},
{Name: "apache2-doc/1.txt"},
{Name: "apache2/-sub/2.txt"},
{Name: "apache2/1.txt"},
},
},
{
Objects: []ObjectInfo{
{Name: "apache2-doc/1.txt"},
{Name: "apache2/1.txt"},
},
},
{
Prefixes: []string{"apache2-doc/", "apache2/", "azerty/"},
},
}
testCases := []struct {
// Inputs to ListObjects.
bucketName string
prefix string
delimiter string
page int
// Expected output of ListObjects.
result ListObjectsInfo
}{
{testBuckets[0], "", "", 1, resultCases[0]},
{testBuckets[0], "a", "", 1, resultCases[0]},
{testBuckets[1], "apache", "", 1, resultCases[1]},
{testBuckets[1], "", "/", 1, resultCases[2]},
}
for i, testCase := range testCases {
testCase := testCase
t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) {
var foundObjects []ObjectInfo
var foundPrefixes []string
var marker = ""
for {
result, err := obj.ListObjects(context.Background(), testCase.bucketName,
testCase.prefix, marker, testCase.delimiter, testCase.page)
if err != nil {
t.Fatalf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s", i+1, instanceType, err.Error())
}
foundObjects = append(foundObjects, result.Objects...)
foundPrefixes = append(foundPrefixes, result.Prefixes...)
if !result.IsTruncated {
break
}
marker = result.NextMarker
if len(result.Objects) > 0 {
// Discard marker, so it cannot resume listing.
marker = result.Objects[len(result.Objects)-1].Name
}
}
if len(testCase.result.Objects) != len(foundObjects) {
t.Logf("want: %v", objInfoNames(testCase.result.Objects))
t.Logf("got: %v", objInfoNames(foundObjects))
t.Errorf("Test %d: %s: Expected number of objects in the result to be '%d', but found '%d' objects instead",
i+1, instanceType, len(testCase.result.Objects), len(foundObjects))
}
for j := 0; j < len(testCase.result.Objects); j++ {
if j >= len(foundObjects) {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.result.Objects[j].Name)
continue
}
if testCase.result.Objects[j].Name != foundObjects[j].Name {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, foundObjects[j].Name)
}
}
if len(testCase.result.Prefixes) != len(foundPrefixes) {
t.Logf("want: %v", testCase.result.Prefixes)
t.Logf("got: %v", foundPrefixes)
t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead",
i+1, instanceType, len(testCase.result.Prefixes), len(foundPrefixes))
}
for j := 0; j < len(testCase.result.Prefixes); j++ {
if j >= len(foundPrefixes) {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.result.Prefixes[j])
continue
}
if testCase.result.Prefixes[j] != foundPrefixes[j] {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Prefixes[j], foundPrefixes[j])
}
}
})
}
}
// Initialize FS backend for the benchmark.
func initFSObjectsB(disk string, t *testing.B) (obj ObjectLayer) {
var err error

View file

@ -36,20 +36,34 @@ func runAllIAMSTSTests(suite *TestSuiteIAM, c *check) {
}
func TestIAMInternalIDPSTSServerSuite(t *testing.T) {
testCases := []*TestSuiteIAM{
baseTestCases := []TestSuiteCommon{
// Init and run test on FS backend with signature v4.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4}),
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend, with tls enabled.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4, secure: true}),
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "Erasure", signer: signerV4}),
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "ErasureSet", signer: signerV4}),
{serverType: "ErasureSet", signer: signerV4},
}
testCases := []*TestSuiteIAM{}
for _, bt := range baseTestCases {
testCases = append(testCases,
newTestSuiteIAM(bt, false),
newTestSuiteIAM(bt, true),
)
}
for i, testCase := range testCases {
t.Run(fmt.Sprintf("Test: %d, ServerType: %s", i+1, testCase.serverType), func(t *testing.T) {
runAllIAMSTSTests(testCase, &check{t, testCase.serverType})
})
etcdStr := ""
if testCase.withEtcdBackend {
etcdStr = " (with etcd backend)"
}
t.Run(
fmt.Sprintf("Test: %d, ServerType: %s%s", i+1, testCase.serverType, etcdStr),
func(t *testing.T) {
runAllIAMSTSTests(testCase, &check{t, testCase.serverType})
},
)
}
}
@ -135,21 +149,16 @@ func (s *TestSuiteIAM) TestSTS(c *check) {
}
}
const (
EnvTestLDAPServer = "LDAP_TEST_SERVER"
)
func (s *TestSuiteIAM) GetLDAPServer(c *check) string {
return os.Getenv(EnvTestLDAPServer)
}
// SetUpLDAP - expects to setup an LDAP test server using the test LDAP
// container and canned data from https://github.com/minio/minio-ldap-testing
func (s *TestSuiteIAM) SetUpLDAP(c *check) {
func (s *TestSuiteIAM) SetUpLDAP(c *check, serverAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), testDefaultTimeout)
defer cancel()
serverAddr := s.GetLDAPServer(c)
configCmds := []string{
"identity_ldap",
fmt.Sprintf("server_addr=%s", serverAddr),
@ -169,31 +178,50 @@ func (s *TestSuiteIAM) SetUpLDAP(c *check) {
s.RestartIAMSuite(c)
}
const (
EnvTestLDAPServer = "LDAP_TEST_SERVER"
)
func TestIAMWithLDAPServerSuite(t *testing.T) {
testCases := []*TestSuiteIAM{
baseTestCases := []TestSuiteCommon{
// Init and run test on FS backend with signature v4.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4}),
{serverType: "FS", signer: signerV4},
// Init and run test on FS backend, with tls enabled.
newTestSuiteIAM(TestSuiteCommon{serverType: "FS", signer: signerV4, secure: true}),
{serverType: "FS", signer: signerV4, secure: true},
// Init and run test on Erasure backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "Erasure", signer: signerV4}),
{serverType: "Erasure", signer: signerV4},
// Init and run test on ErasureSet backend.
newTestSuiteIAM(TestSuiteCommon{serverType: "ErasureSet", signer: signerV4}),
{serverType: "ErasureSet", signer: signerV4},
}
testCases := []*TestSuiteIAM{}
for _, bt := range baseTestCases {
testCases = append(testCases,
newTestSuiteIAM(bt, false),
newTestSuiteIAM(bt, true),
)
}
for i, testCase := range testCases {
t.Run(fmt.Sprintf("Test: %d, ServerType: %s", i+1, testCase.serverType), func(t *testing.T) {
c := &check{t, testCase.serverType}
suite := testCase
etcdStr := ""
if testCase.withEtcdBackend {
etcdStr = " (with etcd backend)"
}
t.Run(
fmt.Sprintf("Test: %d, ServerType: %s%s", i+1, testCase.serverType, etcdStr),
func(t *testing.T) {
c := &check{t, testCase.serverType}
suite := testCase
if suite.GetLDAPServer(c) == "" {
return
}
ldapServer := os.Getenv(EnvTestLDAPServer)
if ldapServer == "" {
c.Skip("Skipping LDAP test as no LDAP server is provided.")
}
suite.SetUpSuite(c)
suite.SetUpLDAP(c)
suite.TestLDAPSTS(c)
suite.TearDownSuite(c)
})
suite.SetUpSuite(c)
suite.SetUpLDAP(c, ldapServer)
suite.TestLDAPSTS(c)
suite.TearDownSuite(c)
},
)
}
}

View file

@ -355,6 +355,8 @@ func initTestServerWithBackend(ctx context.Context, t TestErrHandler, testServer
newAllSubsystems()
globalEtcdClient = nil
initAllSubsystems(ctx, objLayer)
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)

View file

@ -47,7 +47,7 @@ func (az *warmBackendAzure) getDest(object string) string {
}
func (az *warmBackendAzure) tier() azblob.AccessTierType {
for _, t := range azblob.PossibleAccessTierTypeValues() {
if strings.ToLower(az.StorageClass) == strings.ToLower(string(t)) {
if strings.EqualFold(az.StorageClass, string(t)) {
return t
}
}

View file

@ -50,6 +50,10 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error {
var empty bytes.Reader
rv, err := w.Put(ctx, probeObject, &empty, 0)
if err != nil {
switch err.(type) {
case BackendDown:
return err
}
return tierPermErr{
Op: tierPut,
Err: err,
@ -58,6 +62,10 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error {
_, err = w.Get(ctx, probeObject, rv, WarmBackendGetOpts{})
if err != nil {
switch err.(type) {
case BackendDown:
return err
}
switch {
case isErrBucketNotFound(err):
return errTierBucketNotFound
@ -72,6 +80,10 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error {
}
if err = w.Remove(ctx, probeObject, rv); err != nil {
switch err.(type) {
case BackendDown:
return err
}
return tierPermErr{
Op: tierDelete,
Err: err,

View file

@ -1,10 +1,22 @@
#!/usr/bin/env bash
trap 'catch' ERR
trap 'catch $LINENO' ERR
# shellcheck disable=SC2120
catch() {
if [ $# -ne 0 ]; then
echo "error on line $1"
for site in sitea siteb sitec; do
echo "$site server logs ========="
cat "/tmp/${site}_1.log"
echo "==========================="
cat "/tmp/${site}_2.log"
done
fi
echo "Cleaning up instances of MinIO"
pkill minio
pkill -9 minio
rm -rf /tmp/multisitea
rm -rf /tmp/multisiteb
rm -rf /tmp/multisitec
@ -13,47 +25,47 @@ catch() {
catch
set -e
go install -v
export MINIO_BROWSER=off
export MINIO_ROOT_USER="minio"
export MINIO_ROOT_PASSWORD="minio123"
export MINIO_PROMETHEUS_AUTH_TYPE=public
export PATH=${GOPATH}/bin:${PATH}
minio server --address :9001 "http://localhost:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
"http://localhost:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_1.log 2>&1 &
minio server --address :9002 "http://localhost:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
"http://localhost:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_2.log 2>&1 &
minio server --address 127.0.0.1:9001 "http://127.0.0.1:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
"http://127.0.0.1:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_1.log 2>&1 &
minio server --address 127.0.0.1:9002 "http://127.0.0.1:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
"http://127.0.0.1:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_2.log 2>&1 &
minio server --address :9003 "http://localhost:9003/tmp/multisiteb/data/disterasure/xl{1...4}" \
"http://localhost:9004/tmp/multisiteb/data/disterasure/xl{5...8}" >/tmp/siteb_1.log 2>&1 &
minio server --address :9004 "http://localhost:9003/tmp/multisiteb/data/disterasure/xl{1...4}" \
"http://localhost:9004/tmp/multisiteb/data/disterasure/xl{5...8}" >/tmp/siteb_2.log 2>&1 &
minio server --address 127.0.0.1:9003 "http://127.0.0.1:9003/tmp/multisiteb/data/disterasure/xl{1...4}" \
"http://127.0.0.1:9004/tmp/multisiteb/data/disterasure/xl{5...8}" >/tmp/siteb_1.log 2>&1 &
minio server --address 127.0.0.1:9004 "http://127.0.0.1:9003/tmp/multisiteb/data/disterasure/xl{1...4}" \
"http://127.0.0.1:9004/tmp/multisiteb/data/disterasure/xl{5...8}" >/tmp/siteb_2.log 2>&1 &
minio server --address :9005 "http://localhost:9005/tmp/multisitec/data/disterasure/xl{1...4}" \
"http://localhost:9006/tmp/multisitec/data/disterasure/xl{5...8}" >/tmp/sitec_1.log 2>&1 &
minio server --address :9006 "http://localhost:9005/tmp/multisitec/data/disterasure/xl{1...4}" \
"http://localhost:9006/tmp/multisitec/data/disterasure/xl{5...8}" >/tmp/sitec_2.log 2>&1 &
minio server --address 127.0.0.1:9005 "http://127.0.0.1:9005/tmp/multisitec/data/disterasure/xl{1...4}" \
"http://127.0.0.1:9006/tmp/multisitec/data/disterasure/xl{5...8}" >/tmp/sitec_1.log 2>&1 &
minio server --address 127.0.0.1:9006 "http://127.0.0.1:9005/tmp/multisitec/data/disterasure/xl{1...4}" \
"http://127.0.0.1:9006/tmp/multisitec/data/disterasure/xl{5...8}" >/tmp/sitec_2.log 2>&1 &
sleep 30
mc alias set sitea http://localhost:9001 minio minio123
mc alias set sitea http://127.0.0.1:9001 minio minio123
mc mb sitea/bucket
mc version enable sitea/bucket
mc mb -l sitea/olockbucket
mc alias set siteb http://localhost:9004 minio minio123
mc alias set siteb http://127.0.0.1:9004 minio minio123
mc mb siteb/bucket/
mc version enable siteb/bucket/
mc mb -l siteb/olockbucket/
mc alias set sitec http://localhost:9006 minio minio123
mc alias set sitec http://127.0.0.1:9006 minio minio123
mc mb sitec/bucket/
mc version enable sitec/bucket/
mc mb -l sitec/olockbucket
echo "adding replication config for site a -> site b"
remote_arn=$(mc admin bucket remote add sitea/bucket/ \
http://minio:minio123@localhost:9004/bucket \
http://minio:minio123@127.0.0.1:9004/bucket \
--service "replication" --json | jq -r ".RemoteARN")
echo "adding replication rule for a -> b : ${remote_arn}"
sleep 1
@ -64,7 +76,7 @@ sleep 1
echo "adding replication config for site b -> site a"
remote_arn=$(mc admin bucket remote add siteb/bucket/ \
http://minio:minio123@localhost:9001/bucket \
http://minio:minio123@127.0.0.1:9001/bucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for b -> a : ${remote_arn}"
@ -75,7 +87,7 @@ sleep 1
echo "adding replication config for site a -> site c"
remote_arn=$(mc admin bucket remote add sitea/bucket/ \
http://minio:minio123@localhost:9006/bucket \
http://minio:minio123@127.0.0.1:9006/bucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for a -> c : ${remote_arn}"
@ -85,7 +97,7 @@ mc replicate add sitea/bucket/ \
sleep 1
echo "adding replication config for site c -> site a"
remote_arn=$(mc admin bucket remote add sitec/bucket/ \
http://minio:minio123@localhost:9001/bucket \
http://minio:minio123@127.0.0.1:9001/bucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for c -> a : ${remote_arn}"
@ -95,7 +107,7 @@ mc replicate add sitec/bucket/ \
sleep 1
echo "adding replication config for site b -> site c"
remote_arn=$(mc admin bucket remote add siteb/bucket/ \
http://minio:minio123@localhost:9006/bucket \
http://minio:minio123@127.0.0.1:9006/bucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for b -> c : ${remote_arn}"
@ -106,7 +118,7 @@ sleep 1
echo "adding replication config for site c -> site b"
remote_arn=$(mc admin bucket remote add sitec/bucket \
http://minio:minio123@localhost:9004/bucket \
http://minio:minio123@127.0.0.1:9004/bucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for c -> b : ${remote_arn}"
@ -116,7 +128,7 @@ mc replicate add sitec/bucket/ \
sleep 1
echo "adding replication config for olockbucket site a -> site b"
remote_arn=$(mc admin bucket remote add sitea/olockbucket/ \
http://minio:minio123@localhost:9004/olockbucket \
http://minio:minio123@127.0.0.1:9004/olockbucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for olockbucket a -> b : ${remote_arn}"
@ -126,7 +138,7 @@ mc replicate add sitea/olockbucket/ \
sleep 1
echo "adding replication config for site b -> site a"
remote_arn=$(mc admin bucket remote add siteb/olockbucket/ \
http://minio:minio123@localhost:9001/olockbucket \
http://minio:minio123@127.0.0.1:9001/olockbucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for olockbucket b -> a : ${remote_arn}"
@ -136,7 +148,7 @@ mc replicate add siteb/olockbucket/ \
sleep 1
echo "adding replication config for olockbucket site a -> site c"
remote_arn=$(mc admin bucket remote add sitea/olockbucket/ \
http://minio:minio123@localhost:9006/olockbucket \
http://minio:minio123@127.0.0.1:9006/olockbucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for olockbucket a -> c : ${remote_arn}"
@ -146,7 +158,7 @@ mc replicate add sitea/olockbucket/ \
sleep 1
echo "adding replication config for site c -> site a"
remote_arn=$(mc admin bucket remote add sitec/olockbucket/ \
http://minio:minio123@localhost:9001/olockbucket \
http://minio:minio123@127.0.0.1:9001/olockbucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for olockbucket c -> a : ${remote_arn}"
@ -156,7 +168,7 @@ mc replicate add sitec/olockbucket/ \
sleep 1
echo "adding replication config for site b -> site c"
remote_arn=$(mc admin bucket remote add siteb/olockbucket/ \
http://minio:minio123@localhost:9006/olockbucket \
http://minio:minio123@127.0.0.1:9006/olockbucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for olockbucket b -> c : ${remote_arn}"
@ -166,7 +178,7 @@ mc replicate add siteb/olockbucket/ \
sleep 1
echo "adding replication config for site c -> site b"
remote_arn=$(mc admin bucket remote add sitec/olockbucket \
http://minio:minio123@localhost:9004/olockbucket \
http://minio:minio123@127.0.0.1:9004/olockbucket \
--service "replication" --json | jq -r ".RemoteARN")
sleep 1
echo "adding replication rule for olockbucket c -> b : ${remote_arn}"

View file

@ -93,13 +93,13 @@ master key to automatically encrypt all cached content.
Note that cache KMS master key is not recommended for use in production deployments. If the MinIO server/gateway machine is ever compromised, the cache KMS master key must also be treated as compromised.
Support for external KMS to manage cache KMS keys is on the roadmap,and would be ideal for production use cases.
- `MINIO_CACHE_COMMIT` setting of `writethrough` allows caching of multipart uploads synchronously if enabled. By default, single PUT operations are already cached on write without any special setting.
- `MINIO_CACHE_COMMIT` setting of `writethrough` allows caching of single and multipart uploads synchronously if enabled. By default, however single PUT operations are cached asynchronously on write without any special setting.
- Partially cached stale uploads older than 24 hours are automatically cleaned up.
- Expiration happens automatically based on the configured interval as explained above, frequently accessed objects stay alive in cache for a significantly longer time.
> NOTE: `MINIO_CACHE_COMMIT` also has a value of `writeback` which allows staging single uploads in cache before committing to remote. However, for consistency reasons, `writeback` staging uploads in the cache are not permitted for multipart uploads.
> NOTE: `MINIO_CACHE_COMMIT` also has a value of `writeback` which allows staging single uploads in cache before committing to remote. It is not possible to stage multipart uploads in the cache for consistency reasons - hence, multipart uploads will be cached synchronously even if `writeback` is set.
### Crash Recovery

View file

@ -2,7 +2,7 @@ version: '3.7'
# Settings and configurations that are common for all containers
x-minio-common: &minio-common
image: quay.io/minio/minio:RELEASE.2021-11-05T09-16-26Z
image: quay.io/minio/minio:RELEASE.2021-11-09T03-21-45Z
command: server --console-address ":9001" http://minio{1...4}/data{1...2}
expose:
- "9000"

View file

@ -382,7 +382,7 @@ func IsObjectLockLegalHoldRequested(h http.Header) bool {
// IsObjectLockGovernanceBypassSet returns true if object lock governance bypass header is set.
func IsObjectLockGovernanceBypassSet(h http.Header) bool {
return strings.ToLower(h.Get(AmzObjectLockBypassRetGovernance)) == "true"
return strings.EqualFold(h.Get(AmzObjectLockBypassRetGovernance), "true")
}
// IsObjectLockRequested returns true if legal hold or object lock retention headers are requested.

View file

@ -217,6 +217,20 @@ func (kvs KVS) Empty() bool {
return len(kvs) == 0
}
// Clone - returns a copy of the KVS
func (kvs KVS) Clone() KVS {
return append(make(KVS, 0, len(kvs)), kvs...)
}
// GetWithDefault - returns default value if key not set
func (kvs KVS) GetWithDefault(key string, defaultKVS KVS) string {
v := kvs.Get(key)
if len(v) == 0 {
return defaultKVS.Get(key)
}
return v
}
// Keys returns the list of keys for the current KVS
func (kvs KVS) Keys() []string {
var keys = make([]string, len(kvs))
@ -759,10 +773,12 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) (dynamic bool, err e
kvs.Set(Enable, EnableOn)
}
currKVS, ok := c[subSys][tgt]
var currKVS KVS
ck, ok := c[subSys][tgt]
if !ok {
currKVS = defaultKVS[subSys]
currKVS = defaultKVS[subSys].Clone()
} else {
currKVS = ck.Clone()
for _, kv := range defaultKVS[subSys] {
if _, ok = currKVS.Lookup(kv.Key); !ok {
currKVS.Set(kv.Key, kv.Value)

View file

@ -150,15 +150,15 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.HealSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
cfg.Bitrot, err = config.ParseBool(env.Get(EnvBitrot, kvs.Get(Bitrot)))
cfg.Bitrot, err = config.ParseBool(env.Get(EnvBitrot, kvs.GetWithDefault(Bitrot, DefaultKVS)))
if err != nil {
return cfg, fmt.Errorf("'heal:bitrotscan' value invalid: %w", err)
}
cfg.Sleep, err = time.ParseDuration(env.Get(EnvSleep, kvs.Get(Sleep)))
cfg.Sleep, err = time.ParseDuration(env.Get(EnvSleep, kvs.GetWithDefault(Sleep, DefaultKVS)))
if err != nil {
return cfg, fmt.Errorf("'heal:max_sleep' value invalid: %w", err)
}
cfg.IOCount, err = strconv.Atoi(env.Get(EnvIOCount, kvs.Get(IOCount)))
cfg.IOCount, err = strconv.Atoi(env.Get(EnvIOCount, kvs.GetWithDefault(IOCount, DefaultKVS)))
if err != nil {
return cfg, fmt.Errorf("'heal:max_io' value invalid: %w", err)
}

View file

@ -95,7 +95,7 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
}
delay := env.Get(EnvDelayLegacy, "")
if delay == "" {
delay = env.Get(EnvDelay, kvs.Get(Delay))
delay = env.Get(EnvDelay, kvs.GetWithDefault(Delay, DefaultKVS))
}
cfg.Delay, err = strconv.ParseFloat(delay, 64)
if err != nil {
@ -103,14 +103,14 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
}
maxWait := env.Get(EnvMaxWaitLegacy, "")
if maxWait == "" {
maxWait = env.Get(EnvMaxWait, kvs.Get(MaxWait))
maxWait = env.Get(EnvMaxWait, kvs.GetWithDefault(MaxWait, DefaultKVS))
}
cfg.MaxWait, err = time.ParseDuration(maxWait)
if err != nil {
return cfg, err
}
cfg.Cycle, err = time.ParseDuration(env.Get(EnvCycle, kvs.Get(Cycle)))
cfg.Cycle, err = time.ParseDuration(env.Get(EnvCycle, kvs.GetWithDefault(Cycle, DefaultKVS)))
if err != nil {
return cfg, err
}

View file

@ -46,7 +46,8 @@ func (sses3) String() string { return "SSE-S3" }
func (sses3) IsRequested(h http.Header) bool {
_, ok := h[xhttp.AmzServerSideEncryption]
return ok && strings.ToLower(h.Get(xhttp.AmzServerSideEncryption)) != xhttp.AmzEncryptionKMS // Return only true if the SSE header is specified and does not contain the SSE-KMS value
// Return only true if the SSE header is specified and does not contain the SSE-KMS value
return ok && !strings.EqualFold(h.Get(xhttp.AmzServerSideEncryption), xhttp.AmzEncryptionKMS)
}
// ParseHTTP parses the SSE-S3 related HTTP headers and checks

View file

@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"go/build"
"hash"
"net/http"
"path/filepath"
"reflect"
@ -39,8 +38,6 @@ import (
var (
// HighwayHash key for logging in anonymous mode
magicHighwayHash256Key = []byte("\x4b\xe7\x34\xfa\x8e\x23\x8a\xcd\x26\x3e\x83\xe6\xbb\x96\x85\x52\x04\x0f\x93\x5d\xa3\x9f\x44\x14\x97\xe0\x9d\x13\x22\xde\x36\xa0")
// HighwayHash hasher for logging in anonymous mode
loggerHighwayHasher hash.Hash
)
// Disable disables all logging, false by default. (used for "go test")
@ -207,8 +204,6 @@ func Init(goPath string, goRoot string) {
// paths like "{GOROOT}/src/github.com/minio/minio"
// and "{GOPATH}/src/github.com/minio/minio"
trimStrings = append(trimStrings, filepath.Join("github.com", "minio", "minio")+string(filepath.Separator))
loggerHighwayHasher, _ = highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit
}
func trimTrace(f string) string {
@ -263,10 +258,9 @@ func getTrace(traceLevel int) []string {
// Return the highway hash of the passed string
func hashString(input string) string {
defer loggerHighwayHasher.Reset()
loggerHighwayHasher.Write([]byte(input))
checksum := loggerHighwayHasher.Sum(nil)
return hex.EncodeToString(checksum)
hh, _ := highwayhash.New(magicHighwayHash256Key)
hh.Write([]byte(input))
return hex.EncodeToString(hh.Sum(nil))
}
// Kind specifies the kind of error log

View file

@ -387,7 +387,7 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
FieldDelimiter: []rune(s3Select.Output.CSVArgs.FieldDelimiter)[0],
Quote: []rune(s3Select.Output.CSVArgs.QuoteCharacter)[0],
QuoteEscape: []rune(s3Select.Output.CSVArgs.QuoteEscapeCharacter)[0],
AlwaysQuote: strings.ToLower(s3Select.Output.CSVArgs.QuoteFields) == "always",
AlwaysQuote: strings.EqualFold(s3Select.Output.CSVArgs.QuoteFields, "always"),
}
err := record.WriteCSV(bufioWriter, opts)
if err != nil {

View file

@ -179,7 +179,7 @@ func (e *PrimaryTerm) analyze(s *Select) (result qProp) {
case e.JPathExpr != nil:
// Check if the path expression is valid
if len(e.JPathExpr.PathExpr) > 0 {
if e.JPathExpr.BaseKey.String() != s.From.As && strings.ToLower(e.JPathExpr.BaseKey.String()) != baseTableName {
if e.JPathExpr.BaseKey.String() != s.From.As && !strings.EqualFold(e.JPathExpr.BaseKey.String(), baseTableName) {
result = qProp{err: errInvalidKeypath}
return
}

View file

@ -31,7 +31,7 @@ type Boolean bool
// Capture interface used by participle
func (b *Boolean) Capture(values []string) error {
*b = strings.ToLower(values[0]) == "true"
*b = Boolean(strings.EqualFold(values[0], "true"))
return nil
}

View file

@ -118,7 +118,7 @@ func ParseSelectStatement(s string) (stmt SelectStatement, err error) {
}
func validateTableName(from *TableExpression) error {
if strings.ToLower(from.Table.BaseKey.String()) != baseTableName {
if !strings.EqualFold(from.Table.BaseKey.String(), baseTableName) {
return errBadTableName(errors.New("table name must be `s3object`"))
}

View file

@ -44,7 +44,7 @@ func (e *JSONPath) StripTableAlias(tableAlias string) []*JSONPathElement {
return e.strippedPathExpr
}
hasTableAlias := e.BaseKey.String() == tableAlias || strings.ToLower(e.BaseKey.String()) == baseTableName
hasTableAlias := e.BaseKey.String() == tableAlias || strings.EqualFold(e.BaseKey.String(), baseTableName)
var pathExpr []*JSONPathElement
if hasTableAlias {
pathExpr = e.PathExpr