Compare commits

...

2 Commits

Author SHA1 Message Date
Harshavardhana c81a2e2d53 add format header and version 2021-11-09 16:40:38 -08:00
Harshavardhana bf19e9b52b feat: decommission feature for pools 2021-11-09 16:40:38 -08:00
26 changed files with 2069 additions and 333 deletions

View File

@ -66,7 +66,7 @@ function start_minio_pool_erasure_sets_ipv6()
{
export MINIO_ROOT_USER=$ACCESS_KEY
export MINIO_ROOT_PASSWORD=$SECRET_KEY
export MINIO_ENDPOINTS="http://[::1]:9000${WORK_DIR}/pool-disk-sets{1...4} http://[::1]:9001${WORK_DIR}/pool-disk-sets{5...8}"
export MINIO_ENDPOINTS="http://[::1]:9000${WORK_DIR}/pool-disk-sets-ipv6{1...4} http://[::1]:9001${WORK_DIR}/pool-disk-sets-ipv6{5...8}"
"${MINIO[@]}" server --address="[::1]:9000" > "$WORK_DIR/pool-minio-ipv6-9000.log" 2>&1 &
"${MINIO[@]}" server --address="[::1]:9001" > "$WORK_DIR/pool-minio-ipv6-9001.log" 2>&1 &

View File

@ -18,7 +18,7 @@ function start_minio_3_node() {
export MINIO_ROOT_PASSWORD=minio123
export MINIO_ERASURE_SET_DRIVE_COUNT=6
start_port=$(shuf -i 10000-65000 -n 1)
start_port=$2
args=""
for i in $(seq 1 3); do
args="$args http://127.0.0.1:$[$start_port+$i]${WORK_DIR}/$i/1/ http://127.0.0.1:$[$start_port+$i]${WORK_DIR}/$i/2/ http://127.0.0.1:$[$start_port+$i]${WORK_DIR}/$i/3/ http://127.0.0.1:$[$start_port+$i]${WORK_DIR}/$i/4/ http://127.0.0.1:$[$start_port+$i]${WORK_DIR}/$i/5/ http://127.0.0.1:$[$start_port+$i]${WORK_DIR}/$i/6/"
@ -85,14 +85,14 @@ function __init__()
}
function perform_test() {
start_minio_3_node 120
start_minio_3_node 120 $2
echo "Testing Distributed Erasure setup healing of drives"
echo "Remove the contents of the disks belonging to '${1}' erasure set"
rm -rf ${WORK_DIR}/${1}/*/
start_minio_3_node 120
start_minio_3_node 120 $2
rv=$(check_online)
if [ "$rv" == "1" ]; then
@ -109,9 +109,12 @@ function perform_test() {
function main()
{
perform_test "2"
perform_test "1"
perform_test "3"
# use same ports for all tests
start_port=$(shuf -i 10000-65000 -n 1)
perform_test "2" ${start_port}
perform_test "1" ${start_port}
perform_test "3" ${start_port}
}
( __init__ "$@" && main "$@" )

179
cmd/admin-handlers-pools.go Normal file
View File

@ -0,0 +1,179 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/minio/minio/internal/logger"
iampolicy "github.com/minio/pkg/iam/policy"
)
func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "StartDecommission")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction)
if objectAPI == nil {
return
}
// Legacy args style such as non-ellipses style is not supported with this API.
if globalEndpoints.Legacy() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
pools, ok := objectAPI.(*erasureServerPools)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
v := vars["pool"]
idx := globalEndpoints.GetPoolIdx(v)
if idx == -1 {
// We didn't find any matching pools, invalid input
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
return
}
if err := pools.Decommission(r.Context(), idx); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}
func (a adminAPIHandlers) CancelDecommission(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "CancelDecommission")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction)
if objectAPI == nil {
return
}
// Legacy args style such as non-ellipses style is not supported with this API.
if globalEndpoints.Legacy() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
pools, ok := objectAPI.(*erasureServerPools)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
v := vars["pool"]
idx := globalEndpoints.GetPoolIdx(v)
if idx == -1 {
// We didn't find any matching pools, invalid input
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
return
}
if err := pools.DecommissionCancel(idx); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}
func (a adminAPIHandlers) StatusPool(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "StatusPool")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction)
if objectAPI == nil {
return
}
// Legacy args style such as non-ellipses style is not supported with this API.
if globalEndpoints.Legacy() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
pools, ok := objectAPI.(*erasureServerPools)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
v := vars["pool"]
idx := globalEndpoints.GetPoolIdx(v)
if idx == -1 {
// We didn't find any matching pools, invalid input
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
return
}
status, err := pools.Status(r.Context(), idx)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
logger.LogIf(r.Context(), json.NewEncoder(w).Encode(&status))
}
func (a adminAPIHandlers) ListPools(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ListPools")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction)
if objectAPI == nil {
return
}
// Legacy args style such as non-ellipses style is not supported with this API.
if globalEndpoints.Legacy() {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
pools, ok := objectAPI.(*erasureServerPools)
if !ok {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
poolsStatus := make([]PoolStatus, len(globalEndpoints))
for idx := range globalEndpoints {
status, err := pools.Status(r.Context(), idx)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
poolsStatus[idx] = status
}
logger.LogIf(r.Context(), json.NewEncoder(w).Encode(poolsStatus))
}

View File

@ -80,8 +80,12 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(gz(httpTraceAll(adminAPI.BackgroundHealStatusHandler)))
/// Health operations
/// Pool operations
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/pools/list").HandlerFunc(gz(httpTraceAll(adminAPI.ListPools)))
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/pools/status").HandlerFunc(gz(httpTraceAll(adminAPI.StatusPool))).Queries("pool", "{pool:.*}")
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/decomission").HandlerFunc(gz(httpTraceAll(adminAPI.StartDecommission))).Queries("pool", "{pool:.*}")
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/cancel").HandlerFunc(gz(httpTraceAll(adminAPI.CancelDecommission))).Queries("pool", "{pool:.*}")
}
// Profiling operations

View File

@ -355,9 +355,11 @@ func createServerEndpoints(serverAddr string, args ...string) (
return nil, -1, err
}
endpointServerPools = append(endpointServerPools, PoolEndpoints{
Legacy: true,
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
Endpoints: endpointList,
CmdLine: strings.Join(args, " "),
})
setupType = newSetupType
return endpointServerPools, setupType, nil
@ -378,6 +380,7 @@ func createServerEndpoints(serverAddr string, args ...string) (
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
Endpoints: endpointList,
CmdLine: arg,
}); err != nil {
return nil, -1, err
}

View File

@ -197,14 +197,28 @@ func NewEndpoint(arg string) (ep Endpoint, e error) {
// PoolEndpoints represent endpoints in a given pool
// along with its setCount and setDriveCount.
type PoolEndpoints struct {
// indicates if endpoints are provided in non-ellipses style
Legacy bool
SetCount int
DrivesPerSet int
Endpoints Endpoints
CmdLine string
}
// EndpointServerPools - list of list of endpoints
type EndpointServerPools []PoolEndpoints
// GetPoolIdx return pool index
func (l EndpointServerPools) GetPoolIdx(pool string) int {
for id, ep := range globalEndpoints {
if ep.CmdLine != pool {
continue
}
return id
}
return -1
}
// GetLocalPoolIdx returns the pool which endpoint belongs to locally.
// if ep is remote this code will return -1 poolIndex
func (l EndpointServerPools) GetLocalPoolIdx(ep Endpoint) int {
@ -220,6 +234,13 @@ func (l EndpointServerPools) GetLocalPoolIdx(ep Endpoint) int {
return -1
}
// Legacy returns 'true' if the MinIO server commandline was
// provided with no ellipses pattern, those are considered
// legacy deployments.
func (l EndpointServerPools) Legacy() bool {
return len(l) == 1 && l[0].Legacy
}
// Add add pool endpoints
func (l *EndpointServerPools) Add(zeps PoolEndpoints) error {
existSet := set.NewStringSet()

View File

@ -22,6 +22,38 @@ import (
"sync"
)
func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
var wg sync.WaitGroup
var mu sync.Mutex
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
i := i
wg.Add(1)
go func() {
defer wg.Done()
if disks[i-1] == nil {
return
}
di, err := disks[i-1].DiskInfo(context.Background())
if err != nil || di.Healing {
// - Do not consume disks which are not reachable
// unformatted or simply not accessible for some reason.
//
// - Do not consume disks which are being healed
//
// - Future: skip busy disks
return
}
mu.Lock()
newDisks = append(newDisks, disks[i-1])
mu.Unlock()
}()
}
wg.Wait()
return newDisks
}
func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
// Based on the random shuffling return back randomized disks.

View File

@ -0,0 +1,637 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
)
// PoolDecommissionInfo currently decomissioning information
type PoolDecommissionInfo struct {
StartTime time.Time `json:"startTime" msg:"st"`
StartSize int64 `json:"startSize" msg:"ss"`
TotalSize int64 `json:"totalSize" msg:"ts"`
CurrentSize int64 `json:"currentSize" msg:"cs"`
Complete bool `json:"complete" msg:"cmp"`
Failed bool `json:"failed" msg:"fl"`
}
// PoolStatus captures current pool status
type PoolStatus struct {
ID int `json:"id" msg:"id"`
CmdLine string `json:"cmdline" msg:"cl"`
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
Decommission *PoolDecommissionInfo `json:"decomissionInfo,omitempty" msg:"dec"`
}
//go:generate msgp -file $GOFILE -unexported
type poolMeta struct {
Version int `msg:"v"`
Pools []PoolStatus `msg:"pls"`
}
func (p poolMeta) DecommissionComplete(idx int) bool {
p.Pools[idx].LastUpdate = time.Now()
if p.Pools[idx].Decommission != nil {
p.Pools[idx].Decommission.Complete = true
p.Pools[idx].Decommission.Failed = false
return true
}
return false
}
func (p poolMeta) DecommissionFailed(idx int) bool {
p.Pools[idx].LastUpdate = time.Now()
if p.Pools[idx].Decommission != nil {
p.Pools[idx].Decommission.Complete = false
p.Pools[idx].Decommission.Failed = true
return true
}
return false
}
func (p poolMeta) Decommission(idx int, info StorageInfo) bool {
if p.Pools[idx].Decommission != nil && !p.Pools[idx].Decommission.StartTime.IsZero() {
// Decommission is in-progress cannot be allowed.
return false
}
startSize := int64(TotalUsableCapacityFree(info))
totalSize := int64(TotalUsableCapacity(info))
p.Pools[idx].LastUpdate = time.Now()
p.Pools[idx].Decommission = &PoolDecommissionInfo{
StartTime: UTCNow(),
StartSize: startSize,
CurrentSize: startSize,
TotalSize: totalSize,
}
return true
}
func (p poolMeta) IsSuspended(idx int) bool {
return p.Pools[idx].Decommission != nil
}
func (p *poolMeta) load(ctx context.Context, set *erasureSets, sets []*erasureSets) (bool, error) {
gr, err := set.GetObjectNInfo(ctx, minioMetaBucket, poolMetaName,
nil, http.Header{}, readLock, ObjectOptions{})
if err != nil {
if isErrObjectNotFound(err) {
return true, nil
}
return false, err
}
defer gr.Close()
data, err := ioutil.ReadAll(gr)
if err != nil {
return false, err
}
if len(data) == 0 {
return true, nil
}
if len(data) <= 4 {
return false, fmt.Errorf("poolMeta: no data")
}
// Read header
switch binary.LittleEndian.Uint16(data[0:2]) {
case poolMetaFormat:
default:
return false, fmt.Errorf("poolMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case poolMetaVersion:
default:
return false, fmt.Errorf("poolMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
if _, err = p.UnmarshalMsg(data[4:]); err != nil {
return false, err
}
switch p.Version {
case poolMetaVersionV1:
default:
return false, fmt.Errorf("unexpected pool meta version: %d", p.Version)
}
// Total pools cannot reduce upon restart, but allow for
// completely decommissioned pools to be removed.
var rpools int
for _, pool := range p.Pools {
if pool.Decommission == nil {
// pools never decommissioned
rpools++
}
if pool.Decommission != nil && !pool.Decommission.Complete {
// pools were attempted to be decommissioned but not finished yet
rpools++
}
}
npools := len(sets)
if rpools > npools {
return false, fmt.Errorf("unexpected number of pools provided expecting %d, found %d - please check your command line",
rpools, npools)
}
if npools == rpools {
// verify if argument order is same when nothing has changed in the setup.
for idx, pool := range p.Pools {
if sets[idx].endpoints.CmdLine != pool.CmdLine {
return false, fmt.Errorf("ordering change detected in pools expected %s at pool(%d), instead found %s",
pool.CmdLine, idx+1,
sets[idx].endpoints.CmdLine)
}
}
}
return rpools < npools, nil
}
func (p poolMeta) Clone() poolMeta {
meta := poolMeta{
Version: p.Version,
}
meta.Pools = append(meta.Pools, p.Pools...)
return meta
}
func (p poolMeta) save(ctx context.Context, sets []*erasureSets) error {
data := make([]byte, 4, p.Msgsize()+4)
// Initialize the header.
binary.LittleEndian.PutUint16(data[0:2], poolMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], poolMetaVersion)
buf, err := p.MarshalMsg(data)
if err != nil {
return err
}
br := bytes.NewReader(buf)
for _, set := range sets {
r, err := hash.NewReader(br, br.Size(), "", "", br.Size())
if err != nil {
return err
}
if _, err = set.PutObject(ctx, minioMetaBucket, poolMetaName,
NewPutObjReader(r), ObjectOptions{}); err != nil {
return err
}
br.Seek(0, io.SeekStart) // re-seek back the metadata reader.
}
return nil
}
const (
poolMetaName = "pool.meta"
poolMetaFormat = 1
poolMetaVersionV1 = 1
poolMetaVersion = poolMetaVersionV1
)
// Init() initializes pools and saves additional information about them
// in pool.meta, that is eventually used for decomissioning the pool, suspend
// and resume.
func (z *erasureServerPools) Init(ctx context.Context) error {
meta := poolMeta{}
update, err := meta.load(ctx, z.serverPools[0], z.serverPools)
if err != nil {
return err
}
// if no update is needed return right away.
if !update {
z.poolMeta = meta
return nil
}
meta = poolMeta{}
// looks like new pool was added we need to update,
// or this is a fresh installation (or an existing
// installation with pool removed)
meta.Version = poolMetaVersion
for idx, pool := range z.serverPools {
meta.Pools = append(meta.Pools, PoolStatus{
CmdLine: pool.endpoints.CmdLine,
ID: idx,
LastUpdate: time.Now(),
})
}
if err = meta.save(ctx, z.serverPools); err != nil {
return err
}
z.poolMeta = meta
return nil
}
func (z *erasureServerPools) decomissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
defer gr.Close()
objInfo := gr.ObjInfo
if objInfo.isMultipart() {
uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name,
ObjectOptions{
VersionID: objInfo.VersionID,
MTime: objInfo.ModTime,
UserDefined: objInfo.UserDefined,
})
if err != nil {
return err
}
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{})
var parts = make([]CompletePart, 0, len(objInfo.Parts))
for _, part := range objInfo.Parts {
hr, err := hash.NewReader(gr, part.Size, "", "", part.Size)
if err != nil {
return err
}
_, err = z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID,
part.Number,
NewPutObjReader(hr),
ObjectOptions{})
if err != nil {
return err
}
parts = append(parts, CompletePart{
PartNumber: part.Number,
ETag: part.ETag,
})
}
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{
MTime: objInfo.ModTime,
})
return err
}
hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}
_, err = z.PutObject(ctx,
bucket,
objInfo.Name,
NewPutObjReader(hr),
ObjectOptions{
VersionID: objInfo.VersionID,
MTime: objInfo.ModTime,
UserDefined: objInfo.UserDefined,
})
return err
}
func (z *erasureServerPools) decomissionInBackground(ctx context.Context, idx int, buckets []BucketInfo) error {
pool := z.serverPools[idx]
for _, bi := range buckets {
versioned := globalBucketVersioningSys.Enabled(bi.Name)
for _, set := range pool.sets {
disks := set.getOnlineDisks()
if len(disks) == 0 {
logger.LogIf(GlobalContext, fmt.Errorf("no online disks found for set with endpoints %s",
set.getEndpoints()))
continue
}
decomissionEntry := func(entry metaCacheEntry) {
if entry.isDir() {
return
}
fivs, err := entry.fileInfoVersions(bi.Name)
if err != nil {
return
}
// we need a reversed order for Decommissioning,
// to create the appropriate stack.
versionsSorter(fivs.Versions).reverse()
for _, version := range fivs.Versions {
// Skip transitioned objects for now.
if version.IsRemote() {
continue
}
// We will skip decomissioning delete markers
// with single version, its as good as
// there is no data associated with the
// object.
if version.Deleted && len(fivs.Versions) == 1 {
continue
}
if version.Deleted {
_, err := z.DeleteObject(ctx,
bi.Name,
version.Name,
ObjectOptions{
Versioned: versioned,
VersionID: version.VersionID,
MTime: version.ModTime,
DeleteReplication: version.ReplicationState,
})
if err != nil {
logger.LogIf(ctx, err)
} else {
set.DeleteObject(ctx,
bi.Name,
version.Name,
ObjectOptions{
VersionID: version.VersionID,
DeletePrefix: true,
})
}
continue
}
gr, err := set.GetObjectNInfo(ctx,
bi.Name,
version.Name,
nil,
http.Header{},
noLock, // all mutations are blocked reads are safe without locks.
ObjectOptions{
VersionID: version.VersionID,
})
if err != nil {
logger.LogIf(ctx, err)
continue
}
if err = z.decomissionObject(ctx, bi.Name, gr); err != nil {
logger.LogIf(ctx, err)
continue
}
set.DeleteObject(ctx,
bi.Name,
version.Name,
ObjectOptions{
VersionID: version.VersionID,
})
}
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: len(disks) / 2,
objQuorum: len(disks) / 2,
bucket: bi.Name,
}
if err := listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: bi.Name,
path: "",
recursive: true,
forwardTo: "",
minDisks: len(disks),
reportNotFound: false,
agreed: decomissionEntry,
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, ok := entries.resolve(&resolver)
if ok {
decomissionEntry(*entry)
}
},
finished: nil,
}); err != nil {
// Decommissioning failed and won't continue
return err
}
}
}
return nil
}
// DecommissionCancel cancel any active decomission.
func (z *erasureServerPools) DecommissionCancel(idx int) error {
if idx < 0 {
return errInvalidArgument
}
if z.SinglePool() {
return errInvalidArgument
}
// Cancel active decomission.
z.decomissionCancelers[idx]()
return nil
}
// Decommission - start decomission session.
func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error {
if idx < 0 {
return errInvalidArgument
}
if z.SinglePool() {
return errInvalidArgument
}
// Make pool unwritable before decomissioning.
if err := z.StartDecommission(ctx, idx); err != nil {
return err
}
buckets, err := z.ListBuckets(ctx)
if err != nil {
return err
}
go func() {
z.poolMetaMutex.Lock()
var dctx context.Context
dctx, z.decomissionCancelers[idx] = context.WithCancel(GlobalContext)
z.poolMetaMutex.Unlock()
if err := z.decomissionInBackground(dctx, idx, buckets); err != nil {
logger.LogIf(GlobalContext, err)
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
return
}
// Complete the decomission..
logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx))
}()
// Successfully started decomissioning.
return nil
}
func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, error) {
if idx < 0 {
return PoolStatus{}, errInvalidArgument
}
z.poolMetaMutex.RLock()
defer z.poolMetaMutex.RUnlock()
if idx+1 > len(z.poolMeta.Pools) {
return PoolStatus{}, errInvalidArgument
}
pool := z.serverPools[idx]
info, _ := pool.StorageInfo(ctx)
info.Backend.Type = madmin.Erasure
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if scParity <= 0 {
scParity = z.serverPools[0].defaultParityCount
}
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity)
info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity)
info.Backend.StandardSCParity = scParity
info.Backend.RRSCParity = rrSCParity
currentSize := int64(TotalUsableCapacityFree(info))
poolInfo := z.poolMeta.Pools[idx]
if poolInfo.Decommission != nil {
poolInfo.Decommission.CurrentSize = currentSize
} else {
poolInfo.Decommission = &PoolDecommissionInfo{
CurrentSize: currentSize,
TotalSize: int64(TotalUsableCapacity(info)),
}
}
return poolInfo, nil
}
func (z *erasureServerPools) ReloadPoolMeta(ctx context.Context) (err error) {
meta := poolMeta{}
if _, err = meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
return err
}
z.poolMetaMutex.Lock()
defer z.poolMetaMutex.Unlock()
z.poolMeta = meta
return nil
}
func (z *erasureServerPools) DecommissionFailed(ctx context.Context, idx int) (err error) {
if idx < 0 {
return errInvalidArgument
}
if z.SinglePool() {
return errInvalidArgument
}
z.poolMetaMutex.Lock()
defer z.poolMetaMutex.Unlock()
meta := z.poolMeta.Clone()
if meta.DecommissionFailed(idx) {
defer func() {
if err == nil {
z.poolMeta.DecommissionFailed(idx)
globalNotificationSys.ReloadPoolMeta(ctx)
}
}()
return meta.save(ctx, z.serverPools)
}
return nil
}
func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int) (err error) {
if idx < 0 {
return errInvalidArgument
}
if z.SinglePool() {
return errInvalidArgument
}
z.poolMetaMutex.Lock()
defer z.poolMetaMutex.Unlock()
meta := z.poolMeta.Clone()
if meta.DecommissionComplete(idx) {
defer func() {
if err == nil {
z.poolMeta.DecommissionComplete(idx)
globalNotificationSys.ReloadPoolMeta(ctx)
}
}()
return meta.save(ctx, z.serverPools)
}
return nil
}
func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) {
if idx < 0 {
return errInvalidArgument
}
if z.SinglePool() {
return errInvalidArgument
}
var pool *erasureSets
for pidx := range z.serverPools {
if pidx == idx {
pool = z.serverPools[idx]
break
}
}
if pool == nil {
return errInvalidArgument
}
info, _ := pool.StorageInfo(ctx)
info.Backend.Type = madmin.Erasure
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if scParity <= 0 {
scParity = z.serverPools[0].defaultParityCount
}
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity)
info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity)
info.Backend.StandardSCParity = scParity
info.Backend.RRSCParity = rrSCParity
z.poolMetaMutex.Lock()
defer z.poolMetaMutex.Unlock()
meta := z.poolMeta.Clone()
if meta.Decommission(idx, info) {
defer func() {
if err == nil {
z.poolMeta.Decommission(idx, info)
globalNotificationSys.ReloadPoolMeta(ctx)
}
}()
return meta.save(ctx, z.serverPools)
}
return nil
}

View File

@ -0,0 +1,627 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "st":
z.StartTime, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "StartTime")
return
}
case "ss":
z.StartSize, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "StartSize")
return
}
case "ts":
z.TotalSize, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "TotalSize")
return
}
case "cs":
z.CurrentSize, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "CurrentSize")
return
}
case "cmp":
z.Complete, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "Complete")
return
}
case "fl":
z.Failed, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "Failed")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// write "st"
err = en.Append(0x86, 0xa2, 0x73, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.StartTime)
if err != nil {
err = msgp.WrapError(err, "StartTime")
return
}
// write "ss"
err = en.Append(0xa2, 0x73, 0x73)
if err != nil {
return
}
err = en.WriteInt64(z.StartSize)
if err != nil {
err = msgp.WrapError(err, "StartSize")
return
}
// write "ts"
err = en.Append(0xa2, 0x74, 0x73)
if err != nil {
return
}
err = en.WriteInt64(z.TotalSize)
if err != nil {
err = msgp.WrapError(err, "TotalSize")
return
}
// write "cs"
err = en.Append(0xa2, 0x63, 0x73)
if err != nil {
return
}
err = en.WriteInt64(z.CurrentSize)
if err != nil {
err = msgp.WrapError(err, "CurrentSize")
return
}
// write "cmp"
err = en.Append(0xa3, 0x63, 0x6d, 0x70)
if err != nil {
return
}
err = en.WriteBool(z.Complete)
if err != nil {
err = msgp.WrapError(err, "Complete")
return
}
// write "fl"
err = en.Append(0xa2, 0x66, 0x6c)
if err != nil {
return
}
err = en.WriteBool(z.Failed)
if err != nil {
err = msgp.WrapError(err, "Failed")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// string "st"
o = append(o, 0x86, 0xa2, 0x73, 0x74)
o = msgp.AppendTime(o, z.StartTime)
// string "ss"
o = append(o, 0xa2, 0x73, 0x73)
o = msgp.AppendInt64(o, z.StartSize)
// string "ts"
o = append(o, 0xa2, 0x74, 0x73)
o = msgp.AppendInt64(o, z.TotalSize)
// string "cs"
o = append(o, 0xa2, 0x63, 0x73)
o = msgp.AppendInt64(o, z.CurrentSize)
// string "cmp"
o = append(o, 0xa3, 0x63, 0x6d, 0x70)
o = msgp.AppendBool(o, z.Complete)
// string "fl"
o = append(o, 0xa2, 0x66, 0x6c)
o = msgp.AppendBool(o, z.Failed)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "st":
z.StartTime, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "StartTime")
return
}
case "ss":
z.StartSize, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "StartSize")
return
}
case "ts":
z.TotalSize, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "TotalSize")
return
}
case "cs":
z.CurrentSize, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "CurrentSize")
return
}
case "cmp":
z.Complete, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Complete")
return
}
case "fl":
z.Failed, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Failed")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *PoolDecommissionInfo) Msgsize() (s int) {
s = 1 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize
return
}
// DecodeMsg implements msgp.Decodable
func (z *PoolStatus) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.ID, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
case "cl":
z.CmdLine, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "CmdLine")
return
}
case "lu":
z.LastUpdate, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "LastUpdate")
return
}
case "dec":
if dc.IsNil() {
err = dc.ReadNil()
if err != nil {
err = msgp.WrapError(err, "Decommission")
return
}
z.Decommission = nil
} else {
if z.Decommission == nil {
z.Decommission = new(PoolDecommissionInfo)
}
err = z.Decommission.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "Decommission")
return
}
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *PoolStatus) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 4
// write "id"
err = en.Append(0x84, 0xa2, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteInt(z.ID)
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
// write "cl"
err = en.Append(0xa2, 0x63, 0x6c)
if err != nil {
return
}
err = en.WriteString(z.CmdLine)
if err != nil {
err = msgp.WrapError(err, "CmdLine")
return
}
// write "lu"
err = en.Append(0xa2, 0x6c, 0x75)
if err != nil {
return
}
err = en.WriteTime(z.LastUpdate)
if err != nil {
err = msgp.WrapError(err, "LastUpdate")
return
}
// write "dec"
err = en.Append(0xa3, 0x64, 0x65, 0x63)
if err != nil {
return
}
if z.Decommission == nil {
err = en.WriteNil()
if err != nil {
return
}
} else {
err = z.Decommission.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "Decommission")
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *PoolStatus) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 4
// string "id"
o = append(o, 0x84, 0xa2, 0x69, 0x64)
o = msgp.AppendInt(o, z.ID)
// string "cl"
o = append(o, 0xa2, 0x63, 0x6c)
o = msgp.AppendString(o, z.CmdLine)
// string "lu"
o = append(o, 0xa2, 0x6c, 0x75)
o = msgp.AppendTime(o, z.LastUpdate)
// string "dec"
o = append(o, 0xa3, 0x64, 0x65, 0x63)
if z.Decommission == nil {
o = msgp.AppendNil(o)
} else {
o, err = z.Decommission.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "Decommission")
return
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *PoolStatus) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.ID, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
case "cl":
z.CmdLine, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "CmdLine")
return
}
case "lu":
z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "LastUpdate")
return
}
case "dec":
if msgp.IsNil(bts) {
bts, err = msgp.ReadNilBytes(bts)
if err != nil {
return
}
z.Decommission = nil
} else {
if z.Decommission == nil {
z.Decommission = new(PoolDecommissionInfo)
}
bts, err = z.Decommission.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "Decommission")
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *PoolStatus) Msgsize() (s int) {
s = 1 + 3 + msgp.IntSize + 3 + msgp.StringPrefixSize + len(z.CmdLine) + 3 + msgp.TimeSize + 4
if z.Decommission == nil {
s += msgp.NilSize
} else {
s += z.Decommission.Msgsize()
}
return
}
// DecodeMsg implements msgp.Decodable
func (z *poolMeta) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "v":
z.Version, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
case "pls":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Pools")
return
}
if cap(z.Pools) >= int(zb0002) {
z.Pools = (z.Pools)[:zb0002]
} else {
z.Pools = make([]PoolStatus, zb0002)
}
for za0001 := range z.Pools {
err = z.Pools[za0001].DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "Pools", za0001)
return
}
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *poolMeta) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "v"
err = en.Append(0x82, 0xa1, 0x76)
if err != nil {
return
}
err = en.WriteInt(z.Version)
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
// write "pls"
err = en.Append(0xa3, 0x70, 0x6c, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.Pools)))
if err != nil {
err = msgp.WrapError(err, "Pools")
return
}
for za0001 := range z.Pools {
err = z.Pools[za0001].EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "Pools", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *poolMeta) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "v"
o = append(o, 0x82, 0xa1, 0x76)
o = msgp.AppendInt(o, z.Version)
// string "pls"
o = append(o, 0xa3, 0x70, 0x6c, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.Pools)))
for za0001 := range z.Pools {
o, err = z.Pools[za0001].MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "Pools", za0001)
return
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *poolMeta) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "v":
z.Version, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
case "pls":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Pools")
return
}
if cap(z.Pools) >= int(zb0002) {
z.Pools = (z.Pools)[:zb0002]
} else {
z.Pools = make([]PoolStatus, zb0002)
}
for za0001 := range z.Pools {
bts, err = z.Pools[za0001].UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "Pools", za0001)
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *poolMeta) Msgsize() (s int) {
s = 1 + 2 + msgp.IntSize + 4 + msgp.ArrayHeaderSize
for za0001 := range z.Pools {
s += z.Pools[za0001].Msgsize()
}
return
}

View File

@ -0,0 +1,349 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalPoolDecommissionInfo(t *testing.T) {
v := PoolDecommissionInfo{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgPoolDecommissionInfo(b *testing.B) {
v := PoolDecommissionInfo{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgPoolDecommissionInfo(b *testing.B) {
v := PoolDecommissionInfo{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalPoolDecommissionInfo(b *testing.B) {
v := PoolDecommissionInfo{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodePoolDecommissionInfo(t *testing.T) {
v := PoolDecommissionInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodePoolDecommissionInfo Msgsize() is inaccurate")
}
vn := PoolDecommissionInfo{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodePoolDecommissionInfo(b *testing.B) {
v := PoolDecommissionInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodePoolDecommissionInfo(b *testing.B) {
v := PoolDecommissionInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalPoolStatus(t *testing.T) {
v := PoolStatus{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgPoolStatus(b *testing.B) {
v := PoolStatus{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgPoolStatus(b *testing.B) {
v := PoolStatus{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalPoolStatus(b *testing.B) {
v := PoolStatus{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodePoolStatus(t *testing.T) {
v := PoolStatus{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodePoolStatus Msgsize() is inaccurate")
}
vn := PoolStatus{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodePoolStatus(b *testing.B) {
v := PoolStatus{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodePoolStatus(b *testing.B) {
v := PoolStatus{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalpoolMeta(t *testing.T) {
v := poolMeta{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgpoolMeta(b *testing.B) {
v := poolMeta{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgpoolMeta(b *testing.B) {
v := poolMeta{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalpoolMeta(b *testing.B) {
v := poolMeta{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodepoolMeta(t *testing.T) {
v := poolMeta{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodepoolMeta Msgsize() is inaccurate")
}
vn := poolMeta{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodepoolMeta(b *testing.B) {
v := poolMeta{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodepoolMeta(b *testing.B) {
v := poolMeta{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -42,10 +42,15 @@ import (
type erasureServerPools struct {
GatewayUnsupported
serverPools []*erasureSets
poolMetaMutex sync.RWMutex
poolMeta poolMeta
serverPools []*erasureSets
// Shut down async operations
// Shutdown async operations
shutdown context.CancelFunc
// Active decomission canceler
decomissionCancelers []context.CancelFunc
}
func (z *erasureServerPools) SinglePool() bool {
@ -62,7 +67,9 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
formats = make([]*formatErasureV3, len(endpointServerPools))
storageDisks = make([][]StorageAPI, len(endpointServerPools))
z = &erasureServerPools{serverPools: make([]*erasureSets, len(endpointServerPools))}
z = &erasureServerPools{
serverPools: make([]*erasureSets, len(endpointServerPools)),
}
)
var localDrives []string
@ -108,11 +115,26 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
return nil, fmt.Errorf("All serverPools should have same deployment ID expected %s, got %s", deploymentID, formats[i].ID)
}
z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i)
z.serverPools[i], err = newErasureSets(ctx, ep, storageDisks[i], formats[i], commonParityDrives, i)
if err != nil {
return nil, err
}
}
z.decomissionCancelers = make([]context.CancelFunc, len(z.serverPools))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
err := z.Init(ctx)
if err != nil {
if !configRetriableErrors(err) {
logger.Fatal(err, "Unable to initialize backend")
}
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue
}
break
}
ctx, z.shutdown = context.WithCancel(ctx)
go intDataUpdateTracker.start(ctx, localDrives...)
return z, nil
@ -249,6 +271,10 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
g := errgroup.WithNErrs(len(z.serverPools))
for index := range z.serverPools {
index := index
// skip suspended pools for any new I/O.
if z.poolMeta.IsSuspended(index) {
continue
}
g.Go(func() error {
// Get the set where it would be placed.
storageInfos[index] = getDiskInfos(ctx, z.serverPools[index].getHashedSet(object).getDisks())
@ -302,6 +328,7 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
pinfo := poolObjInfo{
PoolIndex: i,
}
opts.VersionID = "" // no need to check for specific versionId
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts)
poolObjInfos[i] = pinfo
}(i, pool)
@ -322,6 +349,12 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return -1, pinfo.Err
}
// skip all objects from suspended pools for mutating calls.
if z.poolMeta.IsSuspended(pinfo.PoolIndex) && opts.Mutate {
continue
}
if isErrObjectNotFound(pinfo.Err) {
// No object exists or its a delete marker,
// check objInfo to confirm.
@ -333,23 +366,23 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
// exist proceed to next pool.
continue
}
return pinfo.PoolIndex, nil
}
return -1, toObjectErr(errFileNotFound, bucket, object)
}
func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) {
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{NoLock: true})
}
// getPoolIdxExisting returns the (first) found object pool index containing an object.
// getPoolIdxExistingNoLock returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
// If any other error is found, it is returned.
// The check is skipped if there is only one zone, and 0, nil is always returned in that case.
func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) {
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{})
func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) {
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
NoLock: true,
Mutate: true,
})
}
func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
@ -369,9 +402,10 @@ func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, objec
}
// getPoolIdx returns the found previous object and its corresponding pool idx,
// if none are found falls back to most available space pool.
// if none are found falls back to most available space pool, this function is
// designed to be only used by PutObject, CopyObject (newObject creation) and NewMultipartUpload.
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
idx, err = z.getPoolIdxExisting(ctx, bucket, object)
idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{Mutate: true})
if err != nil && !isErrObjectNotFound(err) {
return idx, err
}
@ -844,7 +878,11 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
}
func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error {
for _, zone := range z.serverPools {
for idx, zone := range z.serverPools {
if z.poolMeta.IsSuspended(idx) {
logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1))
continue
}
_, err := zone.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true})
if err != nil {
return err
@ -868,7 +906,8 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
return z.serverPools[0].DeleteObject(ctx, bucket, object, opts)
}
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
opts.Mutate = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return objInfo, err
}
@ -1984,7 +2023,8 @@ func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, obje
}
// We don't know the size here set 1GiB atleast.
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
opts.Mutate = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return ObjectInfo{}, err
}
@ -2000,7 +2040,8 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s
}
// We don't know the size here set 1GiB atleast.
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
opts.Mutate = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return ObjectInfo{}, err
}
@ -2015,7 +2056,8 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
}
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
opts.Mutate = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return ObjectInfo{}, err
}
@ -2030,7 +2072,8 @@ func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object s
return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts)
}
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
opts.Mutate = false
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return nil, err
}
@ -2045,7 +2088,8 @@ func (z *erasureServerPools) TransitionObject(ctx context.Context, bucket, objec
return z.serverPools[0].TransitionObject(ctx, bucket, object, opts)
}
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
opts.Mutate = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return err
}
@ -2060,7 +2104,8 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck
return z.serverPools[0].RestoreTransitionedObject(ctx, bucket, object, opts)
}
idx, err := z.getPoolIdxExisting(ctx, bucket, object)
opts.Mutate = true
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return err
}

View File

@ -70,7 +70,7 @@ type erasureSets struct {
erasureLockOwner string
// List of endpoints provided on the command line.
endpoints Endpoints
endpoints PoolEndpoints
// String version of all the endpoints, an optimization
// to avoid url.String() conversion taking CPU on
@ -207,7 +207,7 @@ func (s *erasureSets) connectDisks() {
var wg sync.WaitGroup
var setsJustConnected = make([]bool, s.setCount)
diskMap := s.getDiskMap()
for _, endpoint := range s.endpoints {
for _, endpoint := range s.endpoints.Endpoints {
if isEndpointConnectionStable(diskMap, endpoint, s.lastConnectDisksOpTime) {
continue
}
@ -317,7 +317,7 @@ func (s *erasureSets) GetEndpoints(setIndex int) func() []Endpoint {
eps := make([]Endpoint, s.setDriveCount)
for i := 0; i < s.setDriveCount; i++ {
eps[i] = s.endpoints[setIndex*s.setDriveCount+i]
eps[i] = s.endpoints.Endpoints[setIndex*s.setDriveCount+i]
}
return eps
}
@ -339,12 +339,12 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI {
const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5
// Initialize new set of erasure coded sets.
func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) {
func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) {
setCount := len(format.Erasure.Sets)
setDriveCount := len(format.Erasure.Sets[0])
endpointStrings := make([]string, len(endpoints))
for i, endpoint := range endpoints {
endpointStrings := make([]string, len(endpoints.Endpoints))
for i, endpoint := range endpoints.Endpoints {
endpointStrings[i] = endpoint.String()
}
@ -388,7 +388,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
}
var erasureLockers = map[string]dsync.NetLocker{}
for _, endpoint := range endpoints {
for _, endpoint := range endpoints.Endpoints {
if _, ok := erasureLockers[endpoint.Host]; !ok {
erasureLockers[endpoint.Host] = newLockAPI(endpoint)
}
@ -397,7 +397,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
for i := 0; i < setCount; i++ {
var lockerEpSet = set.NewStringSet()
for j := 0; j < setDriveCount; j++ {
endpoint := endpoints[i*setDriveCount+j]
endpoint := endpoints.Endpoints[i*setDriveCount+j]
// Only add lockers only one per endpoint and per erasure set.
if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) {
lockerEpSet.Add(endpoint.Host)
@ -1213,7 +1213,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
// HealFormat - heals missing `format.json` on fresh unformatted disks.
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
storageDisks, _ := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints)
storageDisks, _ := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints.Endpoints)
defer func(storageDisks []StorageAPI) {
if err != nil {
@ -1243,7 +1243,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}
// Fetch all the drive info status.
beforeDrives := formatsToDrivesInfo(s.endpoints, formats, sErrs)
beforeDrives := formatsToDrivesInfo(s.endpoints.Endpoints, formats, sErrs)
res.After.Drives = make([]madmin.HealDriveInfo, len(beforeDrives))
res.Before.Drives = make([]madmin.HealDriveInfo, len(beforeDrives))

View File

@ -190,7 +190,8 @@ func TestNewErasureSets(t *testing.T) {
t.Fatalf("Unable to format disks for erasure, %s", err)
}
if _, err := newErasureSets(ctx, endpoints, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil {
ep := PoolEndpoints{Endpoints: endpoints}
if _, err := newErasureSets(ctx, ep, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil {
t.Fatalf("Unable to initialize erasure")
}
}

View File

@ -23,7 +23,6 @@ import (
// GetTotalCapacity gets the total capacity in the cluster.
func GetTotalCapacity(diskInfo []madmin.Disk) (capacity uint64) {
for _, disk := range diskInfo {
capacity += disk.TotalSpace
}

View File

@ -616,6 +616,26 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam
return bucketStats
}
// ReloadPoolMeta reloads on disk updates on pool metadata
func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(ctx, func() error {
return client.ReloadPoolMeta(ctx)
}, idx, *client.host)
}
for _, nErr := range ng.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
if nErr.Err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
}
}
}
// LoadTransitionTierConfig notifies remote peers to load their remote tier
// configs from config store.
func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) {

View File

@ -46,6 +46,16 @@ const (
// StorageInfo - represents total capacity of underlying storage.
type StorageInfo = madmin.StorageInfo
// TotalUsableCapacity - total usable capacity
func TotalUsableCapacity(s StorageInfo) float64 {
return GetTotalUsableCapacity(s.Disks, s)
}
// TotalUsableCapacityFree - total usable capacity free
func TotalUsableCapacityFree(s StorageInfo) float64 {
return GetTotalUsableCapacityFree(s.Disks, s)
}
// objectHistogramInterval is an interval that will be
// used to report the histogram of objects data sizes
type objectHistogramInterval struct {

View File

@ -71,6 +71,9 @@ type ObjectOptions struct {
// Use the maximum parity (N/2), used when saving server configuration files
MaxParity bool
// Mutate set to 'true' if the call is namespace mutation call
Mutate bool
}
// ExpirationOptions represents object options for object expiration at objectLayer.

View File

@ -752,6 +752,16 @@ func (client *peerRESTClient) UpdateMetacacheListing(ctx context.Context, m meta
}
func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error {
respBody, err := client.callWithContext(ctx, peerRESTMethodReloadPoolMeta, nil, nil, 0)
if err != nil {
logger.LogIf(ctx, err)
return err
}
defer http.DrainBody(respBody)
return nil
}
func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) error {
respBody, err := client.callWithContext(ctx, peerRESTMethodLoadTransitionTierConfig, nil, nil, 0)
if err != nil {

View File

@ -18,7 +18,7 @@
package cmd
const (
peerRESTVersion = "v15" // Add LoadTransitionTierConfig
peerRESTVersion = "v16" // Add LoadPoolMeta
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@ -67,6 +67,7 @@ const (
peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig"
peerRESTMethodSpeedtest = "/speedtest"
peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig"
peerRESTMethodReloadPoolMeta = "/reloadpoolmeta"
)
const (

View File

@ -1025,6 +1025,27 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
func (s *peerRESTServer) ReloadPoolMetaHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
pools, ok := objAPI.(*erasureServerPools)
if !ok {
return
}
if err := pools.ReloadPoolMeta(r.Context()); err != nil {
s.writeErrorResponse(w, err)
return
}
}
func (s *peerRESTServer) LoadTransitionTierConfigHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
@ -1360,4 +1381,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
}

View File

@ -72,11 +72,12 @@ type FilesInfo struct {
IsTruncated bool
}
// FilesInfoVersions represents a list of file versions,
// additionally indicates if the list is last.
type FilesInfoVersions struct {
FilesVersions []FileInfoVersions
IsTruncated bool
// Size returns size of all versions for the object 'Name'
func (f FileInfoVersions) Size() (size int64) {
for _, v := range f.Versions {
size += v.Size
}
return size
}
// FileInfoVersions represent a list of versions for a given file.
@ -200,7 +201,9 @@ func (fi *FileInfo) SetInlineData() {
}
// VersionPurgeStatusKey denotes purge status in metadata
const VersionPurgeStatusKey = "purgestatus"
const (
VersionPurgeStatusKey = ReservedMetadataPrefixLower + "purgestatus"
)
// newFileInfo - initializes new FileInfo, allocates a fresh erasure info.
func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) {

View File

@ -1454,178 +1454,6 @@ func (z *FilesInfo) Msgsize() (s int) {
return
}
// DecodeMsg implements msgp.Decodable
func (z *FilesInfoVersions) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "FilesVersions":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "FilesVersions")
return
}
if cap(z.FilesVersions) >= int(zb0002) {
z.FilesVersions = (z.FilesVersions)[:zb0002]
} else {
z.FilesVersions = make([]FileInfoVersions, zb0002)
}
for za0001 := range z.FilesVersions {
err = z.FilesVersions[za0001].DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "FilesVersions", za0001)
return
}
}
case "IsTruncated":
z.IsTruncated, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "IsTruncated")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *FilesInfoVersions) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "FilesVersions"
err = en.Append(0x82, 0xad, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.FilesVersions)))
if err != nil {
err = msgp.WrapError(err, "FilesVersions")
return
}
for za0001 := range z.FilesVersions {
err = z.FilesVersions[za0001].EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "FilesVersions", za0001)
return
}
}
// write "IsTruncated"
err = en.Append(0xab, 0x49, 0x73, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteBool(z.IsTruncated)
if err != nil {
err = msgp.WrapError(err, "IsTruncated")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *FilesInfoVersions) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "FilesVersions"
o = append(o, 0x82, 0xad, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.FilesVersions)))
for za0001 := range z.FilesVersions {
o, err = z.FilesVersions[za0001].MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "FilesVersions", za0001)
return
}
}
// string "IsTruncated"
o = append(o, 0xab, 0x49, 0x73, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x64)
o = msgp.AppendBool(o, z.IsTruncated)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *FilesInfoVersions) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "FilesVersions":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "FilesVersions")
return
}
if cap(z.FilesVersions) >= int(zb0002) {
z.FilesVersions = (z.FilesVersions)[:zb0002]
} else {
z.FilesVersions = make([]FileInfoVersions, zb0002)
}
for za0001 := range z.FilesVersions {
bts, err = z.FilesVersions[za0001].UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "FilesVersions", za0001)
return
}
}
case "IsTruncated":
z.IsTruncated, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "IsTruncated")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *FilesInfoVersions) Msgsize() (s int) {
s = 1 + 14 + msgp.ArrayHeaderSize
for za0001 := range z.FilesVersions {
s += z.FilesVersions[za0001].Msgsize()
}
s += 12 + msgp.BoolSize
return
}
// DecodeMsg implements msgp.Decodable
func (z *VolInfo) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32

View File

@ -574,119 +574,6 @@ func BenchmarkDecodeFilesInfo(b *testing.B) {
}
}
func TestMarshalUnmarshalFilesInfoVersions(t *testing.T) {
v := FilesInfoVersions{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgFilesInfoVersions(b *testing.B) {
v := FilesInfoVersions{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgFilesInfoVersions(b *testing.B) {
v := FilesInfoVersions{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalFilesInfoVersions(b *testing.B) {
v := FilesInfoVersions{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeFilesInfoVersions(t *testing.T) {
v := FilesInfoVersions{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeFilesInfoVersions Msgsize() is inaccurate")
}
vn := FilesInfoVersions{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeFilesInfoVersions(b *testing.B) {
v := FilesInfoVersions{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeFilesInfoVersions(b *testing.B) {
v := FilesInfoVersions{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalVolInfo(t *testing.T) {
v := VolInfo{}
bts, err := v.MarshalMsg(nil)

View File

@ -38,6 +38,12 @@ func (v versionsSorter) sort() {
})
}
func (v versionsSorter) reverse() {
sort.Slice(v, func(i, j int) bool {
return v[i].ModTime.Before(v[j].ModTime)
})
}
func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) {
fivs, err := getAllFileInfoVersions(xlMetaBuf, volume, path)
if err != nil {

View File

@ -966,6 +966,7 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error)
VersionID: versionID,
Deleted: true,
}
fi.ReplicationState = GetInternalReplicationState(j.MetaSys)
if j.FreeVersion() {

45
docs/distributed/POOLS.md Normal file
View File

@ -0,0 +1,45 @@
## Decommissioning
### How to decommission a pool?
```
$ mc admin decomission start alias/ http://minio{1...2}/data{1...4}
```
### Status decomissioning a pool
#### Decommissioning without args lists all pools
```
$ mc admin decomission status alias/
| ID | Pools | Capacity | Status |
|----|---------------------------------|----------------------|----------|
| 0 | http://minio{1...2}/data{1...4} | N (used) / M (total) | Active |
| 1 | http://minio{3...4}/data{1...4} | N (used) / M (total) | Draining |
```
#### Decommissioning status
```
$ mc admin decomission status alias/ http://minio{1...2}/data{1...4}
Progress: ===================> [1GiB/sec] [15%] [4TiB/50TiB]
Time Remaining: 4 hours (started 3 hours ago)
```
#### A pool not under decomissioning will throw an error
```
$ mc admin decomission status alias/ http://minio{1...2}/data{1...4}
ERROR: This pool is not scheduled for decomissioning currently.
```
### Canceling a decommissioned pool (to make it again active)
```
$ mc admin decomission cancel alias/
| ID | Pools | Capacity | Status |
|----|---------------------------------|----------------------|----------|
| 0 | http://minio{1...2}/data{1...4} | N (used) / M (total) | Draining |
```
```
~ mc admin decomission cancel alias/ http://minio{1...2}/data{1...4}
| ID | Pools | Capacity | Status |
|----|---------------------------------|----------------------|--------|
| 0 | http://minio{1...2}/data{1...4} | N (used) / M (total) | Active |
```