minio/cmd/erasure.go
Harshavardhana a20d4568a2
fix: make sure to use uniform drive count calculation (#10208)
It is possible in situations when server was deployed
in asymmetric configuration in the past such as

```
minio server ~/fs{1...4}/disk{1...5}
```

Results in setDriveCount of 10 in older releases
but with fairly recent releases we have moved to
having server affinity which means that a set drive
count ascertained from above config will be now '4'

While the object layer make sure that we honor
`format.json` the storageClass configuration however
was by mistake was using the global value obtained
by heuristics. Which leads to prematurely using
lower parity without being requested by the an
administrator.

This PR fixes this behavior.
2020-08-05 13:31:12 -07:00

405 lines
11 KiB
Go

/*
* MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
)
// OfflineDisk represents an unavailable disk.
var OfflineDisk StorageAPI // zero value is nil
// partialOperation is a successful upload/delete of an object
// but not written in all disks (having quorum)
type partialOperation struct {
bucket string
object string
versionID string
failedSet int
}
// erasureObjects - Implements ER object layer.
type erasureObjects struct {
GatewayUnsupported
// getDisks returns list of storageAPIs.
getDisks func() []StorageAPI
// getLockers returns list of remote and local lockers.
getLockers func() []dsync.NetLocker
// getEndpoints returns list of endpoint strings belonging this set.
// some may be local and some remote.
getEndpoints func() []string
// Locker mutex map.
nsMutex *nsLockMap
// Byte pools used for temporary i/o buffers.
bp *bpool.BytePoolCap
mrfOpCh chan partialOperation
}
// NewNSLock - initialize a new namespace RWLocker instance.
func (er erasureObjects) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
return er.nsMutex.NewNSLock(ctx, er.getLockers, bucket, objects...)
}
// SetDriveCount returns the current drives per set.
func (er erasureObjects) SetDriveCount() int {
return len(er.getDisks())
}
// Shutdown function for object storage interface.
func (er erasureObjects) Shutdown(ctx context.Context) error {
// Add any object layer shutdown activities here.
closeStorageDisks(er.getDisks())
return nil
}
// byDiskTotal is a collection satisfying sort.Interface.
type byDiskTotal []madmin.Disk
func (d byDiskTotal) Len() int { return len(d) }
func (d byDiskTotal) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byDiskTotal) Less(i, j int) bool {
return d[i].TotalSpace < d[j].TotalSpace
}
func diskErrToDriveState(err error) (state string) {
state = madmin.DriveStateUnknown
switch {
case errors.Is(err, errDiskNotFound):
state = madmin.DriveStateOffline
case errors.Is(err, errCorruptedFormat):
state = madmin.DriveStateCorrupt
case errors.Is(err, errUnformattedDisk):
state = madmin.DriveStateUnformatted
case errors.Is(err, errDiskAccessDenied):
state = madmin.DriveStatePermission
case errors.Is(err, errFaultyDisk):
state = madmin.DriveStateFaulty
case err == nil:
state = madmin.DriveStateOk
}
return
}
// getDisksInfo - fetch disks info across all other storage API.
func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Disk, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
disksInfo = make([]madmin.Disk, len(disks))
onlineDisks = make(madmin.BackendDisks)
offlineDisks = make(madmin.BackendDisks)
for _, ep := range endpoints {
if _, ok := offlineDisks[ep]; !ok {
offlineDisks[ep] = 0
}
if _, ok := onlineDisks[ep]; !ok {
onlineDisks[ep] = 0
}
}
g := errgroup.WithNErrs(len(disks))
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == OfflineDisk {
disksInfo[index] = madmin.Disk{
State: diskErrToDriveState(errDiskNotFound),
Endpoint: endpoints[index],
}
// Storage disk is empty, perhaps ignored disk or not available.
return errDiskNotFound
}
info, err := disks[index].DiskInfo()
if err != nil {
if !IsErr(err, baseErrs...) {
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String())
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
}
}
di := madmin.Disk{
Endpoint: endpoints[index],
DrivePath: info.MountPath,
TotalSpace: info.Total,
UsedSpace: info.Used,
AvailableSpace: info.Free,
UUID: info.ID,
State: diskErrToDriveState(err),
}
if info.Total > 0 {
di.Utilization = float64(info.Used / info.Total * 100)
}
disksInfo[index] = di
return err
}, index)
}
errs = g.Wait()
// Wait for the routines.
for i, diskInfoErr := range errs {
ep := endpoints[i]
if diskInfoErr != nil {
offlineDisks[ep]++
continue
}
onlineDisks[ep]++
}
// Success.
return disksInfo, errs, onlineDisks, offlineDisks
}
// Get an aggregated storage info across all disks.
func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []error) {
disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, endpoints)
// Sort so that the first element is the smallest.
sort.Sort(byDiskTotal(disksInfo))
storageInfo := StorageInfo{
Disks: disksInfo,
}
storageInfo.Backend.Type = BackendErasure
storageInfo.Backend.OnlineDisks = onlineDisks
storageInfo.Backend.OfflineDisks = offlineDisks
return storageInfo, errs
}
// StorageInfo - returns underlying storage statistics.
func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
disks := er.getDisks()
endpoints := er.getEndpoints()
if local {
var localDisks []StorageAPI
var localEndpoints []string
for i, disk := range disks {
if disk != nil {
if disk.IsLocal() {
// Append this local disk since local flag is true
localDisks = append(localDisks, disk)
localEndpoints = append(localEndpoints, endpoints[i])
}
}
}
disks = localDisks
endpoints = localEndpoints
}
return getStorageInfo(disks, endpoints)
}
// GetMetrics - is not implemented and shouldn't be called.
func (er erasureObjects) GetMetrics(ctx context.Context) (*Metrics, error) {
logger.LogIf(ctx, NotImplemented{})
return &Metrics{}, NotImplemented{}
}
// CrawlAndGetDataUsage collects usage from all buckets.
// updates are sent as different parts of the underlying
// structure has been traversed.
func (er erasureObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
return NotImplemented{API: "CrawlAndGetDataUsage"}
}
// CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed.
// Updates are sent on a regular basis and the caller *must* consume them.
func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error {
var disks []StorageAPI
for _, d := range er.getLoadBalancedDisks() {
if d == nil || !d.IsOnline() {
continue
}
disks = append(disks, d)
}
if len(disks) == 0 || len(buckets) == 0 {
return nil
}
// Load bucket totals
oldCache := dataUsageCache{}
err := oldCache.load(ctx, er, dataUsageCacheName)
if err != nil {
return err
}
// New cache..
cache := dataUsageCache{
Info: dataUsageCacheInfo{
Name: dataUsageRoot,
NextCycle: oldCache.Info.NextCycle,
},
Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)),
}
bloom := bf.bytes()
// Put all buckets into channel.
bucketCh := make(chan BucketInfo, len(buckets))
// Add new buckets first
for _, b := range buckets {
if oldCache.find(b.Name) == nil {
bucketCh <- b
}
}
// Add existing buckets if changes or lifecycles.
for _, b := range buckets {
e := oldCache.find(b.Name)
if e != nil {
cache.replace(b.Name, dataUsageRoot, *e)
lc, err := globalLifecycleSys.Get(b.Name)
activeLC := err == nil && lc.HasActiveRules("", true)
if activeLC || bf == nil || bf.containsDir(b.Name) {
bucketCh <- b
} else {
if intDataUpdateTracker.debug {
logger.Info(color.Green("crawlAndGetDataUsage:")+" Skipping bucket %v, not updated", b.Name)
}
}
}
}
close(bucketCh)
bucketResults := make(chan dataUsageEntryInfo, len(disks))
// Start async collector/saver.
// This goroutine owns the cache.
var saverWg sync.WaitGroup
saverWg.Add(1)
go func() {
const updateTime = 30 * time.Second
t := time.NewTicker(updateTime)
defer t.Stop()
defer saverWg.Done()
var lastSave time.Time
saveLoop:
for {
select {
case <-ctx.Done():
// Return without saving.
return
case <-t.C:
if cache.Info.LastUpdate.Equal(lastSave) {
continue
}
logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName))
updates <- cache.clone()
lastSave = cache.Info.LastUpdate
case v, ok := <-bucketResults:
if !ok {
break saveLoop
}
cache.replace(v.Name, v.Parent, v.Entry)
cache.Info.LastUpdate = time.Now()
}
}
// Save final state...
cache.Info.NextCycle++
cache.Info.LastUpdate = time.Now()
logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName))
updates <- cache
}()
// Start one crawler per disk
var wg sync.WaitGroup
wg.Add(len(disks))
for i := range disks {
go func(i int) {
defer wg.Done()
disk := disks[i]
for bucket := range bucketCh {
select {
case <-ctx.Done():
return
default:
}
// Load cache for bucket
cacheName := pathJoin(bucket.Name, dataUsageCacheName)
cache := dataUsageCache{}
logger.LogIf(ctx, cache.load(ctx, er, cacheName))
if cache.Info.Name == "" {
cache.Info.Name = bucket.Name
}
cache.Info.BloomFilter = bloom
if cache.Info.Name != bucket.Name {
logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name))
cache.Info = dataUsageCacheInfo{
Name: bucket.Name,
LastUpdate: time.Time{},
NextCycle: 0,
}
}
// Calc usage
before := cache.Info.LastUpdate
cache, err = disk.CrawlAndGetDataUsage(ctx, cache)
cache.Info.BloomFilter = nil
if err != nil {
logger.LogIf(ctx, err)
if cache.Info.LastUpdate.After(before) {
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
}
continue
}
var root dataUsageEntry
if r := cache.root(); r != nil {
root = cache.flatten(*r)
}
bucketResults <- dataUsageEntryInfo{
Name: cache.Info.Name,
Parent: dataUsageRoot,
Entry: root,
}
// Save cache
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
}
}(i)
}
wg.Wait()
close(bucketResults)
saverWg.Wait()
return nil
}
// Health shouldn't be called directly - will panic
func (er erasureObjects) Health(ctx context.Context, _ HealthOptions) HealthResult {
logger.CriticalIf(ctx, NotImplemented{})
return HealthResult{}
}