minio/cmd/xl-v1.go
Harshavardhana 0cd0f6c255
Avoid error modification during IAM migration (#8156)
The underlying errors are important, for IAM
requirements and should wait appropriately at
the caller level, this allows for distributed
setups to run properly and not fail prematurely
during startup.

Also additionally fix the onlineDisk counting
2019-08-30 10:41:02 -07:00

170 lines
4.6 KiB
Go

/*
* MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"sort"
"sync"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool"
)
// XL constants.
const (
// XL metadata file carries per object metadata.
xlMetaJSONFile = "xl.json"
)
// OfflineDisk represents an unavailable disk.
var OfflineDisk StorageAPI // zero value is nil
// xlObjects - Implements XL object layer.
type xlObjects struct {
// name space mutex for object layer.
nsMutex *nsLockMap
// getDisks returns list of storageAPIs.
getDisks func() []StorageAPI
// Byte pools used for temporary i/o buffers.
bp *bpool.BytePoolCap
// TODO: Deprecated only kept here for tests, should be removed in future.
storageDisks []StorageAPI
// TODO: ListObjects pool management, should be removed in future.
listPool *TreeWalkPool
}
// Shutdown function for object storage interface.
func (xl xlObjects) Shutdown(ctx context.Context) error {
// Add any object layer shutdown activities here.
closeStorageDisks(xl.getDisks())
return nil
}
// byDiskTotal is a collection satisfying sort.Interface.
type byDiskTotal []DiskInfo
func (d byDiskTotal) Len() int { return len(d) }
func (d byDiskTotal) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byDiskTotal) Less(i, j int) bool {
return d[i].Total < d[j].Total
}
// getDisksInfo - fetch disks info across all other storage API.
func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks int, offlineDisks int) {
disksInfo = make([]DiskInfo, len(disks))
errs := make([]error, len(disks))
var wg sync.WaitGroup
for i, storageDisk := range disks {
if storageDisk == nil {
// Storage disk is empty, perhaps ignored disk or not available.
errs[i] = errDiskNotFound
continue
}
wg.Add(1)
go func(id int, sDisk StorageAPI) {
defer wg.Done()
info, err := sDisk.DiskInfo()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", sDisk.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
if IsErr(err, baseErrs...) {
errs[id] = err
return
}
}
disksInfo[id] = info
}(i, storageDisk)
}
// Wait for the routines.
wg.Wait()
for _, err := range errs {
if err != nil {
offlineDisks++
continue
}
onlineDisks++
}
// Success.
return disksInfo, onlineDisks, offlineDisks
}
// returns sorted disksInfo slice which has only valid entries.
// i.e the entries where the total size of the disk is not stated
// as 0Bytes, this means that the disk is not online or ignored.
func sortValidDisksInfo(disksInfo []DiskInfo) []DiskInfo {
var validDisksInfo []DiskInfo
for _, diskInfo := range disksInfo {
if diskInfo.Total == 0 {
continue
}
validDisksInfo = append(validDisksInfo, diskInfo)
}
sort.Sort(byDiskTotal(validDisksInfo))
return validDisksInfo
}
// Get an aggregated storage info across all disks.
func getStorageInfo(disks []StorageAPI) StorageInfo {
disksInfo, onlineDisks, offlineDisks := getDisksInfo(disks)
// Sort so that the first element is the smallest.
validDisksInfo := sortValidDisksInfo(disksInfo)
// If there are no valid disks, set total and free disks to 0
if len(validDisksInfo) == 0 {
return StorageInfo{}
}
// Combine all disks to get total usage
var used, total, available uint64
for _, di := range validDisksInfo {
used = used + di.Used
total = total + di.Total
available = available + di.Free
}
_, sscParity := getRedundancyCount(standardStorageClass, len(disks))
_, rrscparity := getRedundancyCount(reducedRedundancyStorageClass, len(disks))
storageInfo := StorageInfo{
Used: used,
Total: total,
Available: available,
}
storageInfo.Backend.Type = BackendErasure
storageInfo.Backend.OnlineDisks = onlineDisks
storageInfo.Backend.OfflineDisks = offlineDisks
storageInfo.Backend.StandardSCParity = sscParity
storageInfo.Backend.RRSCParity = rrscparity
return storageInfo
}
// StorageInfo - returns underlying storage statistics.
func (xl xlObjects) StorageInfo(ctx context.Context) StorageInfo {
return getStorageInfo(xl.getDisks())
}